AI

Custom AI Agents for the construction of enterprise workflows through monitoring, orchestration and scalability

In this tutorial, we will walk you through the design and implementation of custom proxy frameworks built on Pytorch and key Python tools, from Web Intelligence and Data Science modules to advanced code generators. We will learn how to package core functionality in monitored custom-categorized courses, coordinate multiple agents with customized system prompts, and define end-to-end workflows to automate tasks such as competitive website analytics and data processing pipelines. In the process, we demonstrate real-world examples with retry logic, records, and performance metrics so you can confidently deploy and scale these agents in your organization’s existing infrastructure.

!pip install -q torch transformers datasets pillow requests beautifulsoup4 pandas numpy scikit-learn openai


import os, json, asyncio, threading, time
import torch, pandas as pd, numpy as np
from PIL import Image
import requests
from io import BytesIO, StringIO
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, lru_cache
from typing import Dict, List, Optional, Any, Callable, Union
import logging
from dataclasses import dataclass
import inspect


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


API_TIMEOUT = 15
MAX_RETRIES = 3

We first install and import all core libraries, including Pytorch and Transformers, as well as data processing libraries such as Pandas and Numpy, as well as BeautifulSoup for Web Scraping and Scikit-Learn for Machine Learning. We configured standardized logging settings to capture information and error messages, and define global constants for API timeouts and retry restrictions to ensure that our tools behave predictably in production.

@dataclass
class ToolResult:
   """Standardized tool result structure"""
   success: bool
   data: Any
   error: Optional[str] = None
   execution_time: float = 0.0
   metadata: Dict[str, Any] = None


class CustomTool:
   """Base class for custom tools"""
   def __init__(self, name: str, description: str, func: Callable):
       self.name = name
       self.description = description
       self.func = func
       self.calls = 0
       self.avg_execution_time = 0.0
       self.error_rate = 0.0
      
   def execute(self, *args, **kwargs) -> ToolResult:
       """Execute tool with monitoring"""
       start_time = time.time()
       self.calls += 1
      
       try:
           result = self.func(*args, **kwargs)
           execution_time = time.time() - start_time
          
           self.avg_execution_time = ((self.avg_execution_time * (self.calls - 1)) + execution_time) / self.calls
          
           return ToolResult(
               success=True,
               data=result,
               execution_time=execution_time,
               metadata={'tool_name': self.name, 'call_count': self.calls}
           )
       except Exception as e:
           execution_time = time.time() - start_time
           self.error_rate = (self.error_rate * (self.calls - 1) + 1) / self.calls
          
           logger.error(f"Tool {self.name} failed: {str(e)}")
           return ToolResult(
               success=False,
               data=None,
               error=str(e),
               execution_time=execution_time,
               metadata={'tool_name': self.name, 'call_count': self.calls}
           )

We define a tool to summarize data classes to encapsulate the results of each execution, regardless of its success, how long it took, any data returned, and the error details that failed. Our custom fund class will then wrap a single function using a unified execution method that tracks call counts, measures execution time, calculates the average runtime and records any errors. By standardizing tool results and performance metrics in this way, we ensure consistency and observability across all custom utilities.

