Tôi vẫn nhớ lần đầu tiên cố xây dựng một multi-agent system mà không có bất kỳ kiến trúc nào. Năm 2023, GPT-4 vừa ra mắt, và tôi nghĩ: “Thôi cứ chain vài agent lại rồi để chúng nói chuyện với nhau.” Ba ngày sau, tôi có một mớ hỗn độn tuyệt đẹp. Các agent lặp lại công việc của nhau. Các agent mâu thuẫn nhau. Một agent cứ mãi xin lỗi agent khác vì không hiểu nó muốn gì. Đó là, theo cách đồng đội tôi mô tả, “như chăn mèo — chỉ khác là những con mèo đang viết production code.”

Bài học tôi rút ra từ trải nghiệm đó: AI agent là software component. Và software component không có kiến trúc sẽ trở thành legacy system rất, rất nhanh.

Trong Phần 1, tôi đã giới thiệu khái niệm về một AI software team hoàn chỉnh. Trong Phần 2, chúng ta đã phác thảo vai trò của từng agent — Product Owner, Business Analyst, Tech Architect, Tech Lead, Senior Software Engineer, QC Engineer, DevOps Engineer, và Project Manager. Giờ đến Phần 3, chúng ta cần trả lời câu hỏi khó hơn: làm thế nào để các agent này thực sự phối hợp với nhau như một hệ thống thống nhất?

Câu trả lời, tôi sẽ lập luận, đến từ một tập hợp ý tưởng mà thế giới software engineering đã tinh chỉnh suốt 20 năm qua: Domain-Driven Design.


1. Tại Sao Kiến Trúc Quan Trọng với AI Team

Để tôi mô tả hai kịch bản.

Kịch bản A — Không có kiến trúc: Bạn có 8 agent. Mỗi agent có thể gọi bất kỳ agent nào khác. Mỗi agent có định nghĩa riêng về “requirement” nghĩa là gì. Khi SSE agent viết xong code, nó broadcast một message cho tất cả mọi người. QC agent nhận được message nhưng không chắc code đã sẵn sàng để test chưa. PM agent nhận được message tương tự và cố cập nhật một status tracker nó tự xây dựng bên trong. TL agent nhận được và bắt đầu một code review mâu thuẫn với lần test chạy của QC agent. PO agent, không hiểu vì lý do gì, cũng được thông báo và bắt đầu hỏi thêm về business logic trong lúc đang code review.

Kịch bản B — Hệ thống có thiết kế: Requirement chảy qua các trạng thái được định nghĩa rõ ràng. Mỗi agent vận hành trong bounded context của mình và giao tiếp qua các domain event được định nghĩa tốt. Khi SSE agent hoàn thành code, nó emit event CodeArtifactReady. Chỉ QC agent và TL agent subscribe vào event này. PM agent biết về nó thông qua thay đổi trạng thái, không phải qua tin nhắn trực tiếp. PO agent không tham gia cho đến khi đạt đến human checkpoint gate.

Sự khác biệt giữa Kịch bản A và Kịch bản B không phải về LLM model bạn đang dùng. Đó là về kiến trúc. Đó là về việc có một từ vựng chung cho những gì mỗi phần của hệ thống làm, và enforce các ranh giới để các component không giẫm chân nhau.

Đây chính xác là những gì Domain-Driven Design mang lại cho chúng ta.

Sự Song Song với Microservices

Nếu bạn đã từng xây dựng microservices, bạn đã đối mặt với vấn đề này rồi. Khi bạn phân tách một monolith thành các service, bạn phải đối mặt với những câu hỏi như: service này sở hữu gì? Nó giao tiếp như thế nào? Điều gì xảy ra khi nó thất bại? Ai gọi ai?

Những câu trả lời xuất hiện từ kỷ nguyên microservices — bounded contexts, domain events, the outbox pattern, saga orchestration — chính xác là những pattern chúng ta cần cho multi-agent AI system. Các agent là các service. LangGraph là orchestrator. TeamState là shared data model. Domain event là các trigger di chuyển công việc giữa các agent.

Chúng ta không phát minh lại bất cứ điều gì ở đây. Chúng ta đang áp dụng các pattern đã được kiểm chứng vào một môi trường thực thi mới.


2. Áp Dụng DDD vào Multi-Agent Systems

Domain-Driven Design, về cốt lõi, là về việc căn chỉnh cấu trúc phần mềm với business domain. Cuốn sách năm 2003 của Eric Evans đã giới thiệu một từ vựng để làm điều này. Hãy để tôi dịch từ vựng đó sang ngôn ngữ agent.

Từ Vựng DDD-sang-Agent

Ubiquitous Language trong DDD có nghĩa là developer, domain expert, và stakeholder đều dùng cùng một thuật ngữ. Trong AI team của chúng ta, điều này có nghĩa là mọi agent và mọi đoạn code đều dùng cùng một từ vựng: một UserStory là một UserStory, không phải ticket, task, hay requirement_item. Khi BA agent tạo user story, khi SSE agent đọc chúng, và khi QC agent validate dựa trên chúng — tất cả đều dùng cùng cấu trúc dữ liệu UserStory.

Aggregate là tập hợp các domain object được xử lý như một đơn vị duy nhất. Trong hệ thống của chúng ta, aggregate Requirement chứa UserBrief, ClarifiedRequirements, và một danh sách UserStory. Aggregate CodeArtifact chứa SourceFiles, TestResults, và DependencyManifest. Aggregate enforce ranh giới nhất quán — bạn không cập nhật các object UserStory riêng lẻ; bạn cập nhật chúng thông qua aggregate Requirement.

Entity là object có danh tính riêng biệt tồn tại theo thời gian. Mỗi UserStorystory_id không đổi dù nội dung, trạng thái, và acceptance criteria của nó thay đổi. Mỗi CodeArtifactartifact_id giúp chúng ta theo dõi nó qua review, sửa đổi, và deployment.

Value Object là object được xác định hoàn toàn bởi các thuộc tính của nó, không có danh tính riêng. Priority (High/Medium/Low), giá trị StoryPoints, phần trăm TestCoverage — đây là value object. Hai giá trị Priority.HIGH là giống nhau và có thể thay thế cho nhau.

Domain Event là sự kiện về điều gì đó đã xảy ra trong quá khứ. Chúng là các bản ghi bất biến. RequirementsCleared, UserStoriesCreated, CodeReviewApproved, TestsFailed — đây là domain event. Chúng được đặt tên ở thì quá khứ vì chúng ghi lại những gì đã xảy ra. Trong hệ thống LangGraph của chúng ta, domain event là cơ chế chính mà các agent kích hoạt lẫn nhau.

