Integration Recipes
FastAPI Service
Add tracing to a FastAPI backend service
FastAPI Service
Add comprehensive tracing to a FastAPI backend service with middleware, dependency injection, and background tasks.
Problem
FastAPI services with AI features need:
- Request-level tracing with correlation IDs
- User authentication context in traces
- Background task tracing
- Clean error handling without breaking responses
Solution
1. Install Dependencies
pip install brokle openai fastapi uvicorn python-jose2. Create Project Structure
app/
├── main.py # FastAPI app
├── dependencies.py # DI providers
├── middleware.py # Tracing middleware
├── routers/
│ └── chat.py # Chat endpoints
├── services/
│ └── ai_service.py # AI service layer
└── config.py # Configuration3. Configuration
# app/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
brokle_api_key: str
openai_api_key: str
environment: str = "development"
class Config:
env_file = ".env"
settings = Settings()4. Brokle Setup with Dependency Injection
# app/dependencies.py
from functools import lru_cache
from brokle import Brokle, wrap_openai
import openai
from .config import settings
@lru_cache
def get_brokle() -> Brokle:
"""Singleton Brokle client."""
return Brokle(
api_key=settings.brokle_api_key,
environment=settings.environment
)
@lru_cache
def get_openai_client():
"""Wrapped OpenAI client."""
brokle = get_brokle()
return wrap_openai(
openai.OpenAI(api_key=settings.openai_api_key),
brokle=brokle
)5. Tracing Middleware
# app/middleware.py
import time
import uuid
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from .dependencies import get_brokle
class TracingMiddleware(BaseHTTPMiddleware):
"""Add tracing to all requests."""
async def dispatch(self, request: Request, call_next) -> Response:
brokle = get_brokle()
# Generate request ID
request_id = str(uuid.uuid4())
request.state.request_id = request_id
# Extract user info from auth header if present
user_id = None
auth_header = request.headers.get("Authorization")
if auth_header:
# Parse JWT or API key to get user ID
user_id = self._extract_user_id(auth_header)
start_time = time.time()
with brokle.start_as_current_span(
name=f"{request.method} {request.url.path}",
metadata={
"http.method": request.method,
"http.url": str(request.url),
"http.path": request.url.path,
"request_id": request_id
}
) as span:
# Set user context if available
if user_id:
span.update_trace(user_id=user_id)
# Store span in request state for child spans
request.state.span = span
request.state.brokle = brokle
try:
response = await call_next(request)
# Record response info
latency_ms = (time.time() - start_time) * 1000
span.update(
metadata={
"http.status_code": response.status_code,
"latency_ms": latency_ms
}
)
if response.status_code >= 400:
span.score(name="http_error", value=1)
return response
except Exception as e:
span.update(
metadata={
"error": True,
"error_type": type(e).__name__,
"error_message": str(e)
}
)
span.score(name="error", value=1, comment=str(e))
raise
def _extract_user_id(self, auth_header: str) -> str | None:
"""Extract user ID from auth header."""
try:
# Example: Bearer token parsing
if auth_header.startswith("Bearer "):
token = auth_header[7:]
# Decode JWT (simplified)
from jose import jwt
payload = jwt.decode(token, options={"verify_signature": False})
return payload.get("sub")
except Exception:
pass
return None6. AI Service Layer
# app/services/ai_service.py
from brokle import Brokle
from openai import OpenAI
from dataclasses import dataclass
from typing import AsyncGenerator
@dataclass
class ChatResponse:
content: str
tokens_used: int
model: str
class AIService:
def __init__(self, brokle: Brokle, openai_client: OpenAI):
self.brokle = brokle
self.openai = openai_client
async def chat(
self,
message: str,
system_prompt: str = "You are a helpful assistant.",
model: str = "gpt-4o"
) -> ChatResponse:
"""Generate a chat response with tracing."""
with self.brokle.start_as_current_generation(
name="chat_completion",
model=model,
input={"message": message, "system": system_prompt}
) as gen:
response = self.openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": message}
]
)
content = response.choices[0].message.content
tokens = response.usage.total_tokens
gen.update(
output=content,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
)
return ChatResponse(
content=content,
tokens_used=tokens,
model=model
)
async def chat_stream(
self,
message: str,
model: str = "gpt-4o"
) -> AsyncGenerator[str, None]:
"""Stream a chat response with tracing."""
with self.brokle.start_as_current_generation(
name="chat_stream",
model=model
) as gen:
full_response = ""
stream = self.openai.chat.completions.create(
model=model,
messages=[{"role": "user", "content": message}],
stream=True,
stream_options={"include_usage": True}
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
if chunk.usage:
gen.update(
usage={
"input_tokens": chunk.usage.prompt_tokens,
"output_tokens": chunk.usage.completion_tokens
}
)
gen.update(output=full_response)7. Chat Router
# app/routers/chat.py
from fastapi import APIRouter, Depends, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from ..dependencies import get_brokle, get_openai_client
from ..services.ai_service import AIService
router = APIRouter(prefix="/chat", tags=["chat"])
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o"
stream: bool = False
class ChatResponse(BaseModel):
content: str
tokens_used: int
model: str
def get_ai_service(
brokle = Depends(get_brokle),
openai_client = Depends(get_openai_client)
) -> AIService:
return AIService(brokle, openai_client)
@router.post("/", response_model=ChatResponse)
async def chat(
request: Request,
body: ChatRequest,
ai_service: AIService = Depends(get_ai_service)
):
"""Send a chat message and get a response."""
# Access parent span from middleware
parent_span = getattr(request.state, "span", None)
if body.stream:
async def generate():
async for chunk in ai_service.chat_stream(body.message, body.model):
yield chunk
return StreamingResponse(
generate(),
media_type="text/plain"
)
try:
result = await ai_service.chat(
message=body.message,
model=body.model
)
return ChatResponse(
content=result.content,
tokens_used=result.tokens_used,
model=result.model
)
except Exception as e:
if parent_span:
parent_span.score(name="error", value=1, comment=str(e))
raise HTTPException(status_code=500, detail="Failed to generate response")
@router.post("/feedback")
async def submit_feedback(
request: Request,
trace_id: str,
score: float,
comment: str = None,
brokle = Depends(get_brokle)
):
"""Submit user feedback for a response."""
# Record feedback
brokle.traces.score(
trace_id=trace_id,
name="user_feedback",
value=score,
comment=comment
)
return {"status": "feedback recorded"}8. Main Application
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from .middleware import TracingMiddleware
from .routers import chat
from .dependencies import get_brokle
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
yield
# Shutdown - flush traces
brokle = get_brokle()
brokle.flush()
brokle.shutdown()
app = FastAPI(
title="AI Chat Service",
lifespan=lifespan
)
# Add tracing middleware
app.add_middleware(TracingMiddleware)
# Include routers
app.include_router(chat.router)
@app.get("/health")
async def health_check():
return {"status": "healthy"}9. Background Task Tracing
# app/tasks.py
from celery import Celery
from .dependencies import get_brokle, get_openai_client
celery_app = Celery("tasks", broker="redis://localhost:6379")
@celery_app.task
def process_document(document_id: str, user_id: str):
"""Background task with tracing."""
brokle = get_brokle()
openai_client = get_openai_client()
with brokle.start_as_current_span(
name="process_document",
metadata={"document_id": document_id, "task": "celery"}
) as span:
span.update_trace(user_id=user_id)
# Fetch document
document = fetch_document(document_id)
# Process with AI
with brokle.start_as_current_generation(
name="summarize_document",
model="gpt-4o"
) as gen:
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Summarize this document."},
{"role": "user", "content": document.content}
]
)
summary = response.choices[0].message.content
gen.update(output=summary)
# Save result
save_summary(document_id, summary)
span.update(output="Document processed successfully")
brokle.flush()Running the Application
# Start the server
uvicorn app.main:app --reload --port 8000
# Test the endpoint
curl -X POST http://localhost:8000/chat/ \
-H "Content-Type: application/json" \
-d '{"message": "Hello, how are you?"}'Environment Variables
# .env
BROKLE_API_KEY=bk_...
OPENAI_API_KEY=sk_...
ENVIRONMENT=developmentThe middleware automatically creates parent spans for all requests, and the AI service creates child spans for each LLM call.
Best Practices
- Use Middleware: Centralize tracing in middleware for consistency
- Dependency Injection: Use FastAPI's DI for Brokle and OpenAI clients
- Service Layer: Keep AI logic in a service layer, not in routes
- Flush on Shutdown: Ensure traces are flushed when the app shuts down
Related
- Python SDK - Full SDK reference
- Production Monitoring - Monitoring setup
- Async Tracing - Async patterns