Tutorials
Production Monitoring
Set up monitoring, alerts, and dashboards for production AI systems
Production Monitoring
Learn how to set up comprehensive monitoring, alerting, and observability for AI applications in production environments.
What You'll Build
A complete production monitoring system including:
- Real-time performance dashboards
- Automated quality monitoring
- Cost and usage alerts
- Error tracking and debugging
- SLO/SLA tracking
┌─────────────────────────────────────────────────────────────────┐
│ Production Monitoring Stack │
├─────────────────────────────────────────────────────────────────┤
│ Real-Time Metrics │
│ ├─ Request Rate: 1,234/min │
│ ├─ P95 Latency: 1.2s │
│ ├─ Error Rate: 0.3% │
│ └─ Quality Score: 94% │
├─────────────────────────────────────────────────────────────────┤
│ Alerts (Last 24h) │
│ ├─ ⚠️ Latency spike (2h ago) - Resolved │
│ ├─ ✅ All quality checks passing │
│ └─ ✅ Cost within budget │
└─────────────────────────────────────────────────────────────────┘Prerequisites
- Brokle account with API key
- Production AI application with tracing enabled
- Python 3.9+ or Node.js 18+
- Slack/Email for alerts (optional)
Setup
Install Dependencies
pip install brokle openainpm install brokle openaiConfigure Environment
export BROKLE_API_KEY=bk_...
export BROKLE_ENVIRONMENT=production
export OPENAI_API_KEY=sk_...Initialize with Production Settings
from brokle import Brokle, wrap_openai
import openai
# Production configuration
brokle = Brokle(
environment="production",
sample_rate=1.0, # Trace 100% in production
flush_interval=5.0, # Flush every 5 seconds
flush_at=50, # Or every 50 events
)
openai_client = wrap_openai(openai.OpenAI(), brokle=brokle)import { Brokle } from 'brokle';
import { wrapOpenAI } from 'brokle-openai';
import OpenAI from 'openai';
const brokle = new Brokle({
environment: 'production',
sampleRate: 1.0,
flushInterval: 5000,
flushAt: 50,
});
const openai = wrapOpenAI(new OpenAI(), { brokle });Implementation
Step 1: Set Up Core Monitoring
Create a monitoring wrapper for all AI operations:
import time
import functools
from typing import Callable, Any
def monitored_operation(
name: str,
feature: str = None,
slo_latency_ms: int = 3000
):
"""Decorator for monitored AI operations."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
with brokle.start_as_current_span(
name=name,
metadata={
"feature": feature,
"environment": "production",
"slo_latency_ms": slo_latency_ms
}
) as span:
try:
result = func(*args, **kwargs)
# Calculate latency
latency_ms = (time.time() - start_time) * 1000
# Record metrics
span.update(
output=result if isinstance(result, str) else str(result)[:500],
metadata={
"latency_ms": latency_ms,
"slo_met": latency_ms <= slo_latency_ms,
"success": True
}
)
# Score SLO compliance
span.score(
name="slo_latency",
value=1 if latency_ms <= slo_latency_ms else 0,
comment=f"Latency: {latency_ms:.0f}ms (SLO: {slo_latency_ms}ms)"
)
return result
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
span.update(
metadata={
"latency_ms": latency_ms,
"success": False,
"error_type": type(e).__name__,
"error_message": str(e)
}
)
span.score(name="error", value=1, comment=str(e))
raise
return wrapper
return decorator
# Usage example
@monitored_operation(name="chat_completion", feature="customer_support", slo_latency_ms=2000)
def handle_customer_query(query: str) -> str:
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful customer support agent."},
{"role": "user", "content": query}
]
)
return response.choices[0].message.contentfunction monitoredOperation(
name,
{ feature = null, sloLatencyMs = 3000 } = {}
) {
return function decorator(target, propertyKey, descriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args) {
const startTime = Date.now();
return brokle.withSpan(
{ name, metadata: { feature, environment: 'production', sloLatencyMs } },
async (span) => {
try {
const result = await originalMethod.apply(this, args);
const latencyMs = Date.now() - startTime;
span.update({
output: typeof result === 'string' ? result : JSON.stringify(result).slice(0, 500),
metadata: {
latencyMs,
sloMet: latencyMs <= sloLatencyMs,
success: true,
},
});
span.score({
name: 'slo_latency',
value: latencyMs <= sloLatencyMs ? 1 : 0,
comment: `Latency: ${latencyMs}ms (SLO: ${sloLatencyMs}ms)`,
});
return result;
} catch (error) {
const latencyMs = Date.now() - startTime;
span.update({
metadata: {
latencyMs,
success: false,
errorType: error.name,
errorMessage: error.message,
},
});
span.score({ name: 'error', value: 1, comment: error.message });
throw error;
}
}
);
};
return descriptor;
};
}Step 2: Configure Alerts
Set up alerts for critical metrics:
def setup_production_alerts():
"""Configure production monitoring alerts."""
# Error rate alert
brokle.alerts.create(
name="High Error Rate",
condition="error_rate_1h > 0.05", # 5% error rate
severity="critical",
channels=["slack", "pagerduty"],
message="Error rate exceeded 5% in the last hour",
runbook_url="https://wiki.example.com/runbooks/ai-errors"
)
# Latency alert
brokle.alerts.create(
name="High P95 Latency",
condition="p95_latency_15m > 5000", # 5 seconds
severity="warning",
channels=["slack"],
message="P95 latency exceeded 5s in the last 15 minutes"
)
# Quality degradation alert
brokle.alerts.create(
name="Quality Degradation",
condition="avg_quality_score_1h < 0.8",
severity="warning",
channels=["slack", "email"],
message="Average quality score dropped below 80%"
)
# Cost spike alert
brokle.alerts.create(
name="Cost Spike",
condition="hourly_cost > (avg_hourly_cost_7d * 2)",
severity="warning",
channels=["slack"],
message="Hourly cost is 2x the 7-day average"
)
# SLO breach alert
brokle.alerts.create(
name="SLO Breach",
condition="slo_compliance_24h < 0.99", # 99% SLO
severity="critical",
channels=["slack", "pagerduty", "email"],
message="SLO compliance dropped below 99%"
)
# Rate limit warning
brokle.alerts.create(
name="Approaching Rate Limit",
condition="request_rate_1m > 900", # Assuming 1000/min limit
severity="warning",
channels=["slack"],
message="Request rate approaching rate limit (900/min)"
)
print("Production alerts configured successfully")
setup_production_alerts()Step 3: Build Monitoring Dashboard
Create a comprehensive production dashboard:
def create_production_dashboard():
"""Create production monitoring dashboard."""
dashboard = brokle.dashboards.create(
name="AI Production Monitoring",
description="Real-time monitoring for production AI systems"
)
# Row 1: Key Metrics
dashboard.add_widget({
"type": "number",
"title": "Request Rate",
"metric": "request_count",
"timeframe": "1m",
"aggregation": "count",
"format": "{value}/min"
})
dashboard.add_widget({
"type": "number",
"title": "P95 Latency",
"metric": "latency_ms",
"timeframe": "15m",
"aggregation": "p95",
"format": "{value}ms",
"thresholds": {"warning": 2000, "critical": 5000}
})
dashboard.add_widget({
"type": "number",
"title": "Error Rate",
"metric": "error_rate",
"timeframe": "1h",
"format": "{value}%",
"thresholds": {"warning": 1, "critical": 5}
})
dashboard.add_widget({
"type": "number",
"title": "Avg Quality Score",
"metric": "quality_score",
"timeframe": "1h",
"aggregation": "avg",
"format": "{value}%",
"thresholds": {"warning": 85, "critical": 75}
})
# Row 2: Trends
dashboard.add_widget({
"type": "line_chart",
"title": "Request Volume",
"metric": "request_count",
"timeframe": "24h",
"group_by": "5m",
"width": "half"
})
dashboard.add_widget({
"type": "line_chart",
"title": "Latency Distribution",
"metrics": ["p50_latency", "p95_latency", "p99_latency"],
"timeframe": "24h",
"group_by": "5m",
"width": "half"
})
# Row 3: Breakdown
dashboard.add_widget({
"type": "pie_chart",
"title": "Requests by Feature",
"metric": "request_count",
"group_by": "metadata.feature",
"timeframe": "24h"
})
dashboard.add_widget({
"type": "bar_chart",
"title": "Error by Type",
"metric": "error_count",
"group_by": "metadata.error_type",
"timeframe": "24h"
})
dashboard.add_widget({
"type": "table",
"title": "Recent Errors",
"query": "errors",
"timeframe": "1h",
"columns": ["timestamp", "feature", "error_type", "message"],
"limit": 10
})
# Row 4: Cost & SLO
dashboard.add_widget({
"type": "line_chart",
"title": "Hourly Cost",
"metric": "cost",
"timeframe": "7d",
"group_by": "1h"
})
dashboard.add_widget({
"type": "gauge",
"title": "SLO Compliance (24h)",
"metric": "slo_compliance",
"timeframe": "24h",
"target": 99.0,
"ranges": [
{"min": 0, "max": 95, "color": "red"},
{"min": 95, "max": 99, "color": "yellow"},
{"min": 99, "max": 100, "color": "green"}
]
})
return dashboard
dashboard = create_production_dashboard()
print(f"Dashboard created: {dashboard.url}")Step 4: Implement Health Checks
Add health check endpoints for your AI service:
from datetime import datetime, timedelta
import asyncio
class AIHealthChecker:
"""Health checker for AI services."""
def __init__(self):
self.checks = {}
async def check_model_availability(self) -> dict:
"""Check if models are responding."""
try:
start = datetime.now()
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "ping"}],
max_tokens=5
)
latency = (datetime.now() - start).total_seconds() * 1000
return {
"status": "healthy",
"latency_ms": latency,
"model": "gpt-4o-mini"
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
async def check_trace_pipeline(self) -> dict:
"""Verify tracing is working."""
try:
with brokle.start_as_current_span(
name="health_check",
metadata={"type": "health_check"}
) as span:
span.update(output="health check successful")
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def check_recent_metrics(self) -> dict:
"""Check recent performance metrics."""
try:
# Get metrics from last 15 minutes
metrics = brokle.analytics.get_metrics(
start_time=datetime.now() - timedelta(minutes=15)
)
issues = []
if metrics.error_rate > 0.05:
issues.append(f"High error rate: {metrics.error_rate:.1%}")
if metrics.p95_latency > 5000:
issues.append(f"High latency: {metrics.p95_latency}ms")
if metrics.request_count == 0:
issues.append("No requests in last 15 minutes")
return {
"status": "unhealthy" if issues else "healthy",
"issues": issues,
"metrics": {
"request_count": metrics.request_count,
"error_rate": metrics.error_rate,
"p95_latency": metrics.p95_latency
}
}
except Exception as e:
return {"status": "unknown", "error": str(e)}
async def full_health_check(self) -> dict:
"""Run all health checks."""
checks = await asyncio.gather(
self.check_model_availability(),
self.check_trace_pipeline(),
self.check_recent_metrics()
)
return {
"timestamp": datetime.now().isoformat(),
"model_availability": checks[0],
"trace_pipeline": checks[1],
"recent_metrics": checks[2],
"overall": "healthy" if all(c.get("status") == "healthy" for c in checks) else "degraded"
}
health_checker = AIHealthChecker()
# FastAPI health endpoint example
from fastapi import FastAPI, Response
app = FastAPI()
@app.get("/health")
async def health_check():
result = await health_checker.full_health_check()
status_code = 200 if result["overall"] == "healthy" else 503
return Response(
content=json.dumps(result),
status_code=status_code,
media_type="application/json"
)
@app.get("/health/live")
async def liveness():
"""Basic liveness probe."""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""Readiness probe checking dependencies."""
model_check = await health_checker.check_model_availability()
if model_check["status"] != "healthy":
return Response(status_code=503)
return {"status": "ready"}Step 5: Set Up Quality Monitoring
Monitor response quality in real-time:
from brokle.evaluation import evaluate
import random
class QualityMonitor:
"""Real-time quality monitoring with sampling."""
def __init__(self, sample_rate: float = 0.1):
self.sample_rate = sample_rate # Evaluate 10% of requests
def should_evaluate(self) -> bool:
"""Determine if this request should be evaluated."""
return random.random() < self.sample_rate
async def evaluate_response(
self,
input: str,
output: str,
trace_id: str
) -> dict:
"""Evaluate response quality."""
# Run multiple evaluators
relevance = evaluate(
evaluator="relevance",
input=input,
output=output
)
helpfulness = evaluate(
evaluator="helpfulness",
input=input,
output=output
)
# Record scores
brokle.traces.score(
trace_id=trace_id,
name="relevance",
value=relevance.score,
comment=relevance.comment
)
brokle.traces.score(
trace_id=trace_id,
name="helpfulness",
value=helpfulness.score,
comment=helpfulness.comment
)
return {
"relevance": relevance.score,
"helpfulness": helpfulness.score,
"avg_quality": (relevance.score + helpfulness.score) / 2
}
quality_monitor = QualityMonitor(sample_rate=0.1)
# Integration with monitored operation
@monitored_operation(name="chat", feature="main", slo_latency_ms=2000)
async def monitored_chat(query: str) -> str:
with brokle.start_as_current_generation(
name="chat_generation",
model="gpt-4o"
) as gen:
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
# Sample-based quality evaluation
if quality_monitor.should_evaluate():
quality = await quality_monitor.evaluate_response(
input=query,
output=result,
trace_id=gen.trace_id
)
gen.set_attribute("quality_evaluated", True)
gen.set_attribute("quality_score", quality["avg_quality"])
return resultStep 6: Create Runbooks
Document troubleshooting procedures:
# Store runbooks alongside alerts
runbooks = {
"high_error_rate": """
## High Error Rate Runbook
### Symptoms
- Error rate > 5% over 1 hour
- Alert: "High Error Rate"
### Investigation Steps
1. Check Brokle dashboard for error breakdown by type
2. Identify most common error types
3. Check if errors are concentrated on specific features/models
### Common Causes & Fixes
- **Rate limiting**: Scale up or implement backoff
- **Model issues**: Check OpenAI status page
- **Input validation**: Review recent input patterns
- **Timeout**: Increase timeout or optimize prompts
### Escalation
If not resolved in 30 minutes, page on-call engineer.
""",
"high_latency": """
## High Latency Runbook
### Symptoms
- P95 latency > 5 seconds
- Alert: "High P95 Latency"
### Investigation Steps
1. Check Brokle traces for slow requests
2. Look for patterns in slow traces
3. Check model provider status
### Common Causes & Fixes
- **Long prompts**: Review context length
- **High output**: Reduce max_tokens
- **Model overload**: Consider fallback model
- **Network issues**: Check connectivity
### Mitigation
Switch traffic to faster model if quality allows.
""",
"quality_degradation": """
## Quality Degradation Runbook
### Symptoms
- Average quality score < 80%
- Alert: "Quality Degradation"
### Investigation Steps
1. Review low-scoring traces in Brokle
2. Identify patterns in failing evaluations
3. Check for prompt changes or model updates
### Common Causes & Fixes
- **Prompt drift**: Review and restore prompts
- **Model change**: Verify model version
- **Input shift**: Analyze input distribution
- **Evaluation issue**: Verify evaluator config
### Recovery
Consider rollback to previous prompt version.
"""
}
# Store runbooks in Brokle
for name, content in runbooks.items():
brokle.runbooks.create(name=name, content=content)Complete Example
from brokle import Brokle, wrap_openai
from brokle.evaluation import evaluate
import openai
import asyncio
from datetime import datetime
# Initialize
brokle = Brokle(environment="production")
openai_client = wrap_openai(openai.OpenAI(), brokle=brokle)
# Set up monitoring
setup_production_alerts()
dashboard = create_production_dashboard()
health_checker = AIHealthChecker()
quality_monitor = QualityMonitor(sample_rate=0.1)
# Main application entry point
@monitored_operation(name="main_chat", feature="customer_support", slo_latency_ms=2000)
async def handle_request(user_id: str, query: str) -> str:
with brokle.start_as_current_span(name="request") as span:
span.update_trace(user_id=user_id)
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
# Quality sampling
if quality_monitor.should_evaluate():
await quality_monitor.evaluate_response(query, result, span.trace_id)
return result
# Run periodic health checks
async def periodic_health_check():
while True:
result = await health_checker.full_health_check()
if result["overall"] != "healthy":
print(f"Health check failed: {result}")
await asyncio.sleep(60) # Check every minute
# Start background health monitoring
asyncio.create_task(periodic_health_check())
print("Production monitoring initialized")
print(f"Dashboard: {dashboard.url}")
brokle.flush()Best Practices
1. Use Structured Logging
import structlog
logger = structlog.get_logger()
@monitored_operation(name="operation", feature="main")
def my_operation(input: str) -> str:
logger.info(
"processing_request",
input_length=len(input),
feature="main"
)
# ... operation logic2. Set Appropriate SLOs
| Metric | Recommended SLO | Critical Threshold |
|---|---|---|
| Availability | 99.9% | 99% |
| P95 Latency | 2 seconds | 5 seconds |
| Error Rate | less than 1% | less than 5% |
| Quality Score | greater than 85% | greater than 75% |
3. Implement Graceful Degradation
async def resilient_completion(prompt: str) -> str:
"""Completion with fallback strategies."""
try:
return await primary_completion(prompt)
except RateLimitError:
return await fallback_completion(prompt)
except TimeoutError:
return await cached_response(prompt)
except Exception:
return "I apologize, but I'm unable to help right now. Please try again."Production monitoring should capture at least 1% of requests for quality evaluation while tracing 100% of requests for performance metrics.
Next Steps
- Cost Optimization - Reduce costs
- Dashboards - Custom visualizations
- Evaluation - Quality measurement
- Self-Hosting - Deploy your own Brokle