I remember the first time I tried to build a multi-agent system without any architecture. It was 2023, GPT-4 had just dropped, and I thought: “I’ll just chain a few agents together and let them talk to each other.” Three days later, I had a beautiful mess. Agents repeating work. Agents contradicting each other. An agent that kept apologizing to another agent for not understanding what it wanted. It was, as my team put it, “like herding cats — except the cats were writing production code.”

The lesson I took from that experience: AI agents are software components. And software components without architecture become legacy systems very, very quickly.

In Part 1, we introduced the concept of a full AI software team. In Part 2, we mapped out each agent’s role — the Product Owner, Business Analyst, Tech Architect, Tech Lead, Senior Software Engineer, QC Engineer, DevOps Engineer, and Project Manager. Now in Part 3, we need to answer the harder question: how do these agents actually work together as a coherent system?

The answer, I’ll argue, comes from a set of ideas the software engineering world has been refining for 20 years: Domain-Driven Design.


1. Why Architecture Matters for AI Teams

Let me describe two scenarios.

Scenario A — No Architecture: You have 8 agents. Each agent can call any other agent. Each agent has its own notion of what a “requirement” means. When the SSE agent finishes writing code, it broadcasts a message to everyone. The QC agent gets the message but isn’t sure if the code is ready for testing. The PM agent gets the same message and tries to update a status tracker it built internally. The TL agent gets it and starts a code review that conflicts with the QC agent’s test run. The PO agent, inexplicably, also gets notified and starts asking clarifying questions about business logic during active code review.

Scenario B — Designed System: Requirements flow through defined states. Each agent operates within its bounded context and communicates via well-defined domain events. When the SSE agent finishes code, it emits a CodeArtifactReady event. Only the QC agent and TL agent subscribe to this event. The PM agent learns about it through a state change, not a direct message. The PO agent is not in the loop until a human checkpoint gate is reached.

The difference between Scenario A and Scenario B is not about which LLM model you’re using. It’s about architecture. It’s about having a vocabulary for what each part of the system does, and enforcing boundaries so that components don’t step on each other.

This is exactly what Domain-Driven Design gives us.

The Microservices Parallel

If you’ve built microservices, you’ve already faced this problem. When you decompose a monolith into services, you face questions like: what does this service own? How does it communicate? What happens when it fails? Who calls whom?

The answers that emerged from the microservices era — bounded contexts, domain events, the outbox pattern, saga orchestration — are exactly the patterns we need for multi-agent AI systems. The agents are the services. LangGraph is the orchestrator. The TeamState is the shared data model. Domain events are the triggers that move work between agents.

We’re not reinventing anything here. We’re applying proven patterns to a new execution environment.


2. Applying DDD to Multi-Agent Systems

Domain-Driven Design, at its core, is about aligning software structure with business domains. Eric Evans’ 2003 book introduced a vocabulary for doing this. Let me translate that vocabulary into agent terms.

The DDD-to-Agent Vocabulary

Ubiquitous Language in DDD means that developers, domain experts, and stakeholders all use the same terms. In our AI team, this means every agent and every piece of code uses the same vocabulary: a UserStory is a UserStory, not a ticket or a task or a requirement_item. When the BA agent creates user stories, when the SSE agent reads them, and when the QC agent validates against them — they all use the same UserStory data structure.

Aggregates are clusters of domain objects that are treated as a single unit. In our system, the Requirement aggregate contains a UserBrief, ClarifiedRequirements, and a list of UserStory objects. The CodeArtifact aggregate contains SourceFiles, TestResults, and DependencyManifest. Aggregates enforce consistency boundaries — you don’t update individual UserStory objects in isolation; you update them through the Requirement aggregate.

Entities are objects with a distinct identity that persists over time. Each UserStory has a story_id that stays constant even as its content, status, and acceptance criteria evolve. Each CodeArtifact has an artifact_id that lets us track it through review, revision, and deployment.

Value Objects are objects defined entirely by their attributes, with no identity. A Priority (High/Medium/Low), a StoryPoints value, a TestCoverage percentage — these are value objects. Two Priority.HIGH values are identical and interchangeable.

Domain Events are facts about something that happened in the past. They’re immutable records. RequirementsCleared, UserStoriesCreated, CodeReviewApproved, TestsFailed — these are domain events. They’re named in the past tense because they record what has already occurred. In our LangGraph system, domain events are the primary mechanism by which agents trigger each other.

Bounded Contexts are the most powerful concept we’ll use. A bounded context is an explicit boundary within which a particular model applies. Within the boundary, terms have specific, unambiguous meanings. Across boundaries, you need translation.

Bounded Contexts and Agent Clusters

Each of our bounded contexts maps to a cluster of agents that share vocabulary and concern:

  • The Requirement Context owns the language of business needs: briefs, stories, acceptance criteria, prioritization.
  • The Design Context owns the language of technical architecture: components, interfaces, data models, tech stack decisions.
  • The Implementation Context owns the language of code: functions, classes, modules, test coverage, linting results.
  • The Quality Context owns the language of validation: test cases, test suites, pass/fail results, coverage thresholds.
  • The Deployment Context owns the language of delivery: pipelines, environments, configuration, rollout strategy.

The Coordination Context is special — it doesn’t have its own domain language but operates across all others. The PM agent lives here, tracking state, identifying blockers, and escalating issues across context boundaries.

Anti-Corruption Layers

In DDD, when two bounded contexts need to communicate, you place an anti-corruption layer between them. This prevents one context’s model from polluting another’s. In agent terms, this is about translation.

When the Requirement Context hands off to the Design Context, the BA agent’s UserStory object gets translated into something the TA agent can work with: a list of FunctionalRequirement objects with specific technical implications. The Design Context doesn’t know or care about the original UserStory format — it works with FunctionalRequirement objects.

