Tracing Recipes
Streaming Responses
Trace streaming LLM outputs in real-time
Streaming Responses
Trace streaming LLM outputs while capturing complete responses and accurate token counts.
Problem
When using streaming responses:
- You need to capture the full output as it streams
- Token counts are only available at the end
- You want to track time-to-first-token
- The response should be traced even if streaming is interrupted
Solution
import time
from brokle import Brokle, wrap_openai
import openai
brokle = Brokle()
openai_client = wrap_openai(openai.OpenAI(), brokle=brokle)
def stream_with_tracing(prompt: str) -> str:
"""Stream a response with full tracing."""
with brokle.start_as_current_generation(
name="streaming_generation",
model="gpt-4o",
input=prompt
) as gen:
start_time = time.time()
first_token_time = None
full_response = ""
chunk_count = 0
# Create streaming request
stream = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True} # Get token counts
)
for chunk in stream:
# Track time to first token
if first_token_time is None and chunk.choices:
first_token_time = time.time()
ttft = (first_token_time - start_time) * 1000
gen.set_attribute("time_to_first_token_ms", ttft)
# Accumulate response
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
chunk_count += 1
# Optional: Update span periodically for long responses
if chunk_count % 50 == 0:
gen.set_attribute("chunks_received", chunk_count)
# Capture usage from final chunk
if chunk.usage:
gen.update(
usage={
"input_tokens": chunk.usage.prompt_tokens,
"output_tokens": chunk.usage.completion_tokens
}
)
# Final update with complete response
total_time = (time.time() - start_time) * 1000
gen.update(
output=full_response,
metadata={
"streaming": True,
"chunk_count": chunk_count,
"total_time_ms": total_time,
"time_to_first_token_ms": ttft if first_token_time else None
}
)
return full_response
# Usage
response = stream_with_tracing("Write a short poem about AI")
print(response)
brokle.flush()import { Brokle } from 'brokle';
import { wrapOpenAI } from 'brokle-openai';
import OpenAI from 'openai';
const brokle = new Brokle();
const openai = wrapOpenAI(new OpenAI(), { brokle });
async function streamWithTracing(prompt) {
return brokle.withGeneration(
{ name: 'streaming_generation', model: 'gpt-4o', input: prompt },
async (gen) => {
const startTime = Date.now();
let firstTokenTime = null;
let fullResponse = '';
let chunkCount = 0;
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true,
stream_options: { include_usage: true },
});
for await (const chunk of stream) {
// Track time to first token
if (firstTokenTime === null && chunk.choices?.length) {
firstTokenTime = Date.now();
const ttft = firstTokenTime - startTime;
gen.setAttribute('time_to_first_token_ms', ttft);
}
// Accumulate response
if (chunk.choices?.[0]?.delta?.content) {
fullResponse += chunk.choices[0].delta.content;
chunkCount++;
}
// Capture usage from final chunk
if (chunk.usage) {
gen.update({
usage: {
inputTokens: chunk.usage.prompt_tokens,
outputTokens: chunk.usage.completion_tokens,
},
});
}
}
const totalTime = Date.now() - startTime;
gen.update({
output: fullResponse,
metadata: {
streaming: true,
chunkCount,
totalTimeMs: totalTime,
},
});
return fullResponse;
}
);
}
// Usage
const response = await streamWithTracing('Write a short poem about AI');
console.log(response);
await brokle.shutdown();How It Works
- Span Created Upfront: Generation span is created before streaming starts
- TTFT Tracking: First token arrival time is captured
- Incremental Updates: Response is accumulated as chunks arrive
- Final Usage: Token counts come in the last chunk with
stream_options - Complete Output: Full response is saved when stream completes
Trace Output
{
"name": "streaming_generation",
"type": "generation",
"model": "gpt-4o",
"input": "Write a short poem about AI",
"output": "In circuits deep...",
"metadata": {
"streaming": true,
"chunk_count": 45,
"total_time_ms": 2340,
"time_to_first_token_ms": 180
},
"usage": {
"input_tokens": 12,
"output_tokens": 89
}
}Variations
Generator-Based Streaming
def stream_as_generator(prompt: str):
"""Yield chunks while tracing."""
with brokle.start_as_current_generation(
name="generator_stream",
model="gpt-4o"
) as gen:
full_response = ""
stream = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True}
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content # Yield to caller
if chunk.usage:
gen.update(usage={
"input_tokens": chunk.usage.prompt_tokens,
"output_tokens": chunk.usage.completion_tokens
})
gen.update(output=full_response)
# Usage - stream to console
for chunk in stream_as_generator("Tell me a story"):
print(chunk, end="", flush=True)FastAPI Streaming Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def stream_chat(prompt: str):
"""Streaming endpoint with tracing."""
async def generate():
with brokle.start_as_current_generation(
name="api_stream",
model="gpt-4o"
) as gen:
full_response = ""
stream = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True}
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield f"data: {content}\n\n"
if chunk.usage:
gen.update(usage={
"input_tokens": chunk.usage.prompt_tokens,
"output_tokens": chunk.usage.completion_tokens
})
gen.update(output=full_response)
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)With Timeout Handling
import asyncio
async def stream_with_timeout(prompt: str, timeout: float = 30.0) -> str:
"""Stream with timeout protection."""
async with brokle.start_as_current_generation(
name="stream_timeout",
model="gpt-4o"
) as gen:
full_response = ""
start_time = time.time()
try:
stream = await asyncio.wait_for(
openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
),
timeout=timeout
)
async for chunk in stream:
if time.time() - start_time > timeout:
gen.update(
output=full_response,
metadata={"timeout": True, "partial": True}
)
gen.score(name="timeout", value=1, comment="Stream timeout")
break
if chunk.choices and chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content
except asyncio.TimeoutError:
gen.update(
output=full_response,
metadata={"timeout": True, "partial": bool(full_response)}
)
gen.score(name="timeout", value=1)
gen.update(output=full_response)
return full_responseStreaming with Tool Calls
def stream_with_tools(prompt: str) -> dict:
"""Stream response that may include tool calls."""
with brokle.start_as_current_generation(
name="stream_tools",
model="gpt-4o"
) as gen:
full_response = ""
tool_calls = []
stream = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
tools=[{"type": "function", "function": {"name": "get_weather", "parameters": {}}}],
stream=True
)
current_tool_call = None
for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta:
# Handle content
if delta.content:
full_response += delta.content
# Handle tool calls
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.index >= len(tool_calls):
tool_calls.append({
"id": tc.id,
"name": tc.function.name if tc.function else "",
"arguments": ""
})
if tc.function and tc.function.arguments:
tool_calls[tc.index]["arguments"] += tc.function.arguments
gen.update(
output=full_response,
metadata={
"tool_calls": tool_calls,
"has_tool_calls": len(tool_calls) > 0
}
)
return {"content": full_response, "tool_calls": tool_calls}Time-to-first-token (TTFT) is a key metric for user experience. Track it to optimize perceived latency.
Related
- Async Tracing - Async streaming
- Production Monitoring - Monitor TTFT
- OpenAI Integration - OpenAI streaming details