I had all eight agents working individually. Each one tested in isolation. Each one producing the right output for carefully crafted input. The PO clarified requirements. The BA wrote stories. QC and TA ran in parallel. The SSE wrote code. The TL reviewed it. DevOps generated deployment configs. The PM tracked everything.

And then I wired them together for the first time, hit enter, and watched the whole thing crash in under four seconds.

The PO agent produced a RequirementDoc with a field name that the BA agent did not expect. The BA agent produced user stories, but the fan-out to QC and TA happened before the stories were fully written to state. The SSE agent started generating code, but the TL agent’s review router checked a state key that the SSE agent had not populated. Three different agents used three different conventions for marking their work as “done.”

Individual agents working is table stakes. The workflow is the product.

This is the article where we wire everything together. We will build the complete workflow.py — a single LangGraph StateGraph with all eight agents as nodes, conditional routing between them, parallel fan-out for QC and TA, human checkpoints, and error recovery. Then we will build a main.py CLI entry point and run it end-to-end with a real example: a URL shortener service, from client brief to deployment config, in 102 seconds.


1. The Complete Graph

Here is the full workflow as a LangGraph StateGraph. Every node is an agent. Every edge is either unconditional (always proceeds), conditional (depends on output), or parallel (fan-out to multiple nodes simultaneously).

Complete LangGraph workflow with all 8 agents, conditional edges, parallel fan-out, and human checkpoints

The flow, in English:

  1. START feeds the client brief to the PO Agent (Alex).
  2. Alex produces a RequirementDoc. A human checkpoint pauses execution. The stakeholder reviews and either approves or requests revision. If revised, the PO runs again.
  3. On approval, the BA Agent (Jordan) decomposes the requirements into UserStory objects.
  4. The BA output fans out in parallel to the QC Agent (Sam) and TA Agent (Morgan). Both run concurrently. Their outputs merge before the next stage.
  5. The SSE Agent (Riley) receives user stories, test cases, and the technical spec. It writes code.
  6. A router checks the SSE output. If the code is incomplete or fails self-validation, the SSE runs again (up to 2 retries).
  7. The TL Agent (Casey) reviews the code. Another router: if Casey requests changes, the SSE revises. If approved, the pipeline proceeds.
  8. The DevOps Agent (Dana) generates deployment configs. The PM Agent (Pat) produces the final project summary. Both run, and the graph reaches END.

Let’s build it.


2. workflow.py — The StateGraph

# workflow.py
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

from state import TeamState
from agents.po import POAgent
from agents.ba import BAAgent
from agents.qc import QCAgent
from agents.ta import TAAgent
from agents.sse import SSEAgent
from agents.tl import TLAgent
from agents.devops import DevOpsAgent
from agents.pm import PMAgent


# ── Agent node functions ──────────────────────────────────────

def po_node(state: TeamState) -> dict:
    agent = POAgent()
    result = agent.run(state)
    return {
        "requirement_doc": result.requirement_doc,
        "po_questions": result.clarifying_questions,
        "current_agent": "po",
        "current_phase": "requirements",
    }


def ba_node(state: TeamState) -> dict:
    agent = BAAgent()
    result = agent.run(state)
    return {
        "user_stories": result.user_stories,
        "current_agent": "ba",
        "current_phase": "analysis",
    }


def qc_node(state: TeamState) -> dict:
    agent = QCAgent()
    result = agent.run(state)
    return {
        "test_suite": result.test_suite,
        "quality_gates": result.quality_gates,
        "current_agent": "qc",
    }


def ta_node(state: TeamState) -> dict:
    agent = TAAgent()
    result = agent.run(state)
    return {
        "technical_spec": result.technical_spec,
        "adrs": result.adrs,
        "current_agent": "ta",
    }


def sse_node(state: TeamState) -> dict:
    agent = SSEAgent()
    result = agent.run(state)
    retries = state.get("sse_retries", 0)
    return {
        "code_artifacts": result.code_artifacts,
        "test_results": result.test_results,
        "current_agent": "sse",
        "current_phase": "implementation",
        "sse_retries": retries + 1,
    }


