Batch Processing

January 6, 2026 ยท View on GitHub

Problem

Processing many requests sequentially is slow and inefficient:

  • High latency (wait for each request)
  • Underutilized rate limits
  • Poor resource usage
  • Slow time-to-completion

Applications often need to process:

  • Multiple documents
  • Large datasets
  • User-generated content
  • Batch analytics

Solution

Implement efficient batch processing with:

  1. Concurrent request handling
  2. Rate limit awareness
  3. Progress tracking
  4. Error recovery
  5. Result aggregation

Code

import { NeuroLink } from "@juspay/neurolink";

type BatchConfig = {
  concurrency?: number; // Max parallel requests
  rateLimit?: number; // Max requests per second
  onProgress?: (completed: number, total: number) => void;
  onError?: (error: Error, item: any, index: number) => void;
  retryFailures?: boolean;
};

type BatchResult<T, R> = {
  results: R[];
  errors: Array<{ item: T; error: Error; index: number }>;
  duration: number;
  successRate: number;
};

class BatchProcessor {
  private neurolink: NeuroLink;

  constructor() {
    this.neurolink = new NeuroLink();
  }

  /**
   * Process items in batches with concurrency control
   */
  async processBatch<T, R>(
    items: T[],
    processFn: (item: T, index: number) => Promise<R>,
    config: BatchConfig = {},
  ): Promise<BatchResult<T, R>> {
    const {
      concurrency = 5,
      rateLimit = 10, // requests per second
      onProgress,
      onError,
      retryFailures = true,
    } = config;

    const startTime = Date.now();
    const results: R[] = new Array(items.length);
    const errors: Array<{ item: T; error: Error; index: number }> = [];

    let completed = 0;
    let inFlight = 0;
    let currentIndex = 0;

    const minDelay = 1000 / rateLimit; // ms between requests

    return new Promise((resolve) => {
      const processNext = async () => {
        if (currentIndex >= items.length && inFlight === 0) {
          // All done
          const duration = Date.now() - startTime;
          const successRate =
            (results.filter((r) => r !== undefined).length / items.length) *
            100;

          resolve({
            results,
            errors,
            duration,
            successRate,
          });
          return;
        }

        if (inFlight >= concurrency || currentIndex >= items.length) {
          return;
        }

        const index = currentIndex++;
        const item = items[index];
        inFlight++;

        try {
          const result = await processFn(item, index);
          results[index] = result;

          completed++;
          onProgress?.(completed, items.length);
        } catch (error: any) {
          errors.push({ item, error, index });
          onError?.(error, item, index);

          if (retryFailures) {
            // Add to end of queue for retry
            items.push(item);
          }
        } finally {
          inFlight--;

          // Rate limiting
          await new Promise((r) => setTimeout(r, minDelay));

          processNext();
        }

        processNext();
      };

      // Start concurrent workers
      for (let i = 0; i < concurrency; i++) {
        processNext();
      }
    });
  }

  /**
   * Process text items with AI
   */
  async processTexts(
    texts: string[],
    prompt: string,
    config: BatchConfig & { provider?: string } = {},
  ): Promise<BatchResult<string, string>> {
    return this.processBatch(
      texts,
      async (text, index) => {
        const result = await this.neurolink.generate({
          input: { text: `${prompt}\n\n${text}` },
          provider: config.provider || "anthropic",
          model: "claude-3-haiku-20240307", // Fast, cheap model
        });

        return result.content;
      },
      config,
    );
  }

  /**
   * Process with structured output
   */
  async processStructured<T>(
    items: string[],
    prompt: string,
    schema: any,
    config: BatchConfig = {},
  ): Promise<BatchResult<string, T>> {
    return this.processBatch(
      items,
      async (item) => {
        const result = await this.neurolink.generate({
          input: { text: `${prompt}\n\n${item}` },
          provider: "openai",
          structuredOutput: { type: "json", schema },
        });

        return JSON.parse(result.content) as T;
      },
      config,
    );
  }