In code, this translation happens in a dedicated function that maps between domain models. We’ll see this in later parts when we implement the actual agent handoffs.


3. Our 5 Bounded Contexts

Here is the full map of our bounded contexts and how they relate:

Bounded contexts map
Figure 1: Bounded Contexts Map. Five domain contexts, each owned by specific agents, communicating via domain events and anti-corruption layers. The PM coordination context (blue border) spans all others.

Let me walk through each context in depth.

Requirement Context: Where It All Begins

The PO agent receives the raw brief from the human stakeholder. Its job is to interpret intent, ask clarifying questions if needed, and produce a ClarifiedRequirement object. This is the aggregate root for this context. Everything in the Requirement Context flows through it.

The BA agent consumes the ClarifiedRequirement and produces UserStory objects, each with acceptance criteria, priority, and estimated complexity. When the BA agent is done, it emits the UserStoriesCreated domain event.

The Requirement Context owns these terms: UserBrief, ClarifiedRequirement, UserStory, AcceptanceCriteria, Priority, StoryPoints, BusinessRule. None of these terms leak into the Design Context without going through an anti-corruption layer.

Design Context: Translating Business to Technical

The anti-corruption layer at the Requirement→Design boundary translates UserStory objects into FunctionalRequirement objects. This translation is intentional: the Design Context doesn’t care about business storytelling language; it cares about technical requirements.

The TA agent takes FunctionalRequirement objects and produces the TechnicalSpec aggregate: component diagrams, data models, API contracts, infrastructure requirements, and the ArchitectureDecision record (capturing the “why” behind tech choices — important for future reference).

The TL agent reviews the TechnicalSpec, may request revisions, and ultimately approves it. This triggers the DesignApproved domain event.

Implementation Context: Where Code Happens

The anti-corruption layer at Design→Implementation translates the TechnicalSpec into an ImplementationPlan — a prioritized list of ImplementationTask objects, each with a clear definition of done and technical notes.

The SSE agent executes tasks one at a time, producing CodeArtifact objects. Each artifact includes source files, test files, and a LintReport. The SSE agent runs tests internally — if tests fail, it iterates. After three failures, it escalates to the TL agent or PM agent rather than spinning forever.

When code is complete and self-tests pass, the SSE agent emits CodeArtifactReady.

Quality Context: Independent Validation

The QC agent doesn’t trust the SSE agent’s self-reported test results. It independently defines test cases (from the original user stories, via the anti-corruption layer) and validates the code artifacts against them. This separation is important: the QC agent is the adversarial voice in the system.

If quality gates pass, the QualityGatePassed event fires. If they fail, the TestsFailed event goes back to the Implementation Context.

Deployment Context: Delivery Pipeline

The DevOps agent receives QualityGatePassed and produces the CI/CD configuration, environment setup, and deployment runbook. It doesn’t create deployments autonomously — it prepares everything and then waits at a human checkpoint before triggering the actual deployment.


4. The Requirement Lifecycle State Machine

The most important piece of shared state in our system is the lifecycle of a requirement. Every requirement starts as a raw brief and ends as a deployed feature. Between those two points, there are nine distinct states and a set of transitions, guards, and possible rollbacks.

Requirement lifecycle state machine
Figure 2: Requirement Lifecycle State Machine. Nine states, two human checkpoints (amber H badges), rollback paths on review failure and test failure, and a 3-attempt retry loop with PM escalation.

Here is the state machine encoded as Python TypedDicts with full type annotations:

from typing import Literal, Optional
from typing_extensions import TypedDict
from enum import Enum
from datetime import datetime


class RequirementState(str, Enum):
    """The nine states in the requirement lifecycle."""
    RECEIVED = "received"
    CLARIFIED = "clarified"
    USER_STORIES_CREATED = "user_stories_created"
    DESIGN_READY = "design_ready"
    IN_DEVELOPMENT = "in_development"
    IN_REVIEW = "in_review"
    TEST_PENDING = "test_pending"
    READY_TO_DEPLOY = "ready_to_deploy"
    DEPLOYED = "deployed"
    # Error states
    BLOCKED = "blocked"
    ESCALATED = "escalated"


class StateTransition(TypedDict):
    """Records a state transition with guard conditions."""
    from_state: RequirementState
    to_state: RequirementState
    triggered_by: str          # agent name
    event_name: str            # domain event that triggered it
    guard: Optional[str]       # condition that must be True
    timestamp: str
    metadata: dict


# Transition table — all valid transitions
VALID_TRANSITIONS: dict[RequirementState, list[RequirementState]] = {
    RequirementState.RECEIVED: [
        RequirementState.CLARIFIED,
        RequirementState.BLOCKED,  # if brief is too vague
    ],
    RequirementState.CLARIFIED: [
        RequirementState.USER_STORIES_CREATED,
        RequirementState.RECEIVED,  # rollback: needs more clarification
    ],
    RequirementState.USER_STORIES_CREATED: [
        RequirementState.DESIGN_READY,  # after human approval
        RequirementState.CLARIFIED,    # if stories are rejected
    ],
    RequirementState.DESIGN_READY: [
        RequirementState.IN_DEVELOPMENT,
        RequirementState.USER_STORIES_CREATED,  # if design reveals gaps
    ],
    RequirementState.IN_DEVELOPMENT: [
        RequirementState.IN_REVIEW,
        RequirementState.IN_DEVELOPMENT,  # retry loop (max 3)
        RequirementState.ESCALATED,       # after 3 failures
    ],
    RequirementState.IN_REVIEW: [
        RequirementState.TEST_PENDING,  # review passed
        RequirementState.IN_DEVELOPMENT,  # review failed
    ],
    RequirementState.TEST_PENDING: [
        RequirementState.READY_TO_DEPLOY,   # all gates pass
        RequirementState.IN_DEVELOPMENT,    # tests failed, rework
    ],
    RequirementState.READY_TO_DEPLOY: [
        RequirementState.DEPLOYED,  # after human approval
        RequirementState.TEST_PENDING,  # human rejects, retest
    ],
    RequirementState.DEPLOYED: [],  # terminal state
}


