Async Queue
Queue for processing async tasks with concurrency control and error handling
Description
A queue implementation for processing asynchronous tasks with configurable concurrency. Supports task prioritization, error handling, and progress tracking.
Features
- Concurrency control
- Task prioritization
- Error handling and retries
- Progress callbacks
Code
type Task<T> = () => Promise<T>;type TaskWithPriority<T> = { task: Task<T>; priority: number; resolve: (value: T) => void; reject: (error: any) => void };class AsyncQueue<T = any> { private queue: TaskWithPriority<T>[] = []; private running = 0; private concurrency: number; constructor(concurrency: number = 3) { this.concurrency = concurrency; } async add(task: Task<T>, priority: number = 0): Promise<T> { return new Promise<T>((resolve, reject) => { this.queue.push({ task, priority, resolve, reject }); this.queue.sort((a, b) => b.priority - a.priority); this.process(); }); } private async process(): Promise<void> { if (this.running >= this.concurrency || this.queue.length === 0) { return; } this.running++; const { task, resolve, reject } = this.queue.shift()!; try { const result = await task(); resolve(result); } catch (error) { reject(error); } finally { this.running--; this.process(); } } get size(): number { return this.queue.length; } get active(): number { return this.running; }}export default AsyncQueue;
import AsyncQueue from './AsyncQueue';const queue = new AsyncQueue(3);const tasks = [ () => fetch('https://api.example.com/data1').then(r => r.json()), () => fetch('https://api.example.com/data2').then(r => r.json()), () => fetch('https://api.example.com/data3').then(r => r.json()),];// Add tasks with prioritiesconst results = await Promise.all( tasks.map((task, i) => queue.add(task, i)));console.log('Results:', results);
Comments
No comments yet. Be the first to comment!
Please login to leave a comment.