Ở Phần 7, agent SSE đã tạo ra code. Files, tests, một implementation đang hoạt động. Cám dỗ lúc này là ship nó. Code biên dịch. Tests pass ở local. LLM tự tin. Điều gì có thể sai?

Mọi thứ. Mọi thứ có thể sai.

Tôi từng nhìn thấy một junior developer đẩy một login endpoint lên production mà lưu passwords dạng plaintext vì “bcrypt chậm và tất cả tests đều pass.” Tests pass vì tests kiểm tra xem password có được lưu hay không, chứ không kiểm tra xem nó có được hash hay không. Không ai review code. Breach xảy ra mười một ngày sau đó.

Code review tồn tại vì người viết code là người tồi nhất để đánh giá nó. Họ quá gần gũi. Họ nhìn thấy điều mà họ dự định, không phải điều họ sản xuất. Điều này đúng với các developer con người. Nó càng đúng hơn với LLMs, những thứ hoàn toàn không có khái niệm “intention” — chúng tạo ra code thống kê hợp lý, không nhất thiết là code đúng.

Bài viết này xây dựng ba agents đóng vòng lặp của pipeline của tôi: Tech Lead người review code trước khi nó có thể tiến hành, DevOps agent người tạo deployment infrastructure, và PM agent người điều phối mọi thứ và biết khi nào gọi một con người. Cùng với năm agents từ Phần 5-7, những cái này hoàn thành danh sách tám agents.


1. The Tech Lead Agent

Jordan là Tech Lead của chúng tôi. Mười lăm năm kinh nghiệm production, điều này chủ yếu có nghĩa là mười lăm năm nhìn thấy những thứ break theo những cách mà không ai dự tính. Công việc của Jordan không phải viết code. Công việc của Jordan là ngăn code xấu tiếp cận production.

Jordan Review Cái Gì

Agent TL thực hiện một review cấu trúc theo bốn chiều:

Security — SQL injection, hardcoded credentials, missing input validation, insecure deserialization, exposed stack traces, missing auth checks, deprecated crypto.

Performance — N+1 queries, unbounded list operations, missing pagination, synchronous calls mà nên async, missing indexes, unclosed resources.

Code Quality — naming conventions, function length (trên 50 dòng được flag), dead code, duplication, missing error handling, architecture drift từ quyết định của TA.

Test Coverage — các test code của SSE có thực sự cover các test cases mà Sam define trong Phần 6 không? Implementing 18 của 24 test cases fail review không kể code quality.

Quyết Định: APPROVE hoặc REJECT

Jordan tạo ra đúng một trong hai verdicts. APPROVE có nghĩa là tiến hành tới DevOps. REJECT có nghĩa là quay lại SSE với feedback cụ thể. Không có “approve with comments.” Trong một automated pipeline, ambiguity là một bug.

TL Agent approval decision tree: SSE output flows to TL review, branches to APPROVE (DevOps) or REJECT (retry SSE or escalate)

2. TL Review Schema

from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field


class ReviewVerdict(str, Enum):
    APPROVE = "APPROVE"
    REJECT = "REJECT"


class Severity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"


class ReviewCategory(str, Enum):
    SECURITY = "security"
    PERFORMANCE = "performance"
    CODE_QUALITY = "code_quality"
    TEST_COVERAGE = "test_coverage"


class ReviewFinding(BaseModel):
    finding_id: str = Field(
        description="Unique identifier, format FND-001",
        pattern=r"^FND-\d{3}$",
    )
    category: ReviewCategory
    severity: Severity
    file_path: str = Field(
        description="File where the issue was found",
    )
    line_range: Optional[str] = Field(
        default=None,
        description="Line range, e.g. '42-48'",
    )
    description: str = Field(
        description="What the issue is",
        min_length=20,
    )
    suggestion: str = Field(
        description="How to fix the issue — actionable, specific",
        min_length=20,
    )