Bounded Context là khái niệm mạnh nhất chúng ta sẽ sử dụng. Một bounded context là ranh giới rõ ràng trong đó một model cụ thể áp dụng. Bên trong ranh giới, các thuật ngữ có nghĩa cụ thể, không mơ hồ. Qua ranh giới, bạn cần dịch thuật.

Bounded Context và Cụm Agent

Mỗi bounded context của chúng ta ánh xạ tới một cụm agent chia sẻ từ vựng và mối quan tâm:

  • Requirement Context sở hữu ngôn ngữ của nhu cầu business: brief, story, acceptance criteria, prioritization.
  • Design Context sở hữu ngôn ngữ của kiến trúc kỹ thuật: component, interface, data model, quyết định tech stack.
  • Implementation Context sở hữu ngôn ngữ của code: function, class, module, test coverage, kết quả linting.
  • Quality Context sở hữu ngôn ngữ của validation: test case, test suite, kết quả pass/fail, coverage threshold.
  • Deployment Context sở hữu ngôn ngữ của delivery: pipeline, environment, configuration, rollout strategy.

Coordination Context là đặc biệt — nó không có domain language riêng nhưng hoạt động xuyên suốt tất cả các context khác. PM agent sống ở đây, theo dõi trạng thái, xác định blocker, và leo thang vấn đề qua các ranh giới context.

Anti-Corruption Layer

Trong DDD, khi hai bounded context cần giao tiếp, bạn đặt một anti-corruption layer giữa chúng. Điều này ngăn model của một context làm ô nhiễm context khác. Trong ngôn ngữ agent, đây là về việc dịch thuật.

Khi Requirement Context bàn giao cho Design Context, object UserStory của BA agent được dịch thành thứ TA agent có thể làm việc được: một danh sách object FunctionalRequirement với các hàm ý kỹ thuật cụ thể. Design Context không biết hay quan tâm đến format UserStory gốc — nó làm việc với object FunctionalRequirement.

Trong code, việc dịch thuật này xảy ra trong một function chuyên dụng ánh xạ giữa các domain model. Chúng ta sẽ thấy điều này ở các phần sau khi triển khai agent handoff thực tế.


3. 5 Bounded Context của Chúng Ta

Đây là bản đồ đầy đủ về các bounded context và cách chúng liên kết với nhau:

Diagram 1
Figure 1: Bounded Contexts Map. Five domain contexts, each owned by specific agents, communicating via domain events and anti-corruption layers. The PM coordination context (blue border) spans all others.

Let me walk through each context in depth.

Hãy để tôi đi sâu vào từng context.

Requirement Context: Nơi Mọi Thứ Bắt Đầu

PO agent nhận brief thô từ stakeholder. Nhiệm vụ của nó là diễn giải ý định, đặt câu hỏi làm rõ nếu cần, và tạo ra object ClarifiedRequirement. Đây là aggregate root cho context này. Mọi thứ trong Requirement Context đều chảy qua nó.

BA agent tiêu thụ ClarifiedRequirement và tạo ra các object UserStory, mỗi cái với acceptance criteria, priority, và estimated complexity. Khi BA agent hoàn thành, nó emit domain event UserStoriesCreated.

Requirement Context sở hữu các thuật ngữ này: UserBrief, ClarifiedRequirement, UserStory, AcceptanceCriteria, Priority, StoryPoints, BusinessRule. Không có thuật ngữ nào trong số này rò rỉ vào Design Context mà không đi qua anti-corruption layer.

Design Context: Dịch Business sang Technical

Anti-corruption layer tại ranh giới Requirement→Design dịch object UserStory thành object FunctionalRequirement. Việc dịch thuật này là có chủ đích: Design Context không quan tâm đến ngôn ngữ kể chuyện business; nó quan tâm đến yêu cầu kỹ thuật.

TA agent lấy object FunctionalRequirement và tạo ra aggregate TechnicalSpec: component diagram, data model, API contract, yêu cầu infrastructure, và bản ghi ArchitectureDecision (ghi lại lý do đằng sau các lựa chọn công nghệ — quan trọng để tham khảo sau).

TL agent review TechnicalSpec, có thể yêu cầu sửa đổi, và cuối cùng phê duyệt nó. Điều này trigger domain event DesignApproved.

Implementation Context: Nơi Code Xảy Ra

Anti-corruption layer tại Design→Implementation dịch TechnicalSpec thành ImplementationPlan — một danh sách ưu tiên các object ImplementationTask, mỗi cái có definition of done rõ ràng và technical notes.

SSE agent thực thi từng task một, tạo ra object CodeArtifact. Mỗi artifact bao gồm source file, test file, và LintReport. SSE agent tự chạy test — nếu test thất bại, nó lặp lại. Sau ba lần thất bại, nó leo thang lên TL agent hoặc PM agent thay vì quay vòng mãi mãi.

Khi code hoàn chỉnh và self-test pass, SSE agent emit CodeArtifactReady.

Quality Context: Validation Độc Lập

QC agent không tin tưởng kết quả test tự báo cáo của SSE agent. Nó độc lập định nghĩa test case (từ user story gốc, thông qua anti-corruption layer) và validate code artifact dựa trên chúng. Sự tách biệt này quan trọng: QC agent là tiếng nói đối lập trong hệ thống.

Nếu quality gate pass, event QualityGatePassed được fire. Nếu không, event TestsFailed quay lại Implementation Context.

Deployment Context: Delivery Pipeline

DevOps agent nhận QualityGatePassed và tạo ra CI/CD configuration, environment setup, và deployment runbook. Nó không tự tạo deployment — nó chuẩn bị mọi thứ và sau đó chờ tại human checkpoint trước khi kích hoạt deployment thực sự.


4. State Machine Vòng Đời Requirement

Phần quan trọng nhất của shared state trong hệ thống là vòng đời của một requirement. Mỗi requirement bắt đầu là một brief thô và kết thúc là một feature được deploy. Giữa hai điểm đó, có chín trạng thái riêng biệt và một tập hợp các transition, guard, và rollback có thể xảy ra.

Diagram 2
Figure 2: Requirement Lifecycle State Machine. Nine states, two human checkpoints (amber H badges), rollback paths on review failure and test failure, and a 3-attempt retry loop with PM escalation.

Here is the state machine encoded as Python TypedDicts with full type annotations:

Đây là state machine được mã hóa dưới dạng Python TypedDict với type annotation đầy đủ:

from typing import Literal, Optional
from typing_extensions import TypedDict
from enum import Enum
from datetime import datetime


class RequirementState(str, Enum):
    """The nine states in the requirement lifecycle."""
    RECEIVED = "received"
    CLARIFIED = "clarified"
    USER_STORIES_CREATED = "user_stories_created"
    DESIGN_READY = "design_ready"
    IN_DEVELOPMENT = "in_development"
    IN_REVIEW = "in_review"
    TEST_PENDING = "test_pending"
    READY_TO_DEPLOY = "ready_to_deploy"
    DEPLOYED = "deployed"
    # Error states
    BLOCKED = "blocked"
    ESCALATED = "escalated"


class StateTransition(TypedDict):
    """Records a state transition with guard conditions."""
    from_state: RequirementState
    to_state: RequirementState
    triggered_by: str          # agent name
    event_name: str            # domain event that triggered it
    guard: Optional[str]       # condition that must be True
    timestamp: str
    metadata: dict


# Transition table — all valid transitions
VALID_TRANSITIONS: dict[RequirementState, list[RequirementState]] = {
    RequirementState.RECEIVED: [
        RequirementState.CLARIFIED,
        RequirementState.BLOCKED,  # if brief is too vague
    ],
    RequirementState.CLARIFIED: [
        RequirementState.USER_STORIES_CREATED,
        RequirementState.RECEIVED,  # rollback: needs more clarification
    ],
    RequirementState.USER_STORIES_CREATED: [
        RequirementState.DESIGN_READY,  # after human approval
        RequirementState.CLARIFIED,    # if stories are rejected
    ],
    RequirementState.DESIGN_READY: [
        RequirementState.IN_DEVELOPMENT,
        RequirementState.USER_STORIES_CREATED,  # if design reveals gaps
    ],
    RequirementState.IN_DEVELOPMENT: [
        RequirementState.IN_REVIEW,
        RequirementState.IN_DEVELOPMENT,  # retry loop (max 3)
        RequirementState.ESCALATED,       # after 3 failures
    ],
    RequirementState.IN_REVIEW: [
        RequirementState.TEST_PENDING,  # review passed
        RequirementState.IN_DEVELOPMENT,  # review failed
    ],
    RequirementState.TEST_PENDING: [
        RequirementState.READY_TO_DEPLOY,   # all gates pass
        RequirementState.IN_DEVELOPMENT,    # tests failed, rework
    ],
    RequirementState.READY_TO_DEPLOY: [
        RequirementState.DEPLOYED,  # after human approval
        RequirementState.TEST_PENDING,  # human rejects, retest
    ],
    RequirementState.DEPLOYED: [],  # terminal state
}


def validate_transition(
    current: RequirementState,
    target: RequirementState,
    context: dict,
) -> tuple[bool, str]:
    """
    Guard function: returns (is_valid, reason).
    Called before every state transition.
    """
    allowed = VALID_TRANSITIONS.get(current, [])
    if target not in allowed:
        return False, f"Transition {current}{target} is not in transition table"

    # Additional guards per transition
    if current == RequirementState.USER_STORIES_CREATED and target == RequirementState.DESIGN_READY:
        if not context.get("human_approved_stories"):
            return False, "Human approval required before design phase"

    if current == RequirementState.IN_DEVELOPMENT and target == RequirementState.IN_REVIEW:
        if not context.get("self_tests_passing"):
            return False, "SSE must pass self-tests before submitting for review"

    if current == RequirementState.READY_TO_DEPLOY and target == RequirementState.DEPLOYED:
        if not context.get("human_approved_deployment"):
            return False, "Human approval required before production deployment"

    return True, "ok"

Điểm mấu chốt ở đây là mỗi transition đều tường minhcó guard. Một agent không thể chuyển requirement từ DESIGN_READY sang DEPLOYED trong một bước. Nó phải đi qua mọi trạng thái trung gian. Điều này ngăn hệ thống bỏ qua các bước dưới áp lực thời gian — đây là một failure mode thực tế khi LLM agent tối ưu hóa cho việc hoàn thành task.


5. Kiến Trúc Giao Tiếp Giữa Các Agent

Với các bounded context đã được định nghĩa và state machine đã rõ ràng, chúng ta cần thiết kế cách các agent thực sự trao đổi thông tin. Có ba communication pattern cơ bản, mượn trực tiếp từ kiến trúc microservices:

Diagram 3
Figure 3: Agent Communication Patterns. Three distinct patterns — Command, Event, Query — with a comparison table showing when to use each. Based on CQRS principles adapted for multi-agent systems.

Bây giờ hãy định nghĩa các pattern này trong code:

from dataclasses import dataclass, field
from typing import Any, Optional, Literal
from datetime import datetime, timezone
import uuid


@dataclass
class AgentMessage:
    """
    The universal message envelope used for all agent communication.
    Whether it's a Command, Event, or Query — it uses this structure.
    The 'message_type' field determines how it's routed.
    """
    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    message_type: Literal["command", "event", "query", "query_response"] = "event"
    event_name: str = ""           # e.g., "UserStoriesCreated", "CreateUserStories"
    source_agent: str = ""         # who sent this
    target_agent: Optional[str] = None  # None = broadcast (events only)
    correlation_id: Optional[str] = None  # links query to its response
    payload: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    schema_version: str = "1.0"

    def to_dict(self) -> dict:
        return {
            "message_id": self.message_id,
            "message_type": self.message_type,
            "event_name": self.event_name,
            "source_agent": self.source_agent,
            "target_agent": self.target_agent,
            "correlation_id": self.correlation_id,
            "payload": self.payload,
            "metadata": self.metadata,
            "timestamp": self.timestamp,
            "schema_version": self.schema_version,
        }


class DomainEvent(AgentMessage):
    """
    Base class for all domain events.
    Domain events record facts that have occurred.
    They are named in the past tense.
    They are immutable once created.
    """

    def __init__(
        self,
        event_name: str,
        source_agent: str,
        payload: dict,
        metadata: dict | None = None,
    ):
        super().__init__(
            message_type="event",
            event_name=event_name,
            source_agent=source_agent,
            target_agent=None,  # events are always broadcast
            payload=payload,
            metadata=metadata or {},
        )

    def __setattr__(self, name: str, value: Any) -> None:
        """Events are immutable after creation."""
        if hasattr(self, "message_id"):  # already initialized
            raise AttributeError("DomainEvent is immutable")
        super().__setattr__(name, value)