def tl_node(state: TeamState) -> dict:
    agent = TLAgent()
    result = agent.run(state)
    return {
        "review_result": result.review_result,
        "review_comments": result.comments,
        "current_agent": "tl",
        "current_phase": "review",
    }


def devops_node(state: TeamState) -> dict:
    agent = DevOpsAgent()
    result = agent.run(state)
    return {
        "deployment_config": result.deployment_config,
        "infra_spec": result.infra_spec,
        "current_agent": "devops",
        "current_phase": "deployment",
    }


def pm_node(state: TeamState) -> dict:
    agent = PMAgent()
    result = agent.run(state)
    return {
        "project_summary": result.project_summary,
        "timeline": result.timeline,
        "current_agent": "pm",
        "current_phase": "complete",
    }

Each node function follows the same pattern: instantiate the agent, call run() with the current state, return a dictionary of state updates. LangGraph merges these into TeamState automatically. Every node is a pure function of its input state — if you need to debug why the SSE produced bad code, replay that single node with its exact state snapshot.


3. Router Functions

Routers are the conditional edges — the if statements of the graph. Each router inspects the current state and returns the name of the next node.

# ── Router functions ──────────────────────────────────────────

def route_after_po(state: TeamState) -> str:
    """After PO: check if human approved the requirements."""
    approval = state.get("human_approval", {})

    if approval.get("po_approved") is True:
        return "ba"

    # If requirements were rejected with feedback,
    # loop back to PO with the feedback in state
    if approval.get("po_approved") is False:
        return "po"

    # Default: waiting for human input (should not reach here
    # if interrupt_before is configured correctly)
    return "po"


def route_after_sse(state: TeamState) -> str:
    """After SSE: check if code passes self-validation."""
    test_results = state.get("test_results", {})
    retries = state.get("sse_retries", 0)

    # If tests pass, proceed to TL review
    if test_results.get("all_passed", False):
        return "tl"

    # If tests fail but retries remain, loop back
    if retries < 3:
        return "sse"

    # Max retries exhausted — proceed to TL anyway
    # TL will flag the failures in review
    return "tl"


def route_after_tl_review(state: TeamState) -> str:
    """After TL review: approve, request changes, or reject."""
    review = state.get("review_result", {})
    decision = review.get("decision", "reject")
    retries = state.get("sse_retries", 0)

    if decision == "approved":
        return "devops"

    if decision == "changes_requested" and retries < 3:
        return "sse"

    # If rejected or max retries, still proceed to DevOps
    # with whatever we have — PM will flag the quality gap
    return "devops"

Three routers. Three decision points. Let me explain the design thinking behind each.

route_after_po is the simplest. The PO agent produces requirements. A human reviews them. The human sets state["human_approval"]["po_approved"] to True or False. If False, the PO agent runs again with the feedback embedded in state. This loop can repeat as many times as the human needs. There is no retry limit on human feedback — unlike automated retries, human iteration is expected.

route_after_sse handles the case where the SSE agent’s self-reported test results indicate failure. The SSE agent runs its own tests as part of code generation. If tests fail, we give it up to two additional attempts (three total). This is cheap — each SSE retry costs about 8 seconds and $0.04 in API calls. If all three attempts produce failing tests, we send the code to the TL agent anyway. Better to have a TL review with known failures than to spin indefinitely.

route_after_tl_review is the most nuanced. Casey (the TL) can return three decisions: approved, changes_requested, or reject. On approved, work flows to DevOps. On changes_requested, the SSE agent revises — but only if the retry budget has not been exhausted. On reject (or exhausted retries), we still proceed. This might seem counterintuitive, but in practice, a hard rejection is rare. The PM agent will document the quality gap in the final report, and the human operator will see it.

The key insight: routers must always resolve to a valid next node. Every branch, including error branches, must point somewhere.


4. Building the Graph

