Tracing Recipes
Batch Processing
Trace batch LLM operations efficiently
Batch Processing
Trace batch LLM operations efficiently with proper aggregation and error handling.
Problem
When processing large batches of items:
- You need to track overall batch progress and metrics
- Individual item failures shouldn't stop the batch
- Token usage and costs should be aggregated
- You want to identify slow or failing items
Solution
from brokle import Brokle, wrap_openai
import openai
from dataclasses import dataclass
from typing import Optional
brokle = Brokle()
openai_client = wrap_openai(openai.OpenAI(), brokle=brokle)
@dataclass
class BatchResult:
item: str
output: Optional[str]
success: bool
error: Optional[str] = None
tokens: int = 0
def process_item(item: str, index: int) -> BatchResult:
"""Process single item with error handling."""
with brokle.start_as_current_generation(
name="batch_item",
model="gpt-4o-mini",
metadata={"index": index, "item_preview": item[:50]}
) as gen:
try:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Classify: {item}"}],
max_tokens=100
)
output = response.choices[0].message.content
tokens = response.usage.total_tokens
gen.update(
output=output,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
)
return BatchResult(
item=item,
output=output,
success=True,
tokens=tokens
)
except Exception as e:
gen.update(
output=str(e),
metadata={"error": True, "error_type": type(e).__name__}
)
gen.score(name="error", value=1, comment=str(e))
return BatchResult(
item=item,
output=None,
success=False,
error=str(e)
)
def process_batch(
items: list[str],
batch_name: str = "batch_job"
) -> dict:
"""Process batch with aggregated metrics."""
with brokle.start_as_current_span(
name=batch_name,
metadata={
"batch_size": len(items),
"operation": "classification"
}
) as span:
results = []
total_tokens = 0
success_count = 0
for i, item in enumerate(items):
result = process_item(item, i)
results.append(result)
if result.success:
success_count += 1
total_tokens += result.tokens
# Update progress periodically
if (i + 1) % 10 == 0:
span.set_attribute("progress", f"{i + 1}/{len(items)}")
# Final metrics
success_rate = success_count / len(items)
span.update(
output=f"Processed {len(items)} items",
metadata={
"success_count": success_count,
"failure_count": len(items) - success_count,
"success_rate": success_rate,
"total_tokens": total_tokens,
"avg_tokens_per_item": total_tokens / max(success_count, 1)
}
)
# Score batch quality
span.score(
name="batch_success_rate",
value=success_rate,
comment=f"{success_count}/{len(items)} successful"
)
return {
"results": results,
"metrics": {
"total": len(items),
"success": success_count,
"failed": len(items) - success_count,
"success_rate": success_rate,
"total_tokens": total_tokens
}
}
# Usage
items = [f"Product description {i}" for i in range(100)]
batch_output = process_batch(items, batch_name="product_classification")
print(f"Success rate: {batch_output['metrics']['success_rate']:.1%}")
brokle.flush()import { Brokle } from 'brokle';
import { wrapOpenAI } from 'brokle-openai';
import OpenAI from 'openai';
const brokle = new Brokle();
const openai = wrapOpenAI(new OpenAI(), { brokle });
async function processItem(item, index) {
return brokle.withGeneration(
{
name: 'batch_item',
model: 'gpt-4o-mini',
metadata: { index, itemPreview: item.slice(0, 50) },
},
async (gen) => {
try {
const response = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: `Classify: ${item}` }],
max_tokens: 100,
});
const output = response.choices[0].message.content;
const tokens = response.usage.total_tokens;
gen.update({
output,
usage: {
inputTokens: response.usage.prompt_tokens,
outputTokens: response.usage.completion_tokens,
},
});
return { item, output, success: true, tokens };
} catch (error) {
gen.update({
output: error.message,
metadata: { error: true, errorType: error.name },
});
gen.score({ name: 'error', value: 1, comment: error.message });
return { item, output: null, success: false, error: error.message };
}
}
);
}
async function processBatch(items, batchName = 'batch_job') {
return brokle.withSpan(
{ name: batchName, metadata: { batchSize: items.length } },
async (span) => {
const results = [];
let totalTokens = 0;
let successCount = 0;
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i], i);
results.push(result);
if (result.success) {
successCount++;
totalTokens += result.tokens;
}
}
const successRate = successCount / items.length;
span.update({
output: `Processed ${items.length} items`,
metadata: {
successCount,
failureCount: items.length - successCount,
successRate,
totalTokens,
},
});
span.score({
name: 'batch_success_rate',
value: successRate,
comment: `${successCount}/${items.length} successful`,
});
return { results, metrics: { total: items.length, success: successCount } };
}
);
}How It Works
- Parent Span: Wraps entire batch for aggregated metrics
- Child Generations: Each item gets its own generation span
- Error Isolation: Individual failures don't crash the batch
- Progress Tracking: Periodic updates show batch progress
- Aggregated Scores: Batch-level success rate scoring
Trace Structure
product_classification (span) 45.2s
├── batch_item (generation) [index=0] 450ms
├── batch_item (generation) [index=1] 380ms
├── batch_item (generation) [index=2] [error] 120ms
├── batch_item (generation) [index=3] 410ms
└── ... (96 more items)Variations
Chunked Processing
def process_in_chunks(
items: list[str],
chunk_size: int = 20
) -> dict:
"""Process in chunks to manage memory and provide checkpoints."""
with brokle.start_as_current_span(
name="chunked_batch",
metadata={"total_items": len(items), "chunk_size": chunk_size}
) as span:
all_results = []
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
for chunk_idx, chunk in enumerate(chunks):
with brokle.start_as_current_span(
name=f"chunk_{chunk_idx}",
metadata={"chunk_index": chunk_idx, "chunk_size": len(chunk)}
) as chunk_span:
chunk_results = process_batch(chunk)
all_results.extend(chunk_results["results"])
# Checkpoint after each chunk
chunk_span.update(
metadata={"completed": True, "items_processed": len(chunk)}
)
return {"results": all_results, "chunks_processed": len(chunks)}With Retry Queue
def process_with_retry_queue(items: list[str], max_retries: int = 2) -> dict:
"""Process batch with automatic retry for failures."""
with brokle.start_as_current_span(name="batch_with_retries") as span:
results = []
retry_queue = list(enumerate(items))
attempt = 0
while retry_queue and attempt <= max_retries:
current_batch = retry_queue
retry_queue = []
for index, item in current_batch:
result = process_item(item, index)
if result.success:
results.append(result)
elif attempt < max_retries:
retry_queue.append((index, item))
else:
results.append(result) # Keep failed result
attempt += 1
if retry_queue:
span.set_attribute(f"retry_attempt_{attempt}", len(retry_queue))
return {"results": results, "retries": attempt}Parallel Batch Processing
import asyncio
from concurrent.futures import ThreadPoolExecutor
def process_batch_parallel(
items: list[str],
max_workers: int = 10
) -> dict:
"""Process batch items in parallel using thread pool."""
with brokle.start_as_current_span(
name="parallel_batch",
metadata={"items": len(items), "workers": max_workers}
) as span:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_item, item, i)
for i, item in enumerate(items)
]
results = [f.result() for f in futures]
success_count = sum(1 for r in results if r.success)
span.update(metadata={"success_rate": success_count / len(results)})
return {"results": results}For very large batches (1000+ items), consider using streaming inserts or processing in background jobs.
Related
- Async Tracing - Concurrent operations
- Cost Optimization - Reduce batch costs
- Production Monitoring - Monitor batch jobs