# Concrete domain events
class UserStoriesCreated(DomainEvent):
    def __init__(self, source_agent: str, stories: list[dict], requirement_id: str):
        super().__init__(
            event_name="UserStoriesCreated",
            source_agent=source_agent,
            payload={
                "requirement_id": requirement_id,
                "stories": stories,
                "story_count": len(stories),
            },
        )


class CodeArtifactReady(DomainEvent):
    def __init__(
        self,
        source_agent: str,
        artifact_id: str,
        story_ids: list[str],
        test_coverage: float,
    ):
        super().__init__(
            event_name="CodeArtifactReady",
            source_agent=source_agent,
            payload={
                "artifact_id": artifact_id,
                "story_ids": story_ids,
                "test_coverage": test_coverage,
                "self_tests_passing": True,
            },
        )


class QualityGatePassed(DomainEvent):
    def __init__(
        self,
        source_agent: str,
        artifact_id: str,
        quality_score: float,
        test_results: dict,
    ):
        super().__init__(
            event_name="QualityGatePassed",
            source_agent=source_agent,
            payload={
                "artifact_id": artifact_id,
                "quality_score": quality_score,
                "test_results": test_results,
            },
        )


class TestsFailed(DomainEvent):
    def __init__(
        self,
        source_agent: str,
        artifact_id: str,
        failure_summary: str,
        failed_test_ids: list[str],
    ):
        super().__init__(
            event_name="TestsFailed",
            source_agent=source_agent,
            payload={
                "artifact_id": artifact_id,
                "failure_summary": failure_summary,
                "failed_test_ids": failed_test_ids,
            },
        )

Outbox Pattern cho Reliable Event Delivery

Một trong những vấn đề thực tế tôi gặp phải khi xây dựng agent với LangGraph: điều gì xảy ra nếu một agent tạo ra domain event nhưng việc thực thi graph bị gián đoạn trước khi event được xử lý? Bạn sẽ có phantom work — trạng thái cho thấy SSE agent đã hoàn thành code, nhưng QC agent không bao giờ nhận được trigger.

Giải pháp mượn từ distributed system: Outbox Pattern. Thay vì emit event trực tiếp, các agent ghi event vào “outbox” trong shared state. Một component riêng — event dispatcher — đọc từ outbox và deliver event đến subscriber, sau đó đánh dấu chúng là đã delivered.

from typing_extensions import TypedDict
from typing import Annotated
import operator


class OutboxEntry(TypedDict):
    event: dict          # serialized DomainEvent
    delivered: bool
    delivery_attempts: int
    created_at: str


# In TeamState:
class EventOutbox(TypedDict):
    pending: Annotated[list[OutboxEntry], operator.add]
    delivered: list[OutboxEntry]


def dispatch_events(state: "TeamState") -> "TeamState":
    """
    Called at the start of each graph step.
    Delivers pending outbox events to the appropriate handlers.
    This ensures at-least-once delivery semantics.
    """
    pending = state["event_outbox"]["pending"]
    still_pending = []

    for entry in pending:
        if not entry["delivered"]:
            event = entry["event"]
            # Route to subscriber nodes
            _route_event_to_subscribers(event, state)
            entry["delivered"] = True
        else:
            still_pending.append(entry)

    return {
        **state,
        "event_outbox": {
            "pending": still_pending,
            "delivered": state["event_outbox"]["delivered"] + [
                e for e in pending if e["delivered"]
            ],
        },
    }

Pattern này đảm bảo không có domain event nào bị mất, ngay cả khi việc thực thi graph bị gián đoạn và khởi động lại từ checkpoint.


6. TeamState: Object Chia Sẻ Trung Tâm

Mọi thứ trong hệ thống đều hội tụ vào một cấu trúc dữ liệu: TeamState. Đây là LangGraph state được truyền qua mọi node, được cập nhật bởi mọi agent, và được persist giữa các checkpoint. Thiết kế nó tốt có lẽ là quyết định kiến trúc quan trọng nhất trong toàn bộ hệ thống.

Đây là định nghĩa hoàn chỉnh:

from typing import Literal, Optional, Annotated
from typing_extensions import TypedDict
import operator


# ============================================================
# Value Objects
# ============================================================

class Priority(TypedDict):
    level: Literal["critical", "high", "medium", "low"]
    rationale: str


class QualityGates(TypedDict):
    min_test_coverage: float          # e.g., 0.80
    max_cyclomatic_complexity: int    # e.g., 10
    lint_must_pass: bool
    type_check_must_pass: bool
    security_scan_must_pass: bool


class DeploymentStatus(TypedDict):
    status: Literal["pending", "in_progress", "deployed", "failed", "rolled_back"]
    environment: Literal["dev", "staging", "production"]
    deployed_at: Optional[str]
    deployed_by: str                  # agent or human identifier
    deployment_url: Optional[str]
    version: Optional[str]


# ============================================================
# Entities
# ============================================================

class UserStory(TypedDict):
    story_id: str
    title: str
    as_a: str                         # "As a [role]"
    i_want: str                       # "I want [feature]"
    so_that: str                      # "So that [benefit]"
    acceptance_criteria: list[str]
    priority: Priority
    story_points: int
    status: Literal[
        "draft", "approved", "in_progress", "done", "rejected"
    ]
    created_by: str                   # agent_id


class ImplementationTask(TypedDict):
    task_id: str
    story_id: str                     # links back to user story
    title: str
    description: str
    file_path: str                    # expected output file
    dependencies: list[str]           # task_ids that must complete first
    estimated_minutes: int
    status: Literal["pending", "in_progress", "done", "failed"]
    assigned_to: str


class CodeArtifact(TypedDict):
    artifact_id: str
    task_id: str
    language: str
    file_path: str
    content: str                      # actual source code
    test_file_path: Optional[str]
    test_content: Optional[str]
    lint_passed: bool
    type_check_passed: bool
    created_at: str
    commit_hash: Optional[str]


class TestCase(TypedDict):
    test_id: str
    story_id: str
    title: str
    test_type: Literal["unit", "integration", "e2e", "performance"]
    preconditions: list[str]
    steps: list[str]
    expected_result: str
    status: Literal["pending", "passed", "failed", "skipped"]
    failure_reason: Optional[str]


class Blocker(TypedDict):
    blocker_id: str
    description: str
    blocked_agent: str
    raised_by: str
    raised_at: str
    resolved: bool
    resolution: Optional[str]