class CustomAgent:
   """Custom agent implementation with tool management"""
   def __init__(self, name: str, system_prompt: str = "", max_iterations: int = 5):
       self.name = name
       self.system_prompt = system_prompt
       self.max_iterations = max_iterations
       self.tools = {}
       self.conversation_history = []
       self.performance_metrics = {}
      
   def add_tool(self, tool: CustomTool):
       """Add a tool to the agent"""
       self.tools[tool.name] = tool
      
   def run(self, task: str) -> Dict[str, Any]:
       """Execute a task using available tools"""
       logger.info(f"Agent {self.name} executing task: {task}")
      
       task_lower = task.lower()
       results = []
      
       if any(keyword in task_lower for keyword in ['analyze', 'website', 'url', 'web']):
           if 'advanced_web_intelligence' in self.tools:
               import re
               url_pattern = r'https?://[^s]+'
               urls = re.findall(url_pattern, task)
               if urls:
                   result = self.tools['advanced_web_intelligence'].execute(urls[0])
                   results.append(result)
                  
       elif any(keyword in task_lower for keyword in ['data', 'analyze', 'stats', 'csv']):
           if 'advanced_data_science_toolkit' in self.tools:
               if 'name,age,salary' in task:
                   data_start = task.find('name,age,salary')
                   data_part = task[data_start:]
                   result = self.tools['advanced_data_science_toolkit'].execute(data_part, 'stats')
                   results.append(result)
                  
       elif any(keyword in task_lower for keyword in ['generate', 'code', 'api', 'client']):
           if 'advanced_code_generator' in self.tools:
               result = self.tools['advanced_code_generator'].execute(task)
               results.append(result)
      
       return {
           'agent': self.name,
           'task': task,
           'results': [r.data if r.success else {'error': r.error} for r in results],
           'execution_summary': {
               'tools_used': len(results),
               'success_rate': sum(1 for r in results if r.success) / len(results) if results else 0,
               'total_time': sum(r.execution_time for r in results)
           }
       }

We encapsulate the AI ​​logic in a custom class with a set of tools, system prompts, and execution history, and then route each incoming task to the correct tool based on simple keyword matching. In the Run() method, we record the task, select the appropriate tool (Web intelligence, data analysis or code generation), execute it, and sum the results into standardized responses, including success rate and timing metrics. This design allows us to easily scale the agent by adding new tools and keep our orchestration transparent and measurable.

print("🏗️ Building Advanced Tool Architecture")


def performance_monitor(func):
   """Decorator for monitoring tool performance"""
   @wraps(func)
   def wrapper(*args, **kwargs):
       start_time = time.time()
       try:
           result = func(*args, **kwargs)
           execution_time = time.time() - start_time
           logger.info(f"{func.__name__} executed in {execution_time:.2f}s")
           return result
       except Exception as e:
           logger.error(f"{func.__name__} failed: {str(e)}")
           raise
   return wrapper


@performance_monitor
def advanced_web_intelligence(url: str, analysis_type: str = "comprehensive") -> Dict[str, Any]:
   """
   Advanced web intelligence gathering with multiple analysis modes.
  
   Args:
       url: Target URL for analysis
       analysis_type: Type of analysis (comprehensive, sentiment, technical, seo)
  
   Returns:
       Dict containing structured analysis results
   """
   try:
       response = requests.get(url, timeout=API_TIMEOUT, headers={
           'User-Agent': 'Mozilla/5.0'
       })
      
       from bs4 import BeautifulSoup
       soup = BeautifulSoup(response.content, 'html.parser')
      
       title = soup.find('title').text if soup.find('title') else 'No title'
       meta_desc = soup.find('meta', attrs={'name': 'description'})
       meta_desc = meta_desc.get('content') if meta_desc else 'No description'
      
       if analysis_type == "comprehensive":
           return {
               'title': title,
               'description': meta_desc,
               'word_count': len(soup.get_text().split()),
               'image_count': len(soup.find_all('img')),
               'link_count': len(soup.find_all('a')),
               'headers': [h.text.strip() for h in soup.find_all(['h1', 'h2', 'h3'])[:5]],
               'status_code': response.status_code,
               'content_type': response.headers.get('content-type', 'unknown'),
               'page_size': len(response.content)
           }
       elif analysis_type == "sentiment":
           text = soup.get_text()[:2000] 
           positive_words = ['good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic']
           negative_words = ['bad', 'terrible', 'awful', 'horrible', 'disappointing']
          
           pos_count = sum(text.lower().count(word) for word in positive_words)
           neg_count = sum(text.lower().count(word) for word in negative_words)
          
           return {
               'sentiment_score': pos_count - neg_count,
               'positive_indicators': pos_count,
               'negative_indicators': neg_count,
               'text_sample': text[:200],
               'analysis_type': 'sentiment'
           }
          
   except Exception as e:
       return {'error': f"Analysis failed: {str(e)}"}