def validate_transition(
    current: RequirementState,
    target: RequirementState,
    context: dict,
) -> tuple[bool, str]:
    """
    Guard function: returns (is_valid, reason).
    Called before every state transition.
    """
    allowed = VALID_TRANSITIONS.get(current, [])
    if target not in allowed:
        return False, f"Transition {current}{target} is not in transition table"

    # Additional guards per transition
    if current == RequirementState.USER_STORIES_CREATED and target == RequirementState.DESIGN_READY:
        if not context.get("human_approved_stories"):
            return False, "Human approval required before design phase"

    if current == RequirementState.IN_DEVELOPMENT and target == RequirementState.IN_REVIEW:
        if not context.get("self_tests_passing"):
            return False, "SSE must pass self-tests before submitting for review"

    if current == RequirementState.READY_TO_DEPLOY and target == RequirementState.DEPLOYED:
        if not context.get("human_approved_deployment"):
            return False, "Human approval required before production deployment"

    return True, "ok"

The key insight here is that every transition is explicit and guarded. An agent cannot move a requirement from DESIGN_READY to DEPLOYED in one jump. It must go through every intermediate state. This prevents the system from skipping steps under time pressure, which is a real failure mode when LLM agents are optimizing for task completion.


5. Agent Communication Architecture

With our bounded contexts defined and our state machine clear, we need to design how agents actually exchange information. There are three fundamental communication patterns, borrowed directly from microservices architecture:

Agent communication patterns
Figure 3: Agent Communication Patterns. Three distinct patterns — Command, Event, Query — with a comparison table showing when to use each. Based on CQRS principles adapted for multi-agent systems.

Now let’s define these patterns in code:

from dataclasses import dataclass, field
from typing import Any, Optional, Literal
from datetime import datetime, timezone
import uuid


@dataclass
class AgentMessage:
    """
    The universal message envelope used for all agent communication.
    Whether it's a Command, Event, or Query — it uses this structure.
    The 'message_type' field determines how it's routed.
    """
    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    message_type: Literal["command", "event", "query", "query_response"] = "event"
    event_name: str = ""           # e.g., "UserStoriesCreated", "CreateUserStories"
    source_agent: str = ""         # who sent this
    target_agent: Optional[str] = None  # None = broadcast (events only)
    correlation_id: Optional[str] = None  # links query to its response
    payload: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    schema_version: str = "1.0"

    def to_dict(self) -> dict:
        return {
            "message_id": self.message_id,
            "message_type": self.message_type,
            "event_name": self.event_name,
            "source_agent": self.source_agent,
            "target_agent": self.target_agent,
            "correlation_id": self.correlation_id,
            "payload": self.payload,
            "metadata": self.metadata,
            "timestamp": self.timestamp,
            "schema_version": self.schema_version,
        }


class DomainEvent(AgentMessage):
    """
    Base class for all domain events.
    Domain events record facts that have occurred.
    They are named in the past tense.
    They are immutable once created.
    """

    def __init__(
        self,
        event_name: str,
        source_agent: str,
        payload: dict,
        metadata: dict | None = None,
    ):
        super().__init__(
            message_type="event",
            event_name=event_name,
            source_agent=source_agent,
            target_agent=None,  # events are always broadcast
            payload=payload,
            metadata=metadata or {},
        )

    def __setattr__(self, name: str, value: Any) -> None:
        """Events are immutable after creation."""
        if hasattr(self, "message_id"):  # already initialized
            raise AttributeError("DomainEvent is immutable")
        super().__setattr__(name, value)


# Concrete domain events
class UserStoriesCreated(DomainEvent):
    def __init__(self, source_agent: str, stories: list[dict], requirement_id: str):
        super().__init__(
            event_name="UserStoriesCreated",
            source_agent=source_agent,
            payload={
                "requirement_id": requirement_id,
                "stories": stories,
                "story_count": len(stories),
            },
        )


class CodeArtifactReady(DomainEvent):
    def __init__(
        self,
        source_agent: str,
        artifact_id: str,
        story_ids: list[str],
        test_coverage: float,
    ):
        super().__init__(
            event_name="CodeArtifactReady",
            source_agent=source_agent,
            payload={
                "artifact_id": artifact_id,
                "story_ids": story_ids,
                "test_coverage": test_coverage,
                "self_tests_passing": True,
            },
        )


class QualityGatePassed(DomainEvent):
    def __init__(
        self,
        source_agent: str,
        artifact_id: str,
        quality_score: float,
        test_results: dict,
    ):
        super().__init__(
            event_name="QualityGatePassed",
            source_agent=source_agent,
            payload={
                "artifact_id": artifact_id,
                "quality_score": quality_score,
                "test_results": test_results,
            },
        )


class TestsFailed(DomainEvent):
    def __init__(
        self,
        source_agent: str,
        artifact_id: str,
        failure_summary: str,
        failed_test_ids: list[str],
    ):
        super().__init__(
            event_name="TestsFailed",
            source_agent=source_agent,
            payload={
                "artifact_id": artifact_id,
                "failure_summary": failure_summary,
                "failed_test_ids": failed_test_ids,
            },
        )

The Outbox Pattern for Reliable Event Delivery

One of the real-world problems I encountered when building agents with LangGraph: what happens if an agent produces a domain event but the graph execution is interrupted before the event is processed? You end up with phantom work — state that shows the SSE agent completed code, but the QC agent never received the trigger.