class CodeReview(BaseModel):
    review_id: str = Field(
        description="Unique identifier, format CR-001",
    )
    verdict: ReviewVerdict
    findings: list[ReviewFinding] = Field(default_factory=list)
    summary: str = Field(
        description="2-3 sentence overall assessment",
        min_length=30,
    )
    test_coverage_assessment: str = Field(
        description="Assessment of whether SSE tests cover QC test cases",
    )
    security_passed: bool = Field(
        description="True if no critical/high security findings",
    )
    performance_passed: bool = Field(
        description="True if no critical/high performance findings",
    )
    approve_reason: Optional[str] = Field(
        default=None,
        description="If APPROVE: why the code is ready",
    )
    reject_reason: Optional[str] = Field(
        default=None,
        description="If REJECT: the primary blocker",
    )

Schema này enforce constraint rằng mọi finding đều bao gồm một suggestion cụ thể. “Cái này xấu” không phải là một useful review comment. “Thay thế str concatenation trên dòng 47 bằng parameterized query để ngăn SQL injection” là.

Booleans security_passedperformance_passed tồn tại cho conditional edge logic. Routing function không parse prose — nó đọc booleans.


3. TLAgent Implementation

from agents.base import BaseAgent
from state import TeamState
from schemas.tl import CodeReview, ReviewVerdict


