Coding Guidelines for Building Artificial Intelligence Crypto-Agent Systems with Hybrid Encryption, Digital Signatures, and Adaptive Security Intelligence
In this tutorial, we build an AI-driven cryptographic proxy system that combines the benefits of classic cryptography with adaptive intelligence. We designed the agent to perform hybrid encryption using RSA and AES, generate digital signatures, detect anomalies in message patterns, and intelligently recommend key rotations. As we progress, we see these autonomous agents securely establish communication channels, exchange encrypted data, and continuously assess security risks in real time, all in a compact, efficient implementation. Check The complete code is here.
import hashlib, hmac, json, time, secrets, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
@dataclass
class SecurityEvent:
timestamp: float
event_type: str
risk_score: float
details: Dict
We first import all the necessary libraries for cryptography, AI-based analysis and data processing. We also define a SecurityEvent data class to record and analyze all important events in the cryptographic system. Check The complete code is here.
class CryptoAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
self.public_key = self.private_key.public_key()
self.session_keys = {}
self.security_events = []
self.encryption_count = 0
self.key_rotation_threshold = 100
def get_public_key_bytes(self) -> bytes:
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
session_key = secrets.token_bytes(32)
self.session_keys[partner_id] = session_key
partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
encrypted_session_key = partner_public_key.encrypt(
session_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
self.log_security_event("SESSION_ESTABLISHED", 0.1, {"partner": partner_id})
return encrypted_session_key
def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
session_key = self.private_key.decrypt(
encrypted_session_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
self.session_keys[partner_id] = session_key
We define the CryptoAgent class and initialize its keys, session storage, and secure tracking system. We then implement methods for agents to generate and exchange RSA public keys and establish secure hybrid encryption sessions. Check The complete code is here.
def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
if partner_id not in self.session_keys:
raise ValueError(f"No session established with {partner_id}")
self.encryption_count += 1
if self.encryption_count >= self.key_rotation_threshold:
self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"count": self.encryption_count})
iv = secrets.token_bytes(12)
cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
message_data = iv + ciphertext + encryptor.tag
signature = self.sign_data(message_data)
risk_score = self.analyze_encryption_pattern(len(plaintext))
return {
"sender": self.agent_id,
"recipient": partner_id,
"iv": iv.hex(),
"ciphertext": ciphertext.hex(),
"tag": encryptor.tag.hex(),
"signature": signature.hex(),
"timestamp": time.time(),
"risk_score": risk_score
}
def decrypt_message(self, encrypted_msg: Dict) -> str:
sender_id = encrypted_msg["sender"]
if sender_id not in self.session_keys:
raise ValueError(f"No session established with {sender_id}")
iv = bytes.fromhex(encrypted_msg["iv"])
ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
tag = bytes.fromhex(encrypted_msg["tag"])
cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
if encrypted_msg.get("risk_score", 0) > 0.7:
self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
return plaintext.decode()
We implemented encryption and decryption logic to allow agents to communicate securely using AES-GCM. Each message is encrypted, signed, risk-scored, and then securely decrypted by the recipient, maintaining authenticity and confidentiality. Check The complete code is here.
def sign_data(self, data: bytes) -> bytes:
return self.private_key.sign(
data,
padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
def analyze_encryption_pattern(self, message_length: int) -> float:
recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
avg_risk = np.mean([e.risk_score for e in recent_events]) if recent_events else 0.1
risk_score = 0.1
if message_length > 10000:
risk_score += 0.3
if self.encryption_count % 50 == 0 and self.encryption_count > 0:
risk_score += 0.2
risk_score = (risk_score + avg_risk) / 2
self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
return min(risk_score, 1.0)
def log_security_event(self, event_type: str, risk_score: float, details: Dict):
event = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, details=details)
self.security_events.append(event)
def generate_security_report(self) -> Dict:
if not self.security_events:
return {"status": "No events recorded"}
total_events = len(self.security_events)
high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
avg_risk = np.mean([e.risk_score for e in self.security_events])
event_types = {}
for event in self.security_events:
event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
return {
"agent_id": self.agent_id,
"total_events": total_events,
"high_risk_events": len(high_risk_events),
"average_risk_score": round(avg_risk, 3),
"encryption_count": self.encryption_count,
"key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
"event_breakdown": event_types,
"security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
}
We added advanced AI-driven components to analyze cryptographic behavior, detect anomalies, and log risk events. This code snippet provides our agent with adaptive intelligence, the ability to identify unusual patterns, and the ability to generate full security reports. Check The complete code is here.
def demo_crypto_agent_system():
print("π Advanced Cryptographic Agent System Demon")
print("=" * 60)
alice = CryptoAgent("Alice")
bob = CryptoAgent("Bob")
print("n1. Agents Created")
print(f" Alice ID: {alice.agent_id}")
print(f" Bob ID: {bob.agent_id}")
print("n2. Establishing Secure Session (Hybrid Encryption)")
alice_public_key = alice.get_public_key_bytes()
bob_public_key = bob.get_public_key_bytes()
encrypted_session_key = alice.establish_session("Bob", bob_public_key)
bob.receive_session_key("Alice", encrypted_session_key)
print(f" β Session established with {len(encrypted_session_key)} byte encrypted key")
print("n3. Encrypting and Transmitting Messages")
messages = [
"Hello Bob! This is a secure message.",
"The launch codes are: Alpha-7-Charlie-9",
"Meeting at 3 PM tomorrow.",
"This is a very long message " * 100
]
for i, msg in enumerate(messages, 1):
encrypted = alice.encrypt_message("Bob", msg)
print(f"n Message {i}:")
print(f" - Plaintext length: {len(msg)} chars")
print(f" - Ciphertext: {encrypted['ciphertext'][:60]}...")
print(f" - Risk Score: {encrypted['risk_score']:.3f}")
print(f" - Signature: {encrypted['signature'][:40]}...")
decrypted = bob.decrypt_message(encrypted)
print(f" - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
print(f" - Verification: {'β SUCCESS' if decrypted == msg else 'β FAILED'}")
print("n4. AI-Powered Security Analysis")
print("n Alice's Security Report:")
alice_report = alice.generate_security_report()
for k, v in alice_report.items(): print(f" - {k}: {v}")
print("n Bob's Security Report:")
bob_report = bob.generate_security_report()
for k, v in bob_report.items(): print(f" - {k}: {v}")
print("n" + "=" * 60)
print("Demo Complete! Key Features Demonstrated:")
print("β Hybrid encryption (RSA + AES-GCM)")
print("β Digital signatures for authentication")
print("β AI-powered anomaly detection")
print("β Intelligent key rotation recommendations")
print("β Real-time security monitoring")
if __name__ == "__main__":
demo_crypto_agent_system()
We demonstrated a complete encryption workflow where two agents securely exchanged messages, detected anomalies, and reviewed detailed security reports. We conclude the demonstration with an overview of all the smart functions performed by the system.
In summary, we show how artificial intelligence can enhance traditional cryptography by introducing adaptability and context awareness. Not only do we securely encrypt and authenticate messages, we enable our agents to learn from communication behavior and dynamically adjust security measures. Finally, we’ll look at how to combine AI-driven analytics with hybrid encryption to create a new generation of smart, self-monitoring encryption systems.
Check The complete code is here. Please feel free to check out our GitHub page for tutorials, code, and notebooks. In addition, welcome to follow us twitter And donβt forget to join our 100k+ ML SubReddit and subscribe our newsletter. wait! Are you using Telegram? Now you can also join us via telegram.
Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of artificial intelligence for the benefit of society. His most recent endeavor is the launch of Marktechpost, an AI media platform that stands out for its in-depth coverage of machine learning and deep learning news that is technically sound and easy to understand for a broad audience. The platform has more than 2 million monthly views, which shows that it is very popular among viewers.
π FOLLOW MARKTECHPOST: Add us as your go-to source on Google.