The solution is borrowed from distributed systems: the Outbox Pattern. Instead of emitting events directly, agents write events to an “outbox” in the shared state. A separate component — the event dispatcher — reads from the outbox and delivers events to subscribers, then marks them as delivered.

from typing_extensions import TypedDict
from typing import Annotated
import operator


class OutboxEntry(TypedDict):
    event: dict          # serialized DomainEvent
    delivered: bool
    delivery_attempts: int
    created_at: str


# In TeamState:
class EventOutbox(TypedDict):
    pending: Annotated[list[OutboxEntry], operator.add]
    delivered: list[OutboxEntry]


def dispatch_events(state: "TeamState") -> "TeamState":
    """
    Called at the start of each graph step.
    Delivers pending outbox events to the appropriate handlers.
    This ensures at-least-once delivery semantics.
    """
    pending = state["event_outbox"]["pending"]
    still_pending = []

    for entry in pending:
        if not entry["delivered"]:
            event = entry["event"]
            # Route to subscriber nodes
            _route_event_to_subscribers(event, state)
            entry["delivered"] = True
        else:
            still_pending.append(entry)

    return {
        **state,
        "event_outbox": {
            "pending": still_pending,
            "delivered": state["event_outbox"]["delivered"] + [
                e for e in pending if e["delivered"]
            ],
        },
    }

This pattern ensures that no domain event gets dropped, even if the graph execution is interrupted and restarted from a checkpoint.


6. The TeamState: Our Central Shared Object

Everything in the system converges on one data structure: TeamState. This is the LangGraph state that gets passed through every node, updated by every agent, and persisted between checkpoints. Designing it well is perhaps the most important architectural decision in the whole system.

Here is the complete definition:

from typing import Literal, Optional, Annotated
from typing_extensions import TypedDict
import operator


# ============================================================
# Value Objects
# ============================================================

class Priority(TypedDict):
    level: Literal["critical", "high", "medium", "low"]
    rationale: str


class QualityGates(TypedDict):
    min_test_coverage: float          # e.g., 0.80
    max_cyclomatic_complexity: int    # e.g., 10
    lint_must_pass: bool
    type_check_must_pass: bool
    security_scan_must_pass: bool


class DeploymentStatus(TypedDict):
    status: Literal["pending", "in_progress", "deployed", "failed", "rolled_back"]
    environment: Literal["dev", "staging", "production"]
    deployed_at: Optional[str]
    deployed_by: str                  # agent or human identifier
    deployment_url: Optional[str]
    version: Optional[str]


# ============================================================
# Entities
# ============================================================

class UserStory(TypedDict):
    story_id: str
    title: str
    as_a: str                         # "As a [role]"
    i_want: str                       # "I want [feature]"
    so_that: str                      # "So that [benefit]"
    acceptance_criteria: list[str]
    priority: Priority
    story_points: int
    status: Literal[
        "draft", "approved", "in_progress", "done", "rejected"
    ]
    created_by: str                   # agent_id


class ImplementationTask(TypedDict):
    task_id: str
    story_id: str                     # links back to user story
    title: str
    description: str
    file_path: str                    # expected output file
    dependencies: list[str]           # task_ids that must complete first
    estimated_minutes: int
    status: Literal["pending", "in_progress", "done", "failed"]
    assigned_to: str


class CodeArtifact(TypedDict):
    artifact_id: str
    task_id: str
    language: str
    file_path: str
    content: str                      # actual source code
    test_file_path: Optional[str]
    test_content: Optional[str]
    lint_passed: bool
    type_check_passed: bool
    created_at: str
    commit_hash: Optional[str]


class TestCase(TypedDict):
    test_id: str
    story_id: str
    title: str
    test_type: Literal["unit", "integration", "e2e", "performance"]
    preconditions: list[str]
    steps: list[str]
    expected_result: str
    status: Literal["pending", "passed", "failed", "skipped"]
    failure_reason: Optional[str]


class Blocker(TypedDict):
    blocker_id: str
    description: str
    blocked_agent: str
    raised_by: str
    raised_at: str
    resolved: bool
    resolution: Optional[str]


class ArchitectureDecision(TypedDict):
    adr_id: str                       # Architecture Decision Record
    title: str
    status: Literal["proposed", "accepted", "deprecated", "superseded"]
    context: str
    decision: str
    consequences: list[str]
    alternatives_considered: list[str]
    decided_by: str
    decided_at: str


class TechnicalSpec(TypedDict):
    spec_id: str
    title: str
    components: list[dict]            # component name, responsibility, tech
    data_models: list[dict]           # entity definitions
    api_contracts: list[dict]         # endpoint definitions
    infrastructure: dict              # cloud resources, config
    tech_stack: dict                  # language, frameworks, databases
    approved_by: Optional[str]
    approved_at: Optional[str]


class CICDConfig(TypedDict):
    config_id: str
    pipeline_tool: str                # "github_actions", "gitlab_ci", etc.
    pipeline_yaml: str                # actual CI/CD config content
    environments: list[str]
    test_commands: list[str]
    deploy_commands: list[str]
    rollback_procedure: str
    generated_by: str
    generated_at: str


class TestResults(TypedDict):
    run_id: str
    total_tests: int
    passed: int
    failed: int
    skipped: int
    coverage_percent: float
    duration_seconds: float
    failed_test_ids: list[str]
    coverage_report: dict
    run_at: str


class RequirementDoc(TypedDict):
    doc_id: str
    original_brief: str
    clarified_objectives: list[str]
    out_of_scope: list[str]
    assumptions: list[str]
    constraints: list[str]
    success_metrics: list[str]
    stakeholders: list[str]
    clarified_by: str
    clarified_at: str