class ArchitectureDecision(TypedDict):
    adr_id: str                       # Architecture Decision Record
    title: str
    status: Literal["proposed", "accepted", "deprecated", "superseded"]
    context: str
    decision: str
    consequences: list[str]
    alternatives_considered: list[str]
    decided_by: str
    decided_at: str


class TechnicalSpec(TypedDict):
    spec_id: str
    title: str
    components: list[dict]            # component name, responsibility, tech
    data_models: list[dict]           # entity definitions
    api_contracts: list[dict]         # endpoint definitions
    infrastructure: dict              # cloud resources, config
    tech_stack: dict                  # language, frameworks, databases
    approved_by: Optional[str]
    approved_at: Optional[str]


class CICDConfig(TypedDict):
    config_id: str
    pipeline_tool: str                # "github_actions", "gitlab_ci", etc.
    pipeline_yaml: str                # actual CI/CD config content
    environments: list[str]
    test_commands: list[str]
    deploy_commands: list[str]
    rollback_procedure: str
    generated_by: str
    generated_at: str


class TestResults(TypedDict):
    run_id: str
    total_tests: int
    passed: int
    failed: int
    skipped: int
    coverage_percent: float
    duration_seconds: float
    failed_test_ids: list[str]
    coverage_report: dict
    run_at: str


class RequirementDoc(TypedDict):
    doc_id: str
    original_brief: str
    clarified_objectives: list[str]
    out_of_scope: list[str]
    assumptions: list[str]
    constraints: list[str]
    success_metrics: list[str]
    stakeholders: list[str]
    clarified_by: str
    clarified_at: str


# ============================================================
# The Complete TeamState
# ============================================================

class TeamState(TypedDict):
    # ── Metadata ──────────────────────────────────────────────
    task_id: str
    created_at: str
    updated_at: str
    phase: Literal[
        "requirement", "design", "implementation", "quality", "deployment", "done"
    ]
    requirement_state: str            # RequirementState enum value
    schema_version: str               # for migration support

    # ── Requirement Domain ─────────────────────────────────────
    raw_brief: str
    clarified_requirements: Optional[RequirementDoc]
    user_stories: list[UserStory]
    stories_human_approved: bool
    rejection_reason: Optional[str]

    # ── Design Domain ─────────────────────────────────────────
    technical_spec: Optional[TechnicalSpec]
    architecture_decisions: list[ArchitectureDecision]
    design_review_notes: list[str]
    design_approved_by: Optional[str]

    # ── Implementation Domain ──────────────────────────────────
    implementation_plan: list[ImplementationTask]
    code_artifacts: list[CodeArtifact]
    implementation_attempts: int      # tracks retry count (max 3)
    current_task_id: Optional[str]
    self_test_results: Optional[TestResults]

    # ── Quality Domain ─────────────────────────────────────────
    test_cases: list[TestCase]
    quality_gates: QualityGates
    quality_test_results: Optional[TestResults]
    quality_score: Optional[float]
    quality_gate_passed: bool
    qc_notes: list[str]

    # ── Deployment Domain ─────────────────────────────────────
    cicd_config: Optional[CICDConfig]
    deployment_status: DeploymentStatus
    deployment_human_approved: bool
    deployment_notes: list[str]

    # ── Coordination ──────────────────────────────────────────
    # Annotated[list, operator.add] = append-only (LangGraph reducer)
    agent_messages: Annotated[list[AgentMessage], operator.add]
    event_outbox: EventOutbox
    current_agent: str
    blockers: list[Blocker]
    human_feedback: Optional[str]
    human_checkpoint_pending: bool
    checkpoint_context: Optional[dict]   # what the human needs to review

    # ── Audit trail ───────────────────────────────────────────
    state_transitions: Annotated[list[StateTransition], operator.add]
    error_log: Annotated[list[dict], operator.add]

Một vài quyết định thiết kế đáng chú ý:

Tại sao dùng Annotated[list, operator.add] cho một số field? Quản lý state của LangGraph yêu cầu bạn chỉ định cách merge các cập nhật state đồng thời. Với agent_messages, state_transitions, và error_log, chúng ta muốn tích lũy các entry — nên dùng reducer operator.add. Với các field như technical_spec hay current_agent, chúng ta muốn giá trị mới nhất thắng, nên không annotate chúng.

Tại sao có schema_version tường minh? Khi hệ thống phát triển, schema TeamState sẽ thay đổi. Có field schema_version cho phép chúng ta viết các migration function nâng cấp state cũ lên schema mới — điều thiết yếu cho các workflow chạy dài.

Tại sao implementation_attempts là một số nguyên? Đây là guard chính chống lại vòng lặp retry vô hạn. Mỗi khi implementation của SSE agent thất bại khi test, counter này tăng lên. Ở mức 3, PM agent trigger human checkpoint thay vì cho phép retry thêm.


7. Human-in-the-Loop Checkpoints

Điều nguy hiểm nhất về một hệ thống AI tự trị là nó có thể sai một cách tự tin. Một hệ thống human-in-the-loop được thiết kế tốt không cố loại bỏ con người khỏi quy trình — nó đặt họ một cách chiến lược ở những điểm mà phán đoán của họ mang lại giá trị cao nhất.

Trong hệ thống của chúng ta, có hai human checkpoint bắt buộc:

  1. Sau khi UserStoriesCreated — PO agent và BA agent đã hoàn thành công việc. Trước khi bất kỳ thiết kế hay code nào được viết, một người phê duyệt phạm vi. Điều này ngăn lãng phí công sức từ việc hiểu nhầm requirement.

  2. Trước khi Deploy — DevOps agent đã chuẩn bị CI/CD configuration và QC agent đã ký duyệt. Trước khi bất cứ thứ gì được deploy lên production, một người cho phép cuối cùng.

Đây là cách triển khai các checkpoint này trong LangGraph:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import interrupt


def stories_approval_checkpoint(state: TeamState) -> TeamState:
    """
    Human checkpoint: review user stories before design begins.
    LangGraph will pause here and wait for external input.
    """
    human_feedback = interrupt({
        "checkpoint_type": "stories_approval",
        "message": "Please review the user stories and approve or reject.",
        "user_stories": state["user_stories"],
        "clarified_requirements": state["clarified_requirements"],
        "instructions": "Set approved=True to continue, or provide rejection_reason.",
    })

    if human_feedback.get("approved"):
        return {
            **state,
            "stories_human_approved": True,
            "human_checkpoint_pending": False,
            "human_feedback": None,
        }
    else:
        return {
            **state,
            "stories_human_approved": False,
            "human_checkpoint_pending": False,
            "rejection_reason": human_feedback.get("reason", "No reason provided"),
            "requirement_state": RequirementState.CLARIFIED,  # rollback
        }