@performance_monitor
def advanced_data_science_toolkit(data: str, operation: str) -> Dict[str, Any]:
   """
   Comprehensive data science operations with statistical analysis.
  
   Args:
       data: CSV-like string or JSON data
       operation: Type of analysis (stats, correlation, forecast, clustering)
  
   Returns:
       Dict with analysis results
   """
   try:
       if data.startswith('{') or data.startswith('['):
           parsed_data = json.loads(data)
           df = pd.DataFrame(parsed_data)
       else:
           df = pd.read_csv(StringIO(data))
      
       if operation == "stats":
           numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
          
           result = {
               'shape': df.shape,
               'columns': df.columns.tolist(),
               'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
               'missing_values': df.isnull().sum().to_dict(),
               'numeric_columns': numeric_columns
           }
          
           if len(numeric_columns) > 0:
               result['summary_stats'] = df[numeric_columns].describe().to_dict()
               if len(numeric_columns) > 1:
                   result['correlation_matrix'] = df[numeric_columns].corr().to_dict()
          
           return result
          
       elif operation == "clustering":
           from sklearn.cluster import KMeans
           from sklearn.preprocessing import StandardScaler
          
           numeric_df = df.select_dtypes(include=[np.number])
           if numeric_df.shape[1]  Dict[str, str]:
   """
   Advanced code generation with multiple language support and optimization.
  
   Args:
       task_description: Description of coding task
       language: Target programming language
  
   Returns:
       Dict with generated code and metadata
   """
   templates = {
       'python': {
           'api_client': '''
import requests
import json
import time
from typing import Dict, Any, Optional


class APIClient:
   """Production-ready API client with retry logic and error handling"""
  
   def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 30):
       self.base_url = base_url.rstrip('/')
       self.timeout = timeout
       self.session = requests.Session()
      
       if api_key:
           self.session.headers.update({'Authorization': f'Bearer {api_key}'})
      
       self.session.headers.update({
           'Content-Type': 'application/json',
           'User-Agent': 'CustomAPIClient/1.0'
       })
  
   def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
       """Make HTTP request with retry logic"""
       url = f'{self.base_url}/{endpoint.lstrip("/")}'
      
       for attempt in range(3):
           try:
               response = self.session.request(method, url, timeout=self.timeout, **kwargs)
               response.raise_for_status()
               return response.json() if response.content else {}
           except requests.exceptions.RequestException as e:
               if attempt == 2:  # Last attempt
                   raise
               time.sleep(2 ** attempt)  # Exponential backoff
  
   def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
       return self._make_request('GET', endpoint, params=params)
  
   def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
       return self._make_request('POST', endpoint, json=data)
  
   def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
       return self._make_request('PUT', endpoint, json=data)
  
   def delete(self, endpoint: str) -> Dict[str, Any]:
       return self._make_request('DELETE', endpoint)
''',
           'data_processor': '''
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional
import logging


logger = logging.getLogger(__name__)


class DataProcessor:
   """Advanced data processor with comprehensive cleaning and analysis"""
  
   def __init__(self, data: pd.DataFrame):
       self.original_data = data.copy()
       self.processed_data = data.copy()
       self.processing_log = []
  
   def clean_data(self, strategy: str="auto") -> 'DataProcessor':
       """Clean data with configurable strategies"""
       initial_shape = self.processed_data.shape
      
       # Remove duplicates
       self.processed_data = self.processed_data.drop_duplicates()
      
       # Handle missing values based on strategy
       if strategy == 'auto':
           # For numeric columns, use mean
           numeric_cols = self.processed_data.select_dtypes(include=[np.number]).columns
           self.processed_data[numeric_cols] = self.processed_data[numeric_cols].fillna(
               self.processed_data[numeric_cols].mean()
           )
          
           # For categorical columns, use mode
           categorical_cols = self.processed_data.select_dtypes(include=['object']).columns
           for col in categorical_cols:
               mode_value = self.processed_data[col].mode()
               if len(mode_value) > 0:
                   self.processed_data[col] = self.processed_data[col].fillna(mode_value[0])
      
       final_shape = self.processed_data.shape
       self.processing_log.append(f"Cleaned data: {initial_shape} -> {final_shape}")
       return self
  
   def normalize(self, method: str="minmax", columns: Optional[List[str]] = None) -> 'DataProcessor':
       """Normalize numerical columns"""
       cols = columns or self.processed_data.select_dtypes(include=[np.number]).columns.tolist()
      
       if method == 'minmax':
           # Min-max normalization
           for col in cols:
               col_min, col_max = self.processed_data[col].min(), self.processed_data[col].max()
               if col_max != col_min:
                   self.processed_data[col] = (self.processed_data[col] - col_min) / (col_max - col_min)
       elif method == 'zscore':
           # Z-score normalization
           for col in cols:
               mean_val, std_val = self.processed_data[col].mean(), self.processed_data[col].std()
               if std_val != 0:
                   self.processed_data[col] = (self.processed_data[col] - mean_val) / std_val
      
       self.processing_log.append(f"Normalized columns {cols} using {method}")
       return self
  
   def get_insights(self) -> Dict[str, Any]:
       """Generate comprehensive data insights"""
       insights = {
           'basic_info': {
               'shape': self.processed_data.shape,
               'columns': self.processed_data.columns.tolist(),
               'dtypes': {col: str(dtype) for col, dtype in self.processed_data.dtypes.items()}
           },
           'data_quality': {
               'missing_values': self.processed_data.isnull().sum().to_dict(),
               'duplicate_rows': self.processed_data.duplicated().sum(),
               'memory_usage': self.processed_data.memory_usage(deep=True).to_dict()
           },
           'processing_log': self.processing_log
       }
      
       # Add statistical summary for numeric columns
       numeric_data = self.processed_data.select_dtypes(include=[np.number])
       if len(numeric_data.columns) > 0:
           insights['statistical_summary'] = numeric_data.describe().to_dict()
      
       return insights
'''
       }
   }
  
   task_lower = task_description.lower()
   if any(keyword in task_lower for keyword in ['api', 'client', 'http', 'request']):
       code = templates[language]['api_client']
       description = "Production-ready API client with retry logic and comprehensive error handling"
   elif any(keyword in task_lower for keyword in ['data', 'process', 'clean', 'analyze']):
       code = templates[language]['data_processor']
       description = "Advanced data processor with cleaning, normalization, and insight generation"
   else:
       code = f'''# Generated code template for: {task_description}
# Language: {language}


class CustomSolution:
   """Auto-generated solution template"""
  
   def __init__(self):
       self.initialized = True
  
   def execute(self, *args, **kwargs):
       """Main execution method - implement your logic here"""
       return {{"message": "Implement your custom logic here", "task": "{task_description}"}}


# Usage example:
# solution = CustomSolution()
# result = solution.execute()
'''
       description = f"Custom template for {task_description}"
  
   return {
       'code': code,
       'language': language,
       'description': description,
       'complexity': 'production-ready',
       'estimated_lines': len(code.split('n')),
       'features': ['error_handling', 'logging', 'type_hints', 'documentation']
   }

