How to design mini-reinforcement learning environment agents with intelligent local feedback, adaptive decision-making, and multi-agent coordination
In this tutorial, we write a mini reinforcement learning setup in which a multi-agent system learns to navigate a grid world through interaction, feedback, and hierarchical decision-making. We built everything from the ground up and combined three agent roles: operational agent, tool agent, and supervisor, so we could observe how simple heuristics, analysis, and supervision combine to produce smarter behavior. Additionally, we observe how agents collaborate, refine strategies, and gradually learn to achieve goals while overcoming obstacles and uncertainties. Check The complete code is here.
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
import time
from collections import defaultdict
class GridWorld:
def __init__(self, size=8):
self.size = size
self.agent_pos = [0, 0]
self.goal_pos = [size-1, size-1]
self.obstacles = self._generate_obstacles()
self.visited = set()
self.step_count = 0
self.max_steps = size * size * 2
def _generate_obstacles(self):
obstacles = set()
n_obstacles = self.size
while len(obstacles)
We set up the entire GridWorld environment and defined how agents, goals, and obstacles exist within it. We build structures for state representation and efficient movement, and prepare the environment so that we can dynamically interact with it. As we run this section, we see the world taking shape and getting ready for agents to explore. Check The complete code is here.
class GridWorld(GridWorld):
def step(self, action):
self.step_count += 1
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
if action not in moves:
return self._get_state(), -1, False, "Invalid action"
delta = moves[action]
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
if not (0 = self.max_steps:
done = True
info = "Max steps reached"
return self._get_state(), reward, done, info
def render(self, agent_thoughts=None):
grid = np.zeros((self.size, self.size, 3))
for pos in self.visited:
grid[pos[0], pos[1]] = [0.7, 0.9, 1.0]
for obs in self.obstacles:
grid[obs[0], obs[1]] = [0.2, 0.2, 0.2]
grid[self.goal_pos[0], self.goal_pos[1]] = [0, 1, 0]
grid[self.agent_pos[0], self.agent_pos[1]] = [1, 0, 0]
plt.figure(figsize=(10, 8))
plt.imshow(grid, interpolation='nearest')
plt.title(f"Step: {self.step_count} | Visited: {len(self.visited)}/{self.size*self.size}")
for i in range(self.size + 1):
plt.axhline(i - 0.5, color="gray", linewidth=0.5)
plt.axvline(i - 0.5, color="gray", linewidth=0.5)
if agent_thoughts:
plt.text(0.5, -1.5, agent_thoughts, ha="center", fontsize=9,
bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.8),
wrap=True, transform=plt.gca().transData)
plt.axis('off')
plt.tight_layout()
plt.show()
We define how each step in the environment works and how the world is visually represented. We calculate rewards, detect collisions, track progress and display everything via a clean grid visualization. As we execute this logic, we observe the agent’s journey in real time and provide clear feedback. Check The complete code is here.
class ActionAgent:
def __init__(self):
self.q_values = defaultdict(lambda: defaultdict(float))
self.epsilon = 0.3
self.learning_rate = 0.1
self.discount = 0.95
def choose_action(self, state):
valid_actions = state['can_move']
if not valid_actions:
return None
pos = state['position']
if np.random.random() 5:
suggestions.append("π Low exploration rate. Consider exploring more.")
if len(history) >= 5:
recent_rewards = [h[2] for h in history[-5:]]
avg_reward = np.mean(recent_rewards)
if avg_reward 0.3:
suggestions.append("β
Good progress! Current strategy working.")
if len(state['can_move'])
We implemented action agents and tool agents to give the system learning capabilities and analysis feedback capabilities. We observe how the operational agent selects operations through a balance of exploration and exploitation, while the tool agent evaluates performance and recommends improvements. Together they create a learning cycle that evolves with experience. Check The complete code is here.
class SupervisorAgent:
def decide(self, state, proposed_action, tool_suggestions):
if not proposed_action:
return None, "No valid actions available"
decision = proposed_action
reasoning = f"Approved action '{proposed_action}'"
for suggestion in tool_suggestions:
if "goal" in suggestion.lower() and "close" in suggestion.lower():
goal_direction = self._get_goal_direction(state)
if goal_direction in state['can_move']:
decision = goal_direction
reasoning = f"Override: Moving '{goal_direction}' toward goal"
break
return decision, reasoning
def _get_goal_direction(self, state):
pos = state['position']
goal = state['goal']
if goal[0] > pos[0]:
return 'down'
elif goal[0] pos[1]:
return 'right'
else:
return 'left'
We introduce Supervisor Agent, which acts as the final decision-maker in the system. We saw how it interprets recommendations, goes beyond risk selection, and ensures actions are aligned with overall goals. When we use this component, we experience a coordinated multi-agent decision-making process. Check The complete code is here.
def train_multi_agent(episodes=5, visualize=True):
env = GridWorld(size=8)
action_agent = ActionAgent()
tool_agent = ToolAgent()
supervisor = SupervisorAgent()
episode_rewards = []
episode_steps = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
history = []
print(f"n{'='*60}")
print(f"EPISODE {episode + 1}/{episodes}")
print(f"{'='*60}")
while not done:
action_result = action_agent.choose_action(state)
if action_result is None:
break
proposed_action, action_reasoning = action_result
suggestions = tool_agent.analyze(state, proposed_action, total_reward, history)
final_action, supervisor_reasoning = supervisor.decide(state, proposed_action, suggestions)
if final_action is None:
break
next_state, reward, done, info = env.step(final_action)
total_reward += reward
action_agent.learn(state, final_action, reward, next_state)
history.append((state, final_action, reward, next_state))
if visualize:
clear_output(wait=True)
thoughts = (f"Action Agent: {action_reasoning}n"
f"Supervisor: {supervisor_reasoning}n"
f"Tool Agent: {', '.join(suggestions[:2]) if suggestions else 'No suggestions'}n"
f"Reward: {reward:.2f} | Total: {total_reward:.2f}")
env.render(thoughts)
time.sleep(0.3)
state = next_state
episode_rewards.append(total_reward)
episode_steps.append(env.step_count)
print(f"nEpisode {episode+1} Complete!")
print(f"Total Reward: {total_reward:.2f}")
print(f"Steps Taken: {env.step_count}")
print(f"Cells Visited: {len(env.visited)}/{env.size**2}")
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(episode_rewards, marker="o")
plt.title('Episode Rewards')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(episode_steps, marker="s", color="orange")
plt.title('Episode Steps')
plt.xlabel('Episode')
plt.ylabel('Steps to Complete')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return action_agent, tool_agent, supervisor
if __name__ == "__main__":
print("π€ Multi-Agent RL System: Grid World Navigation")
print("=" * 60)
print("Components:")
print(" β’ Action Agent: Proposes actions using Q-learning")
print(" β’ Tool Agent: Analyzes performance and suggests improvements")
print(" β’ Supervisor Agent: Makes final decisions")
print("=" * 60)
trained_agents = train_multi_agent(episodes=5, visualize=True)
We run a complete training loop where all agents collaborate in a multi-stage environment. We tracked rewards, observed movement patterns, and visualized learning progress on each trial. As we complete the cycle, we see multi-agent systems improve and become more efficient in a grid world.
In summary, we saw how multi-agent RL systems emerge from clean components, and how each layer contributes to smarter navigation: the action agent learns through Q updates, the tool agent guides improvement, and the supervisor ensures safe, goal-oriented action selection. We appreciate how this simple yet dynamic grid world helps us visualize learning, exploration, and decision-making in real time.
Check The complete code is here. Please feel free to check out our GitHub page for tutorials, code, and notebooks. In addition, welcome to follow us twitter And donβt forget to join our 100k+ ML SubReddit and subscribe our newsletter. wait! Are you using Telegram? Now you can also join us via telegram.
Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of artificial intelligence for the benefit of society. His most recent endeavor is the launch of Marktechpost, an artificial intelligence media platform that stands out for its in-depth coverage of machine learning and deep learning news that is technically sound and easy to understand for a broad audience. The platform has more than 2 million monthly views, which shows that it is very popular among viewers.
π FOLLOW MARKTECHPOST: Add us as your go-to source on Google.