How to design a spaCy advanced multi-agent reasoning system with planning, reflection, memory and knowledge graph functions
In this tutorial, we build an advanced Agentic AI system using Spaceydesigned to allow multiple intelligent agents to reason, collaborate, reflect, and learn from experience. We walk through the process step-by-step, observing how each agent uses planning, memory, communication, and semantic reasoning to handle the task. Finally, we see how the system evolves into a dynamic multi-agent architecture capable of extracting entities, interpreting context, forming chains of reasoning, and building knowledge graphs, while continuously improving through reflection and episodic learning. Check The complete code is here.
!pip install spacy networkx matplotlib -q
import spacy
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import json
import hashlib
from datetime import datetime
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
QUERY = "query"
@dataclass
class Message:
sender: str
receiver: str
msg_type: MessageType
content: Dict[str, Any]
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
priority: int = 1
def get_id(self) -> str:
return hashlib.md5(f"{self.sender}{self.timestamp}".encode()).hexdigest()[:8]
@dataclass
class AgentTask:
task_id: str
task_type: str
data: Any
priority: int = 1
dependencies: List[str] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
@dataclass
class Observation:
state: str
action: str
result: Any
confidence: float
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
class WorkingMemory:
def __init__(self, capacity: int = 10):
self.capacity = capacity
self.items = deque(maxlen=capacity)
self.attention_scores = {}
def add(self, key: str, value: Any, attention: float = 1.0):
self.items.append((key, value))
self.attention_scores[key] = attention
def recall(self, n: int = 5) -> List[Tuple[str, Any]]:
sorted_items = sorted(self.items, key=lambda x: self.attention_scores.get(x[0], 0), reverse=True)
return sorted_items[:n]
def get(self, key: str) -> Optional[Any]:
for k, v in self.items:
if k == key:
return v
return None
class EpisodicMemory:
def __init__(self):
self.episodes = []
self.success_patterns = defaultdict(int)
def store(self, observation: Observation):
self.episodes.append(observation)
if observation.confidence > 0.7:
pattern = f"{observation.state}β{observation.action}"
self.success_patterns[pattern] += 1
def query_similar(self, state: str, top_k: int = 3) -> List[Observation]:
scored = [(obs, self._similarity(state, obs.state)) for obs in self.episodes[-50:]]
scored.sort(key=lambda x: x[1], reverse=True)
return [obs for obs, _ in scored[:top_k]]
def _similarity(self, state1: str, state2: str) -> float:
words1, words2 = set(state1.split()), set(state2.split())
if not words1 or not words2:
return 0.0
return len(words1 & words2) / len(words1 | words2)
We build all the core structures needed for an agent system. We import key libraries, define message and task formats, and build working and episodic memory modules. When we define these foundations, we lay the foundation for reasoning, storage, and communication. Check The complete code is here.
class ReflectionModule:
def __init__(self):
self.performance_log = []
def reflect(self, task_type: str, confidence: float, result: Any) -> Dict[str, Any]:
self.performance_log.append({'task': task_type, 'confidence': confidence, 'timestamp': datetime.now().timestamp()})
recent = [p for p in self.performance_log if p['task'] == task_type][-5:]
avg_conf = sum(p['confidence'] for p in recent) / len(recent) if recent else 0.5
insights = {
'performance_trend': 'improving' if confidence > avg_conf else 'declining',
'avg_confidence': avg_conf,
'recommendation': self._get_recommendation(confidence, avg_conf)
}
return insights
def _get_recommendation(self, current: float, average: float) -> str:
if current List[str]:
similar = self.episodic_memory.query_similar(str(task.data))
if similar and similar[0].confidence > 0.7:
return [similar[0].action]
return self._default_plan(task)
def _default_plan(self, task: AgentTask) -> List[str]:
return ['analyze', 'extract', 'validate']
def send_message(self, receiver: str, msg_type: MessageType, content: Dict):
msg = Message(self.name, receiver, msg_type, content)
self.message_queue.append(msg)
return msg
def receive_message(self, message: Message):
self.message_queue.append(message)
self.collaboration_graph[message.sender] += 1
def process(self, task: AgentTask) -> Dict[str, Any]:
raise NotImplementedError
class CognitiveEntityAgent(AdvancedAgent):
def process(self, task: AgentTask) -> Dict[str, Any]:
doc = self.nlp(task.data)
entities = defaultdict(list)
entity_contexts = []
for ent in doc.ents:
context_start = max(0, ent.start - 5)
context_end = min(len(doc), ent.end + 5)
context = doc[context_start:context_end].text
entities[ent.label_].append(ent.text)
entity_contexts.append({'entity': ent.text, 'type': ent.label_, 'context': context, 'position': (ent.start_char, ent.end_char)})
for ent_type, ents in entities.items():
attention = len(ents) / len(doc.ents) if doc.ents else 0
self.working_memory.add(f"entities_{ent_type}", ents, attention)
confidence = min(len(entities) / 4, 1.0) if entities else 0.3
obs = Observation(state=f"entity_extraction_{len(doc)}tokens", action="extract_with_context", result=len(entity_contexts), confidence=confidence)
self.episodic_memory.store(obs)
reflection = self.reflector.reflect('entity_extraction', confidence, entities)
return {'entities': dict(entities), 'contexts': entity_contexts, 'confidence': confidence, 'reflection': reflection, 'next_actions': ['semantic_analysis', 'knowledge_graph'] if confidence > 0.5 else []}
We built a reflection engine and base agent classes that provide reasoning, planning, and memory capabilities for each agent. We then implement a cognitive entity agent that processes text to extract entities with context and store meaningful observations. When we run this part, we watch the agent learn from experience while dynamically adjusting its policy. Check The complete code is here.
class SemanticReasoningAgent(AdvancedAgent):
def process(self, task: AgentTask) -> Dict[str, Any]:
doc = self.nlp(task.data)
reasoning_chains = []
for sent in doc.sents:
chain = self._extract_reasoning_chain(sent)
if chain:
reasoning_chains.append(chain)
entity_memory = self.working_memory.recall(3)
semantic_clusters = self._cluster_by_semantics(doc)
confidence = min(len(reasoning_chains) / 3, 1.0) if reasoning_chains else 0.4
obs = Observation(state=f"semantic_analysis_{len(list(doc.sents))}sents", action="reason_and_cluster", result=len(reasoning_chains), confidence=confidence)
self.episodic_memory.store(obs)
return {'reasoning_chains': reasoning_chains, 'semantic_clusters': semantic_clusters, 'memory_context': entity_memory, 'confidence': confidence, 'next_actions': ['knowledge_integration']}
def _extract_reasoning_chain(self, sent) -> Optional[Dict]:
subj, verb, obj = None, None, None
for token in sent:
if token.dep_ == 'nsubj':
subj = token
elif token.pos_ == 'VERB':
verb = token
elif token.dep_ in ['dobj', 'attr', 'pobj']:
obj = token
if subj and verb and obj:
return {'subject': subj.text, 'predicate': verb.lemma_, 'object': obj.text, 'confidence': 0.8}
return None
def _cluster_by_semantics(self, doc) -> List[Dict]:
clusters = []
nouns = [token for token in doc if token.pos_ in ['NOUN', 'PROPN']]
visited = set()
for noun in nouns:
if noun.i in visited:
continue
cluster = [noun.text]
visited.add(noun.i)
for other in nouns:
if other.i != noun.i and other.i not in visited:
if noun.similarity(other) > 0.5:
cluster.append(other.text)
visited.add(other.i)
if len(cluster) > 1:
clusters.append({'concepts': cluster, 'size': len(cluster)})
return clusters
We design a semantic reasoning agent that analyzes sentence structure, forms reasoning chains, and groups concepts based on semantic similarity. We integrate working memory to enrich the understanding the agent builds. When we do this, we see how the system moves from surface-level extraction to deeper reasoning. Check The complete code is here.
class KnowledgeGraphAgent(AdvancedAgent):
def process(self, task: AgentTask) -> Dict[str, Any]:
doc = self.nlp(task.data)
graph = {'nodes': set(), 'edges': []}
for sent in doc.sents:
entities = list(sent.ents)
if len(entities) >= 2:
for ent in entities:
graph['nodes'].add((ent.text, ent.label_))
root = sent.root
if root.pos_ == 'VERB':
for i in range(len(entities) - 1):
graph['edges'].append({'from': entities[i].text, 'relation': root.lemma_, 'to': entities[i+1].text, 'sentence': sent.text[:100]})
graph['nodes'] = list(graph['nodes'])
confidence = min(len(graph['edges']) / 5, 1.0) if graph['edges'] else 0.3
obs = Observation(state=f"knowledge_graph_{len(graph['nodes'])}nodes", action="construct_graph", result=len(graph['edges']), confidence=confidence)
self.episodic_memory.store(obs)
return {'graph': graph, 'node_count': len(graph['nodes']), 'edge_count': len(graph['edges']), 'confidence': confidence, 'next_actions': []}
class MetaController:
def __init__(self):
self.nlp = spacy.load('en_core_web_sm')
self.agents = {
'cognitive_entity': CognitiveEntityAgent('CognitiveEntity', 'entity_analysis', self.nlp),
'semantic_reasoning': SemanticReasoningAgent('SemanticReasoner', 'reasoning', self.nlp),
'knowledge_graph': KnowledgeGraphAgent('KnowledgeBuilder', 'graph_construction', self.nlp)
}
self.task_history = []
self.global_memory = WorkingMemory(capacity=20)
def execute_with_planning(self, text: str) -> Dict[str, Any]:
initial_task = AgentTask(task_id="task_001", task_type="cognitive_entity", data=text, metadata={'source': 'user_input'})
results = {}
task_queue = [initial_task]
iterations = 0
max_iterations = 10
while task_queue and iterations str:
report = "=" * 70 + "n"
report += " ADVANCED AGENTIC AI SYSTEM - ANALYSIS REPORTn"
report += "=" * 70 + "nn"
for agent_type, result in results.items():
agent = self.agents[agent_type]
report += f"π€ {agent.name}n"
report += f" Specialty: {agent.specialty}n"
report += f" Confidence: {result['confidence']:.2%}n"
if 'reflection' in result:
report += f" Performance: {result['reflection'].get('performance_trend', 'N/A')}n"
report += " Key Findings:n"
report += json.dumps({k: v for k, v in result.items() if k not in ['reflection', 'next_actions']}, indent=6) + "nn"
report += "π System-Level Insights:n"
report += f" Total iterations: {len(self.task_history)}n"
report += f" Active agents: {len(results)}n"
report += f" Global memory size: {len(self.global_memory.items)}n"
return report
We implemented a knowledge graph agent that enables the system to connect entities through relationships extracted from text. We then build the meta-controller, which coordinates all agents, manages planning, and handles multi-step execution. When we use this component, we observe that the system behaves like a true multi-agent pipeline with dynamic flow control. Check The complete code is here.
if __name__ == "__main__":
sample_text = """
Artificial intelligence researchers at OpenAI and DeepMind are developing
advanced language models. Sam Altman leads OpenAI in San Francisco, while
Demis Hassabis heads DeepMind in London. These organizations collaborate
with universities like MIT and Stanford. Their research focuses on machine
learning, neural networks, and reinforcement learning. The breakthrough
came when transformers revolutionized natural language processing in 2017.
"""
controller = MetaController()
results = controller.execute_with_planning(sample_text)
print(controller.generate_insights(results))
print("Advanced multi-agent analysis complete with reflection and learning!")
We run the entire agent system end-to-end on the example text. We perform planning, call each agent in sequence, and generate comprehensive analytical reports. When we reach this stage, we see the full capabilities of multi-agent architecture for real-time collaboration.
In summary, we developed a comprehensive multi-agent inference framework that uses spaCy to operate on real-world text, integrating planning, learning, and memory into a cohesive workflow. We observe how each agent contributes a unique layer of understanding, and we see the meta-controller orchestrate them to generate rich, interpretable insights. Finally, we recognize the flexibility and scalability of this agent design, and we are confident that we can now adapt it to more complex tasks, larger data sets, and even integrate language models to further enhance the intelligence of the system.
Check The complete code is here. Please feel free to check out our GitHub page for tutorials, code, and notebooks. In addition, welcome to follow us twitter And donβt forget to join our 100k+ ML SubReddit and subscribe our newsletter. wait! Are you using Telegram? Now you can also join us via telegram.
Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of artificial intelligence for the benefit of society. His most recent endeavor is the launch of Marktechpost, an AI media platform that stands out for its in-depth coverage of machine learning and deep learning news that is technically sound and easy to understand for a broad audience. The platform has more than 2 million monthly views, which shows that it is very popular among viewers.
π FOLLOW MARKTECHPOST: Add us as your go-to source on Google.