I remember the first time I tried to build a multi-agent system without any architecture. It was 2023, GPT-4 had just dropped, and I thought: “I’ll just chain a few agents together and let them talk to each other.” Three days later, I had a beautiful mess. Agents repeating work. Agents contradicting each other. An agent that kept apologizing to another agent for not understanding what it wanted. It was, as my team put it, “like herding cats — except the cats were writing production code.”
The lesson I took from that experience: AI agents are software components. And software components without architecture become legacy systems very, very quickly.
In Part 1, we introduced the concept of a full AI software team. In Part 2, we mapped out each agent’s role — the Product Owner, Business Analyst, Tech Architect, Tech Lead, Senior Software Engineer, QC Engineer, DevOps Engineer, and Project Manager. Now in Part 3, we need to answer the harder question: how do these agents actually work together as a coherent system?
The answer, I’ll argue, comes from a set of ideas the software engineering world has been refining for 20 years: Domain-Driven Design.
1. Why Architecture Matters for AI Teams
Let me describe two scenarios.
Scenario A — No Architecture: You have 8 agents. Each agent can call any other agent. Each agent has its own notion of what a “requirement” means. When the SSE agent finishes writing code, it broadcasts a message to everyone. The QC agent gets the message but isn’t sure if the code is ready for testing. The PM agent gets the same message and tries to update a status tracker it built internally. The TL agent gets it and starts a code review that conflicts with the QC agent’s test run. The PO agent, inexplicably, also gets notified and starts asking clarifying questions about business logic during active code review.
Scenario B — Designed System: Requirements flow through defined states. Each agent operates within its bounded context and communicates via well-defined domain events. When the SSE agent finishes code, it emits a CodeArtifactReady event. Only the QC agent and TL agent subscribe to this event. The PM agent learns about it through a state change, not a direct message. The PO agent is not in the loop until a human checkpoint gate is reached.
The difference between Scenario A and Scenario B is not about which LLM model you’re using. It’s about architecture. It’s about having a vocabulary for what each part of the system does, and enforcing boundaries so that components don’t step on each other.
This is exactly what Domain-Driven Design gives us.
The Microservices Parallel
If you’ve built microservices, you’ve already faced this problem. When you decompose a monolith into services, you face questions like: what does this service own? How does it communicate? What happens when it fails? Who calls whom?
The answers that emerged from the microservices era — bounded contexts, domain events, the outbox pattern, saga orchestration — are exactly the patterns we need for multi-agent AI systems. The agents are the services. LangGraph is the orchestrator. The TeamState is the shared data model. Domain events are the triggers that move work between agents.
We’re not reinventing anything here. We’re applying proven patterns to a new execution environment.
2. Applying DDD to Multi-Agent Systems
Domain-Driven Design, at its core, is about aligning software structure with business domains. Eric Evans’ 2003 book introduced a vocabulary for doing this. Let me translate that vocabulary into agent terms.
The DDD-to-Agent Vocabulary
Ubiquitous Language in DDD means that developers, domain experts, and stakeholders all use the same terms. In our AI team, this means every agent and every piece of code uses the same vocabulary: a UserStory is a UserStory, not a ticket or a task or a requirement_item. When the BA agent creates user stories, when the SSE agent reads them, and when the QC agent validates against them — they all use the same UserStory data structure.
Aggregates are clusters of domain objects that are treated as a single unit. In our system, the Requirement aggregate contains a UserBrief, ClarifiedRequirements, and a list of UserStory objects. The CodeArtifact aggregate contains SourceFiles, TestResults, and DependencyManifest. Aggregates enforce consistency boundaries — you don’t update individual UserStory objects in isolation; you update them through the Requirement aggregate.
Entities are objects with a distinct identity that persists over time. Each UserStory has a story_id that stays constant even as its content, status, and acceptance criteria evolve. Each CodeArtifact has an artifact_id that lets us track it through review, revision, and deployment.
Value Objects are objects defined entirely by their attributes, with no identity. A Priority (High/Medium/Low), a StoryPoints value, a TestCoverage percentage — these are value objects. Two Priority.HIGH values are identical and interchangeable.
Domain Events are facts about something that happened in the past. They’re immutable records. RequirementsCleared, UserStoriesCreated, CodeReviewApproved, TestsFailed — these are domain events. They’re named in the past tense because they record what has already occurred. In our LangGraph system, domain events are the primary mechanism by which agents trigger each other.
Bounded Contexts are the most powerful concept we’ll use. A bounded context is an explicit boundary within which a particular model applies. Within the boundary, terms have specific, unambiguous meanings. Across boundaries, you need translation.
Bounded Contexts and Agent Clusters
Each of our bounded contexts maps to a cluster of agents that share vocabulary and concern:
- The Requirement Context owns the language of business needs: briefs, stories, acceptance criteria, prioritization.
- The Design Context owns the language of technical architecture: components, interfaces, data models, tech stack decisions.
- The Implementation Context owns the language of code: functions, classes, modules, test coverage, linting results.
- The Quality Context owns the language of validation: test cases, test suites, pass/fail results, coverage thresholds.
- The Deployment Context owns the language of delivery: pipelines, environments, configuration, rollout strategy.
The Coordination Context is special — it doesn’t have its own domain language but operates across all others. The PM agent lives here, tracking state, identifying blockers, and escalating issues across context boundaries.
Anti-Corruption Layers
In DDD, when two bounded contexts need to communicate, you place an anti-corruption layer between them. This prevents one context’s model from polluting another’s. In agent terms, this is about translation.
When the Requirement Context hands off to the Design Context, the BA agent’s UserStory object gets translated into something the TA agent can work with: a list of FunctionalRequirement objects with specific technical implications. The Design Context doesn’t know or care about the original UserStory format — it works with FunctionalRequirement objects.
In code, this translation happens in a dedicated function that maps between domain models. We’ll see this in later parts when we implement the actual agent handoffs.
3. Our 5 Bounded Contexts
Here is the full map of our bounded contexts and how they relate:
Let me walk through each context in depth.
Requirement Context: Where It All Begins
The PO agent receives the raw brief from the human stakeholder. Its job is to interpret intent, ask clarifying questions if needed, and produce a ClarifiedRequirement object. This is the aggregate root for this context. Everything in the Requirement Context flows through it.
The BA agent consumes the ClarifiedRequirement and produces UserStory objects, each with acceptance criteria, priority, and estimated complexity. When the BA agent is done, it emits the UserStoriesCreated domain event.
The Requirement Context owns these terms: UserBrief, ClarifiedRequirement, UserStory, AcceptanceCriteria, Priority, StoryPoints, BusinessRule. None of these terms leak into the Design Context without going through an anti-corruption layer.
Design Context: Translating Business to Technical
The anti-corruption layer at the Requirement→Design boundary translates UserStory objects into FunctionalRequirement objects. This translation is intentional: the Design Context doesn’t care about business storytelling language; it cares about technical requirements.
The TA agent takes FunctionalRequirement objects and produces the TechnicalSpec aggregate: component diagrams, data models, API contracts, infrastructure requirements, and the ArchitectureDecision record (capturing the “why” behind tech choices — important for future reference).
The TL agent reviews the TechnicalSpec, may request revisions, and ultimately approves it. This triggers the DesignApproved domain event.
Implementation Context: Where Code Happens
The anti-corruption layer at Design→Implementation translates the TechnicalSpec into an ImplementationPlan — a prioritized list of ImplementationTask objects, each with a clear definition of done and technical notes.
The SSE agent executes tasks one at a time, producing CodeArtifact objects. Each artifact includes source files, test files, and a LintReport. The SSE agent runs tests internally — if tests fail, it iterates. After three failures, it escalates to the TL agent or PM agent rather than spinning forever.
When code is complete and self-tests pass, the SSE agent emits CodeArtifactReady.
Quality Context: Independent Validation
The QC agent doesn’t trust the SSE agent’s self-reported test results. It independently defines test cases (from the original user stories, via the anti-corruption layer) and validates the code artifacts against them. This separation is important: the QC agent is the adversarial voice in the system.
If quality gates pass, the QualityGatePassed event fires. If they fail, the TestsFailed event goes back to the Implementation Context.
Deployment Context: Delivery Pipeline
The DevOps agent receives QualityGatePassed and produces the CI/CD configuration, environment setup, and deployment runbook. It doesn’t create deployments autonomously — it prepares everything and then waits at a human checkpoint before triggering the actual deployment.
4. The Requirement Lifecycle State Machine
The most important piece of shared state in our system is the lifecycle of a requirement. Every requirement starts as a raw brief and ends as a deployed feature. Between those two points, there are nine distinct states and a set of transitions, guards, and possible rollbacks.
Here is the state machine encoded as Python TypedDicts with full type annotations:
from typing import Literal, Optional
from typing_extensions import TypedDict
from enum import Enum
from datetime import datetime
class RequirementState(str, Enum):
"""The nine states in the requirement lifecycle."""
RECEIVED = "received"
CLARIFIED = "clarified"
USER_STORIES_CREATED = "user_stories_created"
DESIGN_READY = "design_ready"
IN_DEVELOPMENT = "in_development"
IN_REVIEW = "in_review"
TEST_PENDING = "test_pending"
READY_TO_DEPLOY = "ready_to_deploy"
DEPLOYED = "deployed"
# Error states
BLOCKED = "blocked"
ESCALATED = "escalated"
class StateTransition(TypedDict):
"""Records a state transition with guard conditions."""
from_state: RequirementState
to_state: RequirementState
triggered_by: str # agent name
event_name: str # domain event that triggered it
guard: Optional[str] # condition that must be True
timestamp: str
metadata: dict
# Transition table — all valid transitions
VALID_TRANSITIONS: dict[RequirementState, list[RequirementState]] = {
RequirementState.RECEIVED: [
RequirementState.CLARIFIED,
RequirementState.BLOCKED, # if brief is too vague
],
RequirementState.CLARIFIED: [
RequirementState.USER_STORIES_CREATED,
RequirementState.RECEIVED, # rollback: needs more clarification
],
RequirementState.USER_STORIES_CREATED: [
RequirementState.DESIGN_READY, # after human approval
RequirementState.CLARIFIED, # if stories are rejected
],
RequirementState.DESIGN_READY: [
RequirementState.IN_DEVELOPMENT,
RequirementState.USER_STORIES_CREATED, # if design reveals gaps
],
RequirementState.IN_DEVELOPMENT: [
RequirementState.IN_REVIEW,
RequirementState.IN_DEVELOPMENT, # retry loop (max 3)
RequirementState.ESCALATED, # after 3 failures
],
RequirementState.IN_REVIEW: [
RequirementState.TEST_PENDING, # review passed
RequirementState.IN_DEVELOPMENT, # review failed
],
RequirementState.TEST_PENDING: [
RequirementState.READY_TO_DEPLOY, # all gates pass
RequirementState.IN_DEVELOPMENT, # tests failed, rework
],
RequirementState.READY_TO_DEPLOY: [
RequirementState.DEPLOYED, # after human approval
RequirementState.TEST_PENDING, # human rejects, retest
],
RequirementState.DEPLOYED: [], # terminal state
}
def validate_transition(
current: RequirementState,
target: RequirementState,
context: dict,
) -> tuple[bool, str]:
"""
Guard function: returns (is_valid, reason).
Called before every state transition.
"""
allowed = VALID_TRANSITIONS.get(current, [])
if target not in allowed:
return False, f"Transition {current} → {target} is not in transition table"
# Additional guards per transition
if current == RequirementState.USER_STORIES_CREATED and target == RequirementState.DESIGN_READY:
if not context.get("human_approved_stories"):
return False, "Human approval required before design phase"
if current == RequirementState.IN_DEVELOPMENT and target == RequirementState.IN_REVIEW:
if not context.get("self_tests_passing"):
return False, "SSE must pass self-tests before submitting for review"
if current == RequirementState.READY_TO_DEPLOY and target == RequirementState.DEPLOYED:
if not context.get("human_approved_deployment"):
return False, "Human approval required before production deployment"
return True, "ok"
The key insight here is that every transition is explicit and guarded. An agent cannot move a requirement from DESIGN_READY to DEPLOYED in one jump. It must go through every intermediate state. This prevents the system from skipping steps under time pressure, which is a real failure mode when LLM agents are optimizing for task completion.
5. Agent Communication Architecture
With our bounded contexts defined and our state machine clear, we need to design how agents actually exchange information. There are three fundamental communication patterns, borrowed directly from microservices architecture:
Now let’s define these patterns in code:
from dataclasses import dataclass, field
from typing import Any, Optional, Literal
from datetime import datetime, timezone
import uuid
@dataclass
class AgentMessage:
"""
The universal message envelope used for all agent communication.
Whether it's a Command, Event, or Query — it uses this structure.
The 'message_type' field determines how it's routed.
"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
message_type: Literal["command", "event", "query", "query_response"] = "event"
event_name: str = "" # e.g., "UserStoriesCreated", "CreateUserStories"
source_agent: str = "" # who sent this
target_agent: Optional[str] = None # None = broadcast (events only)
correlation_id: Optional[str] = None # links query to its response
payload: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
schema_version: str = "1.0"
def to_dict(self) -> dict:
return {
"message_id": self.message_id,
"message_type": self.message_type,
"event_name": self.event_name,
"source_agent": self.source_agent,
"target_agent": self.target_agent,
"correlation_id": self.correlation_id,
"payload": self.payload,
"metadata": self.metadata,
"timestamp": self.timestamp,
"schema_version": self.schema_version,
}
class DomainEvent(AgentMessage):
"""
Base class for all domain events.
Domain events record facts that have occurred.
They are named in the past tense.
They are immutable once created.
"""
def __init__(
self,
event_name: str,
source_agent: str,
payload: dict,
metadata: dict | None = None,
):
super().__init__(
message_type="event",
event_name=event_name,
source_agent=source_agent,
target_agent=None, # events are always broadcast
payload=payload,
metadata=metadata or {},
)
def __setattr__(self, name: str, value: Any) -> None:
"""Events are immutable after creation."""
if hasattr(self, "message_id"): # already initialized
raise AttributeError("DomainEvent is immutable")
super().__setattr__(name, value)
# Concrete domain events
class UserStoriesCreated(DomainEvent):
def __init__(self, source_agent: str, stories: list[dict], requirement_id: str):
super().__init__(
event_name="UserStoriesCreated",
source_agent=source_agent,
payload={
"requirement_id": requirement_id,
"stories": stories,
"story_count": len(stories),
},
)
class CodeArtifactReady(DomainEvent):
def __init__(
self,
source_agent: str,
artifact_id: str,
story_ids: list[str],
test_coverage: float,
):
super().__init__(
event_name="CodeArtifactReady",
source_agent=source_agent,
payload={
"artifact_id": artifact_id,
"story_ids": story_ids,
"test_coverage": test_coverage,
"self_tests_passing": True,
},
)
class QualityGatePassed(DomainEvent):
def __init__(
self,
source_agent: str,
artifact_id: str,
quality_score: float,
test_results: dict,
):
super().__init__(
event_name="QualityGatePassed",
source_agent=source_agent,
payload={
"artifact_id": artifact_id,
"quality_score": quality_score,
"test_results": test_results,
},
)
class TestsFailed(DomainEvent):
def __init__(
self,
source_agent: str,
artifact_id: str,
failure_summary: str,
failed_test_ids: list[str],
):
super().__init__(
event_name="TestsFailed",
source_agent=source_agent,
payload={
"artifact_id": artifact_id,
"failure_summary": failure_summary,
"failed_test_ids": failed_test_ids,
},
)
The Outbox Pattern for Reliable Event Delivery
One of the real-world problems I encountered when building agents with LangGraph: what happens if an agent produces a domain event but the graph execution is interrupted before the event is processed? You end up with phantom work — state that shows the SSE agent completed code, but the QC agent never received the trigger.
The solution is borrowed from distributed systems: the Outbox Pattern. Instead of emitting events directly, agents write events to an “outbox” in the shared state. A separate component — the event dispatcher — reads from the outbox and delivers events to subscribers, then marks them as delivered.
from typing_extensions import TypedDict
from typing import Annotated
import operator
class OutboxEntry(TypedDict):
event: dict # serialized DomainEvent
delivered: bool
delivery_attempts: int
created_at: str
# In TeamState:
class EventOutbox(TypedDict):
pending: Annotated[list[OutboxEntry], operator.add]
delivered: list[OutboxEntry]
def dispatch_events(state: "TeamState") -> "TeamState":
"""
Called at the start of each graph step.
Delivers pending outbox events to the appropriate handlers.
This ensures at-least-once delivery semantics.
"""
pending = state["event_outbox"]["pending"]
still_pending = []
for entry in pending:
if not entry["delivered"]:
event = entry["event"]
# Route to subscriber nodes
_route_event_to_subscribers(event, state)
entry["delivered"] = True
else:
still_pending.append(entry)
return {
**state,
"event_outbox": {
"pending": still_pending,
"delivered": state["event_outbox"]["delivered"] + [
e for e in pending if e["delivered"]
],
},
}
This pattern ensures that no domain event gets dropped, even if the graph execution is interrupted and restarted from a checkpoint.
6. The TeamState: Our Central Shared Object
Everything in the system converges on one data structure: TeamState. This is the LangGraph state that gets passed through every node, updated by every agent, and persisted between checkpoints. Designing it well is perhaps the most important architectural decision in the whole system.
Here is the complete definition:
from typing import Literal, Optional, Annotated
from typing_extensions import TypedDict
import operator
# ============================================================
# Value Objects
# ============================================================
class Priority(TypedDict):
level: Literal["critical", "high", "medium", "low"]
rationale: str
class QualityGates(TypedDict):
min_test_coverage: float # e.g., 0.80
max_cyclomatic_complexity: int # e.g., 10
lint_must_pass: bool
type_check_must_pass: bool
security_scan_must_pass: bool
class DeploymentStatus(TypedDict):
status: Literal["pending", "in_progress", "deployed", "failed", "rolled_back"]
environment: Literal["dev", "staging", "production"]
deployed_at: Optional[str]
deployed_by: str # agent or human identifier
deployment_url: Optional[str]
version: Optional[str]
# ============================================================
# Entities
# ============================================================
class UserStory(TypedDict):
story_id: str
title: str
as_a: str # "As a [role]"
i_want: str # "I want [feature]"
so_that: str # "So that [benefit]"
acceptance_criteria: list[str]
priority: Priority
story_points: int
status: Literal[
"draft", "approved", "in_progress", "done", "rejected"
]
created_by: str # agent_id
class ImplementationTask(TypedDict):
task_id: str
story_id: str # links back to user story
title: str
description: str
file_path: str # expected output file
dependencies: list[str] # task_ids that must complete first
estimated_minutes: int
status: Literal["pending", "in_progress", "done", "failed"]
assigned_to: str
class CodeArtifact(TypedDict):
artifact_id: str
task_id: str
language: str
file_path: str
content: str # actual source code
test_file_path: Optional[str]
test_content: Optional[str]
lint_passed: bool
type_check_passed: bool
created_at: str
commit_hash: Optional[str]
class TestCase(TypedDict):
test_id: str
story_id: str
title: str
test_type: Literal["unit", "integration", "e2e", "performance"]
preconditions: list[str]
steps: list[str]
expected_result: str
status: Literal["pending", "passed", "failed", "skipped"]
failure_reason: Optional[str]
class Blocker(TypedDict):
blocker_id: str
description: str
blocked_agent: str
raised_by: str
raised_at: str
resolved: bool
resolution: Optional[str]
class ArchitectureDecision(TypedDict):
adr_id: str # Architecture Decision Record
title: str
status: Literal["proposed", "accepted", "deprecated", "superseded"]
context: str
decision: str
consequences: list[str]
alternatives_considered: list[str]
decided_by: str
decided_at: str
class TechnicalSpec(TypedDict):
spec_id: str
title: str
components: list[dict] # component name, responsibility, tech
data_models: list[dict] # entity definitions
api_contracts: list[dict] # endpoint definitions
infrastructure: dict # cloud resources, config
tech_stack: dict # language, frameworks, databases
approved_by: Optional[str]
approved_at: Optional[str]
class CICDConfig(TypedDict):
config_id: str
pipeline_tool: str # "github_actions", "gitlab_ci", etc.
pipeline_yaml: str # actual CI/CD config content
environments: list[str]
test_commands: list[str]
deploy_commands: list[str]
rollback_procedure: str
generated_by: str
generated_at: str
class TestResults(TypedDict):
run_id: str
total_tests: int
passed: int
failed: int
skipped: int
coverage_percent: float
duration_seconds: float
failed_test_ids: list[str]
coverage_report: dict
run_at: str
class RequirementDoc(TypedDict):
doc_id: str
original_brief: str
clarified_objectives: list[str]
out_of_scope: list[str]
assumptions: list[str]
constraints: list[str]
success_metrics: list[str]
stakeholders: list[str]
clarified_by: str
clarified_at: str
# ============================================================
# The Complete TeamState
# ============================================================
class TeamState(TypedDict):
# ── Metadata ──────────────────────────────────────────────
task_id: str
created_at: str
updated_at: str
phase: Literal[
"requirement", "design", "implementation", "quality", "deployment", "done"
]
requirement_state: str # RequirementState enum value
schema_version: str # for migration support
# ── Requirement Domain ─────────────────────────────────────
raw_brief: str
clarified_requirements: Optional[RequirementDoc]
user_stories: list[UserStory]
stories_human_approved: bool
rejection_reason: Optional[str]
# ── Design Domain ─────────────────────────────────────────
technical_spec: Optional[TechnicalSpec]
architecture_decisions: list[ArchitectureDecision]
design_review_notes: list[str]
design_approved_by: Optional[str]
# ── Implementation Domain ──────────────────────────────────
implementation_plan: list[ImplementationTask]
code_artifacts: list[CodeArtifact]
implementation_attempts: int # tracks retry count (max 3)
current_task_id: Optional[str]
self_test_results: Optional[TestResults]
# ── Quality Domain ─────────────────────────────────────────
test_cases: list[TestCase]
quality_gates: QualityGates
quality_test_results: Optional[TestResults]
quality_score: Optional[float]
quality_gate_passed: bool
qc_notes: list[str]
# ── Deployment Domain ─────────────────────────────────────
cicd_config: Optional[CICDConfig]
deployment_status: DeploymentStatus
deployment_human_approved: bool
deployment_notes: list[str]
# ── Coordination ──────────────────────────────────────────
# Annotated[list, operator.add] = append-only (LangGraph reducer)
agent_messages: Annotated[list[AgentMessage], operator.add]
event_outbox: EventOutbox
current_agent: str
blockers: list[Blocker]
human_feedback: Optional[str]
human_checkpoint_pending: bool
checkpoint_context: Optional[dict] # what the human needs to review
# ── Audit trail ───────────────────────────────────────────
state_transitions: Annotated[list[StateTransition], operator.add]
error_log: Annotated[list[dict], operator.add]
A few design decisions worth calling out:
Why Annotated[list, operator.add] for some fields? LangGraph’s state management requires you to specify how to merge concurrent state updates. For agent_messages, state_transitions, and error_log, we want to accumulate entries — so we use the operator.add reducer. For fields like technical_spec or current_agent, we want the latest value to win, so we don’t annotate them.
Why explicit schema_version? As the system evolves, the TeamState schema will change. Having a schema_version field lets us write migration functions that upgrade old states to new schemas, which is essential for long-running workflows.
Why implementation_attempts as an integer? This is the key guard against infinite retry loops. Every time the SSE agent’s implementation fails tests, this counter increments. At 3, the PM agent triggers a human checkpoint instead of allowing another retry.
7. Human-in-the-Loop Checkpoints
The most dangerous thing about an autonomous AI system is that it can be confidently wrong. A well-designed human-in-the-loop system doesn’t try to remove humans from the process — it places them strategically at points where their judgment adds the most value.
In our system, there are two mandatory human checkpoints:
-
After UserStoriesCreated — The PO agent and BA agent have done their work. Before any design or code is written, a human approves the scope. This prevents wasted effort from misunderstood requirements.
-
Before Deployment — The DevOps agent has prepared the CI/CD configuration and the QC agent has signed off. Before anything is deployed to production, a human gives the final go-ahead.
Here is how we implement these checkpoints in LangGraph:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import interrupt
def stories_approval_checkpoint(state: TeamState) -> TeamState:
"""
Human checkpoint: review user stories before design begins.
LangGraph will pause here and wait for external input.
"""
human_feedback = interrupt({
"checkpoint_type": "stories_approval",
"message": "Please review the user stories and approve or reject.",
"user_stories": state["user_stories"],
"clarified_requirements": state["clarified_requirements"],
"instructions": "Set approved=True to continue, or provide rejection_reason.",
})
if human_feedback.get("approved"):
return {
**state,
"stories_human_approved": True,
"human_checkpoint_pending": False,
"human_feedback": None,
}
else:
return {
**state,
"stories_human_approved": False,
"human_checkpoint_pending": False,
"rejection_reason": human_feedback.get("reason", "No reason provided"),
"requirement_state": RequirementState.CLARIFIED, # rollback
}
def deployment_approval_checkpoint(state: TeamState) -> TeamState:
"""
Human checkpoint: final approval before production deployment.
"""
human_feedback = interrupt({
"checkpoint_type": "deployment_approval",
"message": "Ready to deploy. Please review and approve.",
"cicd_config": state["cicd_config"],
"quality_score": state["quality_score"],
"test_results": state["quality_test_results"],
"deployment_target": state["deployment_status"]["environment"],
})
if human_feedback.get("approved"):
return {
**state,
"deployment_human_approved": True,
"human_checkpoint_pending": False,
}
else:
return {
**state,
"deployment_human_approved": False,
"deployment_notes": state["deployment_notes"] + [
f"Deployment rejected by human: {human_feedback.get('reason')}"
],
}
def route_after_stories(state: TeamState) -> str:
"""Conditional edge: route based on human approval of stories."""
if state.get("human_checkpoint_pending"):
return "stories_approval_checkpoint"
if state.get("stories_human_approved"):
return "ta_agent" # proceed to design
return "po_agent" # back to clarification
def route_after_quality(state: TeamState) -> str:
"""Conditional edge: route after quality gates."""
if not state.get("quality_gate_passed"):
return "sse_agent" # rework
if state["implementation_attempts"] >= 3:
return "pm_escalation" # human escalation
return "devops_agent" # proceed to deployment prep
# Build the graph
builder = StateGraph(TeamState)
# Add agent nodes
builder.add_node("po_agent", run_po_agent)
builder.add_node("ba_agent", run_ba_agent)
builder.add_node("stories_approval_checkpoint", stories_approval_checkpoint)
builder.add_node("ta_agent", run_ta_agent)
builder.add_node("tl_agent", run_tl_agent)
builder.add_node("sse_agent", run_sse_agent)
builder.add_node("qc_agent", run_qc_agent)
builder.add_node("devops_agent", run_devops_agent)
builder.add_node("deployment_approval_checkpoint", deployment_approval_checkpoint)
builder.add_node("pm_agent", run_pm_agent)
builder.add_node("pm_escalation", run_pm_escalation)
# Add edges
builder.set_entry_point("po_agent")
builder.add_edge("po_agent", "ba_agent")
builder.add_edge("ba_agent", "stories_approval_checkpoint")
builder.add_conditional_edges(
"stories_approval_checkpoint",
route_after_stories,
{
"ta_agent": "ta_agent",
"po_agent": "po_agent",
},
)
builder.add_edge("ta_agent", "tl_agent")
builder.add_edge("tl_agent", "sse_agent")
builder.add_edge("sse_agent", "qc_agent")
builder.add_conditional_edges(
"qc_agent",
route_after_quality,
{
"sse_agent": "sse_agent",
"devops_agent": "devops_agent",
"pm_escalation": "pm_escalation",
},
)
builder.add_edge("devops_agent", "deployment_approval_checkpoint")
builder.add_conditional_edges(
"deployment_approval_checkpoint",
lambda s: "deploy" if s.get("deployment_human_approved") else "qc_agent",
{"deploy": END, "qc_agent": "qc_agent"},
)
# Compile with checkpoint support
memory = SqliteSaver.from_conn_string("team_state.db")
graph = builder.compile(checkpointer=memory)
Async Approval via Webhook
The LangGraph interrupt() mechanism is synchronous — it blocks the graph thread. For production use, you need an async approval flow where a human reviews a dashboard and clicks a button. Here’s the pattern:
import asyncio
from typing import Optional
class HumanApprovalGateway:
"""
Manages async human approval for interrupted graph states.
In production: backed by a database + webhook or dashboard UI.
"""
def __init__(self, graph, checkpointer):
self.graph = graph
self.checkpointer = checkpointer
async def submit_for_approval(
self,
thread_id: str,
checkpoint_type: str,
review_data: dict,
) -> str:
"""
Saves an approval request and sends a notification.
Returns the approval_request_id.
"""
approval_id = str(uuid.uuid4())
# Save to DB, send Slack/email notification, etc.
await self._notify_approver(approval_id, checkpoint_type, review_data)
return approval_id
async def receive_approval(
self,
thread_id: str,
approved: bool,
reason: Optional[str] = None,
) -> None:
"""
Called by webhook when human approves or rejects.
Resumes the paused graph with the human's decision.
"""
config = {"configurable": {"thread_id": thread_id}}
# Resume the graph by providing the human input
async for event in self.graph.astream(
Command(
resume={
"approved": approved,
"reason": reason,
}
),
config=config,
):
print(f"Graph resumed: {event}")
async def _notify_approver(self, approval_id, checkpoint_type, data):
# Implement: Slack message, email, dashboard entry
pass
8. Error Handling and Recovery
Let’s be realistic: things will go wrong. The SSE agent will produce code that doesn’t pass tests. The TL agent will request revisions. The QC agent will find edge cases that require rethinking the implementation. We need the system to handle these failures gracefully, not spiral into confusion.
The 3-Attempt Rule
For the SSE agent specifically, we cap retries at 3 attempts. Here’s the logic:
from langgraph.graph import StateGraph
def run_sse_agent(state: TeamState) -> TeamState:
"""
SSE agent node with retry tracking and escalation logic.
"""
attempts = state.get("implementation_attempts", 0)
if attempts >= 3:
# Don't even try — escalate
return {
**state,
"requirement_state": RequirementState.ESCALATED,
"blockers": state["blockers"] + [
Blocker(
blocker_id=str(uuid.uuid4()),
description=(
f"SSE agent failed to implement after {attempts} attempts. "
f"Last failure: {state.get('self_test_results', {}).get('failed_test_ids', [])}"
),
blocked_agent="sse_agent",
raised_by="sse_agent",
raised_at=datetime.now(timezone.utc).isoformat(),
resolved=False,
resolution=None,
)
],
}
# Attempt implementation
result = _do_implementation(state)
if result["self_test_results"]["failed"] > 0:
# Increment attempts, stay in development
return {
**result,
"implementation_attempts": attempts + 1,
"requirement_state": RequirementState.IN_DEVELOPMENT,
}
# Success
return {
**result,
"implementation_attempts": attempts + 1,
"requirement_state": RequirementState.IN_REVIEW,
"event_outbox": {
**state["event_outbox"],
"pending": state["event_outbox"]["pending"] + [
{
"event": CodeArtifactReady(
source_agent="sse_agent",
artifact_id=result["code_artifacts"][-1]["artifact_id"],
story_ids=[t["story_id"] for t in state["implementation_plan"]],
test_coverage=result["self_test_results"]["coverage_percent"],
).to_dict(),
"delivered": False,
"delivery_attempts": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
}
],
},
}
State Rollback
When a state transition fails — say, the TL agent rejects code in review — we don’t just set requirement_state = IN_DEVELOPMENT. We also record the rollback in the audit trail and preserve the review notes so the SSE agent can learn from them:
def handle_review_failure(
state: TeamState,
review_notes: list[str],
reviewer: str,
) -> TeamState:
"""
Rolls back from IN_REVIEW to IN_DEVELOPMENT with full audit trail.
"""
rollback_event = StateTransition(
from_state=RequirementState.IN_REVIEW,
to_state=RequirementState.IN_DEVELOPMENT,
triggered_by=reviewer,
event_name="ReviewFailed",
guard="review_approved == False",
timestamp=datetime.now(timezone.utc).isoformat(),
metadata={"review_notes": review_notes},
)
return {
**state,
"requirement_state": RequirementState.IN_DEVELOPMENT,
"design_review_notes": state.get("design_review_notes", []) + review_notes,
"state_transitions": state.get("state_transitions", []) + [rollback_event],
"error_log": state.get("error_log", []) + [
{
"type": "review_failure",
"reviewer": reviewer,
"notes": review_notes,
"timestamp": rollback_event["timestamp"],
}
],
}
The Saga Pattern for Long-Running Workflows
Our entire requirement lifecycle is what distributed systems architects call a saga — a long-running transaction that spans multiple services (agents). Unlike a database transaction, you can’t simply rollback a saga. You have to execute compensating transactions for each step that already completed.
In our system:
- If deployment fails after code was already committed to the repository, the DevOps agent needs to run a
rollback_deployment()compensating action. - If the design is rejected after the TA agent already created artifacts, the Design Context needs to archive those artifacts before starting fresh.
- If a story is rejected after the BA agent generated it, the state transition records the rejection with a
rejection_reasonso the PO agent knows what to fix.
The key to saga management is: every forward action must have a corresponding compensating action defined. We’ll implement this as part of the BaseAgent class in Part 4.
9. The Complete System Architecture
Let me zoom out and show the entire system as it will look when fully built:
10. The ConversationStore Interface
One additional architectural component worth defining now is the ConversationStore. While the TeamState holds the current workflow state, agents also need access to their conversation history — the back-and-forth with the LLM that produced their outputs. We store this separately to keep the TeamState lean.
from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass
@dataclass
class ConversationEntry:
"""A single turn in an agent's conversation history."""
entry_id: str
agent_id: str
task_id: str
role: str # "system", "user", "assistant"
content: str
timestamp: str
token_count: Optional[int] = None
model: Optional[str] = None
class ConversationStore(ABC):
"""
Interface for persisting agent conversation histories.
Why separate from TeamState?
- Conversation histories can be large (thousands of tokens)
- They're needed for debugging and auditing, not for routing
- Different storage backends may be appropriate
(e.g., S3 for archives, Redis for active conversations)
"""
@abstractmethod
async def save_entry(self, entry: ConversationEntry) -> None:
"""Append a conversation entry."""
@abstractmethod
async def get_history(
self,
agent_id: str,
task_id: str,
limit: Optional[int] = None,
) -> list[ConversationEntry]:
"""Retrieve conversation history for an agent on a task."""
@abstractmethod
async def get_full_task_history(
self,
task_id: str,
) -> dict[str, list[ConversationEntry]]:
"""Get all agent conversations for an entire task."""
@abstractmethod
async def clear_history(self, agent_id: str, task_id: str) -> None:
"""Clear history (e.g., after task completion)."""
class SqliteConversationStore(ConversationStore):
"""
SQLite-backed conversation store for development.
In production, swap for Redis or PostgreSQL.
"""
def __init__(self, db_path: str = "conversations.db"):
self.db_path = db_path
self._init_db()
def _init_db(self) -> None:
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS conversations (
entry_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
task_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
token_count INTEGER,
model TEXT
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_agent_task ON conversations(agent_id, task_id)"
)
conn.commit()
conn.close()
async def save_entry(self, entry: ConversationEntry) -> None:
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute(
"""INSERT INTO conversations
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
entry.entry_id,
entry.agent_id,
entry.task_id,
entry.role,
entry.content,
entry.timestamp,
entry.token_count,
entry.model,
),
)
conn.commit()
conn.close()
async def get_history(
self,
agent_id: str,
task_id: str,
limit: Optional[int] = None,
) -> list[ConversationEntry]:
import sqlite3
conn = sqlite3.connect(self.db_path)
query = """
SELECT * FROM conversations
WHERE agent_id=? AND task_id=?
ORDER BY timestamp ASC
"""
if limit:
query += f" LIMIT {limit}"
rows = conn.execute(query, (agent_id, task_id)).fetchall()
conn.close()
return [
ConversationEntry(
entry_id=row[0],
agent_id=row[1],
task_id=row[2],
role=row[3],
content=row[4],
timestamp=row[5],
token_count=row[6],
model=row[7],
)
for row in rows
]
async def get_full_task_history(
self,
task_id: str,
) -> dict[str, list[ConversationEntry]]:
entries = await self.get_history("", task_id) # simplified
result: dict[str, list] = {}
for entry in entries:
result.setdefault(entry.agent_id, []).append(entry)
return result
async def clear_history(self, agent_id: str, task_id: str) -> None:
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute(
"DELETE FROM conversations WHERE agent_id=? AND task_id=?",
(agent_id, task_id),
)
conn.commit()
conn.close()
11. What We’ve Designed — And Why It Matters
Let me step back and summarize what we’ve built in this post from an architectural perspective.
We’ve defined five bounded contexts that map cleanly to agent clusters. Each context owns its domain model, and cross-context communication happens via anti-corruption layers. This means when the Design Context evolves — say, we add a new field to TechnicalSpec — it doesn’t break the Requirement Context or the Implementation Context.
We’ve designed a nine-state lifecycle with explicit transition guards. No agent can skip states. No agent can transition a requirement backward without triggering a compensating action. The audit trail captures every transition.
We’ve defined three communication patterns — Command, Event, Query — and specified which pattern is appropriate for each agent interaction. This prevents the chaos of agents sending arbitrary messages to each other. Every message has a type, a purpose, and a routing rule.
We’ve built a TeamState with 40+ fields organized by domain. Each field is typed, each list field has an appropriate LangGraph reducer, and the schema is versioned for future migration.
We’ve designed human-in-the-loop checkpoints that use LangGraph’s interrupt() mechanism, with an async approval gateway for production use.
And we’ve identified the saga pattern as the right mental model for the entire workflow, with compensating actions defined for each forward step.
Is this more architecture than most “AI agent” projects have? Yes, significantly. But this is exactly the point. If you’re building a system that will run autonomously on real codebases — pushing code to GitHub, running tests, generating CI/CD configs — you need this level of rigor. The cost of an architectural mistake here isn’t a slow API; it’s broken code in production.
A Word on Over-Engineering
I want to be honest about the other extreme: this architecture only makes sense if you’re building a system meant to handle many different projects over time. If you’re vibe-coding a one-off prototype for a weekend hackathon, you probably don’t need bounded contexts and the outbox pattern. Use LangGraph directly with a flat state dict and get the thing working.
The architecture I’m describing here is for the case where you’re building the AI team as a platform — something your team will use repeatedly, extend, and maintain. In that case, the investment in architecture pays off many times over.
12. Preview: What We Build in Part 4
In Part 4, we move from design to implementation. We’ll build:
The BaseAgent class — The abstract base that every agent inherits from. It handles:
- LLM client initialization (configurable model per agent)
- System prompt management and injection
- Conversation history management via
ConversationStore - Domain event emission via the outbox
- Retry logic with exponential backoff
- Error capture and state recording
- Compensating action registration (saga pattern)
The project structure — The full directory layout:
ai-team/
├── agents/
│ ├── base.py # BaseAgent abstract class
│ ├── po_agent.py # Product Owner
│ ├── ba_agent.py # Business Analyst
│ ├── ta_agent.py # Tech Architect
│ ├── tl_agent.py # Tech Lead
│ ├── sse_agent.py # Senior Software Engineer
│ ├── qc_agent.py # QC Engineer
│ ├── devops_agent.py # DevOps Engineer
│ └── pm_agent.py # Project Manager
├── domain/
│ ├── state.py # TeamState TypedDict
│ ├── events.py # DomainEvent + all event types
│ ├── models.py # All domain entities/value objects
│ └── transitions.py # State machine + guards
├── infrastructure/
│ ├── conversation_store.py # ConversationStore + SQLite impl
│ ├── event_dispatcher.py # Outbox dispatch logic
│ └── human_gateway.py # HumanApprovalGateway
├── graph/
│ ├── builder.py # LangGraph StateGraph construction
│ ├── edges.py # All conditional edge functions
│ └── checkpoints.py # Human checkpoint nodes
├── tools/
│ ├── github_tools.py # GitHub API integration
│ ├── test_runner.py # pytest/jest execution
│ └── linter.py # ruff/mypy/eslint execution
├── api/
│ ├── server.py # FastAPI server
│ └── webhooks.py # Human approval webhook endpoint
├── tests/
│ └── ...
├── pyproject.toml
└── README.md
The first working agent — We’ll implement the PO agent end-to-end: system prompt, LLM call, output parsing, domain event emission, and integration with the LangGraph node. By the end of Part 4, you’ll be able to run a real requirement brief through the PO agent and see it produce a ClarifiedRequirement object.
Closing Thoughts
Architecture is the set of decisions that are hardest to change later. The bounded contexts we’ve defined here, the state machine we’ve specified, the communication patterns we’ve chosen — these are the load-bearing walls of our AI team. We should be thoughtful about them before we write the first line of agent code.
I’ve made these design choices based on hard lessons from building systems that started without architecture and paid the price. Agents without boundaries contradict each other. State without structure becomes impossible to debug. Events without patterns create tangled dependencies that no LLM can reason about.
The patterns here — DDD, sagas, outbox, CQRS applied to agent communication — aren’t novel inventions for AI. They’re the collected wisdom of software engineering applied to a new kind of component. The agents are new. The need for architecture is timeless.
In the next part, we turn this blueprint into working Python. See you there.
Thuan Luong is a Tech Lead based in Vietnam specializing in distributed systems and AI engineering. He leads the “Vibe Coding” series on building production-quality AI software teams with LangGraph.
Part 3 of 12 — Part 4: BaseAgent Class and Project Setup →