# ── Graph assembly ────────────────────────────────────────────

def build_workflow() -> StateGraph:
    """Assemble the complete 8-agent workflow."""
    graph = StateGraph(TeamState)

    # Add all nodes
    graph.add_node("po", po_node)
    graph.add_node("ba", ba_node)
    graph.add_node("qc", qc_node)
    graph.add_node("ta", ta_node)
    graph.add_node("sse", sse_node)
    graph.add_node("tl", tl_node)
    graph.add_node("devops", devops_node)
    graph.add_node("pm", pm_node)

    # Entry point
    graph.add_edge(START, "po")

    # PO -> human checkpoint -> conditional
    graph.add_conditional_edges("po", route_after_po, ["ba", "po"])

    # BA -> parallel fan-out to QC and TA
    graph.add_edge("ba", "qc")
    graph.add_edge("ba", "ta")

    # QC and TA merge -> SSE
    graph.add_edge("qc", "sse")
    graph.add_edge("ta", "sse")

    # SSE -> conditional (retry or proceed to TL)
    graph.add_conditional_edges("sse", route_after_sse, ["tl", "sse"])

    # TL -> conditional (approve, revise, or proceed)
    graph.add_conditional_edges(
        "tl", route_after_tl_review, ["devops", "sse"]
    )

    # DevOps -> PM -> END
    graph.add_edge("devops", "pm")
    graph.add_edge("pm", END)

    return graph

Six lines deserve special attention.

Lines graph.add_edge("ba", "qc") and graph.add_edge("ba", "ta"): These two edges create the parallel fan-out. When the BA node completes, LangGraph sees two outgoing edges and executes both target nodes concurrently. There is no explicit “parallel” keyword — concurrency is a consequence of the graph topology. If a node has multiple outgoing unconditional edges, the targets run in parallel. This is one of LangGraph’s most elegant design decisions.

Lines graph.add_edge("qc", "sse") and graph.add_edge("ta", "sse"): These are the merge edges. The SSE node has two incoming edges. LangGraph will not execute it until both predecessors have completed. The state updates from QC and TA are both merged into TeamState before the SSE node sees it. This is the fan-in — the mirror of the fan-out.

The conditional edges: add_conditional_edges takes three arguments: the source node, the router function, and a list of possible target nodes. The list is important — LangGraph validates at compile time that the router can only return values from this list. If you accidentally return "devop" (missing the ‘s’), you get a build error, not a runtime crash.


5. Human Checkpoints with interrupt_before

Raw automation is dangerous. A graph that runs from brief to deployment without any human oversight is a graph that will eventually deploy something catastrophic. We need checkpoints — moments where the system pauses and waits for human input.

LangGraph’s interrupt_before mechanism handles this:

def build_workflow_with_checkpoints() -> StateGraph:
    """Build workflow with human-in-the-loop checkpoints."""
    graph = build_workflow()

    # Compile with checkpointing and interrupts
    checkpointer = MemorySaver()
    compiled = graph.compile(
        checkpointer=checkpointer,
        interrupt_before=["ba", "sse"],  # pause before these nodes
    )
    return compiled

interrupt_before=["ba", "sse"] means the graph will pause execution before the BA node runs (so the human can review the PO’s requirements) and before the SSE node runs (so the human can review the QC test cases and TA technical spec before implementation begins).

When the graph hits an interrupt, it saves its complete state to the checkpointer and returns control to the caller. The caller — our CLI — can display the current state, ask the human for approval, update the state with any feedback, and then resume:

# Resume after human approval
config = {"configurable": {"thread_id": thread_id}}

# First run — will pause before "ba"
result = compiled.invoke(initial_state, config)

# Human reviews PO output, approves
updated_state = {
    "human_approval": {"po_approved": True}
}
result = compiled.invoke(updated_state, config)

# Runs BA, QC+TA in parallel, then pauses before "sse"
# Human reviews test cases and tech spec
updated_state = {
    "human_approval": {"sse_approved": True}
}
result = compiled.invoke(updated_state, config)
# Runs SSE, TL review, DevOps, PM -> END

