How to design autonomous multi-agent data and infrastructure policy systems using lightweight Qwen models to achieve efficient pipeline intelligence?
In this tutorial, we use the lightweight Qwen2.5-0.5B-Instruct model to build an agent data and infrastructure policy system for efficient execution. We first created a flexible LLM agent framework and then developed specialized agents to handle different layers of data management, from ingestion and quality analysis to infrastructure optimization. We integrate these agents into a coordinator that coordinates their interactions, ensuring smooth multi-agent collaboration across data pipelines. Through practical examples such as e-commerce and IoT pipelines, we explore how autonomous decision-making can simplify complex data operations. Check The complete code is here.
!pip install -q transformers torch accelerate datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
class LightweightLLMAgent:
   def __init__(self, role: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
       self.role = role
       self.model_name = model_name
       self.device = "cuda" if torch.cuda.is_available() else "cpu"
       print(f"Loading {model_name} for {role} agent on {self.device}...")
       self.tokenizer = AutoTokenizer.from_pretrained(model_name)
       self.model = AutoModelForCausalLM.from_pretrained(
           model_name,
           torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
           device_map="auto"
       )
       self.conversation_history = []
   def generate_response(self, prompt: str, max_tokens: int = 150) -> str:
       messages = [
           {"role": "system", "content": f"You are a {self.role} agent in a data infrastructure system."},
           {"role": "user", "content": prompt}
       ]
       text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
       model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
       with torch.no_grad():
           generated_ids = self.model.generate(
               model_inputs.input_ids,
               max_new_tokens=max_tokens,
               temperature=0.7,
               do_sample=True,
               top_p=0.95
           )
       generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
       response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
       self.conversation_history.append({"prompt": prompt, "response": response})
       return responseWe first set up a lightweight LLM agent infrastructure using the Qwen2.5-0.5B-Instruct model. We load the model and tokenizer, and define a basic agent class capable of handling contextual conversations and generating intelligent responses. This forms the core basis for our professional agents to operate efficiently within Colab. Check The complete code is here.
class DataIngestionAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Data Ingestion Specialist")
   def analyze_data_source(self, source_info: Dict) -> Dict:
       prompt = f"""Analyze this data source and provide ingestion strategy:
Source Type: {source_info.get('type', 'unknown')}
Volume: {source_info.get('volume', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Provide a brief strategy focusing on: 1) Ingestion method, 2) Key considerations."""
       strategy = self.generate_response(prompt, max_tokens=100)
       return {"source": source_info, "strategy": strategy, "timestamp": datetime.now().isoformat()}
class DataQualityAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Data Quality Analyst")
   def assess_data_quality(self, data_sample: Dict) -> Dict:
       prompt = f"""Assess data quality for this sample:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Issues Found: {data_sample.get('issues', 0)}
Provide brief quality assessment and top 2 recommendations."""
       assessment = self.generate_response(prompt, max_tokens=100)
       return {"assessment": assessment, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
   def _calculate_severity(self, data_sample: Dict) -> str:
       completeness = data_sample.get('completeness', 100)
       consistency = data_sample.get('consistency', 100)
       avg_score = (completeness + consistency) / 2
       if avg_score >= 90: return "LOW"
       elif avg_score >= 70: return "MEDIUM"
       else: return "HIGH"We design data ingestion and data quality agents to focus on structured analysis of data pipelines. We let ingest agents determine the best way to flow data, while quality agents evaluate data completeness, consistency, and issues to provide actionable insights. Together they establish the first two layers of autonomous data management. Check The complete code is here.
class InfrastructureOptimizationAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Infrastructure Optimization Specialist")
   def optimize_resources(self, metrics: Dict) -> Dict:
       prompt = f"""Analyze infrastructure metrics and suggest optimizations:
CPU Usage: {metrics.get('cpu_usage', 0)}%
Memory Usage: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Query Latency: {metrics.get('query_latency', 0)}ms
Provide 2 optimization recommendations."""
       recommendations = self.generate_response(prompt, max_tokens=100)
       return {"current_metrics": metrics, "recommendations": recommendations, "priority": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
   def _calculate_priority(self, metrics: Dict) -> str:
       cpu = metrics.get('cpu_usage', 0)
       memory = metrics.get('memory_usage', 0)
       if cpu > 85 or memory > 85: return "CRITICAL"
       elif cpu > 70 or memory > 70: return "HIGH"
       else: return "NORMAL"We develop infrastructure optimization agents to continuously analyze key metrics such as CPU, memory, and storage utilization. We use it to generate intelligent optimization recommendations that help us maintain high performance and resource efficiency. This agent ensures that our infrastructure remains responsive and scalable during data operations. Check The complete code is here.
class AgenticDataOrchestrator:
   def __init__(self):
       print("n" + "="*70)
       print("Initializing Agentic Data Infrastructure System")
       print("="*70 + "n")
       self.ingestion_agent = DataIngestionAgent()
       self.quality_agent = DataQualityAgent()
       self.optimization_agent = InfrastructureOptimizationAgent()
       self.execution_log = []
   def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
       results = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "stages": []}
       print("n[Stage 1] Data Ingestion Analysis")
       ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("source", {}))
       print(f"Strategy: {ingestion_result['strategy'][:150]}...")
       results["stages"].append({"stage": "ingestion", "result": ingestion_result})
       print("n[Stage 2] Data Quality Assessment")
       quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
       print(f"Assessment: {quality_result['assessment'][:150]}...")
       print(f"Severity: {quality_result['severity']}")
       results["stages"].append({"stage": "quality", "result": quality_result})
       print("n[Stage 3] Infrastructure Optimization")
       optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
       print(f"Recommendations: {optimization_result['recommendations'][:150]}...")
       print(f"Priority: {optimization_result['priority']}")
       results["stages"].append({"stage": "optimization", "result": optimization_result})
       results["end_time"] = datetime.now().isoformat()
       results["status"] = "completed"
       self.execution_log.append(results)
       return results
   def generate_summary_report(self) -> pd.DataFrame:
       if not self.execution_log: return pd.DataFrame()
       summary_data = []
       for log in self.execution_log:
           summary_data.append({"Pipeline ID": log["pipeline_id"], "Start Time": log["start_time"], "Status": log["status"], "Stages Completed": len(log["stages"])})
       return pd.DataFrame(summary_data)We built an agent data coordinator to coordinate all professional agents under a unified workflow. We use it to manage end-to-end pipeline execution, sequentially trigger ingestion, QA, and optimization. By doing so, we bring structure, collaboration, and automation to the entire multi-agent system. Check The complete code is here.
def main():
   orchestrator = AgenticDataOrchestrator()
   print("n" + "="*70)
   print("EXAMPLE 1: E-commerce Data Pipeline")
   print("="*70)
   ecommerce_pipeline = {
       "id": "ecommerce_pipeline_001",
       "source": {"type": "REST API", "volume": "10GB/day", "frequency": "real-time"},
       "quality_metrics": {"completeness": 87, "consistency": 92, "issues": 15},
       "infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
   }
   result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
   print("nn" + "="*70)
   print("EXAMPLE 2: IoT Sensor Data Pipeline")
   print("="*70)
   iot_pipeline = {
       "id": "iot_pipeline_002",
       "source": {"type": "Message Queue (Kafka)", "volume": "50GB/day", "frequency": "streaming"},
       "quality_metrics": {"completeness": 95, "consistency": 88, "issues": 8},
       "infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
   }
   result2 = orchestrator.process_data_pipeline(iot_pipeline)
   print("nn" + "="*70)
   print("EXECUTION SUMMARY REPORT")
   print("="*70 + "n")
   summary_df = orchestrator.generate_summary_report()
   print(summary_df.to_string(index=False))
   print("n" + "="*70)
   print("Tutorial Complete!")
   print("="*70)
   print("nKey Concepts Demonstrated:")
   print("✓ Lightweight LLM agent architecture")
   print("✓ Specialized agents for different data tasks")
   print("✓ Multi-agent orchestration")
   print("✓ Infrastructure monitoring and optimization")
   print("✓ Autonomous decision-making in data pipelines")
if __name__ == "__main__":
   main()We demonstrate our complete system through two real-world examples (e-commerce and IoT data pipelines). We observe how each agent performs its duties autonomously while contributing to a common goal. Finally, we generate a summary report confirming the efficiency of orchestration and the power of lightweight agent intelligence.
In summary, we designed and implemented an intelligent, multi-agent data infrastructure framework powered by a compact open source model. We witnessed how independent but cooperative agents autonomously analyze, evaluate, and optimize real-world data systems. The entire setup demonstrates how the lightweight LLM can effectively handle infrastructure intelligence, while also highlighting how agent orchestration can transform traditional data workflows into adaptive, self-optimizing systems that are ready for scalable enterprise applications.
Check The complete code is here. Please feel free to check out our GitHub page for tutorials, code, and notebooks. In addition, welcome to follow us twitter And don’t forget to join our 100k+ ML SubReddit and subscribe our newsletter. wait! Are you using Telegram? Now you can also join us via telegram.
Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of artificial intelligence for the benefit of society. His most recent endeavor is the launch of Marktechpost, an artificial intelligence media platform that stands out for its in-depth coverage of machine learning and deep learning news that is technically sound and easy to understand for a broad audience. The platform has more than 2 million monthly views, which shows that it is very popular among viewers.
🙌 FOLLOW MARKTECHPOST: Add us as your go-to source on Google.
 
																								 
																								