We wrap each core function in a @performance_monitor decorator so we can log execution times and catch failures, then implement three specialized tools: advanced_web_intelligence for comprehensive or sentiment-driven web scraping, advanced_data_science_toolkit for statistical analysis and clustering on CSV or JSON data, and advanced_code_generator for producing production-ready code templates, ensuring we monitor performance and maintain consistency in all our analytics and code generation utilities.

print("🤖 Setting up Custom Agent Framework")


class AgentOrchestrator:
   """Manages multiple specialized agents with workflow coordination"""
  
   def __init__(self):
       self.agents = {}
       self.workflows = {}
       self.results_cache = {}
       self.performance_metrics = {}
      
   def create_specialist_agent(self, name: str, tools: List[CustomTool], system_prompt: str = None):
       """Create domain-specific agents"""
       agent = CustomAgent(
           name=name,
           system_prompt=system_prompt or f"You are a specialist {name} agent.",
           max_iterations=5
       )
      
       for tool in tools:
           agent.add_tool(tool)
      
       self.agents[name] = agent
       return agent
  
   def execute_workflow(self, workflow_name: str, inputs: Dict) -> Dict:
       """Execute multi-step workflows across agents"""
       if workflow_name not in self.workflows:
           raise ValueError(f"Workflow {workflow_name} not found")
      
       workflow = self.workflows[workflow_name]
       results = {}
       workflow_start = time.time()
      
       for step in workflow['steps']:
           agent_name = step['agent']
           task = step['task'].format(**inputs, **results)
          
           if agent_name in self.agents:
               step_start = time.time()
               result = self.agents[agent_name].run(task)
               step_time = time.time() - step_start
              
               results[step['output_key']] = result
               results[f"{step['output_key']}_time"] = step_time
      
       total_time = time.time() - workflow_start
      
       return {
           'workflow': workflow_name,
           'inputs': inputs,
           'results': results,
           'metadata': {
               'total_execution_time': total_time,
               'steps_completed': len(workflow['steps']),
               'success': True
           }
       }
  
   def get_system_status(self) -> Dict[str, Any]:
       """Get comprehensive system status"""
       return {
           'agents': {name: {'tools': len(agent.tools)} for name, agent in self.agents.items()},
           'workflows': list(self.workflows.keys()),
           'cache_size': len(self.results_cache),
           'total_tools': sum(len(agent.tools) for agent in self.agents.values())
       }