The thread_id is critical. Each unique thread_id gets its own state history. You can run multiple workflows concurrently with different thread IDs — they are completely isolated.

Why These Two Checkpoints?

I chose to interrupt before BA and before SSE because these are the two highest-leverage review points. Before BA: catching a wrong requirement here costs ~15 seconds of PO re-run. Catching it at TL review wastes ~90 seconds of QC + TA + SSE time. Before SSE: this is the last checkpoint before code generation. Once the SSE starts writing code, the review cycle is automated.

You could add more checkpoints, but each one adds latency. Two hits the sweet spot.


6. main.py — The CLI Entry Point

#!/usr/bin/env python3
# main.py
"""CLI entry point for the AI software team workflow."""

import sys
import time
import json
import argparse
from uuid import uuid4

from workflow import build_workflow_with_checkpoints
from state import TeamState


def create_initial_state(brief: str) -> TeamState:
    """Create the initial state from a client brief."""
    return TeamState(
        client_brief=brief,
        current_phase="intake",
        current_agent="",
        human_approval={},
        sse_retries=0,
    )


def print_phase_header(phase: str, agent: str):
    """Print a formatted phase header."""
    width = 60
    print(f"\n{'=' * width}")
    print(f"  PHASE: {phase.upper()}")
    print(f"  AGENT: {agent}")
    print(f"{'=' * width}")


def print_state_summary(state: dict, keys: list[str]):
    """Print selected keys from state for review."""
    for key in keys:
        value = state.get(key)
        if value is None:
            continue
        if isinstance(value, dict):
            print(f"\n  {key}:")
            print(f"  {json.dumps(value, indent=4, default=str)[:500]}")
        elif isinstance(value, list):
            print(f"\n  {key}: ({len(value)} items)")
            for i, item in enumerate(value[:3]):
                summary = str(item)[:120]
                print(f"    [{i}] {summary}")
            if len(value) > 3:
                print(f"    ... and {len(value) - 3} more")
        else:
            print(f"\n  {key}: {str(value)[:200]}")


def wait_for_approval(checkpoint_name: str) -> tuple[bool, str]:
    """Prompt user for approval at a checkpoint."""
    print(f"\n  ** CHECKPOINT: {checkpoint_name} **")
    print("  Review the output above.")
    response = input("  Approve? [y/n/q] (y=approve, n=revise, q=quit): ")
    response = response.strip().lower()

    if response == "q":
        print("  Workflow cancelled by user.")
        sys.exit(0)
    elif response == "n":
        feedback = input("  Feedback for revision: ")
        return False, feedback
    else:
        return True, ""