  /**
   * Process files in parallel
   */
  async processFiles(
    filePaths: string[],
    processFn: (content: string, path: string) => Promise<any>,
    config: BatchConfig = {},
  ) {
    const fs = await import("fs/promises");

    return this.processBatch(
      filePaths,
      async (path, index) => {
        const content = await fs.readFile(path, "utf-8");
        return processFn(content, path);
      },
      config,
    );
  }
}

// Usage Example 1: Sentiment Analysis
async function example1_SentimentAnalysis() {
  const processor = new BatchProcessor();

  const reviews = [
    "This product is amazing! Highly recommend.",
    "Terrible quality, waste of money.",
    "It's okay, nothing special.",
    "Best purchase I've made this year!",
    "Disappointed, expected much better.",
  ];

  console.log("=== Sentiment Analysis ===");

  const result = await processor.processTexts(
    reviews,
    "Classify the sentiment of this review as positive, negative, or neutral. Return only the sentiment.",
    {
      concurrency: 3,
      rateLimit: 5,
      onProgress: (completed, total) => {
        console.log(
          `Progress: ${completed}/${total} (${((completed / total) * 100).toFixed(0)}%)`,
        );
      },
    },
  );

  console.log("\nโœ… Results:");
  result.results.forEach((sentiment, i) => {
    console.log(`  ${i + 1}. ${reviews[i].slice(0, 30)}... โ†’ ${sentiment}`);
  });

  console.log(`\n๐Ÿ“Š Stats:`);
  console.log(`  Duration: ${result.duration}ms`);
  console.log(`  Success rate: ${result.successRate.toFixed(1)}%`);
  console.log(`  Errors: ${result.errors.length}`);
}

// Example 2: Data Extraction
type ProductInfo = {
  name: string;
  price: number;
  category: string;
};

const productSchema = {
  type: "object",
  properties: {
    name: { type: "string" },
    price: { type: "number" },
    category: { type: "string" },
  },
  required: ["name", "price", "category"],
};

async function example2_DataExtraction() {
  const processor = new BatchProcessor();

  const descriptions = [
    "The UltraBook Pro laptop costs \$1299 and is perfect for professionals.",
    "Get the SmartWatch X for only \$299 - the best fitness tracker available.",
    "Premium wireless headphones, \$199, audiophile quality sound.",
  ];

  console.log("\n=== Data Extraction ===");

  const result = await processor.processStructured<ProductInfo>(
    descriptions,
    "Extract product information:",
    productSchema,
    {
      concurrency: 2,
      rateLimit: 3,
    },
  );

  console.log("\nโœ… Extracted Products:");
  result.results.forEach((product, i) => {
    console.log(
      `  ${i + 1}. ${product.name} - $${product.price} (${product.category})`,
    );
  });
}

// Example 3: Document Summarization
async function example3_DocumentSummarization() {
  const processor = new BatchProcessor();

  const documents = [
    "Long document about artificial intelligence and machine learning...",
    "Article discussing climate change impacts on global economy...",
    "Research paper on quantum computing applications in cryptography...",
  ];

  console.log("\n=== Document Summarization ===");

  let startTime = Date.now();

  const result = await processor.processTexts(
    documents,
    "Summarize this in 1-2 sentences:",
    {
      concurrency: 3,
      rateLimit: 10,
      onProgress: (completed, total) => {
        const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
        console.log(`Progress: ${completed}/${total} (${elapsed}s)`);
      },
      onError: (error, item, index) => {
        console.error(`โŒ Error processing item ${index}:`, error.message);
      },
    },
  );

  console.log("\nโœ… Summaries:");
  result.results.forEach((summary, i) => {
    console.log(`  ${i + 1}. ${summary}`);
  });
}

// Main
async function main() {
  await example1_SentimentAnalysis();
  await example2_DataExtraction();
  await example3_DocumentSummarization();
}