# ============================================================
# The Complete TeamState
# ============================================================

class TeamState(TypedDict):
    # ── Metadata ──────────────────────────────────────────────
    task_id: str
    created_at: str
    updated_at: str
    phase: Literal[
        "requirement", "design", "implementation", "quality", "deployment", "done"
    ]
    requirement_state: str            # RequirementState enum value
    schema_version: str               # for migration support

    # ── Requirement Domain ─────────────────────────────────────
    raw_brief: str
    clarified_requirements: Optional[RequirementDoc]
    user_stories: list[UserStory]
    stories_human_approved: bool
    rejection_reason: Optional[str]

    # ── Design Domain ─────────────────────────────────────────
    technical_spec: Optional[TechnicalSpec]
    architecture_decisions: list[ArchitectureDecision]
    design_review_notes: list[str]
    design_approved_by: Optional[str]

    # ── Implementation Domain ──────────────────────────────────
    implementation_plan: list[ImplementationTask]
    code_artifacts: list[CodeArtifact]
    implementation_attempts: int      # tracks retry count (max 3)
    current_task_id: Optional[str]
    self_test_results: Optional[TestResults]

    # ── Quality Domain ─────────────────────────────────────────
    test_cases: list[TestCase]
    quality_gates: QualityGates
    quality_test_results: Optional[TestResults]
    quality_score: Optional[float]
    quality_gate_passed: bool
    qc_notes: list[str]

    # ── Deployment Domain ─────────────────────────────────────
    cicd_config: Optional[CICDConfig]
    deployment_status: DeploymentStatus
    deployment_human_approved: bool
    deployment_notes: list[str]

    # ── Coordination ──────────────────────────────────────────
    # Annotated[list, operator.add] = append-only (LangGraph reducer)
    agent_messages: Annotated[list[AgentMessage], operator.add]
    event_outbox: EventOutbox
    current_agent: str
    blockers: list[Blocker]
    human_feedback: Optional[str]
    human_checkpoint_pending: bool
    checkpoint_context: Optional[dict]   # what the human needs to review

    # ── Audit trail ───────────────────────────────────────────
    state_transitions: Annotated[list[StateTransition], operator.add]
    error_log: Annotated[list[dict], operator.add]

A few design decisions worth calling out:

Why Annotated[list, operator.add] for some fields? LangGraph’s state management requires you to specify how to merge concurrent state updates. For agent_messages, state_transitions, and error_log, we want to accumulate entries — so we use the operator.add reducer. For fields like technical_spec or current_agent, we want the latest value to win, so we don’t annotate them.

Why explicit schema_version? As the system evolves, the TeamState schema will change. Having a schema_version field lets us write migration functions that upgrade old states to new schemas, which is essential for long-running workflows.

Why implementation_attempts as an integer? This is the key guard against infinite retry loops. Every time the SSE agent’s implementation fails tests, this counter increments. At 3, the PM agent triggers a human checkpoint instead of allowing another retry.


7. Human-in-the-Loop Checkpoints

The most dangerous thing about an autonomous AI system is that it can be confidently wrong. A well-designed human-in-the-loop system doesn’t try to remove humans from the process — it places them strategically at points where their judgment adds the most value.

In our system, there are two mandatory human checkpoints:

  1. After UserStoriesCreated — The PO agent and BA agent have done their work. Before any design or code is written, a human approves the scope. This prevents wasted effort from misunderstood requirements.

  2. Before Deployment — The DevOps agent has prepared the CI/CD configuration and the QC agent has signed off. Before anything is deployed to production, a human gives the final go-ahead.

Here is how we implement these checkpoints in LangGraph:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import interrupt


def stories_approval_checkpoint(state: TeamState) -> TeamState:
    """
    Human checkpoint: review user stories before design begins.
    LangGraph will pause here and wait for external input.
    """
    human_feedback = interrupt({
        "checkpoint_type": "stories_approval",
        "message": "Please review the user stories and approve or reject.",
        "user_stories": state["user_stories"],
        "clarified_requirements": state["clarified_requirements"],
        "instructions": "Set approved=True to continue, or provide rejection_reason.",
    })

    if human_feedback.get("approved"):
        return {
            **state,
            "stories_human_approved": True,
            "human_checkpoint_pending": False,
            "human_feedback": None,
        }
    else:
        return {
            **state,
            "stories_human_approved": False,
            "human_checkpoint_pending": False,
            "rejection_reason": human_feedback.get("reason", "No reason provided"),
            "requirement_state": RequirementState.CLARIFIED,  # rollback
        }


def deployment_approval_checkpoint(state: TeamState) -> TeamState:
    """
    Human checkpoint: final approval before production deployment.
    """
    human_feedback = interrupt({
        "checkpoint_type": "deployment_approval",
        "message": "Ready to deploy. Please review and approve.",
        "cicd_config": state["cicd_config"],
        "quality_score": state["quality_score"],
        "test_results": state["quality_test_results"],
        "deployment_target": state["deployment_status"]["environment"],
    })

    if human_feedback.get("approved"):
        return {
            **state,
            "deployment_human_approved": True,
            "human_checkpoint_pending": False,
        }
    else:
        return {
            **state,
            "deployment_human_approved": False,
            "deployment_notes": state["deployment_notes"] + [
                f"Deployment rejected by human: {human_feedback.get('reason')}"
            ],
        }


def route_after_stories(state: TeamState) -> str:
    """Conditional edge: route based on human approval of stories."""
    if state.get("human_checkpoint_pending"):
        return "stories_approval_checkpoint"
    if state.get("stories_human_approved"):
        return "ta_agent"               # proceed to design
    return "po_agent"                  # back to clarification