class TLAgent(BaseAgent):
    TL_SYSTEM_PROMPT = """You are Jordan, a Tech Lead with 15 years of experience
    reviewing production code. You have seen every failure mode. You are thorough
    but fair — you do not reject code for style preferences, only for real issues.

    Your review covers four dimensions:
    1. SECURITY — injection, auth bypass, credential exposure, input validation
    2. PERFORMANCE — N+1 queries, unbounded operations, missing indexes, memory leaks
    3. CODE QUALITY — naming, dead code, duplication, error handling, architecture drift
    4. TEST COVERAGE — do the implemented tests cover the QC agent's test cases?

    Verdict rules:
    - Any CRITICAL security finding → REJECT
    - Any CRITICAL performance finding → REJECT
    - More than 3 HIGH findings across any category → REJECT
    - Test coverage below 90% of QC test cases → REJECT
    - Otherwise → APPROVE

    When you REJECT, your findings must include specific file paths, line ranges,
    and actionable suggestions. The SSE agent will read your findings and must be
    able to fix every issue without asking clarifying questions.

    When you APPROVE, briefly note what the code does well.

    Always output valid JSON matching the CodeReview schema.
    """

    def _prepare_prompt(self, state: TeamState) -> str:
        code_output = state.get("code_output", {})
        test_suite = state.get("test_suite")
        tech_spec = state.get("technical_spec")

        files_text = self._format_code_files(code_output)
        test_cases_text = self._format_test_cases(test_suite)
        adrs_text = self._format_adrs(tech_spec)

        return f"""Review the following code produced by the SSE agent.

CODE FILES:
{files_text}

QC TEST CASES (defined before code was written):
{test_cases_text}

ARCHITECTURE DECISIONS (from TA):
{adrs_text}

Review each file against the four dimensions: security, performance,
code quality, and test coverage. For test coverage, check whether the
SSE's test files actually implement the test cases defined by QC.

Produce a CodeReview JSON with your verdict and findings.
"""

    def _format_code_files(self, code_output: dict) -> str:
        lines = []
        files = code_output.get("files", [])
        for f in files:
            lines.append(f"--- {f['path']} ---")
            lines.append(f["content"])
            lines.append("")
        return "\n".join(lines) if lines else "No files provided."

    def _format_test_cases(self, test_suite) -> str:
        if not test_suite:
            return "No test suite provided."
        lines = []
        for tc in test_suite.test_cases:
            lines.append(f"{tc.test_id}: {tc.title} [{tc.priority.value}]")
        return "\n".join(lines)

    def _format_adrs(self, tech_spec) -> str:
        if not tech_spec:
            return "No technical spec provided."
        lines = []
        for adr in tech_spec.adrs:
            lines.append(f"{adr.adr_id}: {adr.title}{adr.decision[:80]}...")
        return "\n".join(lines)

    async def run(self, state: TeamState) -> TeamState:
        prompt = self._prepare_prompt(state)
        response = await self.llm.ainvoke(
            [
                {"role": "system", "content": self.TL_SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            response_format={"type": "json_object"},
        )
        review = CodeReview.model_validate_json(response.content)

        retry_count = state.get("tl_retry_count", 0)
        if review.verdict == ReviewVerdict.REJECT:
            retry_count += 1

        return {
            **state,
            "code_review": review,
            "tl_verdict": review.verdict.value,
            "tl_retry_count": retry_count,
            "tl_complete": True,
        }

Chú ý retry counter. Mỗi rejection tăng tl_retry_count. Routing function sử dụng cái này để quyết định xem SSE có nhận một nỗ lực khác hay liệu vấn đề có escalate tới một con người. Nếu không có counter này, một bug cứng đầu có thể gây ra một SSE-TL loop vô hạn.


4. The Conditional Edge: route_after_tl_review

Đây là routing function quan trọng nhất trong pipeline. Nó xác định điều gì xảy ra sau khi TL render một verdict.

from state import TeamState

MAX_TL_RETRIES = 2


def route_after_tl_review(state: TeamState) -> str:
    """
    Conditional edge after TL review.

    Returns the name of the next node:
    - "devops_agent"   → code approved, generate CI/CD
    - "sse_agent"      → code rejected, SSE gets another attempt
    - "human_escalation" → max retries exceeded, needs human
    """
    verdict = state.get("tl_verdict", "REJECT")
    retry_count = state.get("tl_retry_count", 0)

    if verdict == "APPROVE":
        return "devops_agent"

    # Rejected — can the SSE try again?
    if retry_count <= MAX_TL_RETRIES:
        return "sse_agent"

    # Out of retries — escalate
    return "human_escalation"

Ba exits, không có ambiguity. SSE nhận tối đa hai revision attempts. Sau đó, system dừng lại và yêu cầu help. Đây không phải là một failure của AI system — nó là một feature. Một system mà loop vô hạn tồi hơn một system thừa nhận nó stuck.

Wiring It Into LangGraph

from langgraph.graph import StateGraph
from state import TeamState

workflow = StateGraph(TeamState)

# ... (previous nodes from Parts 5-7) ...

workflow.add_node("tl_agent", tl_node)
workflow.add_node("devops_agent", devops_node)
workflow.add_node("pm_agent", pm_node)
workflow.add_node("human_escalation", escalation_node)

# SSE feeds into TL
workflow.add_edge("sse_agent", "tl_agent")

# TL has a conditional edge — three possible destinations
workflow.add_conditional_edges(
    "tl_agent",
    route_after_tl_review,
    {
        "devops_agent": "devops_agent",
        "sse_agent": "sse_agent",
        "human_escalation": "human_escalation",
    },
)

# DevOps feeds into PM
workflow.add_edge("devops_agent", "pm_agent")

Lệnh add_conditional_edges là LangGraph equivalent của một switch statement. Routing function kiểm tra state và return một string. Dictionary map những strings đó tới node names. LangGraph validate tại compile time mà mọi return value có một node tương ứng.


5. The DevOps Agent

Casey là DevOps engineer của chúng tôi. Casey không deploy code. Casey tạo ra deployment infrastructure — những files mà CI/CD systems tiêu thụ. Sự khác biệt quan trọng: Casey tạo ra artifacts, không side effects.

Given một approved codebase và technical spec, Casey tạo ra ba artifacts: một GitHub Actions workflow (.github/workflows/ci.yml) với lint, test, build, và deploy stages; một Dockerfile với multi-stage build; và một docker-compose.yml cho local development. Casey không invent infrastructure decisions. Nếu Morgan chọn PostgreSQL 16, Casey sử dụng postgres:16. TA quyết định; Casey implements.


6. DevOps Schema

from pydantic import BaseModel, Field


class GeneratedFile(BaseModel):
    path: str = Field(
        description="File path relative to project root",
    )
    content: str = Field(
        description="Full file content",
    )
    description: str = Field(
        description="What this file does and why",
    )


class DeploymentConfig(BaseModel):
    config_id: str = Field(
        description="Unique identifier, format DEPLOY-001",
    )
    generated_files: list[GeneratedFile] = Field(
        min_length=1,
        description="CI/CD, Dockerfile, docker-compose files",
    )
    environment_variables: list[str] = Field(
        default_factory=list,
        description="Required env vars (names only, not values)",
    )
    deployment_notes: str = Field(
        description="Instructions for running the generated configuration",
    )
    quality_gate_integration: str = Field(
        description="How QC quality gates are enforced in the pipeline",
    )

Field environment_variables capture names chỉ, không bao giờ values. Đây là một deliberate security constraint. DevOps agent biết application cần DATABASE_URLREDIS_URL, nhưng nó không bao giờ tạo ra actual connection strings hoặc credentials.


7. DevOpsAgent Implementation

from agents.base import BaseAgent
from state import TeamState
from schemas.devops import DeploymentConfig


class DevOpsAgent(BaseAgent):
    DEVOPS_SYSTEM_PROMPT = """You are Casey, a DevOps Engineer with 10 years of
    experience building CI/CD pipelines and containerized deployments.

    Your philosophy:
    - Infrastructure as code, always
    - Build once, deploy anywhere
    - Quality gates are non-negotiable — if the QC agent set a threshold, your
      pipeline enforces it
    - Secrets never appear in generated files — use environment variable references

    You generate three files:

    1. .github/workflows/ci.yml
       - Triggered on push to main and pull requests
       - Jobs: lint, test (with coverage), build Docker image, deploy (staging only)
       - Coverage threshold from QualityGates.min_unit_coverage
       - Fail the pipeline if coverage drops below threshold

    2. Dockerfile
       - Multi-stage build: builder stage + runtime stage
       - Pin base image versions
       - Non-root user in runtime stage
       - HEALTHCHECK instruction

    3. docker-compose.yml
       - Application service built from Dockerfile
       - Database service matching TA spec (Postgres version, etc.)
       - Cache service if TA specified one (Redis version, etc.)
       - Named volumes for data persistence
       - Health checks on all services

    Always output valid JSON matching the DeploymentConfig schema.
    """

    def _prepare_prompt(self, state: TeamState) -> str:
        tech_spec = state.get("technical_spec")
        quality_gates = state.get("test_suite", {})
        code_review = state.get("code_review")

        stack_text = self._format_stack(tech_spec)
        gates_text = self._format_gates(quality_gates)

        return f"""Generate deployment configuration for the approved codebase.

TECHNOLOGY STACK (from TA):
{stack_text}

QUALITY GATES (from QC):
{gates_text}

TL REVIEW STATUS: {code_review.verdict.value if code_review else 'N/A'}

Generate:
1. .github/workflows/ci.yml — full GitHub Actions workflow
2. Dockerfile — multi-stage build
3. docker-compose.yml — local development environment

Requirements:
- Pin ALL image versions (no :latest tags)
- Coverage threshold must match QualityGates.min_unit_coverage
- Database and cache versions must match TA specification
- All services must have health checks
- Use environment variables for secrets (DATABASE_URL, etc.)
- Include comments explaining non-obvious configuration choices

Return a DeploymentConfig JSON.
"""

    def _format_stack(self, tech_spec) -> str:
        if not tech_spec:
            return "No tech spec available."
        lines = []
        for comp in tech_spec.tech_stack:
            version = comp.version or "latest"
            lines.append(f"- {comp.name}: {comp.technology} {version}")
        return "\n".join(lines)

    def _format_gates(self, test_suite) -> str:
        if not test_suite or not hasattr(test_suite, "quality_gates"):
            return "No quality gates defined."
        gates = test_suite.quality_gates
        return (
            f"- Min unit coverage: {gates.min_unit_coverage}%\n"
            f"- Max p95 response: {gates.max_p95_response_ms}ms\n"
            f"- Max error rate: {gates.max_error_rate_percent}%\n"
            f"- Min test pass rate: {gates.min_test_pass_rate}%"
        )

    async def run(self, state: TeamState) -> TeamState:
        prompt = self._prepare_prompt(state)
        response = await self.llm.ainvoke(
            [
                {"role": "system", "content": self.DEVOPS_SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            response_format={"type": "json_object"},
        )
        config = DeploymentConfig.model_validate_json(response.content)

        return {
            **state,
            "deployment_config": config,
            "devops_complete": True,
        }

Dòng key trong bất kỳ generated workflow nào là pytest --cov-fail-under=85 — cái 85 đó đến trực tiếp từ QualityGates.min_unit_coverage. Phiên bản Postgres trong CI services block đến từ TA’s tech stack. Casey không invent. Casey dịch.


8. The PM Agent

Riley là Project Manager của chúng tôi. Riley không viết code, không review code, và không tạo infrastructure. Công việc của Riley là biết điều gì đang xảy ra, điều gì bị blocked, và điều gì cần human attention.

PM agent chạy sau khi DevOps complete và thực hiện ba functions: status reporting (những gì mỗi agent tạo ra, anomalies, pipeline health), blocker tracking (rejections, retries, unresolved findings), và escalation decisions (con người có cần intervene?).


9. PM Schema và Implementation

from enum import Enum
from pydantic import BaseModel, Field


class PipelineStatus(str, Enum):
    SUCCESS = "success"
    PARTIAL = "partial"
    BLOCKED = "blocked"
    FAILED = "failed"


class AgentStatus(BaseModel):
    agent_name: str
    completed: bool
    output_summary: str = Field(
        description="1-2 sentence summary of what this agent produced",
    )
    issues: list[str] = Field(default_factory=list)


class Blocker(BaseModel):
    blocker_id: str
    description: str
    blocking_agent: str
    suggested_resolution: str
    requires_human: bool = False


class StatusReport(BaseModel):
    report_id: str = Field(description="Format RPT-001")
    pipeline_status: PipelineStatus
    agent_statuses: list[AgentStatus]
    blockers: list[Blocker] = Field(default_factory=list)
    summary: str = Field(
        description="Executive summary of pipeline execution",
        min_length=50,
    )
    human_action_required: bool
    human_action_description: str = Field(
        default="",
        description="What the human needs to do, if anything",
    )
    total_retries: int = Field(default=0)
    total_findings: int = Field(default=0)
from agents.base import BaseAgent
from state import TeamState
from schemas.pm import StatusReport, PipelineStatus


class PMAgent(BaseAgent):
    PM_SYSTEM_PROMPT = """You are Riley, a Project Manager with 10 years of
    experience coordinating software delivery teams.

    Your job is to observe, summarize, and escalate — never to make technical
    decisions. You produce a StatusReport that tells stakeholders:

    1. What happened — which agents ran, what they produced
    2. What went wrong — any rejections, retries, or missing outputs
    3. What needs attention — blockers requiring human intervention
    4. Overall status — success, partial, blocked, or failed

    Status rules:
    - SUCCESS: all agents completed, TL approved, DevOps generated configs
    - PARTIAL: pipeline completed but with warnings or non-critical findings
    - BLOCKED: pipeline stopped due to unresolvable issue, needs human
    - FAILED: critical error — agent crashed, schema validation failed, etc.

    Be concise. Stakeholders read your reports at 7am with coffee. Respect
    their time.

    Always output valid JSON matching the StatusReport schema.
    """

    def _prepare_prompt(self, state: TeamState) -> str:
        return f"""Produce a status report for the current pipeline execution.

PIPELINE STATE:
- PO complete: {state.get('po_complete', False)}
- BA complete: {state.get('ba_complete', False)}
- QC complete: {state.get('qc_complete', False)}
- TA complete: {state.get('ta_complete', False)}
- SSE complete: {state.get('sse_complete', False)}
- TL verdict: {state.get('tl_verdict', 'pending')}
- TL retries: {state.get('tl_retry_count', 0)}
- DevOps complete: {state.get('devops_complete', False)}

TL REVIEW SUMMARY:
{self._format_review(state)}

USER STORIES COUNT: {len(state.get('user_stories', []))}
TEST CASES COUNT: {len(state.get('test_suite', {}).test_cases) if state.get('test_suite') else 0}
CODE FILES COUNT: {len(state.get('code_output', {}).get('files', []))}
DEPLOYMENT FILES COUNT: {len(state.get('deployment_config', {}).generated_files) if state.get('deployment_config') else 0}

Produce a StatusReport JSON.
"""

    def _format_review(self, state: TeamState) -> str:
        review = state.get("code_review")
        if not review:
            return "No code review performed."
        lines = [
            f"Verdict: {review.verdict.value}",
            f"Findings: {len(review.findings)}",
            f"Security passed: {review.security_passed}",
            f"Performance passed: {review.performance_passed}",
        ]
        for f in review.findings[:5]:
            lines.append(
                f"  {f.finding_id} [{f.severity.value}] {f.category.value}: "
                f"{f.description[:60]}..."
            )
        return "\n".join(lines)

    async def run(self, state: TeamState) -> TeamState:
        prompt = self._prepare_prompt(state)
        response = await self.llm.ainvoke(
            [
                {"role": "system", "content": self.PM_SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            response_format={"type": "json_object"},
        )
        report = StatusReport.model_validate_json(response.content)

        return {
            **state,
            "status_report": report,
            "pm_complete": True,
            "pipeline_status": report.pipeline_status.value,
        }

10. Human Escalation Node

Human escalation node không phải là một agent. Nó là một simple function mà halt pipeline khi SSE không thể resolve TL’s findings.

from state import TeamState


def escalation_node(state: TeamState) -> TeamState:
    review = state.get("code_review")
    retry_count = state.get("tl_retry_count", 0)

    findings_summary = []
    if review:
        for f in review.findings:
            findings_summary.append(
                f"[{f.severity.value}] {f.category.value} in "
                f"{f.file_path}: {f.description}"
            )

    escalation_message = (
        f"PIPELINE HALTED — Human review required.\n\n"
        f"SSE failed TL review {retry_count} time(s).\n"
        f"Unresolved findings:\n\n"
        + "\n".join(findings_summary)
    )

    return {
        **state,
        "pipeline_status": "blocked",
        "escalation_message": escalation_message,
        "human_intervention_required": True,
    }

Escalation node không tạo quyết định. Nó dừng, report, và chờ. Điều tồi tệ nhất một automated system có thể làm là tiếp tục operate khi nó nên dừng lại.


11. The Complete Agent Roster

Với TL, DevOps, và PM agents được xây dựng, chúng tôi bây giờ có tất cả tám agents trong pipeline. Dưới đây là complete roster:

#AgentNameModelRoleInputOutput
1POAlexGPT-4oClarify requirements, ask questions, define scopeRaw client briefRequirementDoc
2BAJamieGPT-4oBreak requirements into user storiesRequirementDocUserStory[]
3QCSamGPT-4oDefine test cases and quality gates before codeUserStory[]TestSuite
4TAMorganGPT-4oArchitecture decisions, tech stack, data modelsRequirementDoc, UserStory[]TechnicalSpec
5SSEChrisClaude 3.5 SonnetWrite implementation code and testsFull TeamStateCodeOutput
6TLJordanGPT-4oReview code for security, perf, qualityCodeOutput, TestSuite, TechnicalSpecCodeReview
7DevOpsCaseyGPT-4oGenerate CI/CD, Dockerfile, docker-composeTechnicalSpec, QualityGatesDeploymentConfig
8PMRileyGPT-4oMonitor pipeline, track blockers, status reportFull TeamStateStatusReport

SSE sử dụng Claude 3.5 Sonnet cho code generation; các agents khác sử dụng GPT-4o cho analytical và structured reasoning tasks. Swap models freely dựa trên benchmarks của chính tôi.


12. Updated TeamState

TeamState bây giờ mang output từ tất cả tám agents. Mỗi agent thêm các keys của nó ở những phần trước; complete type bây giờ bao gồm code_review (TL), deployment_config (DevOps), status_report (PM), cộng với pipeline-level fields như tl_retry_count, pipeline_status, escalation_message, và human_intervention_required. Mọi field là Optional hoặc có một default, vì vậy state là valid tại mọi điểm trong pipeline — từ initial user_brief tới final StatusReport.


13. The Complete Graph

from langgraph.graph import StateGraph, END
from state import TeamState

from nodes.po import po_node
from nodes.ba import ba_node
from nodes.qc import qc_node
from nodes.ta import ta_node
from nodes.merge import merge_qc_ta
from nodes.sse import sse_node
from nodes.tl import tl_node
from nodes.devops import devops_node
from nodes.pm import pm_node
from nodes.escalation import escalation_node
from routing import route_after_tl_review


def build_full_pipeline() -> StateGraph:
    workflow = StateGraph(TeamState)

    # --- Register all nodes ---
    workflow.add_node("po_agent", po_node)
    workflow.add_node("ba_agent", ba_node)
    workflow.add_node("qc_agent", qc_node)
    workflow.add_node("ta_agent", ta_node)
    workflow.add_node("parallel_merge", merge_qc_ta)
    workflow.add_node("sse_agent", sse_node)
    workflow.add_node("tl_agent", tl_node)
    workflow.add_node("devops_agent", devops_node)
    workflow.add_node("pm_agent", pm_node)
    workflow.add_node("human_escalation", escalation_node)

    # --- Sequential: PO → BA ---
    workflow.set_entry_point("po_agent")
    workflow.add_edge("po_agent", "ba_agent")

    # --- Parallel fan-out: BA → QC + TA ---
    workflow.add_edge("ba_agent", "qc_agent")
    workflow.add_edge("ba_agent", "ta_agent")

    # --- Merge: QC + TA → merge ---
    workflow.add_edge("qc_agent", "parallel_merge")
    workflow.add_edge("ta_agent", "parallel_merge")

    # --- Sequential: merge → SSE → TL ---
    workflow.add_edge("parallel_merge", "sse_agent")
    workflow.add_edge("sse_agent", "tl_agent")

    # --- Conditional: TL → DevOps | SSE (retry) | Human ---
    workflow.add_conditional_edges(
        "tl_agent",
        route_after_tl_review,
        {
            "devops_agent": "devops_agent",
            "sse_agent": "sse_agent",
            "human_escalation": "human_escalation",
        },
    )

    # --- DevOps → PM → END ---
    workflow.add_edge("devops_agent", "pm_agent")
    workflow.add_edge("pm_agent", END)

    # --- Escalation → END ---
    workflow.add_edge("human_escalation", END)

    return workflow.compile()

Hai mươi hai dòng graph definition cho một eight-agent pipeline với parallel execution, conditional routing, retry logic, và human escalation. Đây là sức mạnh của treating agent orchestration như một graph problem thay vì một chain-of-function-calls problem.


Cái Chúng Tôi Xây Dựng Ở Phần 8

Ba agents và complete pipeline:

  • TLAgent (Jordan) — review SSE output theo security, performance, code quality, và test coverage. Tạo ra một binary APPROVE/REJECT verdict với structured findings. Bất kỳ critical security hoặc performance issue nào trigger automatic rejection.
  • DevOpsAgent (Casey) — tạo ra GitHub Actions CI/CD workflow, Dockerfile, và docker-compose.yml từ TA’s technical spec. Quality gates từ QC agent được embed trực tiếp vào pipeline configuration.
  • PMAgent (Riley) — tạo ra một status report summarizing những gì mỗi agent đã làm, identifying blockers, và determining xem human intervention có cần hay không.
  • Conditional routing — hàm route_after_tl_review implements retry-or-escalate pattern với một configurable maximum retry count.
  • Human escalation — pipeline dừng cleanly khi automated resolution fail, với một structured handoff message.
  • Complete graph — tất cả tám agents wired together với parallel execution, conditional edges, và hai possible terminal states (success via PM, hoặc blocked via escalation).

Tám agents được xây dựng. Graph hoàn thành. Nhưng chúng tôi vẫn chưa địa chỉ phần hardest của bất kỳ multi-agent system nào: điều gì xảy ra khi agents cần talk tới outside world?


Tiếp Theo

Phần 9, chúng tôi tackle tool integration và external service connections. Các agents của chúng tôi hiện tại operate ở một closed loop — chúng đọc từ TeamState và viết tới TeamState. Real-world pipelines cần read files từ disk, gọi external APIs, query databases, và interact với version control. Chúng tôi sẽ xây dựng tool layer mà give agents controlled access tới outside world mà không compromise pipeline’s determinism.

Gặp lại tôi ở Phần 9.


Series Navigation

Xuất nội dung

Bình luận