AI Agent Architecture Patterns: From Simple to Complex
Choosing the right architecture pattern is crucial for building reliable AI agents. This guide covers the essential patterns, from simple request-response systems to complex multi-agent networks, with trade-offs and use cases for each.
Pattern 1: Simple Request-Response
The most basic agent pattern - perfect for single-shot tasks.
User Request → Agent → LLM → Response
When to Use
- Simple Q&A and information retrieval
- One-off tasks without state
- Prototyping and learning
- Low-complexity workflows
Implementation
class SimpleAgent:
def __init__(self, llm):
self.llm = llm
async def process(self, user_input):
response = await self.llm.generate(user_input)
return response
Pros & Cons
✅ Simple to implement
✅ Fast response time
✅ Easy to debug
❌ No memory between requests
❌ Limited capabilities
❌ No error recovery
Pattern 2: Sequential Workflow
Agents that execute steps in a predefined order.
User Request → Plan → Step 1 → Step 2 → Step N → Response
When to Use
- Multi-step processes (research → summarize → email)
- Data processing pipelines
- Content creation workflows
- Automated reporting
Implementation
class SequentialAgent:
def __init__(self, llm, skills):
self.llm = llm
self.skills = skills
async def process(self, user_input):
# Plan the workflow
plan = await self.llm.create_plan(user_input)
# Execute steps sequentially
results = []
for step in plan.steps:
skill = self.skills[step.skill_name]
result = await skill.execute(step.params)
results.append(result)
# Generate final response
return await self.llm.synthesize(results)
Example: Research Agent
class ResearchAgent(SequentialAgent):
def __init__(self, llm):
super().__init__(llm, {
"search": WebSearchSkill(),
"summarize": SummarySkill(),
"write": WritingSkill(),
"save": FileSkill()
})
async def research_topic(self, topic):
plan = WorkflowPlan([
Step("search", {"query": topic}),
Step("summarize", {"text": "previous_result"}),
Step("write", {"content": "previous_result"}),
Step("save", {"filename": f"{topic}_research.md"})
])
return await self.execute_plan(plan)
Pros & Cons
✅ Predictable execution
✅ Easy to follow logic
✅ Good for structured tasks
❌ Rigid - can’t adapt
❌ Single point of failure
❌ No parallel processing
Pattern 3: State Machine
Agents with defined states and transitions between them.
[Idle] → [Planning] → [Executing] → [Reviewing] → [Complete]
↓ ↓ ↓ ↓
[Error] ← [Error] ← [Error] ← [Error]
When to Use
- Complex workflows with decision points
- Error recovery and retry logic
- Human-in-the-loop processes
- Approval workflows
Implementation
from enum import Enum
from dataclasses import dataclass
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
REVIEWING = "reviewing"
ERROR = "error"
COMPLETE = "complete"
@dataclass
class AgentContext:
current_task: str
results: list
errors: list
retry_count: int
class StateMachineAgent:
def __init__(self, llm, skills):
self.llm = llm
self.skills = skills
self.state = AgentState.IDLE
self.context = AgentContext("", [], [], 0)
async def process(self, user_input):
self.state = AgentState.PLANNING
self.context.current_task = user_input
while self.state != AgentState.COMPLETE:
try:
await self.handle_current_state()
except Exception as e:
await self.handle_error(e)
return self.context.results[-1]
async def handle_current_state(self):
if self.state == AgentState.PLANNING:
plan = await self.llm.create_plan(self.context.current_task)
self.context.plan = plan
self.state = AgentState.EXECUTING
elif self.state == AgentState.EXECUTING:
for step in self.context.plan.steps:
result = await self.skills[step.skill].execute(step.params)
self.context.results.append(result)
self.state = AgentState.REVIEWING
elif self.state == AgentState.REVIEWING:
review = await self.llm.review_results(self.context.results)
if review.needs_revision:
self.state = AgentState.PLANNING
else:
self.state = AgentState.COMPLETE
async def handle_error(self, error):
self.context.errors.append(error)
if self.context.retry_count < 3:
self.context.retry_count += 1
self.state = AgentState.PLANNING
else:
self.state = AgentState.ERROR
raise Exception(f"Agent failed after 3 retries: {error}")
Pros & Cons
✅ Robust error handling
✅ Clear execution flow
✅ Good for complex workflows
❌ More complex to implement
❌ Can get stuck in loops
❌ Harder to debug
Pattern 4. Tool-Calling Agent
Modern pattern using LLM tool calling capabilities.
User Request → LLM (with tools) → Tool Call → Tool Result → LLM → Response
When to Use
- Dynamic workflow selection
- Agent needs to choose tools at runtime
- Complex decision making
- When using modern LLMs with tool support
Implementation
class ToolCallingAgent:
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.messages = []
async def process(self, user_input):
self.messages.append({"role": "user", "content": user_input})
while True:
response = await self.llm.generate_with_tools(
messages=self.messages,
tools=self.tools
)
self.messages.append(response)
if response.finish_reason == "tool_calls":
# Execute tool calls
for tool_call in response.tool_calls:
tool = self.tools[tool_call.function.name]
result = await tool.execute(**tool_call.function.arguments)
self.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
else:
# Final response
return response.content
Tool Definition Example
def get_weather_tool():
return {
"name": "get_weather",
"description": "Get current weather for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"}
},
"required": ["city"]
}
}
async def execute_get_weather(city):
# Actual weather API call
weather_api = WeatherAPI()
return await weather_api.get_current(city)
Pros & Cons
✅ Dynamic tool selection
✅ Natural language reasoning
✅ Flexible workflows
❌ Depends on LLM tool quality
❌ Can be unpredictable
❌ Harder to test
Pattern 5: Multi-Agent Orchestration
Multiple specialized agents working together.
User Request → Orchestrator → Agent 1 → Agent 2 → Agent N → Orchestrator → Response
When to Use
- Complex tasks requiring different expertise
- Parallel processing needs
- Different agents for different domains
- Scalable systems
Implementation
class OrchestratorAgent:
def __init__(self, llm, agents):
self.llm = llm
self.agents = agents
async def process(self, user_input):
# Analyze and delegate
analysis = await self.llm.analyze_request(user_input)
# Create agent tasks
tasks = []
for agent_name in analysis.required_agents:
agent = self.agents[agent_name]
task = asyncio.create_task(agent.process(analysis.subtask))
tasks.append(task)
# Execute in parallel
results = await asyncio.gather(*tasks)
# Synthesize results
return await self.llm.synthesize(results)
class ResearchAgent:
async def process(self, task):
# Specialized research logic
return {"research": "research findings"}
class WritingAgent:
async def process(self, task):
# Specialized writing logic
return {"content": "written content"}
Agent Communication Patterns
1. Hierarchical
Orchestrator → Specialist Agents → Orchestrator
2. Peer-to-Peer
Agent 1 ↔ Agent 2 ↔ Agent 3
3. Pipeline
Agent 1 → Agent 2 → Agent 3 → Agent 4
Pros & Cons
✅ Specialized expertise
✅ Parallel processing
✅ Scalable architecture
❌ Complex coordination
❌ Communication overhead
❌ Harder to debug
Pattern 6: Event-Driven Agent
Agents that react to events and messages.
Event → Message Bus → Agent(s) → Action → New Event
When to Use
- Real-time systems
- Microservices architecture
- Reactive systems
- Event-driven workflows
Implementation
class EventDrivenAgent:
def __init__(self, llm, skills, event_bus):
self.llm = llm
self.skills = skills
self.event_bus = event_bus
self.state = {}
async def start(self):
# Subscribe to events
await self.event_bus.subscribe("user_request", self.handle_request)
await self.event_bus.subscribe("task_complete", self.handle_completion)
await self.event_bus.subscribe("error", self.handle_error)
async def handle_request(self, event):
user_input = event.data["request"]
# Process the request
response = await self.process_request(user_input)
# Publish response event
await self.event_bus.publish("response", {
"request_id": event.id,
"response": response
})
async def handle_completion(self, event):
# Update state based on completion
self.state[event.data["task_id"]] = "complete"
# Trigger next steps if needed
if self.should_continue(event.data):
await self.start_next_task(event.data)
Event Types
@dataclass
class Event:
type: str
data: dict
timestamp: datetime
source: str
# Common event types
USER_REQUEST = "user_request"
TASK_STARTED = "task_started"
TASK_COMPLETE = "task_complete"
ERROR_OCCURRED = "error"
STATE_CHANGED = "state_changed"
Pros & Cons
✅ Real-time responsiveness
✅ Loose coupling
✅ Scalable
❌ Complex event flow
❌ Harder to trace
❌ Event ordering issues
Pattern 7: Hybrid Architecture
Combining multiple patterns for complex systems.
User Request → Orchestrator → Event Bus → Specialized Agents → Tool Calling → Response
When to Use
- Enterprise-grade systems
- Complex real-world applications
- Systems requiring multiple capabilities
- Production workloads
Implementation
class HybridAgent:
def __init__(self):
# Multiple patterns combined
self.orchestrator = OrchestratorAgent()
self.event_bus = EventBus()
self.tool_agents = {
"research": ToolCallingAgent(research_tools),
"writing": ToolCallingAgent(writing_tools)
}
# Connect components
self.orchestrator.connect_to_event_bus(self.event_bus)
for agent in self.tool_agents.values():
agent.connect_to_event_bus(self.event_bus)
async def process(self, user_input):
# Start orchestration
await self.orchestrator.process(user_input)
# Events will flow through the system
# Final response will be published
return await self.wait_for_response()
Choosing the Right Pattern
Decision Matrix
| Requirement | Best Pattern | Why |
|---|---|---|
| Simple Q&A | Request-Response | Minimal complexity |
| Multi-step task | Sequential Workflow | Predictable flow |
| Error recovery | State Machine | Robust handling |
| Dynamic tool use | Tool-Calling | LLM-driven selection |
| Specialized tasks | Multi-Agent | Domain expertise |
| Real-time needs | Event-Driven | Immediate response |
| Complex system | Hybrid | Maximum flexibility |
Evolution Path
Most systems evolve from simple to complex:
1. Start with Request-Response (prototype)
2. Add Sequential Workflow (multi-step)
3. Implement State Machine (robustness)
4. Introduce Tool-Calling (flexibility)
5. Scale to Multi-Agent (specialization)
6. Migrate to Hybrid (production)
Implementation Best Practices
1. Start Simple
# Begin with basic pattern
agent = SimpleAgent(llm)
# Add complexity as needed
if needs_workflow:
agent = SequentialAgent(llm, skills)
if needs_error_handling:
agent = StateMachineAgent(llm, skills)
2. Clear Interfaces
from abc import ABC, abstractmethod
class AgentInterface(ABC):
@abstractmethod
async def process(self, request: Request) -> Response:
pass
class SkillInterface(ABC):
@abstractmethod
async def execute(self, params: dict) -> Result:
pass
3. Observability
class ObservableAgent:
def __init__(self, agent):
self.agent = agent
self.metrics = MetricsCollector()
async def process(self, request):
start_time = time.time()
try:
result = await self.agent.process(request)
self.metrics.record_success(time.time() - start_time)
return result
except Exception as e:
self.metrics.record_error(e)
raise
4. Configuration-Driven
# agent_config.yaml
pattern: "multi_agent"
agents:
research:
type: "tool_calling"
tools: ["web_search", "document_reader"]
writing:
type: "sequential"
skills: ["summarize", "format"]
orchestration:
type: "event_driven"
events: ["task_complete", "error"]
Testing Strategies
Unit Testing
def test_sequential_agent():
mock_llm = MockLLM()
mock_skills = {"search": MockSkill()}
agent = SequentialAgent(mock_llm, mock_skills)
result = agent.process("search for python tutorials")
assert result.success == True
Integration Testing
def test_multi_agent_integration():
orchestrator = OrchestratorAgent(llm, agents)
result = orchestrator.process("research and write about AI")
assert "research" in result
assert "content" in result
Load Testing
async def test_agent_scalability():
agent = ToolCallingAgent(llm, tools)
# Simulate concurrent requests
tasks = [agent.process(f"task {i}") for i in range(100)]
results = await asyncio.gather(*tasks)
assert all(r.success for r in results)
Performance Considerations
1. Caching
class CachedAgent:
def __init__(self, agent, cache):
self.agent = agent
self.cache = cache
async def process(self, request):
cache_key = hash(request)
if cached := await self.cache.get(cache_key):
return cached
result = await self.agent.process(request)
await self.cache.set(cache_key, result)
return result
2. Connection Pooling
class PooledAgent:
def __init__(self, agent_factory, pool_size=10):
self.pool = asyncio.Queue(maxsize=pool_size)
for _ in range(pool_size):
self.pool.put_nowait(agent_factory())
async def process(self, request):
agent = await self.pool.get()
try:
return await agent.process(request)
finally:
self.pool.put_nowait(agent)
3. Lazy Loading
class LazyAgent:
def __init__(self):
self._agent = None
self._skills = {}
@property
def agent(self):
if self._agent is None:
self._agent = self._create_agent()
return self._agent
async def process(self, request):
return await self.agent.process(request)
Key Takeaway: Start simple and evolve complexity as needed. Each pattern solves specific problems - understand your requirements before choosing an architecture. The best architecture is the simplest one that meets your needs.
Next: Compare the major frameworks that implement these patterns.
Related Articles
Deepen your understanding with these curated continuations.
Agents vs Skills vs Rules: Understanding AI System Layers
Learn how agents, skills, and rules work together in AI systems. Each layer has a specific role - understand the separation of concerns for better agent design.
What Are AI Agents? Beyond Chatbots to Autonomous Systems
AI agents are autonomous systems that can perceive, reason, and act. Learn how they differ from chatbots, what makes them powerful, and when to use them.
How to Measure ROI on AI Agent Deployments
A practical framework for measuring AI agent ROI. Learn to establish baselines, track hidden costs, and define failure criteria for successful deployments.