def deployment_approval_checkpoint(state: TeamState) -> TeamState:
    """
    Human checkpoint: final approval before production deployment.
    """
    human_feedback = interrupt({
        "checkpoint_type": "deployment_approval",
        "message": "Ready to deploy. Please review and approve.",
        "cicd_config": state["cicd_config"],
        "quality_score": state["quality_score"],
        "test_results": state["quality_test_results"],
        "deployment_target": state["deployment_status"]["environment"],
    })

    if human_feedback.get("approved"):
        return {
            **state,
            "deployment_human_approved": True,
            "human_checkpoint_pending": False,
        }
    else:
        return {
            **state,
            "deployment_human_approved": False,
            "deployment_notes": state["deployment_notes"] + [
                f"Deployment rejected by human: {human_feedback.get('reason')}"
            ],
        }


def route_after_stories(state: TeamState) -> str:
    """Conditional edge: route based on human approval of stories."""
    if state.get("human_checkpoint_pending"):
        return "stories_approval_checkpoint"
    if state.get("stories_human_approved"):
        return "ta_agent"               # proceed to design
    return "po_agent"                  # back to clarification


def route_after_quality(state: TeamState) -> str:
    """Conditional edge: route after quality gates."""
    if not state.get("quality_gate_passed"):
        return "sse_agent"             # rework
    if state["implementation_attempts"] >= 3:
        return "pm_escalation"        # human escalation
    return "devops_agent"             # proceed to deployment prep


# Build the graph
builder = StateGraph(TeamState)

# Add agent nodes
builder.add_node("po_agent", run_po_agent)
builder.add_node("ba_agent", run_ba_agent)
builder.add_node("stories_approval_checkpoint", stories_approval_checkpoint)
builder.add_node("ta_agent", run_ta_agent)
builder.add_node("tl_agent", run_tl_agent)
builder.add_node("sse_agent", run_sse_agent)
builder.add_node("qc_agent", run_qc_agent)
builder.add_node("devops_agent", run_devops_agent)
builder.add_node("deployment_approval_checkpoint", deployment_approval_checkpoint)
builder.add_node("pm_agent", run_pm_agent)
builder.add_node("pm_escalation", run_pm_escalation)

# Add edges
builder.set_entry_point("po_agent")
builder.add_edge("po_agent", "ba_agent")
builder.add_edge("ba_agent", "stories_approval_checkpoint")
builder.add_conditional_edges(
    "stories_approval_checkpoint",
    route_after_stories,
    {
        "ta_agent": "ta_agent",
        "po_agent": "po_agent",
    },
)
builder.add_edge("ta_agent", "tl_agent")
builder.add_edge("tl_agent", "sse_agent")
builder.add_edge("sse_agent", "qc_agent")
builder.add_conditional_edges(
    "qc_agent",
    route_after_quality,
    {
        "sse_agent": "sse_agent",
        "devops_agent": "devops_agent",
        "pm_escalation": "pm_escalation",
    },
)
builder.add_edge("devops_agent", "deployment_approval_checkpoint")
builder.add_conditional_edges(
    "deployment_approval_checkpoint",
    lambda s: "deploy" if s.get("deployment_human_approved") else "qc_agent",
    {"deploy": END, "qc_agent": "qc_agent"},
)

# Compile with checkpoint support
memory = SqliteSaver.from_conn_string("team_state.db")
graph = builder.compile(checkpointer=memory)

Async Approval qua Webhook

Cơ chế interrupt() của LangGraph là đồng bộ — nó block graph thread. Để dùng trong production, bạn cần một async approval flow nơi người dùng review dashboard và bấm nút. Đây là pattern:

import asyncio
from typing import Optional


class HumanApprovalGateway:
    """
    Manages async human approval for interrupted graph states.
    In production: backed by a database + webhook or dashboard UI.
    """

    def __init__(self, graph, checkpointer):
        self.graph = graph
        self.checkpointer = checkpointer

    async def submit_for_approval(
        self,
        thread_id: str,
        checkpoint_type: str,
        review_data: dict,
    ) -> str:
        """
        Saves an approval request and sends a notification.
        Returns the approval_request_id.
        """
        approval_id = str(uuid.uuid4())
        # Save to DB, send Slack/email notification, etc.
        await self._notify_approver(approval_id, checkpoint_type, review_data)
        return approval_id

    async def receive_approval(
        self,
        thread_id: str,
        approved: bool,
        reason: Optional[str] = None,
    ) -> None:
        """
        Called by webhook when human approves or rejects.
        Resumes the paused graph with the human's decision.
        """
        config = {"configurable": {"thread_id": thread_id}}

        # Resume the graph by providing the human input
        async for event in self.graph.astream(
            Command(
                resume={
                    "approved": approved,
                    "reason": reason,
                }
            ),
            config=config,
        ):
            print(f"Graph resumed: {event}")

    async def _notify_approver(self, approval_id, checkpoint_type, data):
        # Implement: Slack message, email, dashboard entry
        pass

8. Xử Lý Lỗi và Recovery

Hãy thực tế: mọi thứ sẽ xảy ra. SSE agent sẽ tạo ra code không pass test. TL agent sẽ yêu cầu sửa đổi. QC agent sẽ tìm thấy edge case đòi hỏi phải nghĩ lại implementation. Chúng ta cần hệ thống xử lý những thất bại này một cách graceful, không rơi vào hỗn loạn.

Quy Tắc 3 Lần Thử

Đặc biệt với SSE agent, chúng ta giới hạn retry ở 3 lần. Đây là logic:

from langgraph.graph import StateGraph


