RAG Frameworks
CrewAI Integration
Trace and monitor CrewAI agent workflows with Brokle
CrewAI Integration
Integrate Brokle with CrewAI to capture traces across your multi-agent workflows. Monitor agent interactions, task executions, and tool usage with comprehensive observability.
Supported Features
| Feature | Supported | Notes |
|---|---|---|
| Crew Execution | ✅ | Full workflow tracing |
| Agent Actions | ✅ | Per-agent spans |
| Task Execution | ✅ | Task-level tracing |
| Tool Usage | ✅ | Tool calls traced |
| LLM Calls | ✅ | Underlying model calls |
| Memory | ✅ | Memory operations |
| Delegation | ✅ | Agent-to-agent delegation |
| Token Tracking | ✅ | Aggregated usage |
Quick Start
Install Dependencies
pip install brokle crewai crewai-toolsConfigure Brokle Callback
from brokle import Brokle
from brokle.integrations.crewai import BrokleCrewAICallback
# Initialize Brokle
brokle = Brokle(api_key="bk_...")
# Create callback handler
callback = BrokleCrewAICallback(brokle=brokle)Add to Your Crew
from crewai import Agent, Task, Crew
# Define agents
researcher = Agent(
role="Researcher",
goal="Research topics thoroughly",
backstory="Expert researcher with attention to detail",
verbose=True
)
writer = Agent(
role="Writer",
goal="Write engaging content",
backstory="Creative writer with clear communication",
verbose=True
)
# Define tasks
research_task = Task(
description="Research the latest AI trends",
agent=researcher,
expected_output="Comprehensive research summary"
)
writing_task = Task(
description="Write an article based on research",
agent=writer,
expected_output="Engaging article",
context=[research_task]
)
# Create crew with Brokle callback
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
callbacks=[callback] # Add Brokle tracing
)
# Run the crew
result = crew.kickoff()
print(result)
# Ensure traces are sent
brokle.flush()What Gets Traced
Crew Level
| Attribute | Description |
|---|---|
crew.name | Crew identifier |
crew.agents_count | Number of agents |
crew.tasks_count | Number of tasks |
crew.process | Sequential or hierarchical |
crew.duration_ms | Total execution time |
Agent Level
| Attribute | Description |
|---|---|
agent.role | Agent's role |
agent.goal | Agent's goal |
agent.llm | Underlying LLM model |
agent.tools | Available tools |
agent.iterations | Number of iterations |
Task Level
| Attribute | Description |
|---|---|
task.description | Task description |
task.agent | Assigned agent |
task.status | Completion status |
task.output | Task output |
task.duration_ms | Execution time |
Tool Level
| Attribute | Description |
|---|---|
tool.name | Tool name |
tool.input | Tool input |
tool.output | Tool result |
tool.duration_ms | Execution time |
Agent Configuration
Basic Agent
from crewai import Agent
agent = Agent(
role="Data Analyst",
goal="Analyze data and provide insights",
backstory="Expert data analyst with statistical background",
verbose=True,
allow_delegation=False
)Agent with Custom LLM
from crewai import Agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.7)
agent = Agent(
role="Creative Writer",
goal="Write creative content",
backstory="Award-winning creative writer",
llm=llm,
verbose=True
)Agent with Tools
from crewai import Agent
from crewai_tools import SerperDevTool, WebsiteSearchTool
search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()
agent = Agent(
role="Research Analyst",
goal="Research and analyze information",
backstory="Expert researcher with web skills",
tools=[search_tool, web_tool],
verbose=True
)Task Configuration
Basic Task
from crewai import Task
task = Task(
description="Analyze market trends for Q4 2024",
agent=analyst,
expected_output="Detailed market analysis report"
)Task with Context
research_task = Task(
description="Research competitor products",
agent=researcher,
expected_output="Competitor analysis"
)
analysis_task = Task(
description="Analyze research findings",
agent=analyst,
expected_output="Strategic recommendations",
context=[research_task] # Uses research output
)Task with Output File
task = Task(
description="Generate financial report",
agent=analyst,
expected_output="Financial analysis",
output_file="reports/financial_analysis.md"
)Crew Processes
Sequential Process
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, writing_task, editing_task],
process=Process.sequential, # Tasks run in order
callbacks=[callback]
)Hierarchical Process
from crewai import Crew, Process
manager = Agent(
role="Project Manager",
goal="Coordinate team efficiently",
backstory="Experienced project manager"
)
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, writing_task, editing_task],
process=Process.hierarchical,
manager_agent=manager,
callbacks=[callback]
)Tool Tracing
All tool executions are traced:
from crewai_tools import (
SerperDevTool,
WebsiteSearchTool,
FileReadTool,
CodeInterpreterTool
)
# Search tools
search = SerperDevTool()
web_search = WebsiteSearchTool()
# File tools
file_reader = FileReadTool()
# Code tools
code_interpreter = CodeInterpreterTool()
agent = Agent(
role="Full-Stack Assistant",
tools=[search, web_search, file_reader, code_interpreter],
# ... other config
)
# Each tool call creates a span with:
# - tool.name
# - tool.input
# - tool.output
# - tool.duration_ms
# - tool.error (if any)Memory Tracing
CrewAI memory operations are traced:
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
memory=True, # Enable memory
callbacks=[callback]
)
# Memory traces include:
# - memory.type (short_term, long_term, entity)
# - memory.operation (store, retrieve)
# - memory.content_previewDelegation Tracing
Agent delegation is captured:
senior_agent = Agent(
role="Senior Developer",
goal="Oversee development",
allow_delegation=True # Can delegate to other agents
)
junior_agent = Agent(
role="Junior Developer",
goal="Implement features"
)
# Delegation traces show:
# - delegation.from_agent
# - delegation.to_agent
# - delegation.task
# - delegation.reasonCustom Callbacks
Extend the callback for custom behavior:
from brokle.integrations.crewai import BrokleCrewAICallback
class CustomCallback(BrokleCrewAICallback):
def on_task_start(self, task):
super().on_task_start(task)
# Custom logic
print(f"Starting task: {task.description}")
def on_agent_action(self, agent, action):
super().on_agent_action(agent, action)
# Custom logic
if action.tool == "dangerous_tool":
self.alert_security_team(agent, action)
callback = CustomCallback(brokle=brokle)Adding Context
Add custom metadata to traces:
from brokle import Brokle
from brokle.integrations.crewai import BrokleCrewAICallback
brokle = Brokle(api_key="bk_...")
# Add trace context
with brokle.start_as_current_span(name="customer_support_workflow") as span:
span.update_trace(
user_id="user_123",
session_id="session_456",
workflow_type="support"
)
callback = BrokleCrewAICallback(brokle=brokle)
crew = Crew(
agents=[...],
tasks=[...],
callbacks=[callback]
)
result = crew.kickoff()Error Handling
Errors are automatically captured:
try:
result = crew.kickoff()
except Exception as e:
# Error is captured in trace with:
# - error.type
# - error.message
# - error.agent (if agent-specific)
# - error.task (if task-specific)
print(f"Crew execution failed: {e}")
finally:
brokle.flush()Configuration Options
from brokle import Brokle
from brokle.integrations.crewai import BrokleCrewAICallback
brokle = Brokle(
api_key="bk_...",
environment="production",
sample_rate=1.0,
debug=False
)
callback = BrokleCrewAICallback(
brokle=brokle,
# Callback-specific options
capture_agent_thoughts=True, # Capture agent reasoning
capture_task_output=True, # Capture full task outputs
capture_tool_output=True, # Capture tool results
max_output_length=10000 # Truncate long outputs
)Best Practices
1. Name Your Crews
crew = Crew(
name="content_creation_crew", # Helps identify in traces
agents=[...],
tasks=[...],
callbacks=[callback]
)2. Use Descriptive Roles
# Good: Specific role
agent = Agent(role="SEO Content Specialist", ...)
# Bad: Generic role
agent = Agent(role="Writer", ...)3. Add Task Descriptions
task = Task(
description="Research and compile a report on renewable energy trends in Europe for 2024",
# More descriptive = better traces
)4. Handle Async Crews
import asyncio
async def run_crew():
crew = Crew(
agents=[...],
tasks=[...],
callbacks=[callback]
)
result = await crew.kickoff_async()
await brokle.flush_async()
return result
result = asyncio.run(run_crew())Troubleshooting
Missing Agent Traces
- Ensure callback is added to crew
- Check agent
verbose=Truefor detailed output - Verify Brokle API key is set
Incomplete Task Traces
- Ensure
expected_outputis defined - Check for task errors in logs
- Verify context dependencies are correct
Tool Traces Missing
- Confirm tools are properly instantiated
- Check tool execution didn't throw silent errors
- Verify tool is in agent's tools list
Example: Complete Workflow
from brokle import Brokle
from brokle.integrations.crewai import BrokleCrewAICallback
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
# Initialize Brokle
brokle = Brokle(api_key="bk_...")
callback = BrokleCrewAICallback(brokle=brokle)
# Tools
search_tool = SerperDevTool()
# Agents
researcher = Agent(
role="Market Research Analyst",
goal="Conduct thorough market research",
backstory="Expert analyst with 10 years experience",
tools=[search_tool],
verbose=True
)
strategist = Agent(
role="Business Strategist",
goal="Develop actionable strategies",
backstory="MBA with strategy consulting background",
verbose=True
)
# Tasks
research_task = Task(
description="Research the AI observability market landscape",
agent=researcher,
expected_output="Comprehensive market analysis"
)
strategy_task = Task(
description="Develop go-to-market strategy based on research",
agent=strategist,
expected_output="GTM strategy document",
context=[research_task]
)
# Crew
crew = Crew(
name="market_analysis_crew",
agents=[researcher, strategist],
tasks=[research_task, strategy_task],
process=Process.sequential,
callbacks=[callback]
)
# Execute with context
with brokle.start_as_current_span(name="market_analysis") as span:
span.set_attribute("analysis_type", "competitive")
result = crew.kickoff()
print(result)
brokle.flush()Related Integrations
- LangChain - LangChain framework
- LlamaIndex - LlamaIndex framework
- OpenAI - Direct OpenAI tracing