orchestrator = AgentOrchestrator()


web_tool = CustomTool(
   name="advanced_web_intelligence",
   description="Advanced web analysis and intelligence gathering",
   func=advanced_web_intelligence
)


data_tool = CustomTool(
   name="advanced_data_science_toolkit",
   description="Comprehensive data science and statistical analysis",
   func=advanced_data_science_toolkit
)


code_tool = CustomTool(
   name="advanced_code_generator",
   description="Advanced code generation and architecture",
   func=advanced_code_generator
)


web_agent = orchestrator.create_specialist_agent(
   "web_analyst",
   [web_tool],
   "You are a web analysis specialist. Provide comprehensive website analysis and insights."
)


data_agent = orchestrator.create_specialist_agent(
   "data_scientist",
   [data_tool],
   "You are a data science expert. Perform statistical analysis and machine learning tasks."
)


code_agent = orchestrator.create_specialist_agent(
   "code_architect",
   [code_tool],
   "You are a senior software architect. Generate optimized, production-ready code."
)

We initialize an agent manager to manage our AI proxy suite, register each custom implementation generated by Web Intelligence, Data Science and Code, and then rotate three domain-specific agents: Web_Analyst, data_scientist, and code_architect. Each agent seeds with its own tool set and clear system prompts. This setup allows us to coordinate and execute multi-step workflows across areas of expertise within a unified framework.

print("⚡ Defining Advanced Workflows")


orchestrator.workflows['competitive_analysis'] = {
   'steps': [
       {
           'agent': 'web_analyst',
           'task': 'Analyze website {target_url} with comprehensive analysis',
           'output_key': 'website_analysis'
       },
       {
           'agent': 'code_architect',
           'task': 'Generate monitoring code for website analysis automation',
           'output_key': 'monitoring_code'
       }
   ]
}


orchestrator.workflows['data_pipeline'] = {
   'steps': [
       {
           'agent': 'data_scientist',
           'task': 'Analyze the following CSV data with stats operation: {data_input}',
           'output_key': 'data_analysis'
       },
       {
           'agent': 'code_architect',
           'task': 'Generate data processing pipeline code',
           'output_key': 'pipeline_code'
       }
   ]
}