def run_workflow(brief: str, auto_approve: bool = False):
    """Run the complete workflow from brief to deployment."""
    thread_id = str(uuid4())
    config = {"configurable": {"thread_id": thread_id}}

    compiled = build_workflow_with_checkpoints()
    state = create_initial_state(brief)

    print("\n  AI Software Team — Workflow Runner")
    print(f"  Thread: {thread_id}")
    print(f"  Brief: {brief[:80]}...")

    total_start = time.time()
    stage_times = {}

    # ── Phase 1: PO ──
    stage_start = time.time()
    print_phase_header("requirements", "PO (Alex)")
    result = compiled.invoke(state, config)
    stage_times["po"] = time.time() - stage_start

    print_state_summary(result, ["requirement_doc", "po_questions"])

    if not auto_approve:
        approved, feedback = wait_for_approval("Requirements Review")
        if not approved:
            result = compiled.invoke(
                {"human_approval": {"po_approved": False},
                 "revision_feedback": feedback},
                config,
            )
            stage_times["po_revision"] = time.time() - stage_start

    # Approve and continue
    stage_start = time.time()
    result = compiled.invoke(
        {"human_approval": {"po_approved": True}}, config
    )
    stage_times["ba_qc_ta"] = time.time() - stage_start

    # ── Phase 2: BA + QC/TA parallel ──
    print_phase_header("analysis + design", "BA (Jordan) + QC (Sam) + TA (Morgan)")
    print_state_summary(
        result,
        ["user_stories", "test_suite", "technical_spec"],
    )

    if not auto_approve:
        approved, feedback = wait_for_approval("Design Review")
        if not approved:
            print("  (Design revision not implemented in this demo)")

    # ── Phase 3: SSE + TL review loop ──
    stage_start = time.time()
    result = compiled.invoke(
        {"human_approval": {"sse_approved": True}}, config
    )
    stage_times["sse_tl_devops_pm"] = time.time() - stage_start

    print_phase_header("implementation + review", "SSE (Riley) + TL (Casey)")
    print_state_summary(
        result,
        ["code_artifacts", "review_result", "review_comments"],
    )

    print_phase_header("deployment + summary", "DevOps (Dana) + PM (Pat)")
    print_state_summary(
        result,
        ["deployment_config", "project_summary", "timeline"],
    )

    # ── Final timing report ──
    total_time = time.time() - total_start
    print(f"\n{'=' * 60}")
    print("  TIMING REPORT")
    print(f"{'=' * 60}")
    for stage, elapsed in stage_times.items():
        print(f"  {stage:.<30} {elapsed:6.1f}s")
    print(f"  {'TOTAL':.<30} {total_time:6.1f}s")
    print(f"{'=' * 60}\n")


def main():
    parser = argparse.ArgumentParser(
        description="Run the AI software team workflow"
    )
    parser.add_argument(
        "brief",
        nargs="?",
        help="Client brief (or reads from stdin)",
    )
    parser.add_argument(
        "--auto-approve",
        action="store_true",
        help="Skip human checkpoints (for testing)",
    )
    parser.add_argument(
        "--brief-file",
        type=str,
        help="Read brief from a file",
    )
    args = parser.parse_args()

    if args.brief_file:
        with open(args.brief_file) as f:
            brief = f.read()
    elif args.brief:
        brief = args.brief
    elif not sys.stdin.isatty():
        brief = sys.stdin.read()
    else:
        print("Usage: python main.py 'your brief here'")
        print("       python main.py --brief-file brief.txt")
        print("       echo 'brief' | python main.py")
        sys.exit(1)

    run_workflow(brief, auto_approve=args.auto_approve)


if __name__ == "__main__":
    main()

7. Streaming Output

In production you want to see progress as it happens. LangGraph supports streaming natively:

def run_workflow_streaming(brief: str, config: dict):
    """Stream workflow execution, printing events as they occur."""
    compiled = build_workflow_with_checkpoints()
    state = create_initial_state(brief)

    for event in compiled.stream(state, config, stream_mode="updates"):
        for node_name, updates in event.items():
            agent = updates.get("current_agent", "unknown")
            phase = updates.get("current_phase", "")

            print(f"  [{node_name}] phase={phase} agent={agent}")

            # Print key artifacts as they arrive
            if "requirement_doc" in updates:
                doc = updates["requirement_doc"]
                print(f"    -> RequirementDoc: {len(doc.get('features', []))} features")

            if "user_stories" in updates:
                stories = updates["user_stories"]
                print(f"    -> {len(stories)} user stories generated")

            if "test_suite" in updates:
                suite = updates["test_suite"]
                cases = suite.get("test_cases", [])
                print(f"    -> TestSuite: {len(cases)} test cases")

            if "code_artifacts" in updates:
                artifacts = updates["code_artifacts"]
                files = artifacts.get("files", [])
                total_lines = sum(f.get("lines", 0) for f in files)
                print(f"    -> {len(files)} files, {total_lines} lines of code")

            if "review_result" in updates:
                decision = updates["review_result"].get("decision")
                print(f"    -> TL decision: {decision}")

            if "deployment_config" in updates:
                print(f"    -> Deployment config generated")