def run_sse_agent(state: TeamState) -> TeamState:
    """
    SSE agent node with retry tracking and escalation logic.
    """
    attempts = state.get("implementation_attempts", 0)

    if attempts >= 3:
        # Don't even try — escalate
        return {
            **state,
            "requirement_state": RequirementState.ESCALATED,
            "blockers": state["blockers"] + [
                Blocker(
                    blocker_id=str(uuid.uuid4()),
                    description=(
                        f"SSE agent failed to implement after {attempts} attempts. "
                        f"Last failure: {state.get('self_test_results', {}).get('failed_test_ids', [])}"
                    ),
                    blocked_agent="sse_agent",
                    raised_by="sse_agent",
                    raised_at=datetime.now(timezone.utc).isoformat(),
                    resolved=False,
                    resolution=None,
                )
            ],
        }

    # Attempt implementation
    result = _do_implementation(state)

    if result["self_test_results"]["failed"] > 0:
        # Increment attempts, stay in development
        return {
            **result,
            "implementation_attempts": attempts + 1,
            "requirement_state": RequirementState.IN_DEVELOPMENT,
        }

    # Success
    return {
        **result,
        "implementation_attempts": attempts + 1,
        "requirement_state": RequirementState.IN_REVIEW,
        "event_outbox": {
            **state["event_outbox"],
            "pending": state["event_outbox"]["pending"] + [
                {
                    "event": CodeArtifactReady(
                        source_agent="sse_agent",
                        artifact_id=result["code_artifacts"][-1]["artifact_id"],
                        story_ids=[t["story_id"] for t in state["implementation_plan"]],
                        test_coverage=result["self_test_results"]["coverage_percent"],
                    ).to_dict(),
                    "delivered": False,
                    "delivery_attempts": 0,
                    "created_at": datetime.now(timezone.utc).isoformat(),
                }
            ],
        },
    }

State Rollback

Khi một state transition thất bại — ví dụ, TL agent từ chối code trong review — chúng ta không chỉ đặt requirement_state = IN_DEVELOPMENT. Chúng ta còn ghi lại việc rollback trong audit trail và preserve review notes để SSE agent có thể học hỏi từ chúng:

def handle_review_failure(
    state: TeamState,
    review_notes: list[str],
    reviewer: str,
) -> TeamState:
    """
    Rolls back from IN_REVIEW to IN_DEVELOPMENT with full audit trail.
    """
    rollback_event = StateTransition(
        from_state=RequirementState.IN_REVIEW,
        to_state=RequirementState.IN_DEVELOPMENT,
        triggered_by=reviewer,
        event_name="ReviewFailed",
        guard="review_approved == False",
        timestamp=datetime.now(timezone.utc).isoformat(),
        metadata={"review_notes": review_notes},
    )

    return {
        **state,
        "requirement_state": RequirementState.IN_DEVELOPMENT,
        "design_review_notes": state.get("design_review_notes", []) + review_notes,
        "state_transitions": state.get("state_transitions", []) + [rollback_event],
        "error_log": state.get("error_log", []) + [
            {
                "type": "review_failure",
                "reviewer": reviewer,
                "notes": review_notes,
                "timestamp": rollback_event["timestamp"],
            }
        ],
    }

Saga Pattern cho Long-Running Workflow

Toàn bộ vòng đời requirement của chúng ta là những gì các kiến trúc sư distributed system gọi là saga — một long-running transaction trải dài qua nhiều service (agent). Không giống như database transaction, bạn không thể đơn giản rollback một saga. Bạn phải thực thi các compensating transaction cho mỗi bước đã hoàn thành.

Trong hệ thống của chúng ta:

  • Nếu deployment thất bại sau khi code đã được commit vào repository, DevOps agent cần chạy compensating action rollback_deployment().
  • Nếu design bị từ chối sau khi TA agent đã tạo artifact, Design Context cần lưu trữ các artifact đó trước khi bắt đầu lại từ đầu.
  • Nếu một story bị từ chối sau khi BA agent đã tạo ra nó, state transition ghi lại việc từ chối với rejection_reason để PO agent biết cần sửa gì.

Chìa khóa để quản lý saga là: mọi hành động tiến về phía trước phải có một compensating action tương ứng được định nghĩa. Chúng ta sẽ triển khai điều này như một phần của class BaseAgent trong Phần 4.


9. Kiến Trúc Hệ Thống Hoàn Chỉnh

Hãy để tôi thu lại và hiển thị toàn bộ hệ thống như nó sẽ trông khi được xây dựng đầy đủ:

Diagram 4
Figure 4: Complete System Architecture. LangGraph orchestrates all 8 agents. The state store (Redis/SQLite) persists checkpoints. The message bus delivers domain events via the outbox pattern. Two human checkpoints (amber H badges) pause execution for approval. External tools connect to individual agent nodes.

10. Interface ConversationStore

Một component kiến trúc bổ sung đáng định nghĩa ngay bây giờ là ConversationStore. Trong khi TeamState giữ trạng thái workflow hiện tại, các agent cũng cần truy cập lịch sử conversation của chúng — những trao đổi qua lại với LLM đã tạo ra output của chúng. Chúng ta lưu trữ điều này riêng biệt để giữ TeamState gọn nhẹ.

from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass


@dataclass
class ConversationEntry:
    """A single turn in an agent's conversation history."""
    entry_id: str
    agent_id: str
    task_id: str
    role: str                         # "system", "user", "assistant"
    content: str
    timestamp: str
    token_count: Optional[int] = None
    model: Optional[str] = None


class ConversationStore(ABC):
    """
    Interface for persisting agent conversation histories.
    
    Why separate from TeamState? 
    - Conversation histories can be large (thousands of tokens)
    - They're needed for debugging and auditing, not for routing
    - Different storage backends may be appropriate
      (e.g., S3 for archives, Redis for active conversations)
    """

    @abstractmethod
    async def save_entry(self, entry: ConversationEntry) -> None:
        """Append a conversation entry."""

    @abstractmethod
    async def get_history(
        self,
        agent_id: str,
        task_id: str,
        limit: Optional[int] = None,
    ) -> list[ConversationEntry]:
        """Retrieve conversation history for an agent on a task."""

    @abstractmethod
    async def get_full_task_history(
        self,
        task_id: str,
    ) -> dict[str, list[ConversationEntry]]:
        """Get all agent conversations for an entire task."""

    @abstractmethod
    async def clear_history(self, agent_id: str, task_id: str) -> None:
        """Clear history (e.g., after task completion)."""