def route_after_quality(state: TeamState) -> str:
    """Conditional edge: route after quality gates."""
    if not state.get("quality_gate_passed"):
        return "sse_agent"             # rework
    if state["implementation_attempts"] >= 3:
        return "pm_escalation"        # human escalation
    return "devops_agent"             # proceed to deployment prep


# Build the graph
builder = StateGraph(TeamState)

# Add agent nodes
builder.add_node("po_agent", run_po_agent)
builder.add_node("ba_agent", run_ba_agent)
builder.add_node("stories_approval_checkpoint", stories_approval_checkpoint)
builder.add_node("ta_agent", run_ta_agent)
builder.add_node("tl_agent", run_tl_agent)
builder.add_node("sse_agent", run_sse_agent)
builder.add_node("qc_agent", run_qc_agent)
builder.add_node("devops_agent", run_devops_agent)
builder.add_node("deployment_approval_checkpoint", deployment_approval_checkpoint)
builder.add_node("pm_agent", run_pm_agent)
builder.add_node("pm_escalation", run_pm_escalation)

# Add edges
builder.set_entry_point("po_agent")
builder.add_edge("po_agent", "ba_agent")
builder.add_edge("ba_agent", "stories_approval_checkpoint")
builder.add_conditional_edges(
    "stories_approval_checkpoint",
    route_after_stories,
    {
        "ta_agent": "ta_agent",
        "po_agent": "po_agent",
    },
)
builder.add_edge("ta_agent", "tl_agent")
builder.add_edge("tl_agent", "sse_agent")
builder.add_edge("sse_agent", "qc_agent")
builder.add_conditional_edges(
    "qc_agent",
    route_after_quality,
    {
        "sse_agent": "sse_agent",
        "devops_agent": "devops_agent",
        "pm_escalation": "pm_escalation",
    },
)
builder.add_edge("devops_agent", "deployment_approval_checkpoint")
builder.add_conditional_edges(
    "deployment_approval_checkpoint",
    lambda s: "deploy" if s.get("deployment_human_approved") else "qc_agent",
    {"deploy": END, "qc_agent": "qc_agent"},
)

# Compile with checkpoint support
memory = SqliteSaver.from_conn_string("team_state.db")
graph = builder.compile(checkpointer=memory)

Async Approval via Webhook

The LangGraph interrupt() mechanism is synchronous — it blocks the graph thread. For production use, you need an async approval flow where a human reviews a dashboard and clicks a button. Here’s the pattern:

import asyncio
from typing import Optional


class HumanApprovalGateway:
    """
    Manages async human approval for interrupted graph states.
    In production: backed by a database + webhook or dashboard UI.
    """

    def __init__(self, graph, checkpointer):
        self.graph = graph
        self.checkpointer = checkpointer

    async def submit_for_approval(
        self,
        thread_id: str,
        checkpoint_type: str,
        review_data: dict,
    ) -> str:
        """
        Saves an approval request and sends a notification.
        Returns the approval_request_id.
        """
        approval_id = str(uuid.uuid4())
        # Save to DB, send Slack/email notification, etc.
        await self._notify_approver(approval_id, checkpoint_type, review_data)
        return approval_id

    async def receive_approval(
        self,
        thread_id: str,
        approved: bool,
        reason: Optional[str] = None,
    ) -> None:
        """
        Called by webhook when human approves or rejects.
        Resumes the paused graph with the human's decision.
        """
        config = {"configurable": {"thread_id": thread_id}}

        # Resume the graph by providing the human input
        async for event in self.graph.astream(
            Command(
                resume={
                    "approved": approved,
                    "reason": reason,
                }
            ),
            config=config,
        ):
            print(f"Graph resumed: {event}")

    async def _notify_approver(self, approval_id, checkpoint_type, data):
        # Implement: Slack message, email, dashboard entry
        pass

8. Error Handling and Recovery

Let’s be realistic: things will go wrong. The SSE agent will produce code that doesn’t pass tests. The TL agent will request revisions. The QC agent will find edge cases that require rethinking the implementation. We need the system to handle these failures gracefully, not spiral into confusion.

The 3-Attempt Rule

For the SSE agent specifically, we cap retries at 3 attempts. Here’s the logic:

from langgraph.graph import StateGraph


def run_sse_agent(state: TeamState) -> TeamState:
    """
    SSE agent node with retry tracking and escalation logic.
    """
    attempts = state.get("implementation_attempts", 0)

    if attempts >= 3:
        # Don't even try — escalate
        return {
            **state,
            "requirement_state": RequirementState.ESCALATED,
            "blockers": state["blockers"] + [
                Blocker(
                    blocker_id=str(uuid.uuid4()),
                    description=(
                        f"SSE agent failed to implement after {attempts} attempts. "
                        f"Last failure: {state.get('self_test_results', {}).get('failed_test_ids', [])}"
                    ),
                    blocked_agent="sse_agent",
                    raised_by="sse_agent",
                    raised_at=datetime.now(timezone.utc).isoformat(),
                    resolved=False,
                    resolution=None,
                )
            ],
        }

    # Attempt implementation
    result = _do_implementation(state)

    if result["self_test_results"]["failed"] > 0:
        # Increment attempts, stay in development
        return {
            **result,
            "implementation_attempts": attempts + 1,
            "requirement_state": RequirementState.IN_DEVELOPMENT,
        }

    # Success
    return {
        **result,
        "implementation_attempts": attempts + 1,
        "requirement_state": RequirementState.IN_REVIEW,
        "event_outbox": {
            **state["event_outbox"],
            "pending": state["event_outbox"]["pending"] + [
                {
                    "event": CodeArtifactReady(
                        source_agent="sse_agent",
                        artifact_id=result["code_artifacts"][-1]["artifact_id"],
                        story_ids=[t["story_id"] for t in state["implementation_plan"]],
                        test_coverage=result["self_test_results"]["coverage_percent"],
                    ).to_dict(),
                    "delivered": False,
                    "delivery_attempts": 0,
                    "created_at": datetime.now(timezone.utc).isoformat(),
                }
            ],
        },
    }

