Tôi đã có tất cả tám agents hoạt động riêng lẻ. Mỗi agent được kiểm tra chi tiết. Mỗi agent tạo ra output đúng cho input được soạn kỹ lưỡng. PO làm rõ yêu cầu. BA viết các stories. QC và TA chạy song song. SSE viết code. TL review. DevOps sinh ra cấu hình deployment. PM theo dõi mọi thứ.
Và sau đó tôi nối chúng lại với nhau lần đầu tiên, nhấn enter, và xem toàn bộ điều đó bị hỏng trong chưa tới bốn giây.
Agent PO tạo ra một RequirementDoc với tên trường mà agent BA không mong đợi. Agent BA tạo ra user stories, nhưng fan-out sang QC và TA xảy ra trước khi stories được ghi đầy đủ vào state. Agent SSE bắt đầu sinh code, nhưng router agent TL kiểm tra một state key mà agent SSE chưa điền vào. Ba agents khác nhau sử dụng ba quy ước khác nhau để đánh dấu công việc của họ là “xong”.
Các agents hoạt động riêng lẻ là điều cơ bản. Workflow chính là sản phẩm.
Đây là bài viết nơi tôi nối tất cả chúng lại. Tôi sẽ xây dựng workflow.py hoàn chỉnh — một LangGraph StateGraph với tất cả tám agents là các nodes, routing có điều kiện giữa chúng, parallel fan-out cho QC và TA, human checkpoints, và error recovery. Sau đó tôi sẽ xây dựng CLI entry point main.py và chạy nó end-to-end với một ví dụ thực tế: dịch vụ URL shortener, từ client brief đến deployment config, trong 102 giây.
1. The Complete Graph
Đây là toàn bộ workflow dưới dạng LangGraph StateGraph. Mỗi node là một agent. Mỗi edge là unconditional (luôn tiếp tục), conditional (phụ thuộc vào output), hoặc parallel (fan-out sang nhiều nodes cùng một lúc).
Luồng, bằng tiếng Anh:
- START cấp client brief cho PO Agent (Alex).
- Alex tạo ra
RequirementDoc. Một human checkpoint tạm dừng thực thi. Stakeholder xem xét và phê duyệt hoặc yêu cầu sửa lại. Nếu sửa lại, PO chạy lại. - Khi được phê duyệt, BA Agent (Jordan) phân tích yêu cầu thành các đối tượng
UserStory. - Output của BA fan-out song song tới QC Agent (Sam) và TA Agent (Morgan). Cả hai chạy đồng thời. Outputs của chúng merge trước giai đoạn tiếp theo.
- SSE Agent (Riley) nhận user stories, test cases, và technical spec. Nó viết code.
- Một router kiểm tra output SSE. Nếu code không hoàn chỉnh hoặc fail self-validation, SSE chạy lại (tối đa 2 lần retry).
- TL Agent (Casey) review code. Một router khác: nếu Casey yêu cầu thay đổi, SSE sửa lại. Nếu được phê duyệt, pipeline tiếp tục.
- DevOps Agent (Dana) sinh deployment configs. PM Agent (Pat) tạo project summary cuối cùng. Cả hai chạy và graph đạt tới END.
Hãy xây dựng nó.
2. workflow.py — The StateGraph
# workflow.py
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from state import TeamState
from agents.po import POAgent
from agents.ba import BAAgent
from agents.qc import QCAgent
from agents.ta import TAAgent
from agents.sse import SSEAgent
from agents.tl import TLAgent
from agents.devops import DevOpsAgent
from agents.pm import PMAgent
# ── Agent node functions ──────────────────────────────────────
def po_node(state: TeamState) -> dict:
agent = POAgent()
result = agent.run(state)
return {
"requirement_doc": result.requirement_doc,
"po_questions": result.clarifying_questions,
"current_agent": "po",
"current_phase": "requirements",
}
def ba_node(state: TeamState) -> dict:
agent = BAAgent()
result = agent.run(state)
return {
"user_stories": result.user_stories,
"current_agent": "ba",
"current_phase": "analysis",
}
def qc_node(state: TeamState) -> dict:
agent = QCAgent()
result = agent.run(state)
return {
"test_suite": result.test_suite,
"quality_gates": result.quality_gates,
"current_agent": "qc",
}
def ta_node(state: TeamState) -> dict:
agent = TAAgent()
result = agent.run(state)
return {
"technical_spec": result.technical_spec,
"adrs": result.adrs,
"current_agent": "ta",
}
def sse_node(state: TeamState) -> dict:
agent = SSEAgent()
result = agent.run(state)
retries = state.get("sse_retries", 0)
return {
"code_artifacts": result.code_artifacts,
"test_results": result.test_results,
"current_agent": "sse",
"current_phase": "implementation",
"sse_retries": retries + 1,
}
def tl_node(state: TeamState) -> dict:
agent = TLAgent()
result = agent.run(state)
return {
"review_result": result.review_result,
"review_comments": result.comments,
"current_agent": "tl",
"current_phase": "review",
}
def devops_node(state: TeamState) -> dict:
agent = DevOpsAgent()
result = agent.run(state)
return {
"deployment_config": result.deployment_config,
"infra_spec": result.infra_spec,
"current_agent": "devops",
"current_phase": "deployment",
}
def pm_node(state: TeamState) -> dict:
agent = PMAgent()
result = agent.run(state)
return {
"project_summary": result.project_summary,
"timeline": result.timeline,
"current_agent": "pm",
"current_phase": "complete",
}
Mỗi node function tuân theo cùng một mẫu: khởi tạo agent, gọi run() với current state, trả về dictionary các state updates. LangGraph merge tự động những cái này vào TeamState. Mỗi node là một pure function của input state — nếu tôi cần debug tại sao SSE tạo ra code tồi, tôi có thể replay node đó với exact state snapshot.
3. Router Functions
Routers là conditional edges — các câu lệnh if của graph. Mỗi router kiểm tra current state và trả về tên của node tiếp theo.
# ── Router functions ──────────────────────────────────────────
def route_after_po(state: TeamState) -> str:
"""After PO: check if human approved the requirements."""
approval = state.get("human_approval", {})
if approval.get("po_approved") is True:
return "ba"
# If requirements were rejected with feedback,
# loop back to PO with the feedback in state
if approval.get("po_approved") is False:
return "po"
# Default: waiting for human input (should not reach here
# if interrupt_before is configured correctly)
return "po"
def route_after_sse(state: TeamState) -> str:
"""After SSE: check if code passes self-validation."""
test_results = state.get("test_results", {})
retries = state.get("sse_retries", 0)
# If tests pass, proceed to TL review
if test_results.get("all_passed", False):
return "tl"
# If tests fail but retries remain, loop back
if retries < 3:
return "sse"
# Max retries exhausted — proceed to TL anyway
# TL will flag the failures in review
return "tl"
def route_after_tl_review(state: TeamState) -> str:
"""After TL review: approve, request changes, or reject."""
review = state.get("review_result", {})
decision = review.get("decision", "reject")
retries = state.get("sse_retries", 0)
if decision == "approved":
return "devops"
if decision == "changes_requested" and retries < 3:
return "sse"
# If rejected or max retries, still proceed to DevOps
# with whatever we have — PM will flag the quality gap
return "devops"
Ba routers. Ba decision points. Hãy để tôi giải thích ý tưởng thiết kế đằng sau mỗi cái.
route_after_po là đơn giản nhất. Agent PO tạo yêu cầu. Human xem xét chúng. Human đặt state["human_approval"]["po_approved"] thành True hoặc False. Nếu False, agent PO chạy lại với feedback được nhúng vào state. Vòng lặp này có thể lặp lại bao nhiêu lần con người cần. Không có retry limit cho human feedback — không giống automated retries, human iteration là điều được mong đợi.
route_after_sse xử lý trường hợp khi test results được report bởi SSE agent cho biết failure. SSE agent chạy các tests riêng của nó như một phần của code generation. Nếu tests fail, tôi cho nó tối đa hai lần retry bổ sung (ba tổng cộng). Điều này rẻ — mỗi SSE retry tốn khoảng 8 giây và $0.04 trong API calls. Nếu cả ba lần attempt đều tạo ra failing tests, tôi vẫn gửi code tới agent TL. Tốt hơn là có TL review với known failures chứ không phải spin vô tận.
route_after_tl_review là tinh tế nhất. Casey (TL) có thể trả về ba decisions: approved, changes_requested, hoặc reject. Khi approved, work chảy tới DevOps. Khi changes_requested, agent SSE sửa lại — nhưng chỉ nếu retry budget chưa hết. Khi reject (hoặc exhausted retries), tôi vẫn tiếp tục. Điều này có vẻ phản trực giác, nhưng trong thực tế, hard rejection hiếm. Agent PM sẽ ghi lại quality gap trong final report, và human operator sẽ thấy nó.
Insight chính: routers phải luôn resolve thành một valid next node. Mỗi branch, bao gồm error branches, phải trỏ đến một nơi nào đó.
4. Building the Graph
# ── Graph assembly ────────────────────────────────────────────
def build_workflow() -> StateGraph:
"""Assemble the complete 8-agent workflow."""
graph = StateGraph(TeamState)
# Add all nodes
graph.add_node("po", po_node)
graph.add_node("ba", ba_node)
graph.add_node("qc", qc_node)
graph.add_node("ta", ta_node)
graph.add_node("sse", sse_node)
graph.add_node("tl", tl_node)
graph.add_node("devops", devops_node)
graph.add_node("pm", pm_node)
# Entry point
graph.add_edge(START, "po")
# PO -> human checkpoint -> conditional
graph.add_conditional_edges("po", route_after_po, ["ba", "po"])
# BA -> parallel fan-out to QC and TA
graph.add_edge("ba", "qc")
graph.add_edge("ba", "ta")
# QC and TA merge -> SSE
graph.add_edge("qc", "sse")
graph.add_edge("ta", "sse")
# SSE -> conditional (retry or proceed to TL)
graph.add_conditional_edges("sse", route_after_sse, ["tl", "sse"])
# TL -> conditional (approve, revise, or proceed)
graph.add_conditional_edges(
"tl", route_after_tl_review, ["devops", "sse"]
)
# DevOps -> PM -> END
graph.add_edge("devops", "pm")
graph.add_edge("pm", END)
return graph
Sáu dòng đáng chú ý đặc biệt.
Lines graph.add_edge("ba", "qc") và graph.add_edge("ba", "ta"): Hai edges này tạo parallel fan-out. Khi BA node hoàn thành, LangGraph thấy hai outgoing edges và thực thi cả hai target nodes đồng thời. Không có keyword “parallel” rõ ràng — concurrency là một hệ quả của graph topology. Nếu một node có nhiều outgoing unconditional edges, các targets chạy song song. Đây là một trong những quyết định thiết kế thanh lịch nhất của LangGraph.
Lines graph.add_edge("qc", "sse") và graph.add_edge("ta", "sse"): Đây là merge edges. SSE node có hai incoming edges. LangGraph sẽ không thực thi nó cho đến khi cả hai predecessors hoàn thành. State updates từ QC và TA đều được merge vào TeamState trước khi SSE node thấy nó. Đây là fan-in — gương lại của fan-out.
Conditional edges: add_conditional_edges nhận ba arguments: source node, router function, và list các possible target nodes. List rất quan trọng — LangGraph validates tại compile time rằng router chỉ có thể trả về values từ list này. Nếu tôi vô tình trả về "devop" (thiếu ‘s’), tôi sẽ nhận build error, không phải runtime crash.
5. Human Checkpoints with interrupt_before
Raw automation rất nguy hiểm. Một graph chạy từ brief tới deployment mà không có bất kỳ human oversight nào là một graph mà cuối cùng sẽ deploy một cái gì đó thảm họa. Tôi cần checkpoints — những lúc hệ thống tạm dừng và chờ human input.
LangGraph’s interrupt_before mechanism xử lý điều này:
def build_workflow_with_checkpoints() -> StateGraph:
"""Build workflow with human-in-the-loop checkpoints."""
graph = build_workflow()
# Compile with checkpointing and interrupts
checkpointer = MemorySaver()
compiled = graph.compile(
checkpointer=checkpointer,
interrupt_before=["ba", "sse"], # pause before these nodes
)
return compiled
interrupt_before=["ba", "sse"] nghĩa là graph sẽ tạm dừng thực thi trước BA node chạy (để human có thể review PO’s requirements) và trước SSE node chạy (để human có thể review QC test cases và TA technical spec trước khi implementation bắt đầu).
Khi graph hit một interrupt, nó lưu complete state của nó vào checkpointer và trả lại control cho caller. Caller — CLI của tôi — có thể hiển thị current state, yêu cầu human approval, cập nhật state với bất kỳ feedback, và sau đó resume:
# Resume after human approval
config = {"configurable": {"thread_id": thread_id}}
# First run — will pause before "ba"
result = compiled.invoke(initial_state, config)
# Human reviews PO output, approves
updated_state = {
"human_approval": {"po_approved": True}
}
result = compiled.invoke(updated_state, config)
# Runs BA, QC+TA in parallel, then pauses before "sse"
# Human reviews test cases and tech spec
updated_state = {
"human_approval": {"sse_approved": True}
}
result = compiled.invoke(updated_state, config)
# Runs SSE, TL review, DevOps, PM -> END
thread_id rất quan trọng. Mỗi unique thread_id nhận state history của riêng nó. Tôi có thể chạy nhiều workflows đồng thời với different thread IDs — chúng hoàn toàn isolated.
Why These Two Checkpoints?
Tôi chọn để interrupt trước BA và trước SSE vì đây là hai review points có leverage cao nhất. Trước BA: bắt lỗi requirement ở đây tốn ~15 giây của PO re-run. Bắt nó tại TL review lãng phí ~90 giây của QC + TA + SSE time. Trước SSE: đây là checkpoint cuối cùng trước code generation. Khi SSE bắt đầu viết code, review cycle là automated.
Tôi có thể thêm nhiều checkpoints hơn, nhưng mỗi cái thêm latency. Hai cái cân bằng được.
6. main.py — The CLI Entry Point
#!/usr/bin/env python3
# main.py
"""CLI entry point for the AI software team workflow."""
import sys
import time
import json
import argparse
from uuid import uuid4
from workflow import build_workflow_with_checkpoints
from state import TeamState
def create_initial_state(brief: str) -> TeamState:
"""Create the initial state from a client brief."""
return TeamState(
client_brief=brief,
current_phase="intake",
current_agent="",
human_approval={},
sse_retries=0,
)
def print_phase_header(phase: str, agent: str):
"""Print a formatted phase header."""
width = 60
print(f"\n{'=' * width}")
print(f" PHASE: {phase.upper()}")
print(f" AGENT: {agent}")
print(f"{'=' * width}")
def print_state_summary(state: dict, keys: list[str]):
"""Print selected keys from state for review."""
for key in keys:
value = state.get(key)
if value is None:
continue
if isinstance(value, dict):
print(f"\n {key}:")
print(f" {json.dumps(value, indent=4, default=str)[:500]}")
elif isinstance(value, list):
print(f"\n {key}: ({len(value)} items)")
for i, item in enumerate(value[:3]):
summary = str(item)[:120]
print(f" [{i}] {summary}")
if len(value) > 3:
print(f" ... and {len(value) - 3} more")
else:
print(f"\n {key}: {str(value)[:200]}")
def wait_for_approval(checkpoint_name: str) -> tuple[bool, str]:
"""Prompt user for approval at a checkpoint."""
print(f"\n ** CHECKPOINT: {checkpoint_name} **")
print(" Review the output above.")
response = input(" Approve? [y/n/q] (y=approve, n=revise, q=quit): ")
response = response.strip().lower()
if response == "q":
print(" Workflow cancelled by user.")
sys.exit(0)
elif response == "n":
feedback = input(" Feedback for revision: ")
return False, feedback
else:
return True, ""
def run_workflow(brief: str, auto_approve: bool = False):
"""Run the complete workflow from brief to deployment."""
thread_id = str(uuid4())
config = {"configurable": {"thread_id": thread_id}}
compiled = build_workflow_with_checkpoints()
state = create_initial_state(brief)
print("\n AI Software Team — Workflow Runner")
print(f" Thread: {thread_id}")
print(f" Brief: {brief[:80]}...")
total_start = time.time()
stage_times = {}
# ── Phase 1: PO ──
stage_start = time.time()
print_phase_header("requirements", "PO (Alex)")
result = compiled.invoke(state, config)
stage_times["po"] = time.time() - stage_start
print_state_summary(result, ["requirement_doc", "po_questions"])
if not auto_approve:
approved, feedback = wait_for_approval("Requirements Review")
if not approved:
result = compiled.invoke(
{"human_approval": {"po_approved": False},
"revision_feedback": feedback},
config,
)
stage_times["po_revision"] = time.time() - stage_start
# Approve and continue
stage_start = time.time()
result = compiled.invoke(
{"human_approval": {"po_approved": True}}, config
)
stage_times["ba_qc_ta"] = time.time() - stage_start
# ── Phase 2: BA + QC/TA parallel ──
print_phase_header("analysis + design", "BA (Jordan) + QC (Sam) + TA (Morgan)")
print_state_summary(
result,
["user_stories", "test_suite", "technical_spec"],
)
if not auto_approve:
approved, feedback = wait_for_approval("Design Review")
if not approved:
print(" (Design revision not implemented in this demo)")
# ── Phase 3: SSE + TL review loop ──
stage_start = time.time()
result = compiled.invoke(
{"human_approval": {"sse_approved": True}}, config
)
stage_times["sse_tl_devops_pm"] = time.time() - stage_start
print_phase_header("implementation + review", "SSE (Riley) + TL (Casey)")
print_state_summary(
result,
["code_artifacts", "review_result", "review_comments"],
)
print_phase_header("deployment + summary", "DevOps (Dana) + PM (Pat)")
print_state_summary(
result,
["deployment_config", "project_summary", "timeline"],
)
# ── Final timing report ──
total_time = time.time() - total_start
print(f"\n{'=' * 60}")
print(" TIMING REPORT")
print(f"{'=' * 60}")
for stage, elapsed in stage_times.items():
print(f" {stage:.<30} {elapsed:6.1f}s")
print(f" {'TOTAL':.<30} {total_time:6.1f}s")
print(f"{'=' * 60}\n")
def main():
parser = argparse.ArgumentParser(
description="Run the AI software team workflow"
)
parser.add_argument(
"brief",
nargs="?",
help="Client brief (or reads from stdin)",
)
parser.add_argument(
"--auto-approve",
action="store_true",
help="Skip human checkpoints (for testing)",
)
parser.add_argument(
"--brief-file",
type=str,
help="Read brief from a file",
)
args = parser.parse_args()
if args.brief_file:
with open(args.brief_file) as f:
brief = f.read()
elif args.brief:
brief = args.brief
elif not sys.stdin.isatty():
brief = sys.stdin.read()
else:
print("Usage: python main.py 'your brief here'")
print(" python main.py --brief-file brief.txt")
print(" echo 'brief' | python main.py")
sys.exit(1)
run_workflow(brief, auto_approve=args.auto_approve)
if __name__ == "__main__":
main()
7. Streaming Output
Trong production tôi muốn thấy progress khi nó xảy ra. LangGraph hỗ trợ streaming natively:
def run_workflow_streaming(brief: str, config: dict):
"""Stream workflow execution, printing events as they occur."""
compiled = build_workflow_with_checkpoints()
state = create_initial_state(brief)
for event in compiled.stream(state, config, stream_mode="updates"):
for node_name, updates in event.items():
agent = updates.get("current_agent", "unknown")
phase = updates.get("current_phase", "")
print(f" [{node_name}] phase={phase} agent={agent}")
# Print key artifacts as they arrive
if "requirement_doc" in updates:
doc = updates["requirement_doc"]
print(f" -> RequirementDoc: {len(doc.get('features', []))} features")
if "user_stories" in updates:
stories = updates["user_stories"]
print(f" -> {len(stories)} user stories generated")
if "test_suite" in updates:
suite = updates["test_suite"]
cases = suite.get("test_cases", [])
print(f" -> TestSuite: {len(cases)} test cases")
if "code_artifacts" in updates:
artifacts = updates["code_artifacts"]
files = artifacts.get("files", [])
total_lines = sum(f.get("lines", 0) for f in files)
print(f" -> {len(files)} files, {total_lines} lines of code")
if "review_result" in updates:
decision = updates["review_result"].get("decision")
print(f" -> TL decision: {decision}")
if "deployment_config" in updates:
print(f" -> Deployment config generated")
stream_mode="updates" chỉ yield state deltas từ mỗi node. Để có full state tại bất kỳ điểm nào, dùng stream_mode="values". Streaming output trông như này:
[po] phase=requirements agent=po
-> RequirementDoc: 4 features
[ba] phase=analysis agent=ba
-> 8 user stories generated
[qc] phase=analysis agent=qc
-> TestSuite: 24 test cases
[ta] phase=analysis agent=ta
-> TechnicalSpec: 4 ADRs, 12 API contracts
[sse] phase=implementation agent=sse
-> 11 files, 847 lines of code
[tl] phase=review agent=tl
-> TL decision: approved
[devops] phase=deployment agent=devops
-> Deployment config generated
[pm] phase=complete agent=pm
Chú ý rằng [qc] và [ta] events đến gần nhau — chúng chạy song song.
8. Error Recovery via Checkpointing
Mọi thứ phá hỏng. API calls timeout. LLMs hallucinate malformed JSON. Câu hỏi không phải liệu errors xảy ra, mà liệu tôi có thể recover mà không mất work.
LangGraph’s checkpointer lưu state sau mỗi node hoàn thành. Nếu SSE agent fail trên retry thứ ba của nó, tôi không mất PO requirements, BA stories, QC test cases, hoặc TA technical spec. Chúng tất cả persisted.
from langgraph.checkpoint.sqlite import SqliteSaver
def build_workflow_persistent():
"""Build workflow with persistent SQLite checkpointing."""
graph = build_workflow()
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
compiled = graph.compile(
checkpointer=checkpointer,
interrupt_before=["ba", "sse"],
)
return compiled
def resume_from_failure(thread_id: str):
"""Resume a failed workflow from its last checkpoint."""
compiled = build_workflow_persistent()
config = {"configurable": {"thread_id": thread_id}}
# Get the current state — this loads from the checkpoint
state = compiled.get_state(config)
print(f" Resuming thread {thread_id}")
print(f" Last completed node: {state.values.get('current_agent')}")
print(f" Current phase: {state.values.get('current_phase')}")
print(f" Next node(s): {state.next}")
# Resume execution from where it stopped
result = compiled.invoke(None, config)
return result
Gọi invoke(None, config) để resume — None có nghĩa là “tiếp tục từ saved state mà không có new input.” Cho production, thay thế MemorySaver bằng SqliteSaver (single-machine) hoặc PostgresSaver (distributed). Interface giống nhau.
Đây là cách recovery trông như thế nào trong thực tế:
$ python main.py "Build a URL shortener" --auto-approve
[po] completed in 14.2s
[ba] completed in 18.7s
[qc] completed in 11.3s
[ta] completed in 12.1s
[sse] FAILED: OpenAI API timeout after 30s
Thread ID: a1b2c3d4-e5f6-7890-abcd-ef1234567890
# Five minutes later, after the API recovers:
$ python main.py --resume a1b2c3d4-e5f6-7890-abcd-ef1234567890
Resuming from last checkpoint...
Last completed: ta (parallel merge)
Continuing from: sse
[sse] completed in 22.4s
[tl] completed in 9.8s
[devops] completed in 7.2s
[pm] completed in 6.1s
TOTAL: 45.5s (resumed) + 56.3s (previous) = 101.8s
Không có work nào bị mất. Các PO, BA, QC, và TA runs đắt tiền được preserve. Chỉ failed node và successors của nó cần re-run.
9. E2E Walkthrough: URL Shortener
Hãy để tôi hiển thị một complete run với real (abbreviated) output. Brief:
“Build a URL shortener service. Users paste a long URL and get a short link back. Track click counts. Provide an API for programmatic access. Simple dashboard to see stats.”
Stage 1: PO Agent (Alex) — 14.2s
{
"features": [
"URL shortening with custom alias support",
"Click tracking with timestamp and referrer",
"REST API with API key authentication",
"Analytics dashboard with charts"
],
"personas": [
{"name": "Developer Dave", "goal": "Shorten URLs programmatically via API"},
{"name": "Marketer Maria", "goal": "Track campaign link performance"}
],
"out_of_scope": [
"User registration / account management",
"Custom domains",
"QR code generation",
"Link expiration policies"
],
"success_metrics": [
"< 100ms p95 redirect latency",
"99.9% uptime for redirect service",
"Dashboard loads in < 2s"
]
}
Alex chính xác xác định rằng URL shortener có vẻ đơn giản nhưng có ẩn độ phức tạp. Bằng cách rõ ràng liệt kê những gì out of scope — custom domains, QR codes, link expiration — Alex ngăn chặn downstream agents từ gold-plating solution.
Stage 2: BA Agent (Jordan) — 18.7s
US-001: Shorten a URL (story points: 3, must-have)
US-002: Redirect via short link (story points: 2, must-have)
US-003: Track click metadata (story points: 3, must-have)
US-004: Custom alias for short URL (story points: 2, should-have)
US-005: REST API with key auth (story points: 5, must-have)
US-006: View click analytics (story points: 3, should-have)
US-007: Dashboard with charts (story points: 5, should-have)
US-008: Rate limiting (story points: 2, must-have)
Tám stories, 25 story points tổng cộng. Jordan prioritized chính xác: core shortening và redirect là must-haves; dashboard là should-have. Rate limiting là must-have vì URL shortener công cộng mà không có rate limiting là spam gateway.
Stage 3: QC + TA in Parallel — 12.1s (wall clock)
QC (Sam) tạo ra 24 test cases. Một sample:
TC-001: Shorten valid URL → returns 7-char short code (happy path)
TC-005: Shorten URL exceeding 2048 chars → returns 400 (edge case)
TC-009: Redirect with expired/nonexistent code → returns 404 (error)
TC-017: 1000 concurrent redirects → p95 < 100ms (performance)
TC-024: SQL injection in custom alias → sanitized, returns 400 (security)
TA (Morgan) tạo ra technical spec:
Stack: Python 3.12, FastAPI, PostgreSQL, Redis (cache layer)
Architecture: Monolith with clean separation (not microservices)
ADR-001: PostgreSQL over DynamoDB — need JOIN for analytics
ADR-002: Base62 encoding for short codes — URL-safe, compact
ADR-003: Redis cache for redirect lookups — sub-10ms p95
ADR-004: API key auth over OAuth — simpler for API-first product
QC và TA chạy đồng thời. Sam tốn 11.3 giây; Morgan tốn 12.1 giây. Wall-clock time: 12.1 giây (cái chậm hơn của hai cái). Sequential sẽ là 23.4 giây. Parallel fan-out tiết kiệm 11.3 giây — 48% reduction cho stage này.
Stage 4: SSE Agent (Riley) — 22.4s
Riley tạo ra 11 files:
src/main.py — FastAPI application entry point
src/models.py — SQLAlchemy models (URL, Click, APIKey)
src/schemas.py — Pydantic request/response schemas
src/shortener.py — Core shortening logic (Base62)
src/redirect.py — Redirect endpoint with click tracking
src/api/routes.py — REST API routes
src/api/auth.py — API key authentication middleware
src/cache.py — Redis cache layer
src/analytics.py — Click analytics queries
tests/test_shortener.py — Unit tests for shortening logic
tests/test_api.py — Integration tests for API endpoints
847 dòng code. 19 trong 24 test cases addressed trực tiếp trong test files. Self-reported test results: 14 passing, 5 failing (tất cả performance tests — Riley chính xác lưu ý rằng cái này cần running Redis instance để validate).
Stage 5: TL Agent (Casey) — 9.8s
{
"decision": "approved",
"score": 8.2,
"comments": [
"Good: Base62 implementation matches ADR-002 exactly",
"Good: Redis cache has proper fallback to DB on miss",
"Minor: analytics.py should use connection pooling",
"Minor: Add rate limit headers to API responses (X-RateLimit-*)",
"Note: 5 failing perf tests are expected without Redis — acceptable"
]
}
Casey phê duyệt lần đầu tiên. Score 8.2/10. Hai minor suggestions không blocking deployment. Không revision loop triggered.
Stage 6: DevOps (Dana) — 7.2s
# docker-compose.yml (abbreviated)
services:
app:
build: .
ports: ["8000:8000"]
environment:
DATABASE_URL: postgresql://...
REDIS_URL: redis://cache:6379
depends_on: [db, cache]
db:
image: postgres:16-alpine
cache:
image: redis:7-alpine
Thêm một Dockerfile, một .env.example, và basic nginx.conf cho reverse proxy.
Stage 7: PM (Pat) — 6.1s
PROJECT SUMMARY
===============
Brief: URL shortener service
Stories: 8 (25 story points)
Test cases: 24 (19 addressed in code)
Code: 11 files, 847 lines
Review: Approved (8.2/10)
Deployment: Docker Compose ready
TIMELINE
========
Total AI pipeline time: 102.3 seconds
Estimated human equivalent: 10-14 business days
Speedup factor: ~1000x (wall-clock)
QUALITY NOTES
=============
- 5 performance test cases require live Redis to validate
- 2 minor code suggestions from TL review (non-blocking)
- Rate limiting implemented but headers not yet added
Timing Breakdown
PO (Alex).............. 14.2s (13.9%)
BA (Jordan)............ 18.7s (18.3%)
QC (Sam)............... 11.3s ─┐
TA (Morgan)............ 12.1s ─┤ parallel: 12.1s (11.8%)
SSE (Riley)............ 22.4s (21.9%)
TL (Casey)..............9.8s ( 9.6%)
DevOps (Dana)...........7.2s ( 7.0%)
PM (Pat)................6.1s ( 6.0%)
Human checkpoints...... 11.8s (11.5%)
────────────────────────────────
TOTAL................. 102.3s
102 giây. Một human team có năng lực làm việc trên URL shortener tương tự — requirements gathering, story writing, test case design, architecture decisions, implementation, code review, deployment setup, và project tracking — sẽ tốn 10 tới 14 business days. Tôi biết vì tôi đã làm điều đó.
AI team không làm ít hơn work. Nó tạo ra cùng artifacts mà human team sẽ tạo. Điểm khác là zero context-switching overhead và zero communication latency. 1000x speedup thay thế sequential coordination overhead của human team, không phải human judgment. Đó là tại sao checkpoints tồn tại — judgment ở lại với bạn.
10. Performance Numbers
Đây là real timing numbers từ 10 runs của URL shortener brief, dùng claude-sonnet-4-20250514 là backing model:
| Metric | Min | Median | Max | p95 |
|---|---|---|---|---|
| Total wall-clock time | 89s | 102s | 134s | 128s |
| PO stage | 11s | 14s | 19s | 18s |
| BA stage | 15s | 19s | 24s | 23s |
| QC+TA parallel | 10s | 12s | 16s | 15s |
| SSE stage | 18s | 22s | 31s | 29s |
| TL stage | 7s | 10s | 14s | 13s |
| DevOps stage | 5s | 7s | 10s | 9s |
| PM stage | 4s | 6s | 9s | 8s |
| SSE retry rate | 18% | |||
| TL revision rate | 12% | |||
| API cost per run | $0.31 | $0.42 | $0.68 | $0.61 |
Variance đến từ ba sources: LLM response time variability (dominant factor), retry loops (SSE retries xảy ra trong 18% của runs), và TL revision requests (12% của runs trigger một SSE re-run).
Cost — $0.42 median — là remarkably thấp. Một senior developer hour tốn $75-150 tùy thuộc vào market. AI team tạo comparable first draft cho ít hơn cost của coffee.
SSE agent là bottleneck tại 22 giây median — code generation là most token-intensive task. Parallel fan-out tiết kiệm 11 giây trung bình (23s sequential vs 12s parallel cho QC+TA).
11. What We Built and What Comes Next
Article này assembled toàn bộ pipeline. Tôi bây giờ có:
workflow.py— LangGraphStateGraphvới 8 agent nodes, 3 router functions, parallel fan-out cho QC+TA, và conditional retry loops cho SSE và TLmain.py— CLI entry point support inline briefs, file input, stdin piping, auto-approval mode, và streaming output- Human checkpoints —
interrupt_beforepauses tại hai high-leverage points (trước BA, trước SSE) nơi human review có highest ROI - Error recovery — Checkpointing via
MemorySaverhoặcSqliteSavercho phép resuming failed runs mà không mất completed work - Performance baseline — 102 giây median, $0.42 median cost, với clear identification của SSE bottleneck
Workflow hoạt động. Nhưng nó là black box. Khi TL agent reject SSE’s code, tôi có thể thấy rejection trong state, nhưng tôi không thể watch reasoning xảy ra real time. Khi QC và TA agents chạy song song, tôi biết chúng finished, nhưng tôi không biết cái nào struggled với cái gì.
Trong Part 10, tôi sẽ xây dựng observability layer — structured logging, LangSmith tracing, token usage dashboards, và real-time web UI cho phép bạn watch mỗi agent think. Observability không phải nice-to-have. Nó là difference giữa một system bạn tin tưởng và một system bạn hy vọng nó hoạt động.
Graph được xây dựng. Bây giờ tôi cần nhìn vào bên trong nó.
Tôi là Thuan Luong, một Tech Lead ở Việt Nam. Đây là Phần 9 của series 12-phần về xây dựng AI software team với LangGraph. Code trong series này là working system, không phải thought experiment. Nếu bạn đang xây dựng cái gì tương tự, tôi muốn nghe về nó.