Python SDK
Complete reference for the Brokle Python SDK - tracing, integrations, and advanced configuration
Python SDK
The Brokle Python SDK provides comprehensive AI observability for Python applications, with native async support, decorators, and LLM integrations.
Installation
pip install broklepoetry add brokleRequirements:
- Python 3.9+
- No additional dependencies for core functionality
Quick Start
from brokle import Brokle, wrap_openai
import openai
# Initialize client
client = Brokle(api_key="bk_...")
# Wrap OpenAI for automatic tracing
openai_client = wrap_openai(openai.OpenAI())
# All LLM calls are now traced
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)
# Ensure traces are sent
client.flush()Client Initialization
Brokle Client
from brokle import Brokle
client = Brokle(
api_key="bk_...", # Required: Your API key
base_url=None, # Optional: Custom API URL
environment="production", # Optional: Environment name
sample_rate=1.0, # Optional: Sampling rate (0.0-1.0)
flush_at=100, # Optional: Batch size before flush
flush_interval=5.0, # Optional: Auto-flush interval (seconds)
debug=False, # Optional: Enable debug logging
compression="gzip", # Optional: Payload compression
)Async Client
For async applications:
from brokle import AsyncBrokle
client = AsyncBrokle(api_key="bk_...")
async def main():
async with client.start_as_current_span(name="async_operation") as span:
result = await async_operation()
span.update(output=result)Environment Variables
The client reads from environment variables if not provided:
export BROKLE_API_KEY=bk_...
export BROKLE_BASE_URL=https://api.brokle.com
export BROKLE_ENVIRONMENT=production
export BROKLE_SAMPLE_RATE=1.0
export BROKLE_DEBUG=false# No arguments needed if env vars are set
client = Brokle()Tracing
Context Manager (Recommended)
with client.start_as_current_span(name="operation") as span:
# Set input
span.update(input={"query": user_query})
# Add attributes
span.set_attribute("user_id", user.id)
span.set_attribute("feature", "search")
# Perform operation
result = perform_operation(user_query)
# Set output
span.update(output=result)Decorator Pattern
from brokle import observe
@observe(name="process_query")
def process_query(query: str) -> dict:
"""Function is automatically traced"""
result = search(query)
return {"query": query, "results": result}
# Each call creates a trace
result = process_query("find documents")Manual Span Management
span = client.start_as_current_span(name="manual_operation")
try:
result = do_work()
span.update(output=result)
except Exception as e:
span.update(error=str(e))
raise
finally:
span.end()Span Types
General Spans
with client.start_as_current_span(
name="operation",
as_type="span" # default
) as span:
span.set_attribute("key", "value")
span.update(output=result)Generation Spans (LLM Calls)
with client.start_as_current_generation(
name="chat_completion",
model="gpt-4",
input={"messages": messages}
) as gen:
response = call_llm(messages)
gen.update(
output=response.content,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
},
metadata={
"temperature": 0.7,
"max_tokens": 500
}
)Retrieval Spans
with client.start_as_current_span(
name="vector_search",
as_type="retrieval"
) as span:
span.set_attribute("index", "documents")
span.set_attribute("top_k", 5)
results = vector_store.search(query, top_k=5)
span.update(output={
"count": len(results),
"scores": [r.score for r in results]
})Tool Spans
with client.start_as_current_span(
name="calculator",
as_type="tool"
) as span:
span.set_attribute("tool_name", "calculator")
span.update(input={"expression": "2 + 2"})
result = evaluate("2 + 2")
span.update(output={"result": result})Nested Spans
Create hierarchical traces:
with client.start_as_current_span(name="pipeline") as parent:
parent.update_trace(user_id="user_123", session_id="session_456")
with client.start_as_current_span(name="step_1") as step1:
result1 = step_one()
step1.update(output=result1)
with client.start_as_current_span(name="step_2") as step2:
result2 = step_two(result1)
step2.update(output=result2)
parent.update(output=result2)Span Methods
set_attribute
Add key-value pairs:
span.set_attribute("key", "value")
span.set_attribute("count", 42)
span.set_attribute("is_premium", True)update
Update span data:
span.update(
input={"query": "..."}, # Input data
output={"result": "..."}, # Output data
metadata={"key": "value"}, # Custom metadata
error="Error message", # Error description
usage={ # Token usage (for generations)
"prompt_tokens": 100,
"completion_tokens": 50
}
)update_trace
Set trace-level context:
span.update_trace(
user_id="user_123",
session_id="session_456"
)end
Manually end a span:
span.end()LLM Integrations
OpenAI
from brokle import Brokle, wrap_openai
import openai
client = Brokle(api_key="bk_...")
openai_client = wrap_openai(openai.OpenAI())
# Sync
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
# Streaming
stream = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
# Async
async_client = wrap_openai(openai.AsyncOpenAI())
response = await async_client.chat.completions.create(...)Anthropic
from brokle import Brokle, wrap_anthropic
import anthropic
client = Brokle(api_key="bk_...")
claude = wrap_anthropic(anthropic.Anthropic())
response = claude.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)Prompts
Fetching Prompts
# Get a prompt by name
prompt = client.prompts.get("customer_support")
# Get a specific version
prompt = client.prompts.get("customer_support", label="production")
prompt = client.prompts.get("customer_support", version=3)Using Prompts
# Convert to OpenAI format
messages = prompt.to_openai_messages({
"customer_name": "John",
"issue": "billing question"
})
response = openai_client.chat.completions.create(
model=prompt.model or "gpt-4",
messages=messages
)Lifecycle Management
Flushing
Force send pending traces:
# Synchronous flush
client.flush()
# With timeout
client.flush(timeout=10.0)Shutdown
Graceful shutdown:
# Flush and close connections
client.shutdown()Integration with Web Frameworks
FastAPI:
from contextlib import asynccontextmanager
from fastapi import FastAPI
client = Brokle(api_key="bk_...")
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
client.shutdown()
app = FastAPI(lifespan=lifespan)Flask:
from flask import Flask
import atexit
client = Brokle(api_key="bk_...")
app = Flask(__name__)
atexit.register(client.shutdown)Django:
# settings.py
from brokle import Brokle
BROKLE_CLIENT = Brokle()
# apps.py
from django.apps import AppConfig
class MyAppConfig(AppConfig):
def ready(self):
import atexit
from django.conf import settings
atexit.register(settings.BROKLE_CLIENT.shutdown)Async Support
Async Client
from brokle import AsyncBrokle
client = AsyncBrokle(api_key="bk_...")
async def process():
async with client.start_as_current_span(name="async_op") as span:
result = await async_operation()
span.update(output=result)Async Decorator
from brokle import observe
@observe(name="async_function")
async def async_function(query: str):
result = await async_search(query)
return resultParallel Operations
import asyncio
async def process_batch(items):
async with client.start_as_current_span(name="batch") as span:
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
span.update(output={"processed": len(successes)})
return successesError Handling
Capturing Errors
with client.start_as_current_span(name="operation") as span:
try:
result = risky_operation()
span.update(output=result)
except ValidationError as e:
span.update(
error=str(e),
metadata={"error_type": "validation"}
)
raise
except Exception as e:
span.update(error=f"Unexpected: {str(e)}")
raiseSDK Errors
The SDK is designed to never break your application:
# SDK errors are logged but don't propagate
client = Brokle(api_key="invalid_key")
# This still works - SDK failures are silent
with client.start_as_current_span(name="operation") as span:
result = do_work() # Your code runs normallyConfiguration Reference
| Option | Type | Default | Description |
|---|---|---|---|
| Required | |||
api_key | str | Required | Brokle API key (must start with bk_) |
| Connection | |||
base_url | str | "https://api.brokle.com" | API endpoint |
timeout | int | 30 | HTTP request timeout in seconds |
| Control | |||
enabled | bool | True | Master switch to disable SDK completely |
tracing_enabled | bool | True | Enable/disable tracing |
metrics_enabled | bool | True | Enable/disable metrics collection |
logs_enabled | bool | False | Enable/disable OTLP log export |
| Project | |||
environment | str | "default" | Environment name (e.g., production, staging) |
release | str | None | Release identifier for deployment tracking |
sample_rate | float | 1.0 | Sampling rate (0.0-1.0) |
debug | bool | False | Enable debug logging |
| Privacy | |||
mask | callable | None | Function to mask sensitive data before transmission |
| Batching | |||
flush_at | int | 100 | Maximum batch size before flush (1-1000) |
flush_interval | float | 5.0 | Auto-flush interval in seconds (0.1-60.0) |
| Transport | |||
compression | str | "gzip" | Compression algorithm (gzip, deflate, or None) |
transport | str | "http" | Transport protocol: http or grpc (guide) |
grpc_endpoint | str | None | Explicit gRPC endpoint (guide) |
use_protobuf | bool | True | Use Protobuf format for OTLP export |
Best Practices
1. Initialize Once
# Good - module level
client = Brokle()
def process():
with client.start_as_current_span(...):
...
# Bad - creates new client each call
def process():
client = Brokle() # Don't do this2. Use Context Managers
# Good - automatic cleanup
with client.start_as_current_span(name="op") as span:
...
# Bad - manual cleanup required
span = client.start_span(name="op")
# What if exception occurs before end()?
span.end()3. Add Meaningful Context
with client.start_as_current_span(name="process_order") as span:
span.set_attribute("order_id", order.id)
span.set_attribute("customer_tier", customer.tier)
span.update_trace(user_id=user.id)4. Handle Shutdown
import atexit
atexit.register(client.shutdown)Always call client.flush() or client.shutdown() before your script exits to ensure all traces are sent.
Troubleshooting
Traces Not Appearing
- Check API key:
print(os.getenv("BROKLE_API_KEY")) - Ensure flush: Add
client.flush()before exit - Check debug mode:
client = Brokle(debug=True)
High Memory Usage
Reduce batch size and flush interval:
client = Brokle(
flush_at=50,
flush_interval=2.0
)Slow Performance
Enable sampling for high-throughput:
client = Brokle(sample_rate=0.1) # 10% sampling