Using BEEAI framework to design code implementation of intelligent multi-agent workflow

BEEAI FrameworkIn this tutorial, we discussed Beeai-Framework Build a fully functional multi-mechanical system from scratch. We browse basic components, custom agents, tools, memory management and event monitoring to demonstrate how Beeai can simplify the development of smart cooperative agents. In the process, we demonstrate how these agents use modular, production patterns to perform complex tasks such as market research, code analysis, and strategic planning.
import subprocess
import sys
import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
import os
def install_packages():
packages = [
"beeai-framework",
"requests",
"beautifulsoup4",
"numpy",
"pandas",
"pydantic"
]
print("Installing required packages...")
for package in packages:
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"✅ {package} installed successfully")
except subprocess.CalledProcessError as e:
print(f"❌ Failed to install {package}: {e}")
print("Installation complete!")
install_packages()
try:
from beeai_framework import ChatModel
from beeai_framework.agents import Agent
from beeai_framework.tools import Tool
from beeai_framework.workflows import Workflow
BEEAI_AVAILABLE = True
print("✅ BeeAI Framework imported successfully")
except ImportError as e:
print(f"⚠️ BeeAI Framework import failed: {e}")
print("Falling back to custom implementation...")
BEEAI_AVAILABLE = False
We first install all the required packages, including Beeai-Framework, to ensure our environment is ready for multi-agent development. After installation, we will try to import Beeai’s core module. If the import fails, we elegantly return the custom implementation to maintain the workflow functionality.
class MockChatModel:
"""Mock LLM for demonstration purposes"""
def __init__(self, model_name: str = "mock-llm"):
self.model_name = model_name
async def generate(self, messages: List[Dict[str, str]]) -> str:
"""Generate a mock response"""
last_message = messages[-1]['content'] if messages else ""
if "market" in last_message.lower():
return "Market analysis shows strong growth in AI frameworks with 42% YoY increase. Key competitors include LangChain, CrewAI, and AutoGen."
elif "code" in last_message.lower():
return "Code analysis reveals good structure with async patterns. Consider adding more error handling and documentation."
elif "strategy" in last_message.lower():
return "Strategic recommendation: Focus on ease of use, strong documentation, and enterprise features to compete effectively."
else:
return f"Analyzed: {last_message[:100]}... Recommendation: Implement best practices for scalability and maintainability."
class CustomTool:
"""Base class for custom tools"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
async def run(self, input_data: str) -> str:
"""Override this method in subclasses"""
raise NotImplementedError
When BeeAi is unavailable, we define a mock LLM behavior to simulate LLM behavior, allowing us to test and prototype workflows without relying on external APIs. In addition to that, we created a custom base class that is a blueprint for specific task tools that our agents can use, laying the foundation for the modular, tool-functional proxy capabilities.
class MarketResearchTool(CustomTool):
"""Custom tool for market research and competitor analysis"""
def __init__(self):
super().__init__(
name="market_research",
description="Analyzes market trends and competitor information"
)
self.market_data = {
"AI_frameworks": {
"competitors": ["LangChain", "CrewAI", "AutoGen", "Haystack", "Semantic Kernel"],
"market_size": "$2.8B",
"growth_rate": "42% YoY",
"key_trends": ["Multi-agent systems", "Production deployment", "Tool integration", "Enterprise adoption"]
},
"enterprise_adoption": {
"rate": "78%",
"top_use_cases": ["Customer support", "Data analysis", "Code generation", "Document processing"],
"challenges": ["Reliability", "Cost control", "Integration complexity", "Governance"]
}
}
async def run(self, query: str) -> str:
"""Simulate market research based on query"""
query_lower = query.lower()
if "competitor" in query_lower or "competition" in query_lower:
data = self.market_data["AI_frameworks"]
return f"""Market Analysis Results:
Key Competitors: {', '.join(data['competitors'])}
Market Size: {data['market_size']}
Growth Rate: {data['growth_rate']}
Key Trends: {', '.join(data['key_trends'])}
Recommendation: Focus on differentiating features like simplified deployment, better debugging tools, and enterprise-grade security."""
elif "adoption" in query_lower or "enterprise" in query_lower:
data = self.market_data["enterprise_adoption"]
return f"""Enterprise Adoption Analysis:
Adoption Rate: {data['rate']}
Top Use Cases: {', '.join(data['top_use_cases'])}
Main Challenges: {', '.join(data['challenges'])}
Recommendation: Address reliability and cost control concerns through better monitoring and resource management features."""
else:
return "Market research available for: competitor analysis, enterprise adoption, or specific trend analysis. Please specify your focus area."
We use MarketResearchTool as a professional extension to custom funds. The tool simulates real-world market intelligence with predefined insights into AI framework trends, key competitors, adoption rates and industry challenges. In this way, we will make informed, data-driven recommendations during workflow execution.
class CodeAnalysisTool(CustomTool):
"""Custom tool for analyzing code patterns and suggesting improvements"""
def __init__(self):
super().__init__(
name="code_analysis",
description="Analyzes code structure and suggests improvements"
)
async def run(self, code_snippet: str) -> str:
"""Analyze code and provide insights"""
analysis = {
"lines": len(code_snippet.split('n')),
"complexity": "High" if len(code_snippet) > 500 else "Medium" if len(code_snippet) > 200 else "Low",
"async_usage": "Yes" if "async" in code_snippet or "await" in code_snippet else "No",
"error_handling": "Present" if "try:" in code_snippet or "except:" in code_snippet else "Missing",
"documentation": "Good" if '"""' in code_snippet or "'''" in code_snippet else "Needs improvement",
"imports": "Present" if "import " in code_snippet else "None detected",
"classes": len([line for line in code_snippet.split('n') if line.strip().startswith('class ')]),
"functions": len([line for line in code_snippet.split('n') if line.strip().startswith('def ') or line.strip().startswith('async def ')])
}
suggestions = []
if analysis["error_handling"] == "Missing":
suggestions.append("Add try-except blocks for error handling")
if analysis["documentation"] == "Needs improvement":
suggestions.append("Add docstrings and comments")
if "print(" in code_snippet:
suggestions.append("Consider using proper logging instead of print statements")
if analysis["async_usage"] == "Yes" and "await" not in code_snippet:
suggestions.append("Ensure proper await usage with async functions")
if analysis["complexity"] == "High":
suggestions.append("Consider breaking down into smaller functions")
return f"""Code Analysis Report:
Structure:
- Lines of code: {analysis['lines']}
- Complexity: {analysis['complexity']}
- Classes: {analysis['classes']}
- Functions: {analysis['functions']}
Quality Metrics:
- Async usage: {analysis['async_usage']}
- Error handling: {analysis['error_handling']}
- Documentation: {analysis['documentation']}
Suggestions:
{chr(10).join(f"• {suggestion}" for suggestion in suggestions) if suggestions else "• Code looks good! Following best practices."}
Overall Score: {10 - len(suggestions) * 2}/10"""
class CustomAgent:
"""Custom agent implementation"""
def __init__(self, name: str, role: str, instructions: str, tools: List[CustomTool], llm=None):
self.name = name
self.role = role
self.instructions = instructions
self.tools = tools
self.llm = llm or MockChatModel()
self.memory = []
async def run(self, task: str) -> Dict[str, Any]:
"""Execute agent task"""
print(f"🤖 {self.name} ({self.role}) processing task...")
self.memory.append({"type": "task", "content": task, "timestamp": datetime.now()})
task_lower = task.lower()
tool_used = None
tool_result = None
for tool in self.tools:
if tool.name == "market_research" and ("market" in task_lower or "competitor" in task_lower):
tool_result = await tool.run(task)
tool_used = tool.name
break
elif tool.name == "code_analysis" and ("code" in task_lower or "analyze" in task_lower):
tool_result = await tool.run(task)
tool_used = tool.name
break
messages = [
{"role": "system", "content": f"You are {self.role}. {self.instructions}"},
{"role": "user", "content": task}
]
if tool_result:
messages.append({"role": "system", "content": f"Tool {tool_used} provided: {tool_result}"})
response = await self.llm.generate(messages)
self.memory.append({"type": "response", "content": response, "timestamp": datetime.now()})
return {
"agent": self.name,
"task": task,
"tool_used": tool_used,
"tool_result": tool_result,
"response": response,
"success": True
}
Now we implement CodeAnalySistool, which allows our agents to evaluate code snippets based on structure, complexity, documentation, and error handling. The tool generates insightful suggestions to improve code quality. We also define the CustomAgent class to provide each agent with its own role, directive, memory, tools and access to LLM. This design allows each agent to decide whether or not a tool is needed to intelligently decide, and then use analytics and LLM inference to synthesize responses, ensuring adaptability and context-aware behavior.
class WorkflowMonitor:
"""Monitor and log workflow events"""
def __init__(self):
self.events = []
self.start_time = datetime.now()
def log_event(self, event_type: str, data: Dict[str, Any]):
"""Log workflow events"""
timestamp = datetime.now()
self.events.append({
"timestamp": timestamp,
"duration": (timestamp - self.start_time).total_seconds(),
"event_type": event_type,
"data": data
})
print(f"[{timestamp.strftime('%H:%M:%S')}] {event_type}: {data.get('agent', 'System')}")
def get_summary(self):
"""Get monitoring summary"""
return {
"total_events": len(self.events),
"total_duration": (datetime.now() - self.start_time).total_seconds(),
"event_types": list(set([e["event_type"] for e in self.events])),
"events": self.events
}
class CustomWorkflow:
"""Custom workflow implementation"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.agents = []
self.monitor = WorkflowMonitor()
def add_agent(self, agent: CustomAgent):
"""Add agent to workflow"""
self.agents.append(agent)
self.monitor.log_event("agent_added", {"agent": agent.name, "role": agent.role})
async def run(self, tasks: List[str]) -> Dict[str, Any]:
"""Execute workflow with tasks"""
self.monitor.log_event("workflow_started", {"tasks": len(tasks)})
results = []
context = {"shared_insights": []}
for i, task in enumerate(tasks):
agent = self.agents[i % len(self.agents)]
if context["shared_insights"]:
enhanced_task = f"{task}nnContext from previous analysis:n" + "n".join(context["shared_insights"][-2:])
else:
enhanced_task = task
result = await agent.run(enhanced_task)
results.append(result)
context["shared_insights"].append(f"{agent.name}: {result['response'][:200]}...")
self.monitor.log_event("task_completed", {
"agent": agent.name,
"task_index": i,
"success": result["success"]
})
self.monitor.log_event("workflow_completed", {"total_tasks": len(tasks)})
return {
"workflow": self.name,
"results": results,
"context": context,
"summary": self._generate_summary(results)
}
def _generate_summary(self, results: List[Dict[str, Any]]) -> str:
"""Generate workflow summary"""
summary_parts = []
for result in results:
summary_parts.append(f"• {result['agent']}: {result['response'][:150]}...")
return f"""Workflow Summary for {self.name}:
{chr(10).join(summary_parts)}
Key Insights:
• Market opportunities identified in AI framework space
• Technical architecture recommendations provided
• Strategic implementation plan outlined
• Multi-agent collaboration demonstrated successfully"""
We implement workflow diagrams to record and track events throughout the execution, giving us real-time visibility into the actions taken by each agent. With the CustomWorkFlow class, we carefully plan the entire multi-agent process, assign tasks, retain shared context across agencies, and capture all relevant insights. This structure ensures that we not only perform tasks in a coordinated and transparent manner, but also produces a comprehensive summary highlighting collaboration and key outcomes.
async def advanced_workflow_demo():
"""Demonstrate advanced multi-agent workflow"""
print("🚀 Advanced Multi-Agent Workflow Demo")
print("=" * 50)
workflow = CustomWorkflow(
name="Advanced Business Intelligence System",
description="Multi-agent system for comprehensive business analysis"
)
market_agent = CustomAgent(
name="MarketAnalyst",
role="Senior Market Research Analyst",
instructions="Analyze market trends, competitor landscape, and business opportunities. Provide data-driven insights with actionable recommendations.",
tools=[MarketResearchTool()],
llm=MockChatModel()
)
tech_agent = CustomAgent(
name="TechArchitect",
role="Technical Architecture Specialist",
instructions="Evaluate technical solutions, code quality, and architectural decisions. Focus on scalability, maintainability, and best practices.",
tools=[CodeAnalysisTool()],
llm=MockChatModel()
)
strategy_agent = CustomAgent(
name="StrategicPlanner",
role="Strategic Business Planner",
instructions="Synthesize market and technical insights into comprehensive strategic recommendations. Focus on ROI, risk assessment, and implementation roadmaps.",
tools=[],
llm=MockChatModel()
)
workflow.add_agent(market_agent)
workflow.add_agent(tech_agent)
workflow.add_agent(strategy_agent)
tasks = [
"Analyze the current AI framework market landscape and identify key opportunities for a new multi-agent framework targeting enterprise users.",
"""Analyze this code architecture pattern and provide technical assessment:
async def multi_agent_workflow():
agents = [ResearchAgent(), AnalysisAgent(), SynthesisAgent()]
context = SharedContext()
for agent in agents:
try:
result = await agent.run(context.get_task())
if result.success:
context.add_insight(result.data)
else:
context.add_error(result.error)
except Exception as e:
logger.error(f"Agent {agent.name} failed: {e}")
return context.synthesize_recommendations()""",
"Based on the market analysis and technical assessment, create a comprehensive strategic plan for launching a competitive AI framework with focus on multi-agent capabilities and enterprise adoption."
]
print("n🔄 Executing Advanced Workflow...")
result = await workflow.run(tasks)
print("n✅ Workflow Completed Successfully!")
print("=" * 50)
print("📊 COMPREHENSIVE ANALYSIS RESULTS")
print("=" * 50)
print(result["summary"])
print("n📈 WORKFLOW MONITORING SUMMARY")
print("=" * 30)
summary = workflow.monitor.get_summary()
print(f"Total Events: {summary['total_events']}")
print(f"Total Duration: {summary['total_duration']:.2f} seconds")
print(f"Event Types: {', '.join(summary['event_types'])}")
return workflow, result
async def simple_tool_demo():
"""Demonstrate individual tool functionality"""
print("n🛠️ Individual Tool Demo")
print("=" * 30)
market_tool = MarketResearchTool()
code_tool = CodeAnalysisTool()
print("Available Tools:")
print(f"• {market_tool.name}: {market_tool.description}")
print(f"• {code_tool.name}: {code_tool.description}")
print("n🔍 Market Research Analysis:")
market_result = await market_tool.run("competitor analysis in AI frameworks")
print(market_result)
print("n🔍 Code Analysis:")
sample_code=""'
import asyncio
from typing import List, Dict
class AgentManager:
"""Manages multiple AI agents"""
def __init__(self):
self.agents = []
self.results = []
async def add_agent(self, agent):
"""Add agent to manager"""
self.agents.append(agent)
async def run_all(self, task: str) -> List[Dict]:
"""Run task on all agents"""
results = []
for agent in self.agents:
try:
result = await agent.execute(task)
results.append(result)
except Exception as e:
print(f"Agent failed: {e}")
results.append({"error": str(e)})
return results
'''
code_result = await code_tool.run(sample_code)
print(code_result)
We demonstrated two powerful workflows. First, in a single tool demonstration, we directly tested the functionality of our MarketResearchTool and CodeanalySistool to ensure they generate relevant insights independently. We then put everything together, and in an advanced workflow demonstration, we deployed three professional agents, market analysts, TechArchitect and Strategictlanner to collaborate on business analytics tasks.
async def main():
"""Main demo function"""
print("🐝 Advanced BeeAI Framework Tutorial")
print("=" * 40)
print("This tutorial demonstrates:")
print("• Multi-agent workflows")
print("• Custom tool development")
print("• Memory management")
print("• Event monitoring")
print("• Production-ready patterns")
if BEEAI_AVAILABLE:
print("• Using real BeeAI Framework")
else:
print("• Using custom implementation (BeeAI not available)")
print("=" * 40)
await simple_tool_demo()
print("n" + "="*50)
await advanced_workflow_demo()
print("n🎉 Tutorial Complete!")
print("nNext Steps:")
print("1. Install BeeAI Framework properly: pip install beeai-framework")
print("2. Configure your preferred LLM (OpenAI, Anthropic, local models)")
print("3. Explore the official BeeAI documentation")
print("4. Build custom agents for your specific use case")
print("5. Deploy to production with proper monitoring")
if __name__ == "__main__":
try:
import nest_asyncio
nest_asyncio.apply()
print("✅ Applied nest_asyncio for Colab compatibility")
except ImportError:
print("⚠️ nest_asyncio not available - may not work in some environments")
asyncio.run(main())
We use the Main() function to combine our tutorial, which links everything we build together, demonstrating tool-level capabilities and a complete multi-agent business intelligence workflow. Whether we are running BEEAI locally or using fallback settings, we can ensure compatibility with environments like Nest_Asyncio. With this structure, we are ready to expand our proxy system, explore deeper use cases, and deploy production-ready AI workflows with confidence.
All in all, we built and executed a powerful multi-agent workflow using the BEEAI framework (or custom equivalent standards), demonstrating its potential in real-world business intelligence applications. We’ve seen how easy it is to create agents with specific roles, attach tools for task enhancement, and monitor execution in a transparent way.
Check Code. All credits for this study are to the researchers on the project. Also, please stay tuned for us twitter,,,,, Youtube and Spotify And don’t forget to join us 100K+ ml reddit And subscribe Our newsletter.
Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, ASIF is committed to harnessing the potential of artificial intelligence to achieve social benefits. His recent effort is to launch Marktechpost, an artificial intelligence media platform that has an in-depth coverage of machine learning and deep learning news that can sound both technically, both through technical voices and be understood by a wide audience. The platform has over 2 million views per month, demonstrating its popularity among its audience.