stream_mode="updates" yields only the state deltas from each node. For the full state at any point, use stream_mode="values" instead. The streaming output looks like this:

  [po] phase=requirements agent=po
    -> RequirementDoc: 4 features
  [ba] phase=analysis agent=ba
    -> 8 user stories generated
  [qc] phase=analysis agent=qc
    -> TestSuite: 24 test cases
  [ta] phase=analysis agent=ta
    -> TechnicalSpec: 4 ADRs, 12 API contracts
  [sse] phase=implementation agent=sse
    -> 11 files, 847 lines of code
  [tl] phase=review agent=tl
    -> TL decision: approved
  [devops] phase=deployment agent=devops
    -> Deployment config generated
  [pm] phase=complete agent=pm

Notice that [qc] and [ta] events arrive close together — they run in parallel.


8. Error Recovery via Checkpointing

Things break. API calls timeout. LLMs hallucinate malformed JSON. The question is not whether errors happen, but whether you can recover without losing work.

LangGraph’s checkpointer saves state after every node completes. If the SSE agent fails on its third retry, you do not lose the PO requirements, the BA stories, the QC test cases, or the TA technical spec. They are all persisted.

from langgraph.checkpoint.sqlite import SqliteSaver

def build_workflow_persistent():
    """Build workflow with persistent SQLite checkpointing."""
    graph = build_workflow()
    checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

    compiled = graph.compile(
        checkpointer=checkpointer,
        interrupt_before=["ba", "sse"],
    )
    return compiled


def resume_from_failure(thread_id: str):
    """Resume a failed workflow from its last checkpoint."""
    compiled = build_workflow_persistent()
    config = {"configurable": {"thread_id": thread_id}}

    # Get the current state — this loads from the checkpoint
    state = compiled.get_state(config)

    print(f"  Resuming thread {thread_id}")
    print(f"  Last completed node: {state.values.get('current_agent')}")
    print(f"  Current phase: {state.values.get('current_phase')}")
    print(f"  Next node(s): {state.next}")

    # Resume execution from where it stopped
    result = compiled.invoke(None, config)
    return result

Call invoke(None, config) to resume — None means “continue from saved state without new input.” For production, replace MemorySaver with SqliteSaver (single-machine) or PostgresSaver (distributed). The interface is identical.

Here is what recovery looks like in practice:

$ python main.py "Build a URL shortener" --auto-approve
  [po] completed in 14.2s
  [ba] completed in 18.7s
  [qc] completed in 11.3s
  [ta] completed in 12.1s
  [sse] FAILED: OpenAI API timeout after 30s
  Thread ID: a1b2c3d4-e5f6-7890-abcd-ef1234567890

# Five minutes later, after the API recovers:
$ python main.py --resume a1b2c3d4-e5f6-7890-abcd-ef1234567890
  Resuming from last checkpoint...
  Last completed: ta (parallel merge)
  Continuing from: sse
  [sse] completed in 22.4s
  [tl] completed in 9.8s
  [devops] completed in 7.2s
  [pm] completed in 6.1s
  TOTAL: 45.5s (resumed) + 56.3s (previous) = 101.8s

No work is lost. Only the failed node and its successors re-run.


9. E2E Walkthrough: URL Shortener

Let me show a complete run with real (abbreviated) output. The brief:

“Build a URL shortener service. Users paste a long URL and get a short link back. Track click counts. Provide an API for programmatic access. Simple dashboard to see stats.”

Stage 1: PO Agent (Alex) — 14.2s

{
  "features": [
    "URL shortening with custom alias support",
    "Click tracking with timestamp and referrer",
    "REST API with API key authentication",
    "Analytics dashboard with charts"
  ],
  "personas": [
    {"name": "Developer Dave", "goal": "Shorten URLs programmatically via API"},
    {"name": "Marketer Maria", "goal": "Track campaign link performance"}
  ],
  "out_of_scope": [
    "User registration / account management",
    "Custom domains",
    "QR code generation",
    "Link expiration policies"
  ],
  "success_metrics": [
    "< 100ms p95 redirect latency",
    "99.9% uptime for redirect service",
    "Dashboard loads in < 2s"
  ]
}