class SqliteConversationStore(ConversationStore):
    """
    SQLite-backed conversation store for development.
    In production, swap for Redis or PostgreSQL.
    """

    def __init__(self, db_path: str = "conversations.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self) -> None:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS conversations (
                entry_id TEXT PRIMARY KEY,
                agent_id TEXT NOT NULL,
                task_id TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                token_count INTEGER,
                model TEXT
            )
        """)
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_agent_task ON conversations(agent_id, task_id)"
        )
        conn.commit()
        conn.close()

    async def save_entry(self, entry: ConversationEntry) -> None:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """INSERT INTO conversations 
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                entry.entry_id,
                entry.agent_id,
                entry.task_id,
                entry.role,
                entry.content,
                entry.timestamp,
                entry.token_count,
                entry.model,
            ),
        )
        conn.commit()
        conn.close()

    async def get_history(
        self,
        agent_id: str,
        task_id: str,
        limit: Optional[int] = None,
    ) -> list[ConversationEntry]:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        query = """
            SELECT * FROM conversations 
            WHERE agent_id=? AND task_id=? 
            ORDER BY timestamp ASC
        """
        if limit:
            query += f" LIMIT {limit}"
        rows = conn.execute(query, (agent_id, task_id)).fetchall()
        conn.close()
        return [
            ConversationEntry(
                entry_id=row[0],
                agent_id=row[1],
                task_id=row[2],
                role=row[3],
                content=row[4],
                timestamp=row[5],
                token_count=row[6],
                model=row[7],
            )
            for row in rows
        ]

    async def get_full_task_history(
        self,
        task_id: str,
    ) -> dict[str, list[ConversationEntry]]:
        entries = await self.get_history("", task_id)  # simplified
        result: dict[str, list] = {}
        for entry in entries:
            result.setdefault(entry.agent_id, []).append(entry)
        return result

    async def clear_history(self, agent_id: str, task_id: str) -> None:
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "DELETE FROM conversations WHERE agent_id=? AND task_id=?",
            (agent_id, task_id),
        )
        conn.commit()
        conn.close()

11. Những Gì Chúng Ta Đã Thiết Kế — Và Tại Sao Nó Quan Trọng

Hãy để tôi nhìn lại và tóm tắt những gì chúng ta đã xây dựng trong bài này từ góc độ kiến trúc.

Chúng ta đã định nghĩa năm bounded context ánh xạ gọn gàng tới các cụm agent. Mỗi context sở hữu domain model của mình, và giao tiếp cross-context xảy ra qua anti-corruption layer. Điều này có nghĩa là khi Design Context phát triển — ví dụ, chúng ta thêm field mới vào TechnicalSpec — nó không phá vỡ Requirement Context hay Implementation Context.

Chúng ta đã thiết kế một vòng đời chín trạng thái với transition guard tường minh. Không agent nào có thể bỏ qua trạng thái. Không agent nào có thể chuyển requirement ngược lại mà không trigger compensating action. Audit trail ghi lại mọi transition.

Chúng ta đã định nghĩa ba communication pattern — Command, Event, Query — và chỉ định pattern nào phù hợp cho từng tương tác agent. Điều này ngăn sự hỗn loạn khi các agent gửi message tùy tiện cho nhau. Mọi message đều có type, mục đích, và routing rule.

Chúng ta đã xây dựng TeamState với 40+ field được tổ chức theo domain. Mỗi field được type, mỗi list field có LangGraph reducer phù hợp, và schema được version để migration trong tương lai.

Chúng ta đã thiết kế human-in-the-loop checkpoint dùng cơ chế interrupt() của LangGraph, với async approval gateway cho production.

Và chúng ta đã xác định saga pattern là mental model đúng đắn cho toàn bộ workflow, với compensating action được định nghĩa cho mỗi bước tiến về phía trước.

Liệu đây có nhiều kiến trúc hơn hầu hết project “AI agent” không? Có, đáng kể. Nhưng đây chính xác là điểm mấu chốt. Nếu bạn đang xây dựng một hệ thống sẽ chạy tự động trên codebase thực — push code lên GitHub, chạy test, tạo CI/CD config — bạn cần mức độ nghiêm ngặt này. Chi phí của một sai lầm kiến trúc ở đây không phải là API chậm; đó là broken code trong production.

Một Lời Về Over-Engineering

Tôi muốn thành thật về cực đoan còn lại: kiến trúc này chỉ có ý nghĩa nếu bạn đang xây dựng một hệ thống dự định xử lý nhiều project khác nhau theo thời gian. Nếu bạn đang vibe-coding một prototype one-off cho weekend hackathon, bạn có thể không cần bounded context và outbox pattern. Hãy dùng LangGraph trực tiếp với flat state dict và làm cho thứ đó chạy được.

Kiến trúc tôi mô tả ở đây dành cho trường hợp bạn đang xây dựng AI team như một platform — thứ gì đó team bạn sẽ dùng lặp lại, mở rộng, và bảo trì. Trong trường hợp đó, đầu tư vào kiến trúc trả lại nhiều lần.


12. Preview: Những Gì Chúng Ta Xây Dựng trong Phần 4

Trong Phần 4, chúng ta chuyển từ thiết kế sang triển khai. Chúng ta sẽ xây dựng:

Class BaseAgent — Abstract base mà mọi agent kế thừa. Nó xử lý:

  • LLM client initialization (model có thể cấu hình theo agent)
  • System prompt management và injection
  • Conversation history management qua ConversationStore
  • Domain event emission qua outbox
  • Retry logic với exponential backoff
  • Error capture và state recording
  • Compensating action registration (saga pattern)

Cấu trúc project — Toàn bộ directory layout:

ai-team/
├── agents/
│   ├── base.py              # BaseAgent abstract class
│   ├── po_agent.py          # Product Owner
│   ├── ba_agent.py          # Business Analyst
│   ├── ta_agent.py          # Tech Architect
│   ├── tl_agent.py          # Tech Lead
│   ├── sse_agent.py         # Senior Software Engineer
│   ├── qc_agent.py          # QC Engineer
│   ├── devops_agent.py      # DevOps Engineer
│   └── pm_agent.py          # Project Manager
├── domain/
│   ├── state.py             # TeamState TypedDict
│   ├── events.py            # DomainEvent + all event types
│   ├── models.py            # All domain entities/value objects
│   └── transitions.py       # State machine + guards
├── infrastructure/
│   ├── conversation_store.py  # ConversationStore + SQLite impl
│   ├── event_dispatcher.py    # Outbox dispatch logic
│   └── human_gateway.py       # HumanApprovalGateway
├── graph/
│   ├── builder.py           # LangGraph StateGraph construction
│   ├── edges.py             # All conditional edge functions
│   └── checkpoints.py       # Human checkpoint nodes
├── tools/
│   ├── github_tools.py      # GitHub API integration
│   ├── test_runner.py       # pytest/jest execution
│   └── linter.py            # ruff/mypy/eslint execution
├── api/
│   ├── server.py            # FastAPI server
│   └── webhooks.py          # Human approval webhook endpoint
├── tests/
│   └── ...
├── pyproject.toml
└── README.md
Xuất nội dung

Bình luận