Tracing Recipes
Async Tracing
Trace asynchronous and concurrent AI operations
Async Tracing
Trace asynchronous and concurrent AI operations while maintaining proper span hierarchy.
Problem
When running multiple LLM calls concurrently, you need to:
- Maintain proper parent-child relationships between spans
- Handle async context correctly
- Aggregate results from parallel operations
Solution
import asyncio
from brokle import AsyncBrokle, wrap_openai
import openai
# Use AsyncBrokle for async operations
brokle = AsyncBrokle()
openai_client = wrap_openai(openai.AsyncOpenAI(), brokle=brokle)
async def process_single_item(item: str) -> str:
"""Process a single item with its own span."""
async with brokle.start_as_current_generation(
name="process_item",
model="gpt-4o-mini",
metadata={"item": item}
) as gen:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Summarize: {item}"}]
)
result = response.choices[0].message.content
gen.update(output=result)
return result
async def process_batch(items: list[str]) -> list[str]:
"""Process multiple items concurrently with proper tracing."""
async with brokle.start_as_current_span(
name="batch_process",
metadata={"batch_size": len(items)}
) as span:
# Run all items concurrently
# Each task inherits the current span as parent
tasks = [process_single_item(item) for item in items]
results = await asyncio.gather(*tasks)
span.update(
output=f"Processed {len(results)} items",
metadata={"success_count": len(results)}
)
return results
# Usage
async def main():
items = ["Article 1 content...", "Article 2 content...", "Article 3 content..."]
results = await process_batch(items)
await brokle.flush()
return results
asyncio.run(main())import { Brokle } from 'brokle';
import { wrapOpenAI } from 'brokle-openai';
import OpenAI from 'openai';
const brokle = new Brokle();
const openai = wrapOpenAI(new OpenAI(), { brokle });
async function processSingleItem(item) {
return brokle.withGeneration(
{ name: 'process_item', model: 'gpt-4o-mini', metadata: { item } },
async (gen) => {
const response = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: `Summarize: ${item}` }],
});
const result = response.choices[0].message.content;
gen.update({ output: result });
return result;
}
);
}
async function processBatch(items) {
return brokle.withSpan(
{ name: 'batch_process', metadata: { batchSize: items.length } },
async (span) => {
// Run all items concurrently
const results = await Promise.all(items.map(processSingleItem));
span.update({
output: `Processed ${results.length} items`,
metadata: { successCount: results.length },
});
return results;
}
);
}
// Usage
async function main() {
const items = ['Article 1...', 'Article 2...', 'Article 3...'];
const results = await processBatch(items);
await brokle.shutdown();
return results;
}
main();How It Works
- Context Propagation: The
start_as_current_spancontext manager sets the current span, which child spans inherit - Concurrent Execution:
asyncio.gather()(Python) orPromise.all()(JS) runs tasks concurrently - Span Hierarchy: Each
process_single_itemcall creates a child span under the parentbatch_processspan - Automatic Timing: Span durations are automatically calculated
Trace Structure
batch_process (span) 2.1s
├── process_item (generation) 800ms
├── process_item (generation) 750ms
└── process_item (generation) 820msVariations
With Error Handling
async def process_with_retry(item: str, max_retries: int = 3) -> str:
"""Process with retry and error tracking."""
async with brokle.start_as_current_span(
name="process_with_retry",
metadata={"item": item, "max_retries": max_retries}
) as span:
last_error = None
for attempt in range(max_retries):
try:
result = await process_single_item(item)
span.update(metadata={"attempts": attempt + 1, "success": True})
return result
except Exception as e:
last_error = e
span.set_attribute(f"error_attempt_{attempt}", str(e))
await asyncio.sleep(2 ** attempt) # Exponential backoff
span.update(metadata={"success": False, "error": str(last_error)})
span.score(name="error", value=1, comment=str(last_error))
raise last_errorWith Concurrency Limit
import asyncio
async def process_with_semaphore(items: list[str], max_concurrent: int = 5) -> list[str]:
"""Process with limited concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_process(item: str) -> str:
async with semaphore:
return await process_single_item(item)
async with brokle.start_as_current_span(
name="batch_limited",
metadata={"batch_size": len(items), "max_concurrent": max_concurrent}
) as span:
results = await asyncio.gather(*[limited_process(item) for item in items])
return resultsWith Progress Tracking
async def process_with_progress(items: list[str]) -> list[str]:
"""Process with progress tracking."""
async with brokle.start_as_current_span(
name="batch_with_progress",
metadata={"total": len(items)}
) as span:
results = []
completed = 0
for item in items:
result = await process_single_item(item)
results.append(result)
completed += 1
# Update progress
span.set_attribute("progress", f"{completed}/{len(items)}")
span.set_attribute("percent_complete", completed / len(items) * 100)
return resultsFor very high concurrency, consider using start_span() instead of start_as_current_span() to avoid context overhead.
Related
- Batch Processing - Efficient batch tracing
- Manual Tracing - Creating spans manually
- Python SDK - Full async API reference