Alex correctly identified hidden complexity. By explicitly listing what is out of scope, Alex prevents the downstream agents from gold-plating.

Stage 2: BA Agent (Jordan) — 18.7s

US-001: Shorten a URL (story points: 3, must-have)
US-002: Redirect via short link (story points: 2, must-have)
US-003: Track click metadata (story points: 3, must-have)
US-004: Custom alias for short URL (story points: 2, should-have)
US-005: REST API with key auth (story points: 5, must-have)
US-006: View click analytics (story points: 3, should-have)
US-007: Dashboard with charts (story points: 5, should-have)
US-008: Rate limiting (story points: 2, must-have)

Eight stories, 25 story points. Jordan prioritized correctly: core shortening is must-have; dashboard is should-have. Rate limiting is must-have — a public URL shortener without it is a spam gateway.

Stage 3: QC + TA in Parallel — 12.1s (wall clock)

QC (Sam) produced 24 test cases. A sample:

TC-001: Shorten valid URL → returns 7-char short code (happy path)
TC-005: Shorten URL exceeding 2048 chars → returns 400 (edge case)
TC-009: Redirect with expired/nonexistent code → returns 404 (error)
TC-017: 1000 concurrent redirects → p95 < 100ms (performance)
TC-024: SQL injection in custom alias → sanitized, returns 400 (security)

TA (Morgan) produced the technical spec:

Stack: Python 3.12, FastAPI, PostgreSQL, Redis (cache layer)
Architecture: Monolith with clean separation (not microservices)
ADR-001: PostgreSQL over DynamoDB — need JOIN for analytics
ADR-002: Base62 encoding for short codes — URL-safe, compact
ADR-003: Redis cache for redirect lookups — sub-10ms p95
ADR-004: API key auth over OAuth — simpler for API-first product

QC and TA ran simultaneously. Wall-clock time: 12.1 seconds (the slower of the two). Sequential would have been 23.4 seconds — a 48% reduction.

Stage 4: SSE Agent (Riley) — 22.4s

Riley produced 11 files:

src/main.py          — FastAPI application entry point
src/models.py        — SQLAlchemy models (URL, Click, APIKey)
src/schemas.py       — Pydantic request/response schemas
src/shortener.py     — Core shortening logic (Base62)
src/redirect.py      — Redirect endpoint with click tracking
src/api/routes.py    — REST API routes
src/api/auth.py      — API key authentication middleware
src/cache.py         — Redis cache layer
src/analytics.py     — Click analytics queries
tests/test_shortener.py  — Unit tests for shortening logic
tests/test_api.py    — Integration tests for API endpoints

847 lines of code. 19 out of 24 test cases addressed directly in the test files. Self-reported test results: 14 passing, 5 failing (all performance tests — Riley correctly noted these require a running Redis instance to validate).

Stage 5: TL Agent (Casey) — 9.8s

{
  "decision": "approved",
  "score": 8.2,
  "comments": [
    "Good: Base62 implementation matches ADR-002 exactly",
    "Good: Redis cache has proper fallback to DB on miss",
    "Minor: analytics.py should use connection pooling",
    "Minor: Add rate limit headers to API responses (X-RateLimit-*)",
    "Note: 5 failing perf tests are expected without Redis — acceptable"
  ]
}

Casey approved on the first pass. Score of 8.2/10. Two minor suggestions that do not block deployment. No revision loop triggered.

Stage 6: DevOps (Dana) — 7.2s

# docker-compose.yml (abbreviated)
services:
  app:
    build: .
    ports: ["8000:8000"]
    environment:
      DATABASE_URL: postgresql://...
      REDIS_URL: redis://cache:6379
    depends_on: [db, cache]
  db:
    image: postgres:16-alpine
  cache:
    image: redis:7-alpine

Plus a Dockerfile, a .env.example, and a basic nginx.conf for reverse proxy.

Stage 7: PM (Pat) — 6.1s

