Cost Optimization
Reduce LLM API costs by 30-50% using token analysis, model routing, prompt optimization, and caching strategies
Cost Optimization
Learn how to analyze, optimize, and reduce your AI spending while maintaining response quality using Brokle's cost tracking and analytics.
What You'll Build
A comprehensive cost optimization system that:
- Identifies high-cost operations
- Implements smart model routing
- Caches expensive responses
- Monitors cost-quality tradeoffs
- Reduces costs by 40-60%
┌─────────────────────────────────────────────────────────────────┐
│ Cost Optimization Results │
├─────────────────────────────────────────────────────────────────┤
│ Before Optimization │
│ ├─ Monthly Cost: $5,200 │
│ ├─ Avg Cost/Request: $0.052 │
│ └─ Model Mix: 100% GPT-4o │
├─────────────────────────────────────────────────────────────────┤
│ After Optimization │
│ ├─ Monthly Cost: $2,340 (-55%) │
│ ├─ Avg Cost/Request: $0.023 │
│ ├─ Model Mix: 30% GPT-4o, 70% GPT-4o-mini │
│ └─ Quality Score: 94% (maintained) │
└─────────────────────────────────────────────────────────────────┘Prerequisites
- Brokle account with historical trace data
- Python 3.9+ with pip
- OpenAI API key
- At least 1000 traced requests for analysis
Setup
Install Dependencies
pip install brokle openai pandas numpyConfigure Environment
export BROKLE_API_KEY=bk_...
export OPENAI_API_KEY=sk_...Initialize Clients
from brokle import Brokle, wrap_openai
import openai
import pandas as pd
from datetime import datetime, timedelta
# Initialize Brokle
brokle = Brokle()
# Wrap OpenAI for tracing
openai_client = wrap_openai(openai.OpenAI(), brokle=brokle)Implementation
Step 1: Analyze Current Costs
Start by understanding your current spending patterns:
def analyze_costs(days: int = 30) -> dict:
"""Analyze cost patterns over time."""
# Get cost data from Brokle
costs = brokle.analytics.get_costs(
start_time=datetime.now() - timedelta(days=days)
)
# Get detailed breakdown
by_model = brokle.analytics.get_costs(
start_time=datetime.now() - timedelta(days=days),
group_by="model"
)
by_feature = brokle.analytics.get_costs(
start_time=datetime.now() - timedelta(days=days),
group_by="metadata.feature"
)
# Calculate key metrics
analysis = {
"total_cost": costs.total,
"total_requests": costs.request_count,
"avg_cost_per_request": costs.total / costs.request_count,
"input_tokens": costs.input_tokens,
"output_tokens": costs.output_tokens,
"models": [
{"name": m.name, "cost": m.cost, "percentage": m.cost / costs.total * 100}
for m in by_model
],
"features": [
{"name": f.name, "cost": f.cost, "percentage": f.cost / costs.total * 100}
for f in by_feature
]
}
return analysis
# Run analysis
cost_analysis = analyze_costs()
print("=== COST ANALYSIS ===")
print(f"Total Cost (30d): ${cost_analysis['total_cost']:.2f}")
print(f"Avg Cost/Request: ${cost_analysis['avg_cost_per_request']:.4f}")
print(f"Total Requests: {cost_analysis['total_requests']:,}")
print("\nCost by Model:")
for m in cost_analysis['models']:
print(f" {m['name']}: ${m['cost']:.2f} ({m['percentage']:.1f}%)")Step 2: Identify Optimization Opportunities
Analyze traces to find optimization targets:
def find_optimization_opportunities() -> dict:
"""Identify high-cost traces and patterns."""
# Get expensive traces
traces = brokle.traces.list(
start_time=datetime.now() - timedelta(days=7),
order_by="cost",
order="desc",
limit=100
)
opportunities = {
"high_cost_traces": [],
"long_outputs": [],
"redundant_calls": [],
"simple_tasks_expensive_models": []
}
for trace in traces:
# High cost single requests
if trace.cost > 0.10:
opportunities["high_cost_traces"].append({
"trace_id": trace.id,
"cost": trace.cost,
"model": trace.model,
"tokens": trace.total_tokens
})
# Long output that might be trimmed
if trace.output_tokens > 2000:
opportunities["long_outputs"].append({
"trace_id": trace.id,
"output_tokens": trace.output_tokens,
"feature": trace.metadata.get("feature")
})
# Check for redundant calls (same input, multiple requests)
input_counts = {}
for trace in traces:
input_hash = hash(str(trace.input)[:500])
input_counts[input_hash] = input_counts.get(input_hash, 0) + 1
opportunities["redundant_inputs"] = sum(1 for c in input_counts.values() if c > 1)
return opportunities
opportunities = find_optimization_opportunities()
print("\n=== OPTIMIZATION OPPORTUNITIES ===")
print(f"High-cost traces (>$0.10): {len(opportunities['high_cost_traces'])}")
print(f"Long outputs (>2000 tokens): {len(opportunities['long_outputs'])}")
print(f"Potentially redundant inputs: {opportunities['redundant_inputs']}")Step 3: Implement Smart Model Routing
Route requests to appropriate models based on complexity:
from brokle.evaluation import evaluate
class SmartModelRouter:
"""Route requests to cost-effective models based on complexity."""
def __init__(self):
self.models = {
"simple": "gpt-4o-mini", # $0.15/$0.60 per 1M tokens
"standard": "gpt-4o-mini", # Default for most tasks
"complex": "gpt-4o", # $2.50/$10.00 per 1M tokens
"critical": "gpt-4o" # When quality is paramount
}
self.complexity_indicators = {
"simple": [
"summarize", "translate", "classify", "extract",
"list", "format", "convert"
],
"complex": [
"analyze", "compare", "evaluate", "synthesize",
"design", "architect", "reason", "debug"
],
"critical": [
"legal", "medical", "financial", "security",
"compliance", "critical"
]
}
def classify_complexity(self, prompt: str, metadata: dict = None) -> str:
"""Classify request complexity."""
prompt_lower = prompt.lower()
# Check metadata overrides
if metadata:
if metadata.get("priority") == "critical":
return "critical"
if metadata.get("model_tier"):
return metadata["model_tier"]
# Check for critical indicators
for indicator in self.complexity_indicators["critical"]:
if indicator in prompt_lower:
return "critical"
# Check for complexity indicators
for indicator in self.complexity_indicators["complex"]:
if indicator in prompt_lower:
return "complex"
# Check for simplicity indicators
for indicator in self.complexity_indicators["simple"]:
if indicator in prompt_lower:
return "simple"
# Default to standard
return "standard"
def route(self, prompt: str, metadata: dict = None) -> str:
"""Select appropriate model for request."""
complexity = self.classify_complexity(prompt, metadata)
return self.models[complexity]
router = SmartModelRouter()
def optimized_completion(prompt: str, metadata: dict = None, **kwargs) -> str:
"""Execute completion with smart model routing."""
# Determine optimal model
model = router.route(prompt, metadata)
complexity = router.classify_complexity(prompt, metadata)
with brokle.start_as_current_generation(
name="optimized_completion",
model=model,
metadata={
"complexity": complexity,
"routing": "smart_router",
**(metadata or {})
}
) as gen:
response = openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
result = response.choices[0].message.content
gen.update(
output=result,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
)
# Log cost savings estimate
gen.set_attribute(
"estimated_savings",
calculate_savings(model, response.usage)
)
return result
def calculate_savings(model: str, usage) -> float:
"""Calculate estimated savings vs always using GPT-4o."""
gpt4o_cost = (usage.prompt_tokens * 2.50 + usage.completion_tokens * 10.00) / 1_000_000
actual_cost = calculate_cost(model, usage)
return max(0, gpt4o_cost - actual_cost)
def calculate_cost(model: str, usage) -> float:
"""Calculate actual cost for a model."""
pricing = {
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
}
input_price, output_price = pricing.get(model, (2.50, 10.00))
return (usage.prompt_tokens * input_price + usage.completion_tokens * output_price) / 1_000_000Step 4: Implement Response Caching
Cache expensive responses to avoid redundant API calls:
import hashlib
from functools import lru_cache
class SemanticCache:
"""Cache responses with semantic similarity matching."""
def __init__(self, similarity_threshold: float = 0.95):
self.cache = {}
self.similarity_threshold = similarity_threshold
self.hits = 0
self.misses = 0
def _hash_prompt(self, prompt: str) -> str:
"""Create hash for exact matching."""
return hashlib.sha256(prompt.encode()).hexdigest()
def get(self, prompt: str) -> tuple[str | None, bool]:
"""Get cached response if available."""
key = self._hash_prompt(prompt)
if key in self.cache:
self.hits += 1
return self.cache[key], True
self.misses += 1
return None, False
def set(self, prompt: str, response: str, ttl: int = 3600):
"""Cache a response."""
key = self._hash_prompt(prompt)
self.cache[key] = response
def stats(self) -> dict:
"""Get cache statistics."""
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total if total > 0 else 0,
"size": len(self.cache)
}
cache = SemanticCache()
def cached_completion(prompt: str, metadata: dict = None, **kwargs) -> str:
"""Execute completion with caching."""
# Check cache first
cached_response, hit = cache.get(prompt)
if hit:
# Log cache hit
with brokle.start_as_current_span(
name="cached_completion",
metadata={"cache_hit": True, **(metadata or {})}
) as span:
span.update(output=cached_response)
span.set_attribute("cost", 0) # Zero cost for cache hit
return cached_response
# Cache miss - make API call
response = optimized_completion(prompt, metadata, **kwargs)
# Cache the response
cache.set(prompt, response)
return response
# Example usage with cache monitoring
print("\n=== CACHE PERFORMANCE ===")
print(cache.stats())Step 5: Optimize Token Usage
Reduce token usage without sacrificing quality:
def optimize_prompt(prompt: str, max_context: int = 4000) -> str:
"""Optimize prompt to reduce tokens while maintaining quality."""
# Remove redundant whitespace
optimized = " ".join(prompt.split())
# Truncate context if too long
if len(optimized) > max_context * 4: # Rough char to token ratio
# Keep instruction and truncate context
parts = optimized.split("Context:")
if len(parts) == 2:
instruction = parts[0]
context = parts[1]
max_context_chars = (max_context - len(instruction) // 4) * 4
truncated_context = context[:max_context_chars]
optimized = f"{instruction}Context:{truncated_context}..."
return optimized
def efficient_completion(
prompt: str,
max_tokens: int = None,
metadata: dict = None
) -> str:
"""Execute completion with token optimization."""
# Optimize prompt
optimized_prompt = optimize_prompt(prompt)
token_savings = (len(prompt) - len(optimized_prompt)) // 4
with brokle.start_as_current_generation(
name="efficient_completion",
model=router.route(optimized_prompt, metadata),
metadata={
"prompt_optimization": True,
"token_savings_estimate": token_savings,
**(metadata or {})
}
) as gen:
# Set reasonable max_tokens if not specified
if max_tokens is None:
# Estimate based on task type
complexity = router.classify_complexity(optimized_prompt, metadata)
max_tokens = {"simple": 150, "standard": 300, "complex": 800, "critical": 1000}[complexity]
response = openai_client.chat.completions.create(
model=gen.model,
messages=[{"role": "user", "content": optimized_prompt}],
max_tokens=max_tokens
)
result = response.choices[0].message.content
gen.update(
output=result,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
)
return resultStep 6: Monitor Cost-Quality Tradeoffs
Ensure optimizations don't degrade quality:
from brokle.evaluation import LLMJudge
quality_judge = LLMJudge(
name="response_quality",
prompt="""Evaluate the quality of this AI response.
Task: {input}
Response: {output}
Rate from 0-1 based on:
- Accuracy and correctness
- Completeness of answer
- Clarity and helpfulness
- Appropriate level of detail
Provide score and brief explanation.""",
model="gpt-4o-mini" # Use cheaper model for evaluation
)
def track_cost_quality(
prompt: str,
metadata: dict = None,
evaluate: bool = True
) -> dict:
"""Execute completion while tracking cost and quality metrics."""
with brokle.start_as_current_span(
name="cost_quality_tracking",
metadata={"evaluation_enabled": evaluate, **(metadata or {})}
) as span:
# Execute optimized completion
response = efficient_completion(prompt, metadata=metadata)
# Track cost
# (Cost is automatically tracked by Brokle)
# Evaluate quality if enabled
if evaluate:
quality = quality_judge.evaluate(
input=prompt,
output=response
)
span.score(name="quality", value=quality.score, comment=quality.comment)
return {
"response": response,
"quality_score": quality.score if evaluate else None
}
# A/B testing optimized vs unoptimized
def ab_test_optimization(prompts: list[str], sample_size: int = 100) -> dict:
"""Compare optimized vs baseline performance."""
results = {"optimized": [], "baseline": []}
for i, prompt in enumerate(prompts[:sample_size]):
# Optimized path
with brokle.start_as_current_span(name="ab_test_optimized") as span:
opt_result = track_cost_quality(prompt, metadata={"variant": "optimized"})
results["optimized"].append(opt_result)
# Baseline path (always GPT-4o)
with brokle.start_as_current_generation(
name="ab_test_baseline",
model="gpt-4o",
metadata={"variant": "baseline"}
) as gen:
baseline_response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
baseline_quality = quality_judge.evaluate(
input=prompt,
output=baseline_response.choices[0].message.content
)
gen.score(name="quality", value=baseline_quality.score)
results["baseline"].append({
"response": baseline_response.choices[0].message.content,
"quality_score": baseline_quality.score
})
# Calculate comparison
opt_avg_quality = sum(r["quality_score"] for r in results["optimized"] if r["quality_score"]) / len(results["optimized"])
baseline_avg_quality = sum(r["quality_score"] for r in results["baseline"]) / len(results["baseline"])
return {
"optimized_quality": opt_avg_quality,
"baseline_quality": baseline_avg_quality,
"quality_difference": opt_avg_quality - baseline_avg_quality,
"sample_size": sample_size
}Complete Example
from brokle import Brokle, wrap_openai
from brokle.evaluation import LLMJudge
import openai
from datetime import datetime, timedelta
# Initialize
brokle = Brokle()
openai_client = wrap_openai(openai.OpenAI(), brokle=brokle)
# Initialize router and cache
router = SmartModelRouter()
cache = SemanticCache()
# Test prompts
test_prompts = [
"Summarize the main points of machine learning.",
"Translate 'Hello, how are you?' to French.",
"Analyze the competitive landscape of the EV market.",
"Design a microservices architecture for an e-commerce platform.",
]
# Process with optimization
print("=== PROCESSING WITH OPTIMIZATION ===")
for prompt in test_prompts:
complexity = router.classify_complexity(prompt)
model = router.route(prompt)
result = cached_completion(prompt, metadata={"feature": "demo"})
print(f"\nComplexity: {complexity} | Model: {model}")
print(f"Prompt: {prompt[:50]}...")
print(f"Response: {result[:100]}...")
# Show cache stats
print("\n=== CACHE STATS ===")
print(cache.stats())
# Analyze cost savings
print("\n=== COST ANALYSIS ===")
analysis = analyze_costs(days=1)
print(f"Today's cost: ${analysis['total_cost']:.2f}")
brokle.flush()Dashboard Setup
Create a cost optimization dashboard in Brokle:
- Navigate to Analytics → Dashboards
- Create widgets for:
- Daily cost trend
- Cost by model distribution
- Cache hit rate over time
- Quality score vs cost scatter
- Savings from routing
# Programmatic dashboard creation
brokle.dashboards.create(
name="Cost Optimization",
widgets=[
{
"type": "line_chart",
"title": "Daily Cost Trend",
"metric": "cost",
"group_by": "day"
},
{
"type": "pie_chart",
"title": "Cost by Model",
"metric": "cost",
"group_by": "model"
},
{
"type": "number",
"title": "Total Savings",
"metric": "metadata.estimated_savings",
"aggregation": "sum"
}
]
)Best Practices
1. Start with Analysis
# Always analyze before optimizing
analysis = analyze_costs(days=30)
opportunities = find_optimization_opportunities()
# Focus on highest-impact areas first
print("Top optimization targets:")
for model in sorted(analysis["models"], key=lambda x: x["cost"], reverse=True)[:3]:
print(f" {model['name']}: ${model['cost']:.2f}")2. Maintain Quality Guardrails
# Set minimum quality thresholds
QUALITY_THRESHOLD = 0.85
def safe_optimization(prompt: str, **kwargs) -> str:
"""Optimize with quality safeguards."""
result = track_cost_quality(prompt, evaluate=True)
if result["quality_score"] < QUALITY_THRESHOLD:
# Fall back to premium model
return optimized_completion(
prompt,
metadata={"model_tier": "critical"},
**kwargs
)
return result["response"]3. Monitor Continuously
# Set up cost alerts
brokle.alerts.create(
name="Cost spike detection",
condition="daily_cost > (avg_daily_cost_7d * 1.5)",
channels=["slack", "email"],
message="Daily cost 50% above 7-day average"
)
brokle.alerts.create(
name="Quality degradation",
condition="avg_quality_score_24h < 0.8",
channels=["slack"],
message="Quality scores dropping below threshold"
)Most organizations see 40-60% cost reduction with these techniques while maintaining or improving response quality.
Next Steps
- Cost Tracking - Detailed cost analytics
- Evaluation - Quality measurement
- Production Monitoring - Full monitoring setup
- Dashboards - Custom visualizations