We define two key multi-agent workflows: Competitive_Analysis, which involves our web analysts scratching and analyzing target URLs before passing insights to our code architects to generate monitoring scripts, and Data_pipeline, where our data scientists run statistical analyses on CSV inputs. Our code architect then makes the corresponding ETL pipeline code. These declared sequences of steps allow us to coordinate complex tasks end-to-end with minimal boilerplate.

print("🚀 Running Production Examples")


print("n📊 Advanced Web Intelligence Demo")
try:
   web_result = web_agent.run("Analyze  with comprehensive analysis type")
   print(f"✅ Web Analysis Success: {json.dumps(web_result, indent=2)}")
except Exception as e:
   print(f"❌ Web analysis error: {e}")


print("n🔬 Data Science Pipeline Demo")
sample_data = """name,age,salary,department
Alice,25,50000,Engineering
Bob,30,60000,Engineering 
Carol,35,70000,Marketing
David,28,55000,Engineering
Eve,32,65000,Marketing"""


try:
   data_result = data_agent.run(f"Analyze this data with stats operation: {sample_data}")
   print(f"✅ Data Analysis Success: {json.dumps(data_result, indent=2)}")
except Exception as e:
   print(f"❌ Data analysis error: {e}")


print("n💻 Code Architecture Demo")
try:
   code_result = code_agent.run("Generate an API client for data processing tasks")
   print(f"✅ Code Generation Success: Generated {len(code_result['results'][0]['code'].split())} lines of code")
except Exception as e:
   print(f"❌ Code generation error: {e}")


print("n🔄 Multi-Agent Workflow Demo")
try:
   workflow_inputs = {'target_url': ''}
   workflow_result = orchestrator.execute_workflow('competitive_analysis', workflow_inputs)
   print(f"✅ Workflow Success: Completed in {workflow_result['metadata']['total_execution_time']:.2f}s")
except Exception as e:
   print(f"❌ Workflow error: {e}")

We run a set of production demonstrations to validate each component: First, our Web_analyst performs a complete analysis; next, our data_scientist crunches sample CSV statistics; then our Code_architect generates an API client; finally, we coordinate the end-to-end competitive analysis workflow to capture the success metrics, outputs, and execution time for each step.

print("n📈 System Performance Metrics")


system_status = orchestrator.get_system_status()
print(f"System Status: {json.dumps(system_status, indent=2)}")


print("nTool Performance:")
for agent_name, agent in orchestrator.agents.items():
   print(f"n{agent_name}:")
   for tool_name, tool in agent.tools.items():
       print(f"  - {tool_name}: {tool.calls} calls, {tool.avg_execution_time:.3f}s avg, {tool.error_rate:.1%} error rate")


print("n✅ Advanced Custom Agent Framework Complete!")
print("🚀 Production-ready implementation with full monitoring and error handling!")

We search and print the overall system status of the organizer, list the registered agents, workflows, and cache sizes, and then loop through each agent’s tools to display call counts, average execution time, and error rate. This provides us with a real-time view of performance and reliability before finally confirming that our production-ready proxy framework is complete.

In short, we now have a blueprint for creating a dedicated AI agent that performs complex analysis and generates production quality code and monitors its execution health and resource usage by itself. AgentOrchestrator connects everything together, enabling you to coordinate multi-step workflows and capture granular performance insights across agents. Whether you are automated market research, ETL tasks, or API customer generation, this framework can provide scalability, reliability, and observability for enterprise-level AI deployments.


Check Code. All credits for this study are to the researchers on the project. Also, please stay tuned for us twitter And don’t forget to join us 100K+ ml reddit And subscribe Our newsletter.


Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, ASIF is committed to harnessing the potential of artificial intelligence to achieve social benefits. His recent effort is to launch Marktechpost, an artificial intelligence media platform that has an in-depth coverage of machine learning and deep learning news that can sound both technically, both through technical voices and be understood by a wide audience. The platform has over 2 million views per month, demonstrating its popularity among its audience.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button