PROJECT SUMMARY
===============
Brief: URL shortener service
Stories: 8 (25 story points)
Test cases: 24 (19 addressed in code)
Code: 11 files, 847 lines
Review: Approved (8.2/10)
Deployment: Docker Compose ready

TIMELINE
========
Total AI pipeline time: 102.3 seconds
Estimated human equivalent: 10-14 business days
Speedup factor: ~1000x (wall-clock)

QUALITY NOTES
=============
- 5 performance test cases require live Redis to validate
- 2 minor code suggestions from TL review (non-blocking)
- Rate limiting implemented but headers not yet added

Timing Breakdown

PO (Alex).............. 14.2s   (13.9%)
BA (Jordan)............ 18.7s   (18.3%)
QC (Sam)............... 11.3s   ─┐
TA (Morgan)............ 12.1s   ─┤ parallel: 12.1s (11.8%)
SSE (Riley)............ 22.4s   (21.9%)
TL (Casey)..............9.8s    ( 9.6%)
DevOps (Dana)...........7.2s    ( 7.0%)
PM (Pat)................6.1s    ( 6.0%)
Human checkpoints...... 11.8s   (11.5%)
────────────────────────────────
TOTAL................. 102.3s

102 seconds. A competent human team working on this same URL shortener — requirements gathering, story writing, test case design, architecture decisions, implementation, code review, deployment setup, and project tracking — would spend 10 to 14 business days. I know because I have done it.

The AI team is not doing less work. It is producing the same artifacts a human team would. The difference is zero context-switching overhead and zero communication latency. The 1000x speedup replaces the sequential coordination overhead of a human team, not the human judgment. That is why the checkpoints exist — the judgment stays with you.


10. Performance Numbers

Here are real timing numbers from 10 runs of the URL shortener brief, using claude-sonnet-4-20250514 as the backing model:

MetricMinMedianMaxp95
Total wall-clock time89s102s134s128s
PO stage11s14s19s18s
BA stage15s19s24s23s
QC+TA parallel10s12s16s15s
SSE stage18s22s31s29s
TL stage7s10s14s13s
DevOps stage5s7s10s9s
PM stage4s6s9s8s
SSE retry rate18%
TL revision rate12%
API cost per run$0.31$0.42$0.68$0.61

The variance comes from LLM response time variability, SSE retries (18% of runs), and TL revision requests (12% of runs). The $0.42 median cost is less than a coffee — compared to $75-150 for a single senior developer hour.

The SSE agent is the bottleneck at 22 seconds median — code generation is the most token-intensive task. The parallel fan-out saves 11 seconds on average (23s sequential vs 12s parallel for QC+TA).


11. What We Built and What Comes Next

This article assembled the entire pipeline. We now have:

  • workflow.py — A LangGraph StateGraph with 8 agent nodes, 3 router functions, parallel fan-out for QC+TA, and conditional retry loops for SSE and TL
  • main.py — A CLI entry point supporting inline briefs, file input, stdin piping, auto-approval mode, and streaming output
  • Human checkpointsinterrupt_before pauses at two high-leverage points (before BA, before SSE) where human review has the highest ROI
  • Error recovery — Checkpointing via MemorySaver or SqliteSaver allows resuming failed runs without losing completed work
  • Performance baseline — 102 seconds median, $0.42 median cost, with clear identification of the SSE bottleneck

The workflow works. But it is a black box. When the TL agent rejects the SSE’s code, you can see the rejection in the state, but you cannot watch the reasoning happen in real time. When the QC and TA agents run in parallel, you know they finished, but you do not know what each one struggled with.

In Part 10, we build the observability layer — structured logging, LangSmith tracing, and a real-time web UI. The graph is built. Now we need to see inside it.


I am Thuan Luong, a Tech Lead based in Vietnam. This is Part 9 of a 12-part series on building an AI software team with LangGraph. The code in this series is a working system, not a thought experiment. If you are building something similar, I would like to hear about it.

Export for reading

Comments