Tracing
Working with Spans
Complete guide to creating, managing, and instrumenting spans in your AI applications
Working with Spans
Spans are the building blocks of traces. This guide covers everything you need to know about creating and managing spans in your application.
For conceptual understanding of spans, see Core Concepts: Spans. This guide focuses on implementation.
Creating Spans
Basic Span Creation
from brokle import Brokle
client = Brokle(api_key="bk_...")
# Using context manager (recommended)
with client.start_as_current_span(name="process_document") as span:
result = process(document)
span.update(output=result)
# Using decorator
from brokle import observe
@observe(name="analyze_text")
def analyze_text(text: str) -> dict:
return {"sentiment": "positive", "confidence": 0.95}import { Brokle } from 'brokle';
const client = new Brokle({ apiKey: 'bk_...' });
// Start and end span explicitly
const span = client.startSpan({ name: 'process_document' });
try {
const result = await process(document);
span.end({ output: result });
} catch (error) {
span.end({ error });
throw error;
}Span Types
Brokle supports specialized span types for different operations:
| Type | Constructor | Use Case |
|---|---|---|
span | start_as_current_span() | General operations |
generation | start_as_current_generation() | LLM calls |
retrieval | start_as_current_span(as_type="retrieval") | Vector/document retrieval |
tool | start_as_current_span(as_type="tool") | Tool/function execution |
agent | start_as_current_span(as_type="agent") | Agent operations |
event | create_event() | Discrete events |
Generation Spans
For LLM calls, use generation spans to capture model-specific data:
with client.start_as_current_generation(
name="summarize",
model="gpt-4",
input={"text": document_text}
) as gen:
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Summarize the following text."},
{"role": "user", "content": document_text}
]
)
gen.update(
output=response.choices[0].message.content,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
},
metadata={
"temperature": 0.7,
"max_tokens": 500
}
)const gen = client.startGeneration({
name: 'summarize',
model: 'gpt-4',
input: { text: documentText }
});
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: 'Summarize the following text.' },
{ role: 'user', content: documentText }
]
});
gen.end({
output: response.choices[0].message.content,
usage: {
promptTokens: response.usage.prompt_tokens,
completionTokens: response.usage.completion_tokens
},
attributes: {
temperature: 0.7,
maxTokens: 500
}
});Nested Spans
Create hierarchical traces with nested spans:
with client.start_as_current_span(name="rag_pipeline") as parent:
# Child span 1: Query embedding
with client.start_as_current_span(name="embed_query") as embed_span:
embedding = embed(query)
embed_span.update(
output={"dimensions": len(embedding)},
metadata={"model": "text-embedding-3-small"}
)
# Child span 2: Vector search
with client.start_as_current_span(
name="vector_search",
as_type="retrieval"
) as search_span:
search_span.set_attribute("index", "products")
search_span.set_attribute("top_k", 5)
results = vector_store.search(embedding, top_k=5)
search_span.update(output={
"count": len(results),
"scores": [r.score for r in results]
})
# Child span 3: LLM generation
with client.start_as_current_generation(
name="generate_answer",
model="gpt-4"
) as gen_span:
answer = generate(query, results)
gen_span.update(output=answer)
parent.update(output=answer)const parent = client.startSpan({ name: 'rag_pipeline' });
// Child span 1: Query embedding
const embedSpan = client.startSpan({
name: 'embed_query',
parentSpanId: parent.spanId
});
const embedding = await embed(query);
embedSpan.end({
output: { dimensions: embedding.length },
attributes: { model: 'text-embedding-3-small' }
});
// Child span 2: Vector search
const searchSpan = client.startSpan({
name: 'vector_search',
type: 'retrieval',
parentSpanId: parent.spanId,
attributes: { index: 'products', topK: 5 }
});
const results = await vectorStore.search(embedding, 5);
searchSpan.end({
output: {
count: results.length,
scores: results.map(r => r.score)
}
});
// Child span 3: LLM generation
const genSpan = client.startGeneration({
name: 'generate_answer',
model: 'gpt-4',
parentSpanId: parent.spanId
});
const answer = await generate(query, results);
genSpan.end({ output: answer });
parent.end({ output: answer });Resulting Trace Structure
Trace: rag_pipeline (1,850ms)
├── embed_query (45ms)
│ ├── output: {dimensions: 1536}
│ └── model: text-embedding-3-small
├── vector_search (120ms) [retrieval]
│ ├── index: products
│ ├── top_k: 5
│ └── output: {count: 5, scores: [...]}
└── generate_answer (1,685ms) [generation]
├── model: gpt-4
├── tokens: 1,523
├── cost: $0.0456
└── output: "Based on the search results..."Span Attributes
Setting Attributes
Add custom key-value pairs to spans:
with client.start_as_current_span(name="process_order") as span:
# Set individual attributes
span.set_attribute("order_id", order.id)
span.set_attribute("customer_id", customer.id)
span.set_attribute("item_count", len(order.items))
span.set_attribute("total_value", order.total)
span.set_attribute("priority", "high")
# Or via metadata in update()
span.update(
metadata={
"payment_method": order.payment_method,
"shipping_tier": order.shipping_tier
}
)
result = process_order(order)
span.update(output=result)const span = client.startSpan({
name: 'process_order',
attributes: {
orderId: order.id,
customerId: customer.id,
itemCount: order.items.length,
totalValue: order.total,
priority: 'high'
}
});
// Add more attributes later
span.setAttributes({
paymentMethod: order.paymentMethod,
shippingTier: order.shippingTier
});
const result = await processOrder(order);
span.end({ output: result });Common Attribute Patterns
| Category | Attributes | Purpose |
|---|---|---|
| Identity | user_id, session_id, request_id | Link to users/sessions |
| Business | customer_tier, feature, product | Business segmentation |
| Technical | model, cache_hit, retry_count | Technical debugging |
| Quality | confidence, relevance_score | Quality metrics |
Updating Spans
Update During Execution
with client.start_as_current_span(name="multi_step_process") as span:
# Update as you progress
span.set_attribute("step", "preprocessing")
preprocessed = preprocess(data)
span.set_attribute("step", "analysis")
analysis = analyze(preprocessed)
span.set_attribute("step", "postprocessing")
result = postprocess(analysis)
# Final update with output
span.update(
output=result,
metadata={
"preprocessing_time_ms": 100,
"analysis_time_ms": 500,
"postprocessing_time_ms": 50
}
)const span = client.startSpan({ name: 'multi_step_process' });
span.setAttributes({ step: 'preprocessing' });
const preprocessed = await preprocess(data);
span.setAttributes({ step: 'analysis' });
const analysis = await analyze(preprocessed);
span.setAttributes({ step: 'postprocessing' });
const result = await postprocess(analysis);
span.end({
output: result,
attributes: {
preprocessingTimeMs: 100,
analysisTimeMs: 500,
postprocessingTimeMs: 50
}
});Updating Trace Context
Associate spans with traces, sessions, and users:
with client.start_as_current_span(name="user_interaction") as span:
# Link to user and session
span.update_trace(
user_id="user_123",
session_id="session_456"
)
# Set trace-level tags
span.set_attribute("environment", "production")
span.set_attribute("version", "2.1.0")
response = handle_interaction(request)
span.update(output=response)const span = client.startSpan({
name: 'user_interaction',
attributes: {
userId: 'user_123',
sessionId: 'session_456',
environment: 'production',
version: '2.1.0'
}
});
const response = await handleInteraction(request);
span.end({ output: response });Error Handling
Capturing Errors
with client.start_as_current_span(name="risky_operation") as span:
try:
result = perform_operation()
span.update(output=result)
except ValidationError as e:
span.update(
error=str(e),
metadata={"error_type": "validation", "field": e.field}
)
raise
except ExternalAPIError as e:
span.update(
error=str(e),
metadata={
"error_type": "external_api",
"status_code": e.status_code,
"retryable": e.retryable
}
)
raise
except Exception as e:
span.update(error=f"Unexpected error: {str(e)}")
raiseconst span = client.startSpan({ name: 'risky_operation' });
try {
const result = await performOperation();
span.end({ output: result });
} catch (error) {
if (error instanceof ValidationError) {
span.end({
error: error.message,
attributes: { errorType: 'validation', field: error.field }
});
} else if (error instanceof ExternalAPIError) {
span.end({
error: error.message,
attributes: {
errorType: 'external_api',
statusCode: error.statusCode,
retryable: error.retryable
}
});
} else {
span.end({ error: `Unexpected error: ${error.message}` });
}
throw error;
}Partial Success
Handle cases where some operations succeed:
with client.start_as_current_span(name="batch_process") as span:
results = []
errors = []
for item in items:
try:
result = process_item(item)
results.append(result)
except Exception as e:
errors.append({"item_id": item.id, "error": str(e)})
span.update(
output={"processed": len(results), "failed": len(errors)},
metadata={"errors": errors} if errors else {}
)
# Mark as error only if all failed
if len(errors) == len(items):
span.update(error="All items failed to process")Streaming Support
Track streaming LLM responses:
with client.start_as_current_generation(
name="streaming_response",
model="gpt-4"
) as gen:
stream = openai.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
collected_content = []
first_token_time = None
for chunk in stream:
if chunk.choices[0].delta.content:
if first_token_time is None:
first_token_time = time.time()
collected_content.append(chunk.choices[0].delta.content)
yield chunk.choices[0].delta.content
full_response = "".join(collected_content)
gen.update(
output=full_response,
metadata={
"streaming": True,
"time_to_first_token_ms": first_token_time * 1000
}
)const gen = client.startGeneration({
name: 'streaming_response',
model: 'gpt-4'
});
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true
});
const chunks = [];
let firstTokenTime = null;
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
if (firstTokenTime === null) {
firstTokenTime = Date.now();
}
chunks.push(chunk.choices[0].delta.content);
yield chunk.choices[0].delta.content;
}
}
const fullResponse = chunks.join('');
gen.end({
output: fullResponse,
attributes: {
streaming: true,
timeToFirstTokenMs: firstTokenTime
}
});Async Operations
Python Async
import asyncio
from brokle import Brokle, AsyncBrokle
# For async code, use AsyncBrokle
client = AsyncBrokle(api_key="bk_...")
async def process_requests(requests):
async with client.start_as_current_span(name="parallel_processing") as span:
# Process requests in parallel
tasks = [process_single(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
span.update(
output={"success_count": len(successes), "failure_count": len(failures)},
metadata={"parallel_tasks": len(requests)}
)
return successesJavaScript Async
const client = new Brokle({ apiKey: 'bk_...' });
async function processRequests(requests) {
const span = client.startSpan({ name: 'parallel_processing' });
try {
// Process requests in parallel
const results = await Promise.allSettled(
requests.map(req => processSingle(req))
);
const successes = results.filter(r => r.status === 'fulfilled');
const failures = results.filter(r => r.status === 'rejected');
span.end({
output: {
successCount: successes.length,
failureCount: failures.length
},
attributes: { parallelTasks: requests.length }
});
return successes.map(r => r.value);
} catch (error) {
span.end({ error: error.message });
throw error;
}
}Best Practices
1. Meaningful Names
# Good: Descriptive, action-oriented names
with client.start_as_current_span(name="search_product_catalog"):
...
with client.start_as_current_span(name="validate_user_input"):
...
# Bad: Vague or generic names
with client.start_as_current_span(name="process"):
...
with client.start_as_current_span(name="step1"):
...2. Appropriate Granularity
# Good: Separate spans for distinct operations
with client.start_as_current_span(name="order_processing") as parent:
with client.start_as_current_span(name="validate_order"):
validate(order)
with client.start_as_current_span(name="calculate_totals"):
totals = calculate(order)
with client.start_as_current_span(name="apply_discounts"):
final = apply_discounts(totals, customer)
# Bad: Everything in one span
with client.start_as_current_span(name="do_everything"):
validate(order)
totals = calculate(order)
final = apply_discounts(totals, customer)3. Always Close Spans
# Good: Context manager ensures closure
with client.start_as_current_span(name="operation") as span:
result = do_work()
span.update(output=result)
# If not using context manager, use try/finally
span = client.start_span(name="operation")
try:
result = do_work()
span.update(output=result)
finally:
span.end()4. Capture Relevant Context
with client.start_as_current_span(name="api_request") as span:
# Add context that helps with debugging
span.set_attribute("endpoint", request.path)
span.set_attribute("method", request.method)
span.set_attribute("user_agent", request.headers.get("User-Agent"))
span.set_attribute("request_size", len(request.body))
response = handle_request(request)
span.set_attribute("response_status", response.status_code)
span.set_attribute("response_size", len(response.body))
span.update(output={"status": response.status_code})Related Documentation
- Core Concepts: Spans - Conceptual overview
- Automatic Tracing - Integration-based tracing
- Manual Tracing - Manual instrumentation patterns
- SDK Reference: Python - Full Python API
- SDK Reference: JavaScript - Full JavaScript API