State Rollback

When a state transition fails — say, the TL agent rejects code in review — we don’t just set requirement_state = IN_DEVELOPMENT. We also record the rollback in the audit trail and preserve the review notes so the SSE agent can learn from them:

def handle_review_failure(
    state: TeamState,
    review_notes: list[str],
    reviewer: str,
) -> TeamState:
    """
    Rolls back from IN_REVIEW to IN_DEVELOPMENT with full audit trail.
    """
    rollback_event = StateTransition(
        from_state=RequirementState.IN_REVIEW,
        to_state=RequirementState.IN_DEVELOPMENT,
        triggered_by=reviewer,
        event_name="ReviewFailed",
        guard="review_approved == False",
        timestamp=datetime.now(timezone.utc).isoformat(),
        metadata={"review_notes": review_notes},
    )

    return {
        **state,
        "requirement_state": RequirementState.IN_DEVELOPMENT,
        "design_review_notes": state.get("design_review_notes", []) + review_notes,
        "state_transitions": state.get("state_transitions", []) + [rollback_event],
        "error_log": state.get("error_log", []) + [
            {
                "type": "review_failure",
                "reviewer": reviewer,
                "notes": review_notes,
                "timestamp": rollback_event["timestamp"],
            }
        ],
    }

The Saga Pattern for Long-Running Workflows

Our entire requirement lifecycle is what distributed systems architects call a saga — a long-running transaction that spans multiple services (agents). Unlike a database transaction, you can’t simply rollback a saga. You have to execute compensating transactions for each step that already completed.

In our system:

  • If deployment fails after code was already committed to the repository, the DevOps agent needs to run a rollback_deployment() compensating action.
  • If the design is rejected after the TA agent already created artifacts, the Design Context needs to archive those artifacts before starting fresh.
  • If a story is rejected after the BA agent generated it, the state transition records the rejection with a rejection_reason so the PO agent knows what to fix.

The key to saga management is: every forward action must have a corresponding compensating action defined. We’ll implement this as part of the BaseAgent class in Part 4.


9. The Complete System Architecture

Let me zoom out and show the entire system as it will look when fully built:

Complete system architecture
Figure 4: Complete System Architecture. LangGraph orchestrates all 8 agents. The state store (Redis/SQLite) persists checkpoints. The message bus delivers domain events via the outbox pattern. Two human checkpoints (amber H badges) pause execution for approval. External tools connect to individual agent nodes.

10. The ConversationStore Interface

One additional architectural component worth defining now is the ConversationStore. While the TeamState holds the current workflow state, agents also need access to their conversation history — the back-and-forth with the LLM that produced their outputs. We store this separately to keep the TeamState lean.

from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass


@dataclass
class ConversationEntry:
    """A single turn in an agent's conversation history."""
    entry_id: str
    agent_id: str
    task_id: str
    role: str                         # "system", "user", "assistant"
    content: str
    timestamp: str
    token_count: Optional[int] = None
    model: Optional[str] = None


class ConversationStore(ABC):
    """
    Interface for persisting agent conversation histories.
    
    Why separate from TeamState? 
    - Conversation histories can be large (thousands of tokens)
    - They're needed for debugging and auditing, not for routing
    - Different storage backends may be appropriate
      (e.g., S3 for archives, Redis for active conversations)
    """

    @abstractmethod
    async def save_entry(self, entry: ConversationEntry) -> None:
        """Append a conversation entry."""

    @abstractmethod
    async def get_history(
        self,
        agent_id: str,
        task_id: str,
        limit: Optional[int] = None,
    ) -> list[ConversationEntry]:
        """Retrieve conversation history for an agent on a task."""

    @abstractmethod
    async def get_full_task_history(
        self,
        task_id: str,
    ) -> dict[str, list[ConversationEntry]]:
        """Get all agent conversations for an entire task."""

    @abstractmethod
    async def clear_history(self, agent_id: str, task_id: str) -> None:
        """Clear history (e.g., after task completion)."""


class SqliteConversationStore(ConversationStore):
    """
    SQLite-backed conversation store for development.
    In production, swap for Redis or PostgreSQL.
    """

    def __init__(self, db_path: str = "conversations.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self) -> None:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS conversations (
                entry_id TEXT PRIMARY KEY,
                agent_id TEXT NOT NULL,
                task_id TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                token_count INTEGER,
                model TEXT
            )
        """)
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_agent_task ON conversations(agent_id, task_id)"
        )
        conn.commit()
        conn.close()

    async def save_entry(self, entry: ConversationEntry) -> None:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """INSERT INTO conversations 
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                entry.entry_id,
                entry.agent_id,
                entry.task_id,
                entry.role,
                entry.content,
                entry.timestamp,
                entry.token_count,
                entry.model,
            ),
        )
        conn.commit()
        conn.close()

    async def get_history(
        self,
        agent_id: str,
        task_id: str,
        limit: Optional[int] = None,
    ) -> list[ConversationEntry]:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        query = """
            SELECT * FROM conversations 
            WHERE agent_id=? AND task_id=? 
            ORDER BY timestamp ASC
        """
        if limit:
            query += f" LIMIT {limit}"
        rows = conn.execute(query, (agent_id, task_id)).fetchall()
        conn.close()
        return [
            ConversationEntry(
                entry_id=row[0],
                agent_id=row[1],
                task_id=row[2],
                role=row[3],
                content=row[4],
                timestamp=row[5],
                token_count=row[6],
                model=row[7],
            )
            for row in rows
        ]

    async def get_full_task_history(
        self,
        task_id: str,
    ) -> dict[str, list[ConversationEntry]]:
        entries = await self.get_history("", task_id)  # simplified
        result: dict[str, list] = {}
        for entry in entries:
            result.setdefault(entry.agent_id, []).append(entry)
        return result

    async def clear_history(self, agent_id: str, task_id: str) -> None:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "DELETE FROM conversations WHERE agent_id=? AND task_id=?",
            (agent_id, task_id),
        )
        conn.commit()
        conn.close()

