Coding Guide for Using Agent Communication Protocol (ACP) to Build a Scalable Multi-Agent Communication System

In this tutorial, we implement the Proxy Communication Protocol (ACP) by establishing a flexible, ACP-compliant messaging system in Python, and use Google’s Gemini API for natural language processing. Starting with the installation and configuration of the Google-generativeai library, the tutorial introduces the core summary, message types, performance, and ACPMESSAGE data classes that standardize inter-agent communication. By defining Acpaget and AcpMessageBroker classes, this guide demonstrates how to create, send, route, and process structured messages in multiple autonomous agents. With clear code examples, users learn to implement queries, request operations and broadcast information, while maintaining conversation threads, confirmation and error handling.
import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)
We import basic Python modules, from JSON processing and timing to unique identifier generation and type annotations to support structured ACP implementations. It then retrieves the user’s Gemini API key placeholder and configures the Google-generativeai client to subsequently call the Gemini language model.
class ACPMessageType(Enum):
"""Standard ACP message types"""
REQUEST = "request"
RESPONSE = "response"
INFORM = "inform"
QUERY = "query"
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
ERROR = "error"
ACK = "acknowledge"
ACPMessAgeType enumeration defines the core message categories used in the proxy communication protocol, including requests, responses, information broadcasts, querying and control operations such as subscription management, error signals and acknowledgements. By centralizing these message types, the protocol ensures consistent processing and routing of inter-agent communications across the system.
class ACPPerformative(Enum):
"""ACP speech acts (performatives)"""
TELL = "tell"
ASK = "ask"
REPLY = "reply"
REQUEST_ACTION = "request-action"
AGREE = "agree"
REFUSE = "refuse"
PROPOSE = "propose"
ACCEPT = "accept"
REJECT = "reject"
Acpperformative enumeration captures various voice behaviors that agents can use when interacting under the ACP framework, drawing advanced intents such as making a request, asking questions, asking commands, or negotiating agreements. This clear taxonomy enables agents to interpret and respond to information in a context-appropriate manner, ensuring robust and semantically rich communication.
@dataclass
class ACPMessage:
"""Agent Communication Protocol Message Structure"""
message_id: str
sender: str
receiver: str
performative: str
content: Dict[str, Any]
protocol: str = "ACP-1.0"
conversation_id: str = None
reply_to: str = None
language: str = "english"
encoding: str = "json"
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.conversation_id is None:
self.conversation_id = str(uuid.uuid4())
def to_acp_format(self) -> str:
"""Convert to standard ACP message format"""
acp_msg = {
"message-id": self.message_id,
"sender": self.sender,
"receiver": self.receiver,
"performative": self.performative,
"content": self.content,
"protocol": self.protocol,
"conversation-id": self.conversation_id,
"reply-to": self.reply_to,
"language": self.language,
"encoding": self.encoding,
"timestamp": self.timestamp
}
return json.dumps(acp_msg, indent=2)
@classmethod
def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
"""Parse ACP message from string format"""
data = json.loads(acp_string)
return cls(
message_id=data["message-id"],
sender=data["sender"],
receiver=data["receiver"],
performative=data["performative"],
content=data["content"],
protocol=data.get("protocol", "ACP-1.0"),
conversation_id=data.get("conversation-id"),
reply_to=data.get("reply-to"),
language=data.get("language", "english"),
encoding=data.get("encoding", "json"),
timestamp=data.get("timestamp", time.time())
)
The ACPMESSAGE data class encapsulates all fields required for structured ACP exchange, including identifiers, participants, performances, payloads and metadata, such as protocol versions, languages, and timestamps. Its __post_init__ method automatically fills in the missing timestamps and dialogue_id values, ensuring that each message is tracked uniquely. Utility methods to_acp_format and from_acp_format handle serialized and standardized JSON representations for seamless transmission and parsing.
class ACPAgent:
"""Agent implementing Agent Communication Protocol"""
def __init__(self, agent_id: str, name: str, capabilities: List[str]):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.model = genai.GenerativeModel("gemini-1.5-flash")
self.message_queue: List[ACPMessage] = []
self.subscriptions: Dict[str, List[str]] = {}
self.conversations: Dict[str, List[ACPMessage]] = {}
def create_message(self, receiver: str, performative: str,
content: Dict[str, Any], conversation_id: str = None,
reply_to: str = None) -> ACPMessage:
"""Create a new ACP-compliant message"""
return ACPMessage(
message_id=str(uuid.uuid4()),
sender=self.agent_id,
receiver=receiver,
performative=performative,
content=content,
conversation_id=conversation_id,
reply_to=reply_to
)
def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
"""Send an INFORM message (telling someone a fact)"""
content = {"fact": fact, "data": data}
return self.create_message(receiver, ACPPerformative.TELL.value, content)
def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
"""Send a QUERY message (asking for information)"""
content = {"question": question, "query-type": query_type}
return self.create_message(receiver, ACPPerformative.ASK.value, content)
def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
"""Send a REQUEST message (asking someone to perform an action)"""
content = {"action": action, "parameters": parameters or {}}
return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
"""Send a REPLY message in response to another message"""
content = {"response": response_data, "original-question": original_msg.content}
return self.create_message(
original_msg.sender,
ACPPerformative.REPLY.value,
content,
conversation_id=original_msg.conversation_id,
reply_to=original_msg.message_id
)
def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:
"""Process incoming ACP message and generate appropriate response"""
self.message_queue.append(message)
conv_id = message.conversation_id
if conv_id not in self.conversations:
self.conversations[conv_id] = []
self.conversations[conv_id].append(message)
if message.performative == ACPPerformative.ASK.value:
return self._handle_query(message)
elif message.performative == ACPPerformative.REQUEST_ACTION.value:
return self._handle_request(message)
elif message.performative == ACPPerformative.TELL.value:
return self._handle_inform(message)
return None
def _handle_query(self, message: ACPMessage) -> ACPMessage:
"""Handle incoming query messages"""
question = message.content.get("question", "")
prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
try:
response = self.model.generate_content(prompt)
answer = response.text.strip()
except:
answer = "Unable to process query at this time"
return self.send_reply(message, {"answer": answer, "confidence": 0.8})
def _handle_request(self, message: ACPMessage) -> ACPMessage:
"""Handle incoming action requests"""
action = message.content.get("action", "")
parameters = message.content.get("parameters", {})
if any(capability in action.lower() for capability in self.capabilities):
result = f"Executing {action} with parameters {parameters}"
status = "agreed"
else:
result = f"Cannot perform {action} - not in my capabilities"
status = "refused"
return self.send_reply(message, {"status": status, "result": result})
def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:
"""Handle incoming information messages"""
fact = message.content.get("fact", "")
print(f"[{self.name}] Received information: {fact}")
ack_content = {"status": "received", "fact": fact}
return self.create_message(message.sender, "acknowledge", ack_content,
conversation_id=message.conversation_id)
The Acpaget class encapsulates an autonomous entity that can send, receive and process ACP compatible messages using Gemini’s language model. It manages its own message queues, conversation history and subscriptions, and provides helper methods (send_inform, send_query, send_request, send_reply) to build a correct formatted ACPMESSAGE instance. Incoming messages are routed through Process_message, which is delegated to a special handler for querying, operating requests and information messages.
class ACPMessageBroker:
"""Message broker implementing ACP routing and delivery"""
def __init__(self):
self.agents: Dict[str, ACPAgent] = {}
self.message_log: List[ACPMessage] = []
self.routing_table: Dict[str, str] = {}
def register_agent(self, agent: ACPAgent):
"""Register an agent with the message broker"""
self.agents[agent.agent_id] = agent
self.routing_table[agent.agent_id] = "local"
print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
def route_message(self, message: ACPMessage) -> bool:
"""Route ACP message to appropriate recipient"""
if message.receiver not in self.agents:
print(f"✗ Receiver {message.receiver} not found")
return False
print(f"n📨 ACP MESSAGE ROUTING:")
print(f"From: {message.sender} → To: {message.receiver}")
print(f"Performative: {message.performative}")
print(f"Content: {json.dumps(message.content, indent=2)}")
receiver_agent = self.agents[message.receiver]
response = receiver_agent.process_message(message)
self.message_log.append(message)
if response:
print(f"n📤 GENERATED RESPONSE:")
print(f"From: {response.sender} → To: {response.receiver}")
print(f"Content: {json.dumps(response.content, indent=2)}")
if response.receiver in self.agents:
self.agents[response.receiver].process_message(response)
self.message_log.append(response)
return True
def broadcast_message(self, message: ACPMessage, recipients: List[str]):
"""Broadcast message to multiple recipients"""
for recipient in recipients:
msg_copy = ACPMessage(
message_id=str(uuid.uuid4()),
sender=message.sender,
receiver=recipient,
performative=message.performative,
content=message.content.copy(),
conversation_id=message.conversation_id
)
self.route_message(msg_copy)
ACPMESSAGEBROKR is the central router for ACP messages, maintaining proxy registry and message logs. It provides a way to register a proxy, pass a single message through Route_Message, which handles lookup, record and response chains and sends the same message to multiple recipients using Broadcast_message.
def demonstrate_acp():
"""Comprehensive demonstration of Agent Communication Protocol"""
print("🤖 AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
print("=" * 60)
broker = ACPMessageBroker()
researcher = ACPAgent("agent-001", "Dr. Research", ["analysis", "research", "data-processing"])
assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])
calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])
broker.register_agent(researcher)
broker.register_agent(assistant)
broker.register_agent(calculator)
print(f"n📋 REGISTERED AGENTS:")
for agent_id, agent in broker.agents.items():
print(f" • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
print(f"n🔬 SCENARIO 1: Information Query (ASK performative)")
query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")
broker.route_message(query_msg)
print(f"n🔢 SCENARIO 2: Action Request (REQUEST-ACTION performative)")
calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
broker.route_message(calc_request)
print(f"n📢 SCENARIO 3: Information Sharing (TELL performative)")
info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")
broker.route_message(info_msg)
print(f"n📊 PROTOCOL STATISTICS:")
print(f" • Total messages processed: {len(broker.message_log)}")
print(f" • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")
print(f" • Message types used: {len(set(msg.performative for msg in broker.message_log))}")
print(f"n📋 SAMPLE ACP MESSAGE FORMAT:")
sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")
print(sample_msg.to_acp_format())
The Expivenate_ACP function curates hands-on walkthroughs of the entire ACP framework: it initializes brokers and three different agents (researchers, AI assistants, and Mathbots), registers them, and describes three key interactions, querying information to get information, requesting calculations, and sharing updates. Once each message is routed and the response is processed, it prints the summary statistics about the message flow. It shows a format of ACP messages, providing users with a clear, end-to-end example of how agents communicate under protocols.
def setup_guide():
print("""
🚀 GOOGLE COLAB SETUP GUIDE:
1. Get Gemini API Key:
2. Replace: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"
3. Run: demonstrate_acp()
🔧 ACP PROTOCOL FEATURES:
• Standardized message format with required fields
• Speech act performatives (TELL, ASK, REQUEST-ACTION, etc.)
• Conversation tracking and message threading
• Error handling and acknowledgments
• Message routing and delivery confirmation
📝 EXTEND THE PROTOCOL:
```python
# Create custom agent
my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])
broker.register_agent(my_agent)
# Send custom message
msg = my_agent.send_query("agent-001", "Your question here")
broker.route_message(msg)
```
""")
if __name__ == "__main__":
setup_guide()
demonstrate_acp()
Finally, the Setup_Guide function provides a quick start reference for running an ACP demonstration in Google COLAB, outlining how to get and configure a Gemini API key and call the explore_acp routine. It also summarizes key protocol features such as standardized message formats, performance words, and message routing. It provides a neat code snippet of how to register a custom proxy and send customized messages.
In short, the tutorial implements a multi-institutional system based on ACP, capable of conducting research, computing and collaborative tasks. The example schemes provided illustrate common use cases, information queries, computation requests, and fact sharing, while brokers ensure reliable messaging and logging. Readers are encouraged to extend the framework by adding new proxy capabilities, integrating domain-specific actions, or combining more complex subscription and notification mechanisms.
Download the notebook on Github. All credits for this study are to the researchers on the project. Also, please stay tuned for us twitter And don’t forget to join us 95k+ ml reddit And subscribe Our newsletter.
Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, ASIF is committed to harnessing the potential of artificial intelligence to achieve social benefits. His recent effort is to launch Marktechpost, an artificial intelligence media platform that has an in-depth coverage of machine learning and deep learning news that can sound both technically, both through technical voices and be understood by a wide audience. The platform has over 2 million views per month, demonstrating its popularity among its audience.
