LangChain Integration
Integrate Brokle with LangChain for comprehensive tracing of chains, agents, and retrieval workflows
LangChain Integration
Integrate Brokle with LangChain to trace chains, agents, retrievers, and all LLM interactions within your LangChain applications.
Supported Features
| Feature | Supported | Notes |
|---|---|---|
| LLM Calls | ✅ | All LLM providers |
| Chains | ✅ | LCEL and legacy chains |
| Agents | ✅ | Full agent execution traces |
| Retrievers | ✅ | Vector stores, documents |
| Tools | ✅ | Tool calls and results |
| Memory | ✅ | Conversation memory |
| Callbacks | ✅ | Native callback support |
Quick Start
Install Dependencies
pip install brokle langchain langchain-openainpm install brokle brokle-langchain langchain @langchain/openaiSet Up Callback Handler
from brokle import Brokle
from brokle.integrations.langchain import BrokleCallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# Initialize Brokle
brokle = Brokle(api_key="bk_...")
# Create callback handler
handler = BrokleCallbackHandler(brokle=brokle)
# Use with any LangChain component
llm = ChatOpenAI(model="gpt-4", callbacks=[handler])import { Brokle } from 'brokle';
import { BrokleCallbackHandler } from 'brokle-langchain';
import { ChatOpenAI } from '@langchain/openai';
// Initialize Brokle
const brokle = new Brokle({ apiKey: 'bk_...' });
// Create callback handler
const handler = new BrokleCallbackHandler({ brokle });
// Use with any LangChain component
const llm = new ChatOpenAI({
model: 'gpt-4',
callbacks: [handler]
});Run a Chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Create a simple chain
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "{question}")
])
chain = prompt | llm | StrOutputParser()
# Run with tracing
result = chain.invoke(
{"question": "What is LangChain?"},
config={"callbacks": [handler]}
)
print(result)
brokle.flush()import { ChatPromptTemplate } from '@langchain/core/prompts';
import { StringOutputParser } from '@langchain/core/output_parsers';
// Create a simple chain
const prompt = ChatPromptTemplate.fromMessages([
['system', 'You are a helpful assistant.'],
['user', '{question}']
]);
const chain = prompt.pipe(llm).pipe(new StringOutputParser());
// Run with tracing
const result = await chain.invoke(
{ question: 'What is LangChain?' },
{ callbacks: [handler] }
);
console.log(result);
await brokle.shutdown();Integration Methods
Method 1: Callback Handler (Recommended)
Use the callback handler for fine-grained control:
from brokle.integrations.langchain import BrokleCallbackHandler
handler = BrokleCallbackHandler(
brokle=brokle,
# Configuration options
trace_name="my_langchain_app",
capture_input=True,
capture_output=True,
metadata={"environment": "production"}
)
# Pass to specific invocations
result = chain.invoke(input, config={"callbacks": [handler]})Method 2: Global Configuration
Set callbacks globally for all LangChain operations:
from langchain.globals import set_llm_cache
from langchain_core.callbacks import CallbackManager
# Set global callback manager
callback_manager = CallbackManager([handler])
# All LangChain operations will use this
llm = ChatOpenAI(callback_manager=callback_manager)Method 3: Context Manager
Use within a specific context:
from brokle.integrations.langchain import BrokleTracer
with BrokleTracer(brokle=brokle, name="rag_pipeline") as tracer:
# All LangChain operations in this block are traced
result = chain.invoke(input, config={"callbacks": [tracer.handler]})Tracing Chains
LCEL Chains
LangChain Expression Language (LCEL) chains are fully traced:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Build chain with LCEL
chain = (
ChatPromptTemplate.from_template("Summarize: {text}")
| llm
| StrOutputParser()
)
# Each component creates a span
result = chain.invoke(
{"text": "Long article text..."},
config={"callbacks": [handler]}
)Trace structure:
chain
├── ChatPromptTemplate
├── ChatOpenAI (generation)
└── StrOutputParserSequential Chains
from langchain_core.runnables import RunnableSequence
# Multi-step chain
analysis_chain = prompt1 | llm | parser1
summary_chain = prompt2 | llm | parser2
full_chain = RunnableSequence(first=analysis_chain, last=summary_chain)
result = full_chain.invoke(input, config={"callbacks": [handler]})Parallel Chains
from langchain_core.runnables import RunnableParallel
# Parallel execution
parallel = RunnableParallel(
summary=summary_chain,
keywords=keyword_chain,
sentiment=sentiment_chain
)
# All branches traced in parallel
result = parallel.invoke(input, config={"callbacks": [handler]})Tracing Agents
Agents with tool use are fully traced:
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain_core.tools import tool
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
@tool
def calculator(expression: str) -> str:
"""Calculate a math expression."""
return str(eval(expression))
tools = [search, calculator]
# Create agent
agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
# Full agent execution is traced
result = executor.invoke(
{"input": "What is 25 * 4?"},
config={"callbacks": [handler]}
)Agent traces include:
- Agent reasoning steps
- Tool selection decisions
- Tool inputs and outputs
- Final answer generation
Tracing Retrievers
Vector Store Retrieval
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# Create vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents, embeddings)
retriever = vectorstore.as_retriever()
# Retrieval is traced
docs = retriever.invoke(
"What is AI?",
config={"callbacks": [handler]}
)Retrieval traces capture:
- Query embedding generation
- Vector search execution
- Retrieved document metadata
- Relevance scores
RAG Pipeline
from langchain_core.runnables import RunnablePassthrough
# RAG chain
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# Full RAG pipeline traced
result = rag_chain.invoke(
"Explain quantum computing",
config={"callbacks": [handler]}
)Trace structure:
rag_chain
├── retriever (retrieval)
│ ├── embedding
│ └── vector_search
├── ChatPromptTemplate
├── ChatOpenAI (generation)
└── StrOutputParserStreaming
Streaming is fully supported:
# Streaming with callbacks
for chunk in chain.stream(
{"question": "Tell me a story"},
config={"callbacks": [handler]}
):
print(chunk, end="", flush=True)Streaming traces include:
- Time to first token
- Chunk count
- Total streaming duration
Adding Context
User and Session Context
handler = BrokleCallbackHandler(
brokle=brokle,
user_id="user_123",
session_id="session_456",
metadata={
"feature": "chatbot",
"version": "1.0"
}
)Dynamic Metadata
# Update metadata per invocation
result = chain.invoke(
input,
config={
"callbacks": [handler],
"metadata": {
"conversation_id": "conv_789",
"turn": 3
}
}
)Run Names
# Name specific runs
result = chain.invoke(
input,
config={
"callbacks": [handler],
"run_name": "customer_inquiry"
}
)Error Handling
Errors are automatically captured with full context:
try:
result = chain.invoke(input, config={"callbacks": [handler]})
except Exception as e:
# Error is captured in trace:
# - Error type
# - Error message
# - Stack trace
# - Input that caused error
print(f"Error: {e}")Async Support
Full async support:
import asyncio
async def process():
result = await chain.ainvoke(
{"question": "Hello!"},
config={"callbacks": [handler]}
)
return result
# Run async
result = asyncio.run(process())Async Streaming
async def stream_response():
async for chunk in chain.astream(
{"question": "Tell me about Python"},
config={"callbacks": [handler]}
):
print(chunk, end="", flush=True)Multiple LLM Providers
Track calls across different providers:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# OpenAI
openai_llm = ChatOpenAI(model="gpt-4", callbacks=[handler])
# Anthropic
anthropic_llm = ChatAnthropic(model="claude-3-sonnet", callbacks=[handler])
# Use both in the same chain
comparison_chain = RunnableParallel(
openai=prompt | openai_llm | parser,
anthropic=prompt | anthropic_llm | parser
)
# Both are traced in the same trace
result = comparison_chain.invoke(input, config={"callbacks": [handler]})Configuration Options
handler = BrokleCallbackHandler(
brokle=brokle,
# Trace configuration
trace_name="my_app", # Root trace name
capture_input=True, # Capture inputs
capture_output=True, # Capture outputs
# Context
user_id="user_123", # User identifier
session_id="session_456", # Session identifier
# Metadata
metadata={
"environment": "production",
"version": "1.0"
},
# Privacy
mask_inputs=False, # Mask input content
mask_outputs=False, # Mask output content
)Best Practices
1. Use Descriptive Run Names
result = chain.invoke(
input,
config={
"callbacks": [handler],
"run_name": "summarize_document" # Not just "chain"
}
)2. Add Business Context
handler = BrokleCallbackHandler(
brokle=brokle,
metadata={
"product": "customer_support",
"feature": "ticket_classification"
}
)3. Handle Cleanup
import atexit
atexit.register(brokle.shutdown)4. Use Run Tags
result = chain.invoke(
input,
config={
"callbacks": [handler],
"tags": ["production", "high-priority"]
}
)Troubleshooting
Traces Not Appearing
- Ensure callback is passed to
invoke()config - Call
brokle.flush()before exit - Enable debug:
Brokle(debug=True)
Missing Components
Some components may not emit callbacks. Wrap them:
from langchain_core.runnables import RunnableLambda
def my_function(x):
return process(x)
# Wrap to enable tracing
traceable_fn = RunnableLambda(my_function)Duplicate Traces
Avoid passing callbacks at multiple levels:
# Bad: callbacks at both levels
llm = ChatOpenAI(callbacks=[handler])
result = chain.invoke(input, config={"callbacks": [handler]})
# Good: callbacks at one level
llm = ChatOpenAI()
result = chain.invoke(input, config={"callbacks": [handler]})