main();

Explanation

1. Concurrency Control

Process multiple requests simultaneously:

concurrency: 5; // 5 requests in parallel

Benefits:

  • 5x faster than sequential
  • Efficient resource usage
  • Respects provider limits

2. Rate Limiting

Prevent exceeding provider rate limits:

rateLimit: 10  // 10 requests per second
minDelay = 1000 / 10 = 100ms between requests

3. Progress Tracking

Monitor batch processing in real-time:

onProgress: (completed, total) => {
  console.log(`${completed}/${total} (${percentage}%)`);
};

4. Error Handling

Individual failures don't stop the batch:

onError: (error, item, index) => {
  // Log, retry, or skip
};

5. Retry Logic

Automatically retry failed items:

retryFailures: true; // Add to queue end

Variations

Chunked Batch Processing

Process very large datasets in chunks:

async function processInChunks<T, R>(
  items: T[],
  chunkSize: number,
  processFn: (items: T[]) => Promise<R[]>,
): Promise<R[]> {
  const results: R[] = [];

  for (let i = 0; i < items.length; i += chunkSize) {
    const chunk = items.slice(i, i + chunkSize);
    console.log(`Processing chunk ${i / chunkSize + 1}...`);

    const chunkResults = await processFn(chunk);
    results.push(...chunkResults);

    // Break between chunks
    await new Promise((r) => setTimeout(r, 1000));
  }

  return results;
}

// Usage
const results = await processInChunks(allItems, 100, async (chunk) =>
  processor.processBatch(chunk, processFn).then((r) => r.results),
);

Priority Queue

Process high-priority items first:

type PriorityItem<T> = {
  item: T;
  priority: number;
};

async function processPriorityBatch<T, R>(
  items: PriorityItem<T>[],
  processFn: (item: T) => Promise<R>,
) {
  // Sort by priority (higher first)
  const sorted = items.sort((a, b) => b.priority - a.priority);

  return processor.processBatch(
    sorted.map((p) => p.item),
    processFn,
  );
}

Result Streaming

Stream results as they complete:

async function* processBatchStreaming<T, R>(
  items: T[],
  processFn: (item: T) => Promise<R>,
): AsyncIterable<{ index: number; result: R }> {
  const promises = items.map((item, index) =>
    processFn(item).then((result) => ({ index, result })),
  );

  for (const promise of promises) {
    yield await promise;
  }
}

// Usage
for await (const { index, result } of processBatchStreaming(items, processFn)) {
  console.log(`Completed item ${index}:`, result);
}

Cost Tracking

Track costs per batch:

class CostTrackingProcessor extends BatchProcessor {
  private totalCost = 0;

  async processBatch<T, R>(
    items: T[],
    processFn: Function,
    config: BatchConfig,
  ) {
    const startCost = this.totalCost;

    const result = await super.processBatch(
      items,
      async (item, index) => {
        const result = await processFn(item, index);

        // Estimate cost (rough)
        const cost = 0.001; // \$0.001 per request
        this.totalCost += cost;

        return result;
      },
      config,
    );

    const batchCost = this.totalCost - startCost;
    console.log(`๐Ÿ’ฐ Batch cost: $${batchCost.toFixed(4)}`);

    return result;
  }
}

Performance Comparison

Approach100 Items1000 ItemsNotes
Sequential200s2000sBaseline
Concurrency: 540s400s5x faster
Concurrency: 1020s200s10x faster
Concurrency: 2015s150sMay hit rate limits

Best Practices

  1. Start conservative: Begin with low concurrency (3-5)
  2. Monitor rate limits: Track 429 errors
  3. Implement retries: Handle transient failures
  4. Track progress: Show completion status
  5. Use cheap models: Batch processing doesn't need GPT-4
  6. Cache results: Save completed work
  7. Handle partial failures: Don't block on errors

See Also