Python SDK
Complete reference for the Brokle Python SDK - tracing, integrations, and advanced configuration
Python SDK
The Brokle Python SDK provides comprehensive AI observability for Python applications, with native async support, decorators, and LLM integrations.
Installation
pip install broklepoetry add brokleconda install -c conda-forge brokleRequirements:
- Python 3.8+
- No additional dependencies for core functionality
Quick Start
from brokle import Brokle, wrap_openai
import openai
# Initialize client
client = Brokle(api_key="bk_...")
# Wrap OpenAI for automatic tracing
openai_client = wrap_openai(openai.OpenAI(), brokle=client)
# All LLM calls are now traced
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)
# Ensure traces are sent
client.flush()Client Initialization
Brokle Client
from brokle import Brokle
client = Brokle(
api_key="bk_...", # Required: Your API key
base_url=None, # Optional: Custom API URL
environment="production", # Optional: Environment name
sample_rate=1.0, # Optional: Sampling rate (0.0-1.0)
flush_at=100, # Optional: Batch size before flush
flush_interval=5.0, # Optional: Auto-flush interval (seconds)
debug=False, # Optional: Enable debug logging
compression="gzip", # Optional: Payload compression
)Async Client
For async applications:
from brokle import AsyncBrokle
client = AsyncBrokle(api_key="bk_...")
async def main():
async with client.start_as_current_span(name="async_operation") as span:
result = await async_operation()
span.update(output=result)Environment Variables
The client reads from environment variables if not provided:
export BROKLE_API_KEY=bk_...
export BROKLE_BASE_URL=https://api.brokle.com
export BROKLE_ENVIRONMENT=production
export BROKLE_SAMPLE_RATE=1.0
export BROKLE_DEBUG=false# No arguments needed if env vars are set
client = Brokle()Tracing
Context Manager (Recommended)
with client.start_as_current_span(name="operation") as span:
# Set input
span.update(input={"query": user_query})
# Add attributes
span.set_attribute("user_id", user.id)
span.set_attribute("feature", "search")
# Perform operation
result = perform_operation(user_query)
# Set output
span.update(output=result)Decorator Pattern
from brokle import observe
@observe(name="process_query")
def process_query(query: str) -> dict:
"""Function is automatically traced"""
result = search(query)
return {"query": query, "results": result}
# Each call creates a trace
result = process_query("find documents")Manual Span Management
span = client.start_as_current_span(name="manual_operation")
try:
result = do_work()
span.update(output=result)
except Exception as e:
span.update(error=str(e))
raise
finally:
span.end()Span Types
General Spans
with client.start_as_current_span(
name="operation",
as_type="span" # default
) as span:
span.set_attribute("key", "value")
span.update(output=result)Generation Spans (LLM Calls)
with client.start_as_current_generation(
name="chat_completion",
model="gpt-4",
input={"messages": messages}
) as gen:
response = call_llm(messages)
gen.update(
output=response.content,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
},
metadata={
"temperature": 0.7,
"max_tokens": 500
}
)Retrieval Spans
with client.start_as_current_span(
name="vector_search",
as_type="retrieval"
) as span:
span.set_attribute("index", "documents")
span.set_attribute("top_k", 5)
results = vector_store.search(query, top_k=5)
span.update(output={
"count": len(results),
"scores": [r.score for r in results]
})Tool Spans
with client.start_as_current_span(
name="calculator",
as_type="tool"
) as span:
span.set_attribute("tool_name", "calculator")
span.update(input={"expression": "2 + 2"})
result = evaluate("2 + 2")
span.update(output={"result": result})Nested Spans
Create hierarchical traces:
with client.start_as_current_span(name="pipeline") as parent:
parent.update_trace(user_id="user_123", session_id="session_456")
with client.start_as_current_span(name="step_1") as step1:
result1 = step_one()
step1.update(output=result1)
with client.start_as_current_span(name="step_2") as step2:
result2 = step_two(result1)
step2.update(output=result2)
parent.update(output=result2)Span Methods
set_attribute
Add key-value pairs:
span.set_attribute("key", "value")
span.set_attribute("count", 42)
span.set_attribute("is_premium", True)update
Update span data:
span.update(
input={"query": "..."}, # Input data
output={"result": "..."}, # Output data
metadata={"key": "value"}, # Custom metadata
error="Error message", # Error description
usage={ # Token usage (for generations)
"prompt_tokens": 100,
"completion_tokens": 50
}
)update_trace
Set trace-level context:
span.update_trace(
user_id="user_123",
session_id="session_456"
)end
Manually end a span:
span.end()LLM Integrations
OpenAI
from brokle import Brokle, wrap_openai
import openai
client = Brokle(api_key="bk_...")
openai_client = wrap_openai(openai.OpenAI(), brokle=client)
# Sync
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
# Streaming
stream = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
# Async
async_client = wrap_openai(openai.AsyncOpenAI(), brokle=client)
response = await async_client.chat.completions.create(...)Anthropic
from brokle import Brokle, wrap_anthropic
import anthropic
client = Brokle(api_key="bk_...")
claude = wrap_anthropic(anthropic.Anthropic(), brokle=client)
response = claude.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)Prompts
Fetching Prompts
# Get a prompt by name
prompt = client.prompts.get("customer_support")
# Get a specific version
prompt = client.prompts.get("customer_support", label="production")
prompt = client.prompts.get("customer_support", version=3)Using Prompts
# Convert to OpenAI format
messages = prompt.to_openai_messages({
"customer_name": "John",
"issue": "billing question"
})
response = openai_client.chat.completions.create(
model=prompt.model or "gpt-4",
messages=messages
)Lifecycle Management
Flushing
Force send pending traces:
# Synchronous flush
client.flush()
# With timeout
client.flush(timeout=10.0)Shutdown
Graceful shutdown:
# Flush and close connections
client.shutdown()Integration with Web Frameworks
FastAPI:
from contextlib import asynccontextmanager
from fastapi import FastAPI
client = Brokle(api_key="bk_...")
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
client.shutdown()
app = FastAPI(lifespan=lifespan)Flask:
from flask import Flask
import atexit
client = Brokle(api_key="bk_...")
app = Flask(__name__)
atexit.register(client.shutdown)Django:
# settings.py
from brokle import Brokle
BROKLE_CLIENT = Brokle()
# apps.py
from django.apps import AppConfig
class MyAppConfig(AppConfig):
def ready(self):
import atexit
from django.conf import settings
atexit.register(settings.BROKLE_CLIENT.shutdown)Async Support
Async Client
from brokle import AsyncBrokle
client = AsyncBrokle(api_key="bk_...")
async def process():
async with client.start_as_current_span(name="async_op") as span:
result = await async_operation()
span.update(output=result)Async Decorator
from brokle import observe
@observe(name="async_function")
async def async_function(query: str):
result = await async_search(query)
return resultParallel Operations
import asyncio
async def process_batch(items):
async with client.start_as_current_span(name="batch") as span:
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
span.update(output={"processed": len(successes)})
return successesError Handling
Capturing Errors
with client.start_as_current_span(name="operation") as span:
try:
result = risky_operation()
span.update(output=result)
except ValidationError as e:
span.update(
error=str(e),
metadata={"error_type": "validation"}
)
raise
except Exception as e:
span.update(error=f"Unexpected: {str(e)}")
raiseSDK Errors
The SDK is designed to never break your application:
# SDK errors are logged but don't propagate
client = Brokle(api_key="invalid_key")
# This still works - SDK failures are silent
with client.start_as_current_span(name="operation") as span:
result = do_work() # Your code runs normallyConfiguration Reference
| Option | Type | Default | Description |
|---|---|---|---|
api_key | str | Required | Brokle API key |
base_url | str | https://api.brokle.com | API endpoint |
environment | str | "default" | Environment name |
sample_rate | float | 1.0 | Sampling rate (0.0-1.0) |
flush_at | int | 100 | Batch size before flush |
flush_interval | float | 5.0 | Auto-flush interval (seconds) |
debug | bool | False | Enable debug logging |
compression | str | "gzip" | Payload compression |
Best Practices
1. Initialize Once
# Good - module level
client = Brokle()
def process():
with client.start_as_current_span(...):
...
# Bad - creates new client each call
def process():
client = Brokle() # Don't do this2. Use Context Managers
# Good - automatic cleanup
with client.start_as_current_span(name="op") as span:
...
# Bad - manual cleanup required
span = client.start_span(name="op")
# What if exception occurs before end()?
span.end()3. Add Meaningful Context
with client.start_as_current_span(name="process_order") as span:
span.set_attribute("order_id", order.id)
span.set_attribute("customer_tier", customer.tier)
span.update_trace(user_id=user.id)4. Handle Shutdown
import atexit
atexit.register(client.shutdown)Always call client.flush() or client.shutdown() before your script exits to ensure all traces are sent.
Troubleshooting
Traces Not Appearing
- Check API key:
print(os.getenv("BROKLE_API_KEY")) - Ensure flush: Add
client.flush()before exit - Check debug mode:
client = Brokle(debug=True)
High Memory Usage
Reduce batch size and flush interval:
client = Brokle(
flush_at=50,
flush_interval=2.0
)Slow Performance
Enable sampling for high-throughput:
client = Brokle(sample_rate=0.1) # 10% sampling