11. What We’ve Designed — And Why It Matters

Let me step back and summarize what we’ve built in this post from an architectural perspective.

We’ve defined five bounded contexts that map cleanly to agent clusters. Each context owns its domain model, and cross-context communication happens via anti-corruption layers. This means when the Design Context evolves — say, we add a new field to TechnicalSpec — it doesn’t break the Requirement Context or the Implementation Context.

We’ve designed a nine-state lifecycle with explicit transition guards. No agent can skip states. No agent can transition a requirement backward without triggering a compensating action. The audit trail captures every transition.

We’ve defined three communication patterns — Command, Event, Query — and specified which pattern is appropriate for each agent interaction. This prevents the chaos of agents sending arbitrary messages to each other. Every message has a type, a purpose, and a routing rule.

We’ve built a TeamState with 40+ fields organized by domain. Each field is typed, each list field has an appropriate LangGraph reducer, and the schema is versioned for future migration.

We’ve designed human-in-the-loop checkpoints that use LangGraph’s interrupt() mechanism, with an async approval gateway for production use.

And we’ve identified the saga pattern as the right mental model for the entire workflow, with compensating actions defined for each forward step.

Is this more architecture than most “AI agent” projects have? Yes, significantly. But this is exactly the point. If you’re building a system that will run autonomously on real codebases — pushing code to GitHub, running tests, generating CI/CD configs — you need this level of rigor. The cost of an architectural mistake here isn’t a slow API; it’s broken code in production.

A Word on Over-Engineering

I want to be honest about the other extreme: this architecture only makes sense if you’re building a system meant to handle many different projects over time. If you’re vibe-coding a one-off prototype for a weekend hackathon, you probably don’t need bounded contexts and the outbox pattern. Use LangGraph directly with a flat state dict and get the thing working.

The architecture I’m describing here is for the case where you’re building the AI team as a platform — something your team will use repeatedly, extend, and maintain. In that case, the investment in architecture pays off many times over.


12. Preview: What We Build in Part 4

In Part 4, we move from design to implementation. We’ll build:

The BaseAgent class — The abstract base that every agent inherits from. It handles:

  • LLM client initialization (configurable model per agent)
  • System prompt management and injection
  • Conversation history management via ConversationStore
  • Domain event emission via the outbox
  • Retry logic with exponential backoff
  • Error capture and state recording
  • Compensating action registration (saga pattern)

The project structure — The full directory layout:

ai-team/
├── agents/
│   ├── base.py              # BaseAgent abstract class
│   ├── po_agent.py          # Product Owner
│   ├── ba_agent.py          # Business Analyst
│   ├── ta_agent.py          # Tech Architect
│   ├── tl_agent.py          # Tech Lead
│   ├── sse_agent.py         # Senior Software Engineer
│   ├── qc_agent.py          # QC Engineer
│   ├── devops_agent.py      # DevOps Engineer
│   └── pm_agent.py          # Project Manager
├── domain/
│   ├── state.py             # TeamState TypedDict
│   ├── events.py            # DomainEvent + all event types
│   ├── models.py            # All domain entities/value objects
│   └── transitions.py       # State machine + guards
├── infrastructure/
│   ├── conversation_store.py  # ConversationStore + SQLite impl
│   ├── event_dispatcher.py    # Outbox dispatch logic
│   └── human_gateway.py       # HumanApprovalGateway
├── graph/
│   ├── builder.py           # LangGraph StateGraph construction
│   ├── edges.py             # All conditional edge functions
│   └── checkpoints.py       # Human checkpoint nodes
├── tools/
│   ├── github_tools.py      # GitHub API integration
│   ├── test_runner.py       # pytest/jest execution
│   └── linter.py            # ruff/mypy/eslint execution
├── api/
│   ├── server.py            # FastAPI server
│   └── webhooks.py          # Human approval webhook endpoint
├── tests/
│   └── ...
├── pyproject.toml
└── README.md

The first working agent — We’ll implement the PO agent end-to-end: system prompt, LLM call, output parsing, domain event emission, and integration with the LangGraph node. By the end of Part 4, you’ll be able to run a real requirement brief through the PO agent and see it produce a ClarifiedRequirement object.


Closing Thoughts

Architecture is the set of decisions that are hardest to change later. The bounded contexts we’ve defined here, the state machine we’ve specified, the communication patterns we’ve chosen — these are the load-bearing walls of our AI team. We should be thoughtful about them before we write the first line of agent code.

I’ve made these design choices based on hard lessons from building systems that started without architecture and paid the price. Agents without boundaries contradict each other. State without structure becomes impossible to debug. Events without patterns create tangled dependencies that no LLM can reason about.

The patterns here — DDD, sagas, outbox, CQRS applied to agent communication — aren’t novel inventions for AI. They’re the collected wisdom of software engineering applied to a new kind of component. The agents are new. The need for architecture is timeless.

In the next part, we turn this blueprint into working Python. See you there.


Thuan Luong is a Tech Lead based in Vietnam specializing in distributed systems and AI engineering. He leads the “Vibe Coding” series on building production-quality AI software teams with LangGraph.

Part 3 of 12 — Part 4: BaseAgent Class and Project Setup →

Export for reading

Comments