Tôi vẫn nhớ lần đầu tiên cố xây dựng một multi-agent system mà không có bất kỳ kiến trúc nào. Năm 2023, GPT-4 vừa ra mắt, và tôi nghĩ: “Thôi cứ chain vài agent lại rồi để chúng nói chuyện với nhau.” Ba ngày sau, tôi có một mớ hỗn độn tuyệt đẹp. Các agent lặp lại công việc của nhau. Các agent mâu thuẫn nhau. Một agent cứ mãi xin lỗi agent khác vì không hiểu nó muốn gì. Đó là, theo cách đồng đội tôi mô tả, “như chăn mèo — chỉ khác là những con mèo đang viết production code.”
Bài học tôi rút ra từ trải nghiệm đó: AI agent là software component. Và software component không có kiến trúc sẽ trở thành legacy system rất, rất nhanh.
Trong Phần 1, tôi đã giới thiệu khái niệm về một AI software team hoàn chỉnh. Trong Phần 2, chúng ta đã phác thảo vai trò của từng agent — Product Owner, Business Analyst, Tech Architect, Tech Lead, Senior Software Engineer, QC Engineer, DevOps Engineer, và Project Manager. Giờ đến Phần 3, chúng ta cần trả lời câu hỏi khó hơn: làm thế nào để các agent này thực sự phối hợp với nhau như một hệ thống thống nhất?
Câu trả lời, tôi sẽ lập luận, đến từ một tập hợp ý tưởng mà thế giới software engineering đã tinh chỉnh suốt 20 năm qua: Domain-Driven Design.
1. Tại Sao Kiến Trúc Quan Trọng với AI Team
Để tôi mô tả hai kịch bản.
Kịch bản A — Không có kiến trúc: Bạn có 8 agent. Mỗi agent có thể gọi bất kỳ agent nào khác. Mỗi agent có định nghĩa riêng về “requirement” nghĩa là gì. Khi SSE agent viết xong code, nó broadcast một message cho tất cả mọi người. QC agent nhận được message nhưng không chắc code đã sẵn sàng để test chưa. PM agent nhận được message tương tự và cố cập nhật một status tracker nó tự xây dựng bên trong. TL agent nhận được và bắt đầu một code review mâu thuẫn với lần test chạy của QC agent. PO agent, không hiểu vì lý do gì, cũng được thông báo và bắt đầu hỏi thêm về business logic trong lúc đang code review.
Kịch bản B — Hệ thống có thiết kế: Requirement chảy qua các trạng thái được định nghĩa rõ ràng. Mỗi agent vận hành trong bounded context của mình và giao tiếp qua các domain event được định nghĩa tốt. Khi SSE agent hoàn thành code, nó emit event CodeArtifactReady. Chỉ QC agent và TL agent subscribe vào event này. PM agent biết về nó thông qua thay đổi trạng thái, không phải qua tin nhắn trực tiếp. PO agent không tham gia cho đến khi đạt đến human checkpoint gate.
Sự khác biệt giữa Kịch bản A và Kịch bản B không phải về LLM model bạn đang dùng. Đó là về kiến trúc. Đó là về việc có một từ vựng chung cho những gì mỗi phần của hệ thống làm, và enforce các ranh giới để các component không giẫm chân nhau.
Đây chính xác là những gì Domain-Driven Design mang lại cho chúng ta.
Sự Song Song với Microservices
Nếu bạn đã từng xây dựng microservices, bạn đã đối mặt với vấn đề này rồi. Khi bạn phân tách một monolith thành các service, bạn phải đối mặt với những câu hỏi như: service này sở hữu gì? Nó giao tiếp như thế nào? Điều gì xảy ra khi nó thất bại? Ai gọi ai?
Những câu trả lời xuất hiện từ kỷ nguyên microservices — bounded contexts, domain events, the outbox pattern, saga orchestration — chính xác là những pattern chúng ta cần cho multi-agent AI system. Các agent là các service. LangGraph là orchestrator. TeamState là shared data model. Domain event là các trigger di chuyển công việc giữa các agent.
Chúng ta không phát minh lại bất cứ điều gì ở đây. Chúng ta đang áp dụng các pattern đã được kiểm chứng vào một môi trường thực thi mới.
2. Áp Dụng DDD vào Multi-Agent Systems
Domain-Driven Design, về cốt lõi, là về việc căn chỉnh cấu trúc phần mềm với business domain. Cuốn sách năm 2003 của Eric Evans đã giới thiệu một từ vựng để làm điều này. Hãy để tôi dịch từ vựng đó sang ngôn ngữ agent.
Từ Vựng DDD-sang-Agent
Ubiquitous Language trong DDD có nghĩa là developer, domain expert, và stakeholder đều dùng cùng một thuật ngữ. Trong AI team của chúng ta, điều này có nghĩa là mọi agent và mọi đoạn code đều dùng cùng một từ vựng: một UserStory là một UserStory, không phải ticket, task, hay requirement_item. Khi BA agent tạo user story, khi SSE agent đọc chúng, và khi QC agent validate dựa trên chúng — tất cả đều dùng cùng cấu trúc dữ liệu UserStory.
Aggregate là tập hợp các domain object được xử lý như một đơn vị duy nhất. Trong hệ thống của chúng ta, aggregate Requirement chứa UserBrief, ClarifiedRequirements, và một danh sách UserStory. Aggregate CodeArtifact chứa SourceFiles, TestResults, và DependencyManifest. Aggregate enforce ranh giới nhất quán — bạn không cập nhật các object UserStory riêng lẻ; bạn cập nhật chúng thông qua aggregate Requirement.
Entity là object có danh tính riêng biệt tồn tại theo thời gian. Mỗi UserStory có story_id không đổi dù nội dung, trạng thái, và acceptance criteria của nó thay đổi. Mỗi CodeArtifact có artifact_id giúp chúng ta theo dõi nó qua review, sửa đổi, và deployment.
Value Object là object được xác định hoàn toàn bởi các thuộc tính của nó, không có danh tính riêng. Priority (High/Medium/Low), giá trị StoryPoints, phần trăm TestCoverage — đây là value object. Hai giá trị Priority.HIGH là giống nhau và có thể thay thế cho nhau.
Domain Event là sự kiện về điều gì đó đã xảy ra trong quá khứ. Chúng là các bản ghi bất biến. RequirementsCleared, UserStoriesCreated, CodeReviewApproved, TestsFailed — đây là domain event. Chúng được đặt tên ở thì quá khứ vì chúng ghi lại những gì đã xảy ra. Trong hệ thống LangGraph của chúng ta, domain event là cơ chế chính mà các agent kích hoạt lẫn nhau.
Bounded Context là khái niệm mạnh nhất chúng ta sẽ sử dụng. Một bounded context là ranh giới rõ ràng trong đó một model cụ thể áp dụng. Bên trong ranh giới, các thuật ngữ có nghĩa cụ thể, không mơ hồ. Qua ranh giới, bạn cần dịch thuật.
Bounded Context và Cụm Agent
Mỗi bounded context của chúng ta ánh xạ tới một cụm agent chia sẻ từ vựng và mối quan tâm:
- Requirement Context sở hữu ngôn ngữ của nhu cầu business: brief, story, acceptance criteria, prioritization.
- Design Context sở hữu ngôn ngữ của kiến trúc kỹ thuật: component, interface, data model, quyết định tech stack.
- Implementation Context sở hữu ngôn ngữ của code: function, class, module, test coverage, kết quả linting.
- Quality Context sở hữu ngôn ngữ của validation: test case, test suite, kết quả pass/fail, coverage threshold.
- Deployment Context sở hữu ngôn ngữ của delivery: pipeline, environment, configuration, rollout strategy.
Coordination Context là đặc biệt — nó không có domain language riêng nhưng hoạt động xuyên suốt tất cả các context khác. PM agent sống ở đây, theo dõi trạng thái, xác định blocker, và leo thang vấn đề qua các ranh giới context.
Anti-Corruption Layer
Trong DDD, khi hai bounded context cần giao tiếp, bạn đặt một anti-corruption layer giữa chúng. Điều này ngăn model của một context làm ô nhiễm context khác. Trong ngôn ngữ agent, đây là về việc dịch thuật.
Khi Requirement Context bàn giao cho Design Context, object UserStory của BA agent được dịch thành thứ TA agent có thể làm việc được: một danh sách object FunctionalRequirement với các hàm ý kỹ thuật cụ thể. Design Context không biết hay quan tâm đến format UserStory gốc — nó làm việc với object FunctionalRequirement.
Trong code, việc dịch thuật này xảy ra trong một function chuyên dụng ánh xạ giữa các domain model. Chúng ta sẽ thấy điều này ở các phần sau khi triển khai agent handoff thực tế.
3. 5 Bounded Context của Chúng Ta
Đây là bản đồ đầy đủ về các bounded context và cách chúng liên kết với nhau:
Let me walk through each context in depth.
Hãy để tôi đi sâu vào từng context.
Requirement Context: Nơi Mọi Thứ Bắt Đầu
PO agent nhận brief thô từ stakeholder. Nhiệm vụ của nó là diễn giải ý định, đặt câu hỏi làm rõ nếu cần, và tạo ra object ClarifiedRequirement. Đây là aggregate root cho context này. Mọi thứ trong Requirement Context đều chảy qua nó.
BA agent tiêu thụ ClarifiedRequirement và tạo ra các object UserStory, mỗi cái với acceptance criteria, priority, và estimated complexity. Khi BA agent hoàn thành, nó emit domain event UserStoriesCreated.
Requirement Context sở hữu các thuật ngữ này: UserBrief, ClarifiedRequirement, UserStory, AcceptanceCriteria, Priority, StoryPoints, BusinessRule. Không có thuật ngữ nào trong số này rò rỉ vào Design Context mà không đi qua anti-corruption layer.
Design Context: Dịch Business sang Technical
Anti-corruption layer tại ranh giới Requirement→Design dịch object UserStory thành object FunctionalRequirement. Việc dịch thuật này là có chủ đích: Design Context không quan tâm đến ngôn ngữ kể chuyện business; nó quan tâm đến yêu cầu kỹ thuật.
TA agent lấy object FunctionalRequirement và tạo ra aggregate TechnicalSpec: component diagram, data model, API contract, yêu cầu infrastructure, và bản ghi ArchitectureDecision (ghi lại lý do đằng sau các lựa chọn công nghệ — quan trọng để tham khảo sau).
TL agent review TechnicalSpec, có thể yêu cầu sửa đổi, và cuối cùng phê duyệt nó. Điều này trigger domain event DesignApproved.
Implementation Context: Nơi Code Xảy Ra
Anti-corruption layer tại Design→Implementation dịch TechnicalSpec thành ImplementationPlan — một danh sách ưu tiên các object ImplementationTask, mỗi cái có definition of done rõ ràng và technical notes.
SSE agent thực thi từng task một, tạo ra object CodeArtifact. Mỗi artifact bao gồm source file, test file, và LintReport. SSE agent tự chạy test — nếu test thất bại, nó lặp lại. Sau ba lần thất bại, nó leo thang lên TL agent hoặc PM agent thay vì quay vòng mãi mãi.
Khi code hoàn chỉnh và self-test pass, SSE agent emit CodeArtifactReady.
Quality Context: Validation Độc Lập
QC agent không tin tưởng kết quả test tự báo cáo của SSE agent. Nó độc lập định nghĩa test case (từ user story gốc, thông qua anti-corruption layer) và validate code artifact dựa trên chúng. Sự tách biệt này quan trọng: QC agent là tiếng nói đối lập trong hệ thống.
Nếu quality gate pass, event QualityGatePassed được fire. Nếu không, event TestsFailed quay lại Implementation Context.
Deployment Context: Delivery Pipeline
DevOps agent nhận QualityGatePassed và tạo ra CI/CD configuration, environment setup, và deployment runbook. Nó không tự tạo deployment — nó chuẩn bị mọi thứ và sau đó chờ tại human checkpoint trước khi kích hoạt deployment thực sự.
4. State Machine Vòng Đời Requirement
Phần quan trọng nhất của shared state trong hệ thống là vòng đời của một requirement. Mỗi requirement bắt đầu là một brief thô và kết thúc là một feature được deploy. Giữa hai điểm đó, có chín trạng thái riêng biệt và một tập hợp các transition, guard, và rollback có thể xảy ra.
Here is the state machine encoded as Python TypedDicts with full type annotations:
Đây là state machine được mã hóa dưới dạng Python TypedDict với type annotation đầy đủ:
from typing import Literal, Optional
from typing_extensions import TypedDict
from enum import Enum
from datetime import datetime
class RequirementState(str, Enum):
"""The nine states in the requirement lifecycle."""
RECEIVED = "received"
CLARIFIED = "clarified"
USER_STORIES_CREATED = "user_stories_created"
DESIGN_READY = "design_ready"
IN_DEVELOPMENT = "in_development"
IN_REVIEW = "in_review"
TEST_PENDING = "test_pending"
READY_TO_DEPLOY = "ready_to_deploy"
DEPLOYED = "deployed"
# Error states
BLOCKED = "blocked"
ESCALATED = "escalated"
class StateTransition(TypedDict):
"""Records a state transition with guard conditions."""
from_state: RequirementState
to_state: RequirementState
triggered_by: str # agent name
event_name: str # domain event that triggered it
guard: Optional[str] # condition that must be True
timestamp: str
metadata: dict
# Transition table — all valid transitions
VALID_TRANSITIONS: dict[RequirementState, list[RequirementState]] = {
RequirementState.RECEIVED: [
RequirementState.CLARIFIED,
RequirementState.BLOCKED, # if brief is too vague
],
RequirementState.CLARIFIED: [
RequirementState.USER_STORIES_CREATED,
RequirementState.RECEIVED, # rollback: needs more clarification
],
RequirementState.USER_STORIES_CREATED: [
RequirementState.DESIGN_READY, # after human approval
RequirementState.CLARIFIED, # if stories are rejected
],
RequirementState.DESIGN_READY: [
RequirementState.IN_DEVELOPMENT,
RequirementState.USER_STORIES_CREATED, # if design reveals gaps
],
RequirementState.IN_DEVELOPMENT: [
RequirementState.IN_REVIEW,
RequirementState.IN_DEVELOPMENT, # retry loop (max 3)
RequirementState.ESCALATED, # after 3 failures
],
RequirementState.IN_REVIEW: [
RequirementState.TEST_PENDING, # review passed
RequirementState.IN_DEVELOPMENT, # review failed
],
RequirementState.TEST_PENDING: [
RequirementState.READY_TO_DEPLOY, # all gates pass
RequirementState.IN_DEVELOPMENT, # tests failed, rework
],
RequirementState.READY_TO_DEPLOY: [
RequirementState.DEPLOYED, # after human approval
RequirementState.TEST_PENDING, # human rejects, retest
],
RequirementState.DEPLOYED: [], # terminal state
}
def validate_transition(
current: RequirementState,
target: RequirementState,
context: dict,
) -> tuple[bool, str]:
"""
Guard function: returns (is_valid, reason).
Called before every state transition.
"""
allowed = VALID_TRANSITIONS.get(current, [])
if target not in allowed:
return False, f"Transition {current} → {target} is not in transition table"
# Additional guards per transition
if current == RequirementState.USER_STORIES_CREATED and target == RequirementState.DESIGN_READY:
if not context.get("human_approved_stories"):
return False, "Human approval required before design phase"
if current == RequirementState.IN_DEVELOPMENT and target == RequirementState.IN_REVIEW:
if not context.get("self_tests_passing"):
return False, "SSE must pass self-tests before submitting for review"
if current == RequirementState.READY_TO_DEPLOY and target == RequirementState.DEPLOYED:
if not context.get("human_approved_deployment"):
return False, "Human approval required before production deployment"
return True, "ok"
Điểm mấu chốt ở đây là mỗi transition đều tường minh và có guard. Một agent không thể chuyển requirement từ DESIGN_READY sang DEPLOYED trong một bước. Nó phải đi qua mọi trạng thái trung gian. Điều này ngăn hệ thống bỏ qua các bước dưới áp lực thời gian — đây là một failure mode thực tế khi LLM agent tối ưu hóa cho việc hoàn thành task.
5. Kiến Trúc Giao Tiếp Giữa Các Agent
Với các bounded context đã được định nghĩa và state machine đã rõ ràng, chúng ta cần thiết kế cách các agent thực sự trao đổi thông tin. Có ba communication pattern cơ bản, mượn trực tiếp từ kiến trúc microservices:
Bây giờ hãy định nghĩa các pattern này trong code:
from dataclasses import dataclass, field
from typing import Any, Optional, Literal
from datetime import datetime, timezone
import uuid
@dataclass
class AgentMessage:
"""
The universal message envelope used for all agent communication.
Whether it's a Command, Event, or Query — it uses this structure.
The 'message_type' field determines how it's routed.
"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
message_type: Literal["command", "event", "query", "query_response"] = "event"
event_name: str = "" # e.g., "UserStoriesCreated", "CreateUserStories"
source_agent: str = "" # who sent this
target_agent: Optional[str] = None # None = broadcast (events only)
correlation_id: Optional[str] = None # links query to its response
payload: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
schema_version: str = "1.0"
def to_dict(self) -> dict:
return {
"message_id": self.message_id,
"message_type": self.message_type,
"event_name": self.event_name,
"source_agent": self.source_agent,
"target_agent": self.target_agent,
"correlation_id": self.correlation_id,
"payload": self.payload,
"metadata": self.metadata,
"timestamp": self.timestamp,
"schema_version": self.schema_version,
}
class DomainEvent(AgentMessage):
"""
Base class for all domain events.
Domain events record facts that have occurred.
They are named in the past tense.
They are immutable once created.
"""
def __init__(
self,
event_name: str,
source_agent: str,
payload: dict,
metadata: dict | None = None,
):
super().__init__(
message_type="event",
event_name=event_name,
source_agent=source_agent,
target_agent=None, # events are always broadcast
payload=payload,
metadata=metadata or {},
)
def __setattr__(self, name: str, value: Any) -> None:
"""Events are immutable after creation."""
if hasattr(self, "message_id"): # already initialized
raise AttributeError("DomainEvent is immutable")
super().__setattr__(name, value)
# Concrete domain events
class UserStoriesCreated(DomainEvent):
def __init__(self, source_agent: str, stories: list[dict], requirement_id: str):
super().__init__(
event_name="UserStoriesCreated",
source_agent=source_agent,
payload={
"requirement_id": requirement_id,
"stories": stories,
"story_count": len(stories),
},
)
class CodeArtifactReady(DomainEvent):
def __init__(
self,
source_agent: str,
artifact_id: str,
story_ids: list[str],
test_coverage: float,
):
super().__init__(
event_name="CodeArtifactReady",
source_agent=source_agent,
payload={
"artifact_id": artifact_id,
"story_ids": story_ids,
"test_coverage": test_coverage,
"self_tests_passing": True,
},
)
class QualityGatePassed(DomainEvent):
def __init__(
self,
source_agent: str,
artifact_id: str,
quality_score: float,
test_results: dict,
):
super().__init__(
event_name="QualityGatePassed",
source_agent=source_agent,
payload={
"artifact_id": artifact_id,
"quality_score": quality_score,
"test_results": test_results,
},
)
class TestsFailed(DomainEvent):
def __init__(
self,
source_agent: str,
artifact_id: str,
failure_summary: str,
failed_test_ids: list[str],
):
super().__init__(
event_name="TestsFailed",
source_agent=source_agent,
payload={
"artifact_id": artifact_id,
"failure_summary": failure_summary,
"failed_test_ids": failed_test_ids,
},
)
Outbox Pattern cho Reliable Event Delivery
Một trong những vấn đề thực tế tôi gặp phải khi xây dựng agent với LangGraph: điều gì xảy ra nếu một agent tạo ra domain event nhưng việc thực thi graph bị gián đoạn trước khi event được xử lý? Bạn sẽ có phantom work — trạng thái cho thấy SSE agent đã hoàn thành code, nhưng QC agent không bao giờ nhận được trigger.
Giải pháp mượn từ distributed system: Outbox Pattern. Thay vì emit event trực tiếp, các agent ghi event vào “outbox” trong shared state. Một component riêng — event dispatcher — đọc từ outbox và deliver event đến subscriber, sau đó đánh dấu chúng là đã delivered.
from typing_extensions import TypedDict
from typing import Annotated
import operator
class OutboxEntry(TypedDict):
event: dict # serialized DomainEvent
delivered: bool
delivery_attempts: int
created_at: str
# In TeamState:
class EventOutbox(TypedDict):
pending: Annotated[list[OutboxEntry], operator.add]
delivered: list[OutboxEntry]
def dispatch_events(state: "TeamState") -> "TeamState":
"""
Called at the start of each graph step.
Delivers pending outbox events to the appropriate handlers.
This ensures at-least-once delivery semantics.
"""
pending = state["event_outbox"]["pending"]
still_pending = []
for entry in pending:
if not entry["delivered"]:
event = entry["event"]
# Route to subscriber nodes
_route_event_to_subscribers(event, state)
entry["delivered"] = True
else:
still_pending.append(entry)
return {
**state,
"event_outbox": {
"pending": still_pending,
"delivered": state["event_outbox"]["delivered"] + [
e for e in pending if e["delivered"]
],
},
}
Pattern này đảm bảo không có domain event nào bị mất, ngay cả khi việc thực thi graph bị gián đoạn và khởi động lại từ checkpoint.
6. TeamState: Object Chia Sẻ Trung Tâm
Mọi thứ trong hệ thống đều hội tụ vào một cấu trúc dữ liệu: TeamState. Đây là LangGraph state được truyền qua mọi node, được cập nhật bởi mọi agent, và được persist giữa các checkpoint. Thiết kế nó tốt có lẽ là quyết định kiến trúc quan trọng nhất trong toàn bộ hệ thống.
Đây là định nghĩa hoàn chỉnh:
from typing import Literal, Optional, Annotated
from typing_extensions import TypedDict
import operator
# ============================================================
# Value Objects
# ============================================================
class Priority(TypedDict):
level: Literal["critical", "high", "medium", "low"]
rationale: str
class QualityGates(TypedDict):
min_test_coverage: float # e.g., 0.80
max_cyclomatic_complexity: int # e.g., 10
lint_must_pass: bool
type_check_must_pass: bool
security_scan_must_pass: bool
class DeploymentStatus(TypedDict):
status: Literal["pending", "in_progress", "deployed", "failed", "rolled_back"]
environment: Literal["dev", "staging", "production"]
deployed_at: Optional[str]
deployed_by: str # agent or human identifier
deployment_url: Optional[str]
version: Optional[str]
# ============================================================
# Entities
# ============================================================
class UserStory(TypedDict):
story_id: str
title: str
as_a: str # "As a [role]"
i_want: str # "I want [feature]"
so_that: str # "So that [benefit]"
acceptance_criteria: list[str]
priority: Priority
story_points: int
status: Literal[
"draft", "approved", "in_progress", "done", "rejected"
]
created_by: str # agent_id
class ImplementationTask(TypedDict):
task_id: str
story_id: str # links back to user story
title: str
description: str
file_path: str # expected output file
dependencies: list[str] # task_ids that must complete first
estimated_minutes: int
status: Literal["pending", "in_progress", "done", "failed"]
assigned_to: str
class CodeArtifact(TypedDict):
artifact_id: str
task_id: str
language: str
file_path: str
content: str # actual source code
test_file_path: Optional[str]
test_content: Optional[str]
lint_passed: bool
type_check_passed: bool
created_at: str
commit_hash: Optional[str]
class TestCase(TypedDict):
test_id: str
story_id: str
title: str
test_type: Literal["unit", "integration", "e2e", "performance"]
preconditions: list[str]
steps: list[str]
expected_result: str
status: Literal["pending", "passed", "failed", "skipped"]
failure_reason: Optional[str]
class Blocker(TypedDict):
blocker_id: str
description: str
blocked_agent: str
raised_by: str
raised_at: str
resolved: bool
resolution: Optional[str]
class ArchitectureDecision(TypedDict):
adr_id: str # Architecture Decision Record
title: str
status: Literal["proposed", "accepted", "deprecated", "superseded"]
context: str
decision: str
consequences: list[str]
alternatives_considered: list[str]
decided_by: str
decided_at: str
class TechnicalSpec(TypedDict):
spec_id: str
title: str
components: list[dict] # component name, responsibility, tech
data_models: list[dict] # entity definitions
api_contracts: list[dict] # endpoint definitions
infrastructure: dict # cloud resources, config
tech_stack: dict # language, frameworks, databases
approved_by: Optional[str]
approved_at: Optional[str]
class CICDConfig(TypedDict):
config_id: str
pipeline_tool: str # "github_actions", "gitlab_ci", etc.
pipeline_yaml: str # actual CI/CD config content
environments: list[str]
test_commands: list[str]
deploy_commands: list[str]
rollback_procedure: str
generated_by: str
generated_at: str
class TestResults(TypedDict):
run_id: str
total_tests: int
passed: int
failed: int
skipped: int
coverage_percent: float
duration_seconds: float
failed_test_ids: list[str]
coverage_report: dict
run_at: str
class RequirementDoc(TypedDict):
doc_id: str
original_brief: str
clarified_objectives: list[str]
out_of_scope: list[str]
assumptions: list[str]
constraints: list[str]
success_metrics: list[str]
stakeholders: list[str]
clarified_by: str
clarified_at: str
# ============================================================
# The Complete TeamState
# ============================================================
class TeamState(TypedDict):
# ── Metadata ──────────────────────────────────────────────
task_id: str
created_at: str
updated_at: str
phase: Literal[
"requirement", "design", "implementation", "quality", "deployment", "done"
]
requirement_state: str # RequirementState enum value
schema_version: str # for migration support
# ── Requirement Domain ─────────────────────────────────────
raw_brief: str
clarified_requirements: Optional[RequirementDoc]
user_stories: list[UserStory]
stories_human_approved: bool
rejection_reason: Optional[str]
# ── Design Domain ─────────────────────────────────────────
technical_spec: Optional[TechnicalSpec]
architecture_decisions: list[ArchitectureDecision]
design_review_notes: list[str]
design_approved_by: Optional[str]
# ── Implementation Domain ──────────────────────────────────
implementation_plan: list[ImplementationTask]
code_artifacts: list[CodeArtifact]
implementation_attempts: int # tracks retry count (max 3)
current_task_id: Optional[str]
self_test_results: Optional[TestResults]
# ── Quality Domain ─────────────────────────────────────────
test_cases: list[TestCase]
quality_gates: QualityGates
quality_test_results: Optional[TestResults]
quality_score: Optional[float]
quality_gate_passed: bool
qc_notes: list[str]
# ── Deployment Domain ─────────────────────────────────────
cicd_config: Optional[CICDConfig]
deployment_status: DeploymentStatus
deployment_human_approved: bool
deployment_notes: list[str]
# ── Coordination ──────────────────────────────────────────
# Annotated[list, operator.add] = append-only (LangGraph reducer)
agent_messages: Annotated[list[AgentMessage], operator.add]
event_outbox: EventOutbox
current_agent: str
blockers: list[Blocker]
human_feedback: Optional[str]
human_checkpoint_pending: bool
checkpoint_context: Optional[dict] # what the human needs to review
# ── Audit trail ───────────────────────────────────────────
state_transitions: Annotated[list[StateTransition], operator.add]
error_log: Annotated[list[dict], operator.add]
Một vài quyết định thiết kế đáng chú ý:
Tại sao dùng Annotated[list, operator.add] cho một số field? Quản lý state của LangGraph yêu cầu bạn chỉ định cách merge các cập nhật state đồng thời. Với agent_messages, state_transitions, và error_log, chúng ta muốn tích lũy các entry — nên dùng reducer operator.add. Với các field như technical_spec hay current_agent, chúng ta muốn giá trị mới nhất thắng, nên không annotate chúng.
Tại sao có schema_version tường minh? Khi hệ thống phát triển, schema TeamState sẽ thay đổi. Có field schema_version cho phép chúng ta viết các migration function nâng cấp state cũ lên schema mới — điều thiết yếu cho các workflow chạy dài.
Tại sao implementation_attempts là một số nguyên? Đây là guard chính chống lại vòng lặp retry vô hạn. Mỗi khi implementation của SSE agent thất bại khi test, counter này tăng lên. Ở mức 3, PM agent trigger human checkpoint thay vì cho phép retry thêm.
7. Human-in-the-Loop Checkpoints
Điều nguy hiểm nhất về một hệ thống AI tự trị là nó có thể sai một cách tự tin. Một hệ thống human-in-the-loop được thiết kế tốt không cố loại bỏ con người khỏi quy trình — nó đặt họ một cách chiến lược ở những điểm mà phán đoán của họ mang lại giá trị cao nhất.
Trong hệ thống của chúng ta, có hai human checkpoint bắt buộc:
-
Sau khi UserStoriesCreated — PO agent và BA agent đã hoàn thành công việc. Trước khi bất kỳ thiết kế hay code nào được viết, một người phê duyệt phạm vi. Điều này ngăn lãng phí công sức từ việc hiểu nhầm requirement.
-
Trước khi Deploy — DevOps agent đã chuẩn bị CI/CD configuration và QC agent đã ký duyệt. Trước khi bất cứ thứ gì được deploy lên production, một người cho phép cuối cùng.
Đây là cách triển khai các checkpoint này trong LangGraph:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import interrupt
def stories_approval_checkpoint(state: TeamState) -> TeamState:
"""
Human checkpoint: review user stories before design begins.
LangGraph will pause here and wait for external input.
"""
human_feedback = interrupt({
"checkpoint_type": "stories_approval",
"message": "Please review the user stories and approve or reject.",
"user_stories": state["user_stories"],
"clarified_requirements": state["clarified_requirements"],
"instructions": "Set approved=True to continue, or provide rejection_reason.",
})
if human_feedback.get("approved"):
return {
**state,
"stories_human_approved": True,
"human_checkpoint_pending": False,
"human_feedback": None,
}
else:
return {
**state,
"stories_human_approved": False,
"human_checkpoint_pending": False,
"rejection_reason": human_feedback.get("reason", "No reason provided"),
"requirement_state": RequirementState.CLARIFIED, # rollback
}
def deployment_approval_checkpoint(state: TeamState) -> TeamState:
"""
Human checkpoint: final approval before production deployment.
"""
human_feedback = interrupt({
"checkpoint_type": "deployment_approval",
"message": "Ready to deploy. Please review and approve.",
"cicd_config": state["cicd_config"],
"quality_score": state["quality_score"],
"test_results": state["quality_test_results"],
"deployment_target": state["deployment_status"]["environment"],
})
if human_feedback.get("approved"):
return {
**state,
"deployment_human_approved": True,
"human_checkpoint_pending": False,
}
else:
return {
**state,
"deployment_human_approved": False,
"deployment_notes": state["deployment_notes"] + [
f"Deployment rejected by human: {human_feedback.get('reason')}"
],
}
def route_after_stories(state: TeamState) -> str:
"""Conditional edge: route based on human approval of stories."""
if state.get("human_checkpoint_pending"):
return "stories_approval_checkpoint"
if state.get("stories_human_approved"):
return "ta_agent" # proceed to design
return "po_agent" # back to clarification
def route_after_quality(state: TeamState) -> str:
"""Conditional edge: route after quality gates."""
if not state.get("quality_gate_passed"):
return "sse_agent" # rework
if state["implementation_attempts"] >= 3:
return "pm_escalation" # human escalation
return "devops_agent" # proceed to deployment prep
# Build the graph
builder = StateGraph(TeamState)
# Add agent nodes
builder.add_node("po_agent", run_po_agent)
builder.add_node("ba_agent", run_ba_agent)
builder.add_node("stories_approval_checkpoint", stories_approval_checkpoint)
builder.add_node("ta_agent", run_ta_agent)
builder.add_node("tl_agent", run_tl_agent)
builder.add_node("sse_agent", run_sse_agent)
builder.add_node("qc_agent", run_qc_agent)
builder.add_node("devops_agent", run_devops_agent)
builder.add_node("deployment_approval_checkpoint", deployment_approval_checkpoint)
builder.add_node("pm_agent", run_pm_agent)
builder.add_node("pm_escalation", run_pm_escalation)
# Add edges
builder.set_entry_point("po_agent")
builder.add_edge("po_agent", "ba_agent")
builder.add_edge("ba_agent", "stories_approval_checkpoint")
builder.add_conditional_edges(
"stories_approval_checkpoint",
route_after_stories,
{
"ta_agent": "ta_agent",
"po_agent": "po_agent",
},
)
builder.add_edge("ta_agent", "tl_agent")
builder.add_edge("tl_agent", "sse_agent")
builder.add_edge("sse_agent", "qc_agent")
builder.add_conditional_edges(
"qc_agent",
route_after_quality,
{
"sse_agent": "sse_agent",
"devops_agent": "devops_agent",
"pm_escalation": "pm_escalation",
},
)
builder.add_edge("devops_agent", "deployment_approval_checkpoint")
builder.add_conditional_edges(
"deployment_approval_checkpoint",
lambda s: "deploy" if s.get("deployment_human_approved") else "qc_agent",
{"deploy": END, "qc_agent": "qc_agent"},
)
# Compile with checkpoint support
memory = SqliteSaver.from_conn_string("team_state.db")
graph = builder.compile(checkpointer=memory)
Async Approval qua Webhook
Cơ chế interrupt() của LangGraph là đồng bộ — nó block graph thread. Để dùng trong production, bạn cần một async approval flow nơi người dùng review dashboard và bấm nút. Đây là pattern:
import asyncio
from typing import Optional
class HumanApprovalGateway:
"""
Manages async human approval for interrupted graph states.
In production: backed by a database + webhook or dashboard UI.
"""
def __init__(self, graph, checkpointer):
self.graph = graph
self.checkpointer = checkpointer
async def submit_for_approval(
self,
thread_id: str,
checkpoint_type: str,
review_data: dict,
) -> str:
"""
Saves an approval request and sends a notification.
Returns the approval_request_id.
"""
approval_id = str(uuid.uuid4())
# Save to DB, send Slack/email notification, etc.
await self._notify_approver(approval_id, checkpoint_type, review_data)
return approval_id
async def receive_approval(
self,
thread_id: str,
approved: bool,
reason: Optional[str] = None,
) -> None:
"""
Called by webhook when human approves or rejects.
Resumes the paused graph with the human's decision.
"""
config = {"configurable": {"thread_id": thread_id}}
# Resume the graph by providing the human input
async for event in self.graph.astream(
Command(
resume={
"approved": approved,
"reason": reason,
}
),
config=config,
):
print(f"Graph resumed: {event}")
async def _notify_approver(self, approval_id, checkpoint_type, data):
# Implement: Slack message, email, dashboard entry
pass
8. Xử Lý Lỗi và Recovery
Hãy thực tế: mọi thứ sẽ xảy ra. SSE agent sẽ tạo ra code không pass test. TL agent sẽ yêu cầu sửa đổi. QC agent sẽ tìm thấy edge case đòi hỏi phải nghĩ lại implementation. Chúng ta cần hệ thống xử lý những thất bại này một cách graceful, không rơi vào hỗn loạn.
Quy Tắc 3 Lần Thử
Đặc biệt với SSE agent, chúng ta giới hạn retry ở 3 lần. Đây là logic:
from langgraph.graph import StateGraph
def run_sse_agent(state: TeamState) -> TeamState:
"""
SSE agent node with retry tracking and escalation logic.
"""
attempts = state.get("implementation_attempts", 0)
if attempts >= 3:
# Don't even try — escalate
return {
**state,
"requirement_state": RequirementState.ESCALATED,
"blockers": state["blockers"] + [
Blocker(
blocker_id=str(uuid.uuid4()),
description=(
f"SSE agent failed to implement after {attempts} attempts. "
f"Last failure: {state.get('self_test_results', {}).get('failed_test_ids', [])}"
),
blocked_agent="sse_agent",
raised_by="sse_agent",
raised_at=datetime.now(timezone.utc).isoformat(),
resolved=False,
resolution=None,
)
],
}
# Attempt implementation
result = _do_implementation(state)
if result["self_test_results"]["failed"] > 0:
# Increment attempts, stay in development
return {
**result,
"implementation_attempts": attempts + 1,
"requirement_state": RequirementState.IN_DEVELOPMENT,
}
# Success
return {
**result,
"implementation_attempts": attempts + 1,
"requirement_state": RequirementState.IN_REVIEW,
"event_outbox": {
**state["event_outbox"],
"pending": state["event_outbox"]["pending"] + [
{
"event": CodeArtifactReady(
source_agent="sse_agent",
artifact_id=result["code_artifacts"][-1]["artifact_id"],
story_ids=[t["story_id"] for t in state["implementation_plan"]],
test_coverage=result["self_test_results"]["coverage_percent"],
).to_dict(),
"delivered": False,
"delivery_attempts": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
}
],
},
}
State Rollback
Khi một state transition thất bại — ví dụ, TL agent từ chối code trong review — chúng ta không chỉ đặt requirement_state = IN_DEVELOPMENT. Chúng ta còn ghi lại việc rollback trong audit trail và preserve review notes để SSE agent có thể học hỏi từ chúng:
def handle_review_failure(
state: TeamState,
review_notes: list[str],
reviewer: str,
) -> TeamState:
"""
Rolls back from IN_REVIEW to IN_DEVELOPMENT with full audit trail.
"""
rollback_event = StateTransition(
from_state=RequirementState.IN_REVIEW,
to_state=RequirementState.IN_DEVELOPMENT,
triggered_by=reviewer,
event_name="ReviewFailed",
guard="review_approved == False",
timestamp=datetime.now(timezone.utc).isoformat(),
metadata={"review_notes": review_notes},
)
return {
**state,
"requirement_state": RequirementState.IN_DEVELOPMENT,
"design_review_notes": state.get("design_review_notes", []) + review_notes,
"state_transitions": state.get("state_transitions", []) + [rollback_event],
"error_log": state.get("error_log", []) + [
{
"type": "review_failure",
"reviewer": reviewer,
"notes": review_notes,
"timestamp": rollback_event["timestamp"],
}
],
}
Saga Pattern cho Long-Running Workflow
Toàn bộ vòng đời requirement của chúng ta là những gì các kiến trúc sư distributed system gọi là saga — một long-running transaction trải dài qua nhiều service (agent). Không giống như database transaction, bạn không thể đơn giản rollback một saga. Bạn phải thực thi các compensating transaction cho mỗi bước đã hoàn thành.
Trong hệ thống của chúng ta:
- Nếu deployment thất bại sau khi code đã được commit vào repository, DevOps agent cần chạy compensating action
rollback_deployment(). - Nếu design bị từ chối sau khi TA agent đã tạo artifact, Design Context cần lưu trữ các artifact đó trước khi bắt đầu lại từ đầu.
- Nếu một story bị từ chối sau khi BA agent đã tạo ra nó, state transition ghi lại việc từ chối với
rejection_reasonđể PO agent biết cần sửa gì.
Chìa khóa để quản lý saga là: mọi hành động tiến về phía trước phải có một compensating action tương ứng được định nghĩa. Chúng ta sẽ triển khai điều này như một phần của class BaseAgent trong Phần 4.
9. Kiến Trúc Hệ Thống Hoàn Chỉnh
Hãy để tôi thu lại và hiển thị toàn bộ hệ thống như nó sẽ trông khi được xây dựng đầy đủ:
10. Interface ConversationStore
Một component kiến trúc bổ sung đáng định nghĩa ngay bây giờ là ConversationStore. Trong khi TeamState giữ trạng thái workflow hiện tại, các agent cũng cần truy cập lịch sử conversation của chúng — những trao đổi qua lại với LLM đã tạo ra output của chúng. Chúng ta lưu trữ điều này riêng biệt để giữ TeamState gọn nhẹ.
from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass
@dataclass
class ConversationEntry:
"""A single turn in an agent's conversation history."""
entry_id: str
agent_id: str
task_id: str
role: str # "system", "user", "assistant"
content: str
timestamp: str
token_count: Optional[int] = None
model: Optional[str] = None
class ConversationStore(ABC):
"""
Interface for persisting agent conversation histories.
Why separate from TeamState?
- Conversation histories can be large (thousands of tokens)
- They're needed for debugging and auditing, not for routing
- Different storage backends may be appropriate
(e.g., S3 for archives, Redis for active conversations)
"""
@abstractmethod
async def save_entry(self, entry: ConversationEntry) -> None:
"""Append a conversation entry."""
@abstractmethod
async def get_history(
self,
agent_id: str,
task_id: str,
limit: Optional[int] = None,
) -> list[ConversationEntry]:
"""Retrieve conversation history for an agent on a task."""
@abstractmethod
async def get_full_task_history(
self,
task_id: str,
) -> dict[str, list[ConversationEntry]]:
"""Get all agent conversations for an entire task."""
@abstractmethod
async def clear_history(self, agent_id: str, task_id: str) -> None:
"""Clear history (e.g., after task completion)."""
class SqliteConversationStore(ConversationStore):
"""
SQLite-backed conversation store for development.
In production, swap for Redis or PostgreSQL.
"""
def __init__(self, db_path: str = "conversations.db"):
self.db_path = db_path
self._init_db()
def _init_db(self) -> None:
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS conversations (
entry_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
task_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
token_count INTEGER,
model TEXT
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_agent_task ON conversations(agent_id, task_id)"
)
conn.commit()
conn.close()
async def save_entry(self, entry: ConversationEntry) -> None:
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute(
"""INSERT INTO conversations
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
entry.entry_id,
entry.agent_id,
entry.task_id,
entry.role,
entry.content,
entry.timestamp,
entry.token_count,
entry.model,
),
)
conn.commit()
conn.close()
async def get_history(
self,
agent_id: str,
task_id: str,
limit: Optional[int] = None,
) -> list[ConversationEntry]:
import sqlite3
conn = sqlite3.connect(self.db_path)
query = """
SELECT * FROM conversations
WHERE agent_id=? AND task_id=?
ORDER BY timestamp ASC
"""
if limit:
query += f" LIMIT {limit}"
rows = conn.execute(query, (agent_id, task_id)).fetchall()
conn.close()
return [
ConversationEntry(
entry_id=row[0],
agent_id=row[1],
task_id=row[2],
role=row[3],
content=row[4],
timestamp=row[5],
token_count=row[6],
model=row[7],
)
for row in rows
]
async def get_full_task_history(
self,
task_id: str,
) -> dict[str, list[ConversationEntry]]:
entries = await self.get_history("", task_id) # simplified
result: dict[str, list] = {}
for entry in entries:
result.setdefault(entry.agent_id, []).append(entry)
return result
async def clear_history(self, agent_id: str, task_id: str) -> None:
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute(
"DELETE FROM conversations WHERE agent_id=? AND task_id=?",
(agent_id, task_id),
)
conn.commit()
conn.close()
11. Những Gì Chúng Ta Đã Thiết Kế — Và Tại Sao Nó Quan Trọng
Hãy để tôi nhìn lại và tóm tắt những gì chúng ta đã xây dựng trong bài này từ góc độ kiến trúc.
Chúng ta đã định nghĩa năm bounded context ánh xạ gọn gàng tới các cụm agent. Mỗi context sở hữu domain model của mình, và giao tiếp cross-context xảy ra qua anti-corruption layer. Điều này có nghĩa là khi Design Context phát triển — ví dụ, chúng ta thêm field mới vào TechnicalSpec — nó không phá vỡ Requirement Context hay Implementation Context.
Chúng ta đã thiết kế một vòng đời chín trạng thái với transition guard tường minh. Không agent nào có thể bỏ qua trạng thái. Không agent nào có thể chuyển requirement ngược lại mà không trigger compensating action. Audit trail ghi lại mọi transition.
Chúng ta đã định nghĩa ba communication pattern — Command, Event, Query — và chỉ định pattern nào phù hợp cho từng tương tác agent. Điều này ngăn sự hỗn loạn khi các agent gửi message tùy tiện cho nhau. Mọi message đều có type, mục đích, và routing rule.
Chúng ta đã xây dựng TeamState với 40+ field được tổ chức theo domain. Mỗi field được type, mỗi list field có LangGraph reducer phù hợp, và schema được version để migration trong tương lai.
Chúng ta đã thiết kế human-in-the-loop checkpoint dùng cơ chế interrupt() của LangGraph, với async approval gateway cho production.
Và chúng ta đã xác định saga pattern là mental model đúng đắn cho toàn bộ workflow, với compensating action được định nghĩa cho mỗi bước tiến về phía trước.
Liệu đây có nhiều kiến trúc hơn hầu hết project “AI agent” không? Có, đáng kể. Nhưng đây chính xác là điểm mấu chốt. Nếu bạn đang xây dựng một hệ thống sẽ chạy tự động trên codebase thực — push code lên GitHub, chạy test, tạo CI/CD config — bạn cần mức độ nghiêm ngặt này. Chi phí của một sai lầm kiến trúc ở đây không phải là API chậm; đó là broken code trong production.
Một Lời Về Over-Engineering
Tôi muốn thành thật về cực đoan còn lại: kiến trúc này chỉ có ý nghĩa nếu bạn đang xây dựng một hệ thống dự định xử lý nhiều project khác nhau theo thời gian. Nếu bạn đang vibe-coding một prototype one-off cho weekend hackathon, bạn có thể không cần bounded context và outbox pattern. Hãy dùng LangGraph trực tiếp với flat state dict và làm cho thứ đó chạy được.
Kiến trúc tôi mô tả ở đây dành cho trường hợp bạn đang xây dựng AI team như một platform — thứ gì đó team bạn sẽ dùng lặp lại, mở rộng, và bảo trì. Trong trường hợp đó, đầu tư vào kiến trúc trả lại nhiều lần.
12. Preview: Những Gì Chúng Ta Xây Dựng trong Phần 4
Trong Phần 4, chúng ta chuyển từ thiết kế sang triển khai. Chúng ta sẽ xây dựng:
Class BaseAgent — Abstract base mà mọi agent kế thừa. Nó xử lý:
- LLM client initialization (model có thể cấu hình theo agent)
- System prompt management và injection
- Conversation history management qua
ConversationStore - Domain event emission qua outbox
- Retry logic với exponential backoff
- Error capture và state recording
- Compensating action registration (saga pattern)
Cấu trúc project — Toàn bộ directory layout:
ai-team/
├── agents/
│ ├── base.py # BaseAgent abstract class
│ ├── po_agent.py # Product Owner
│ ├── ba_agent.py # Business Analyst
│ ├── ta_agent.py # Tech Architect
│ ├── tl_agent.py # Tech Lead
│ ├── sse_agent.py # Senior Software Engineer
│ ├── qc_agent.py # QC Engineer
│ ├── devops_agent.py # DevOps Engineer
│ └── pm_agent.py # Project Manager
├── domain/
│ ├── state.py # TeamState TypedDict
│ ├── events.py # DomainEvent + all event types
│ ├── models.py # All domain entities/value objects
│ └── transitions.py # State machine + guards
├── infrastructure/
│ ├── conversation_store.py # ConversationStore + SQLite impl
│ ├── event_dispatcher.py # Outbox dispatch logic
│ └── human_gateway.py # HumanApprovalGateway
├── graph/
│ ├── builder.py # LangGraph StateGraph construction
│ ├── edges.py # All conditional edge functions
│ └── checkpoints.py # Human checkpoint nodes
├── tools/
│ ├── github_tools.py # GitHub API integration
│ ├── test_runner.py # pytest/jest execution
│ └── linter.py # ruff/mypy/eslint execution
├── api/
│ ├── server.py # FastAPI server
│ └── webhooks.py # Human approval webhook endpoint
├── tests/
│ └── ...
├── pyproject.toml
└── README.md