Mỗi codebase đều có một thời điểm mà kiến trúc hoặc là vững chắc hoặc bị sụp đổ. Đối với các hệ thống multi-agent, thời điểm đó không phải là khi tôi xây dựng agent lồng ghép viết code hay agent liên lạc với khách hàng. Đó là khi agent thứ hai cần gì đó từ agent đầu tiên và không có cách nào sạch sẽ để lấy nó.
Tôi đã chứng kiến điều này ba lần. Ai đó xây dựng agent PO. Nó hoạt động tuyệt đẹp. Họ xây dựng agent BA. Cũng hoạt động tuyệt đẹp. Họ kết nối chúng lại với nhau và mọi thứ sụp đổ vì agent PO trả về một string, agent BA mong đợi một dict, không có hợp đồng state chia sẻ, không có retry logic, không có memory, và không có cách nào để trace xem điều gì đã xảy ra khi mọi thứ bị lỗi.
Trong Part 1, tôi đã trình bày tầm nhìn. Trong Part 2, tôi chọn LangGraph làm framework. Trong Part 3, tôi thiết kế kiến trúc — bounded contexts, domain events, state machines, communication patterns. Bây giờ trong Part 4, tôi viết code khiến tất cả đó trở thành hiện thực.
Đây là nền tảng. Mỗi agent trong Parts 5 đến 10 sẽ kế thừa từ class BaseAgent tôi xây dựng ở đây. Mỗi phần state sẽ chảy qua TeamState tôi định nghĩa ở đây. Mỗi tool sẽ được đăng ký trong ToolRegistry tôi tạo ở đây. Nếu tôi làm sai, mọi thứ phía sau sẽ bị gãy. Nếu tôi làm đúng, việc thêm các agent mới sẽ trở thành cơ học.
Hãy làm cho nó đúng.
1. Cấu Trúc Dự Án
Trước khi viết bất kỳ code nào, đây là toàn bộ layout thư mục. Tôi hiển thị điều này ở phía trước vì theo kinh nghiệm của tôi, hiểu được những thứ sống ở đâu là nửa trận chiến với bất kỳ codebase mới nào.
ai-software-team/
├── agents/
│ ├── __init__.py
│ ├── base.py # BaseAgent ABC
│ ├── po_agent.py # Part 5
│ ├── ba_agent.py # Part 5
│ ├── ta_agent.py # Part 6
│ ├── tl_agent.py # Part 7
│ ├── sse_agent.py # Part 8
│ ├── qc_agent.py # Part 9
│ ├── devops_agent.py # Part 9
│ └── pm_agent.py # Part 10
├── domain/
│ ├── __init__.py
│ ├── models.py # Value objects, entities
│ ├── state.py # TeamState TypedDict
│ └── events.py # Domain events (from Part 3)
├── infra/
│ ├── __init__.py
│ ├── llm.py # LLM client wrapper
│ ├── memory.py # SQLiteMemory
│ └── tools.py # ToolRegistry
├── config/
│ └── agents.yaml # Agent configuration
├── tests/
│ ├── __init__.py
│ ├── test_base_agent.py
│ ├── test_state.py
│ ├── test_memory.py
│ └── test_tool_registry.py
├── main.py # Entry point
├── requirements.txt
└── .env.example
Mỗi thư mục có một trách nhiệm duy nhất. agents/ chứa các agent implementation. domain/ chứa business logic và data structures. infra/ chứa các infrastructure concerns — LLM clients, persistence, tool management. config/ chứa YAML configuration. tests/ chứa các test. Không có sự mơ hồ nào về nơi mọi thứ nằm.
2. Requirements và Environment
# requirements.txt
langgraph==0.4.1
langchain-core==0.3.40
langchain-anthropic==0.3.12
anthropic>=0.42.0
pydantic>=2.9.0
pyyaml>=6.0.2
python-dotenv>=1.0.1
rich>=13.9.0
pytest>=8.3.0
pytest-asyncio>=0.24.0
Và tệp environment:
# .env.example
ANTHROPIC_API_KEY=sk-ant-...
DEFAULT_MODEL=claude-sonnet-4-20250514
LOG_LEVEL=INFO
MEMORY_DB_PATH=./data/memory.db
Tôi sử dụng claude-sonnet-4-20250514 làm mô hình mặc định cho hầu hết các agents. Nó đủ nhanh cho công việc lặp đi lặp lại và đủ thông minh cho structured output. Đối với các agent Tech Architect và Tech Lead, cần suy luận sâu hơn, tôi chuyển đổi sang claude-opus-4-6. Điều này được cấu hình cho từng agent trong agents.yaml, không phải hardcoded.
3. Agent Configuration: agents.yaml
Một trong những sai lầm sớm nhất tôi mắc phải là hardcoding cấu hình agent bên trong mỗi class agent. Tên mô hình, cài đặt nhiệt độ, số lần retry, các fragment system prompt — tất cả đều bị chôn trong các tệp Python. Thay đổi bất cứ điều gì đều yêu cầu chỉnh sửa code, điều đó có nghĩa là phải test lại, điều đó có nghĩa là không thay đổi bất cứ điều gì trừ khi hoàn toàn cần thiết.
Cách khắc phục rất đơn giản: đặt tất cả cấu hình agent vào tệp YAML. Các class agent đọc từ tệp này khi khởi tạo.
# config/agents.yaml
project:
name: "AI Software Team"
version: "1.0.0"
default_model: "claude-sonnet-4-20250514"
max_retries: 3
retry_delay_seconds: 2
agents:
po_agent:
name: "Alex"
role: "Senior Product Owner"
model: "claude-sonnet-4-20250514"
temperature: 0.3
max_tokens: 4096
description: >
Transforms raw client briefs into structured requirement
documents. Asks clarifying questions, defines scope, identifies
personas, and sets success metrics.
tools:
- "search_web"
- "create_document"
permissions:
can_write_code: false
can_deploy: false
can_approve_stories: true
can_approve_architecture: false
ba_agent:
name: "Jordan"
role: "Senior Business Analyst"
model: "claude-sonnet-4-20250514"
temperature: 0.2
max_tokens: 8192
description: >
Decomposes requirement documents into user stories with
acceptance criteria, priority, and story point estimates.
tools:
- "create_document"
permissions:
can_write_code: false
can_deploy: false
can_approve_stories: false
can_approve_architecture: false
ta_agent:
name: "Sam"
role: "Technical Architect"
model: "claude-opus-4-6"
temperature: 0.2
max_tokens: 8192
description: >
Designs system architecture from user stories. Produces
component diagrams, data models, API contracts, and
architecture decision records.
tools:
- "create_document"
- "search_web"
permissions:
can_write_code: false
can_deploy: false
can_approve_stories: false
can_approve_architecture: true
tl_agent:
name: "Morgan"
role: "Tech Lead"
model: "claude-opus-4-6"
temperature: 0.1
max_tokens: 4096
description: >
Reviews architecture and code. Approves or rejects with
specific feedback. Ensures consistency and quality standards.
tools:
- "read_file"
- "create_document"
permissions:
can_write_code: false
can_deploy: false
can_approve_stories: true
can_approve_architecture: true
sse_agent:
name: "Riley"
role: "Senior Software Engineer"
model: "claude-sonnet-4-20250514"
temperature: 0.1
max_tokens: 16384
description: >
Implements code from technical specs and implementation plans.
Writes source files, tests, handles self-testing and iteration.
tools:
- "write_file"
- "read_file"
- "run_tests"
- "run_linter"
permissions:
can_write_code: true
can_deploy: false
can_approve_stories: false
can_approve_architecture: false
qc_agent:
name: "Casey"
role: "QC Engineer"
model: "claude-sonnet-4-20250514"
temperature: 0.1
max_tokens: 8192
description: >
Independently validates code against acceptance criteria.
Writes and executes test cases. Reports quality gate results.
tools:
- "read_file"
- "run_tests"
- "create_document"
permissions:
can_write_code: false
can_deploy: false
can_approve_stories: false
can_approve_architecture: false
devops_agent:
name: "Drew"
role: "DevOps Engineer"
model: "claude-sonnet-4-20250514"
temperature: 0.1
max_tokens: 4096
description: >
Creates CI/CD configuration, deployment scripts, environment
setup, and rollback procedures.
tools:
- "write_file"
- "read_file"
- "create_document"
permissions:
can_write_code: true
can_deploy: true
can_approve_stories: false
can_approve_architecture: false
pm_agent:
name: "Taylor"
role: "Project Manager"
model: "claude-sonnet-4-20250514"
temperature: 0.3
max_tokens: 4096
description: >
Tracks progress, identifies blockers, manages state
transitions, escalates issues. Coordinates across all
bounded contexts.
tools:
- "create_document"
permissions:
can_write_code: false
can_deploy: false
can_approve_stories: false
can_approve_architecture: false
Tôi có một vài lựa chọn thiết kế để gọi ra.
Nhiệt độ thay đổi theo vai trò. Các agent SSE và QC sử dụng 0.1 vì tạo code và xác thực test cần tính deterministic. Agent PO sử dụng 0.3 vì làm rõ requirement có lợi từ một số cách giải thích sáng tạo. Không ai vượt quá 0.3 — đây là một hệ thống sản xuất, không phải một phiên brainstorming.
Max tokens thay đổi theo kích thước output. Agent SSE nhận được 16384 token vì nó sản xuất các tệp source đầy đủ. Agent PM nhận được 4096 vì output của nó là những tóm tắt trạng thái ngắn gọn.
Tools được whitelist, không blacklist. Mỗi agent chỉ nhận được các tool mà nó cần. Agent PO không thể viết file. Agent SSE không thể deploy. Đây không chỉ là documentation — nó được enforce tại runtime bởi ToolRegistry.
Đây là loader:
# config/loader.py
from pathlib import Path
from typing import Any
import yaml
def load_config(config_path: str = "config/agents.yaml") -> dict[str, Any]:
"""Load and validate agent configuration from YAML."""
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Config not found: {config_path}")
with open(path) as f:
config = yaml.safe_load(f)
_validate_config(config)
return config
def get_agent_config(
config: dict[str, Any], agent_id: str
) -> dict[str, Any]:
"""Extract config for a specific agent, merged with defaults."""
agents = config.get("agents", {})
if agent_id not in agents:
raise KeyError(f"Agent '{agent_id}' not found in config")
agent_cfg = agents[agent_id]
project_cfg = config.get("project", {})
return {
"agent_id": agent_id,
"name": agent_cfg["name"],
"role": agent_cfg["role"],
"model": agent_cfg.get("model", project_cfg.get("default_model")),
"temperature": agent_cfg.get("temperature", 0.2),
"max_tokens": agent_cfg.get("max_tokens", 4096),
"description": agent_cfg.get("description", ""),
"tools": agent_cfg.get("tools", []),
"permissions": agent_cfg.get("permissions", {}),
"max_retries": project_cfg.get("max_retries", 3),
"retry_delay": project_cfg.get("retry_delay_seconds", 2),
}
def _validate_config(config: dict[str, Any]) -> None:
"""Basic validation of required config fields."""
required_agents = [
"po_agent", "ba_agent", "ta_agent", "tl_agent",
"sse_agent", "qc_agent", "devops_agent", "pm_agent",
]
agents = config.get("agents", {})
missing = [a for a in required_agents if a not in agents]
if missing:
raise ValueError(f"Missing agent configs: {missing}")
for agent_id, agent_cfg in agents.items():
for field in ("name", "role"):
if field not in agent_cfg:
raise ValueError(
f"Agent '{agent_id}' missing required field: {field}"
)
4. TeamState: Xương Sống Chia Sẻ
Tôi đã định nghĩa TeamState theo khái niệm trong Part 3. Bây giờ hãy viết phiên bản sản xuất. Đây là data structure quan trọng nhất trong toàn bộ hệ thống — mỗi agent đọc từ nó, mỗi agent viết vào nó, và LangGraph quản lý vòng đời của nó.
# domain/state.py
from typing import Literal, Optional, Annotated, Any
from typing_extensions import TypedDict
import operator
# ── Value Objects ─────────────────────────────────────────
class Priority(TypedDict):
level: Literal["critical", "high", "medium", "low"]
rationale: str
class QualityGates(TypedDict):
min_test_coverage: float
max_cyclomatic_complexity: int
lint_must_pass: bool
type_check_must_pass: bool
security_scan_must_pass: bool
# ── Entities ──────────────────────────────────────────────
class UserStory(TypedDict):
story_id: str
title: str
as_a: str
i_want: str
so_that: str
acceptance_criteria: list[str]
priority: Priority
story_points: int
status: Literal["draft", "approved", "in_progress", "done", "rejected"]
created_by: str
class CodeArtifact(TypedDict):
artifact_id: str
task_id: str
language: str
file_path: str
content: str
test_file_path: Optional[str]
test_content: Optional[str]
lint_passed: bool
type_check_passed: bool
created_at: str
commit_hash: Optional[str]
class TestCase(TypedDict):
test_id: str
story_id: str
title: str
test_type: Literal["unit", "integration", "e2e", "performance"]
preconditions: list[str]
steps: list[str]
expected_result: str
status: Literal["pending", "passed", "failed", "skipped"]
failure_reason: Optional[str]
class TestResults(TypedDict):
run_id: str
total_tests: int
passed: int
failed: int
skipped: int
coverage_percent: float
duration_seconds: float
failed_test_ids: list[str]
run_at: str
class Blocker(TypedDict):
blocker_id: str
description: str
blocked_agent: str
raised_by: str
raised_at: str
resolved: bool
resolution: Optional[str]
# ── The Complete TeamState ────────────────────────────────
class TeamState(TypedDict):
# Metadata
task_id: str
created_at: str
updated_at: str
phase: Literal[
"requirement", "design", "implementation",
"quality", "deployment", "done",
]
requirement_state: str
schema_version: str
# Requirement domain
raw_brief: str
clarified_requirements: Optional[dict]
po_clarifying_questions: Optional[str]
clarification_answers: Optional[str]
po_research_notes: Optional[str]
user_stories: list[UserStory]
stories_human_approved: bool
rejection_reason: Optional[str]
# Design domain
technical_spec: Optional[dict]
architecture_decisions: list[dict]
design_review_notes: list[str]
design_approved_by: Optional[str]
# Implementation domain
implementation_plan: list[dict]
code_artifacts: list[CodeArtifact]
implementation_attempts: int
current_task_id: Optional[str]
self_test_results: Optional[TestResults]
# Quality domain
test_cases: list[TestCase]
quality_gates: QualityGates
quality_test_results: Optional[TestResults]
quality_score: Optional[float]
quality_gate_passed: bool
qc_notes: list[str]
# Deployment domain
cicd_config: Optional[dict]
deployment_status: Optional[dict]
deployment_human_approved: bool
deployment_notes: list[str]
# Coordination
agent_messages: Annotated[list[dict], operator.add]
event_outbox: list[dict]
current_agent: str
blockers: list[Blocker]
human_feedback: Optional[str]
human_checkpoint_pending: bool
# Audit trail
state_transitions: Annotated[list[dict], operator.add]
error_log: Annotated[list[dict], operator.add]
def create_initial_state(
task_id: str,
raw_brief: str,
) -> TeamState:
"""
Factory function for creating a fresh TeamState.
Every field gets a sensible default. This is the only place
where TeamState objects should be created from scratch.
"""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
return TeamState(
task_id=task_id,
created_at=now,
updated_at=now,
phase="requirement",
requirement_state="received",
schema_version="1.0.0",
# Requirement
raw_brief=raw_brief,
clarified_requirements=None,
po_clarifying_questions=None,
clarification_answers=None,
po_research_notes=None,
user_stories=[],
stories_human_approved=False,
rejection_reason=None,
# Design
technical_spec=None,
architecture_decisions=[],
design_review_notes=[],
design_approved_by=None,
# Implementation
implementation_plan=[],
code_artifacts=[],
implementation_attempts=0,
current_task_id=None,
self_test_results=None,
# Quality
test_cases=[],
quality_gates=QualityGates(
min_test_coverage=0.80,
max_cyclomatic_complexity=10,
lint_must_pass=True,
type_check_must_pass=True,
security_scan_must_pass=False,
),
quality_test_results=None,
quality_score=None,
quality_gate_passed=False,
qc_notes=[],
# Deployment
cicd_config=None,
deployment_status=None,
deployment_human_approved=False,
deployment_notes=[],
# Coordination
agent_messages=[],
event_outbox=[],
current_agent="",
blockers=[],
human_feedback=None,
human_checkpoint_pending=False,
# Audit
state_transitions=[],
error_log=[],
)
Factory function create_initial_state rất quan trọng. Nếu không có nó, tôi sẽ kết thúc bằng việc kiểm tra if "user_stories" in state ở khắp nơi vì không có đảm bảo rằng key tồn tại. Với factory, mỗi key luôn hiện diện. Giá trị mặc định là rõ ràng. Không có bất ngờ.
5. BaseAgent: Abstract Base Class
Đây là lõi của toàn bộ hệ thống. Mỗi agent kế thừa từ BaseAgent. Nó xử lý các LLM call, conversation history, retry logic, event emission, và state updates. Các agent riêng lẻ chỉ cần implement ba method: _prepare_prompt, _parse_output, và tùy chọn _get_system_prompt.
# agents/base.py
import json
import time
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Any, Optional
from anthropic import Anthropic
from config.loader import load_config, get_agent_config
from domain.state import TeamState
from infra.memory import SQLiteMemory
from infra.tools import ToolRegistry
logger = logging.getLogger(__name__)
class BaseAgent(ABC):
"""
Abstract base class for all agents in the AI software team.
Subclasses MUST implement:
_prepare_prompt(state) -> str
_parse_output(response, state) -> TeamState
Subclasses MAY override:
_get_system_prompt() -> str
_on_retry(attempt, error, state) -> TeamState
"""
AGENT_ID: str = "" # Override in subclass
AGENT_NAME: str = "" # Override in subclass
AGENT_ROLE: str = "" # Override in subclass
SYSTEM_PROMPT: str = "" # Override in subclass
def __init__(
self,
config_path: str = "config/agents.yaml",
memory: Optional[SQLiteMemory] = None,
tool_registry: Optional[ToolRegistry] = None,
):
full_config = load_config(config_path)
self.config = get_agent_config(full_config, self.AGENT_ID)
self.model = self.config["model"]
self.temperature = self.config["temperature"]
self.max_tokens = self.config["max_tokens"]
self.max_retries = self.config["max_retries"]
self.retry_delay = self.config["retry_delay"]
self.client = Anthropic()
self.memory = memory or SQLiteMemory()
self.tool_registry = tool_registry or ToolRegistry()
logger.info(
"Initialized %s (%s) with model=%s",
self.AGENT_NAME, self.AGENT_ID, self.model,
)
# ── Public API ────────────────────────────────────────
def run(self, state: TeamState) -> TeamState:
"""
Execute the agent. This is the method LangGraph calls.
Flow:
1. Update current_agent in state
2. Prepare the prompt
3. Call the LLM (with retries)
4. Parse the output into state updates
5. Record the interaction in memory
6. Return updated state
"""
state = {
**state,
"current_agent": self.AGENT_ID,
"updated_at": datetime.now(timezone.utc).isoformat(),
}
prompt = self._prepare_prompt(state)
system = self._get_system_prompt()
# Load conversation history for context
history = self.memory.get_history(
state["task_id"], self.AGENT_ID
)
response = self._call_llm_with_retry(
system=system,
prompt=prompt,
history=history,
state=state,
)
# Persist this interaction
self.memory.save_interaction(
task_id=state["task_id"],
agent_id=self.AGENT_ID,
prompt=prompt,
response=response,
)
# Parse response into state updates
updated_state = self._parse_output(response, state)
# Record state transition
transition = {
"agent_id": self.AGENT_ID,
"agent_name": self.AGENT_NAME,
"timestamp": datetime.now(timezone.utc).isoformat(),
"phase": state["phase"],
}
updated_state["state_transitions"] = [transition]
return updated_state
# ── Abstract Methods ──────────────────────────────────
@abstractmethod
def _prepare_prompt(self, state: TeamState) -> str:
"""
Build the user prompt from current state.
Each agent decides what state fields matter for its work.
"""
...
@abstractmethod
def _parse_output(
self, response: str, state: TeamState
) -> TeamState:
"""
Parse the LLM response and return updated state.
Must return a FULL state dict (or partial for LangGraph
reducer merge).
"""
...
# ── Overridable Methods ───────────────────────────────
def _get_system_prompt(self) -> str:
"""Return the system prompt. Override for dynamic prompts."""
return self.SYSTEM_PROMPT
def _on_retry(
self,
attempt: int,
error: Exception,
state: TeamState,
) -> None:
"""Hook called before each retry. Override for custom logic."""
logger.warning(
"%s retry %d/%d: %s",
self.AGENT_ID, attempt, self.max_retries, str(error),
)
# ── LLM Interaction ──────────────────────────────────
def _call_llm_with_retry(
self,
system: str,
prompt: str,
history: list[dict],
state: TeamState,
) -> str:
"""
Call the LLM with exponential backoff retry.
Returns the text response.
Raises after max_retries exhausted.
"""
messages = self._build_messages(history, prompt)
for attempt in range(1, self.max_retries + 1):
try:
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
temperature=self.temperature,
system=system,
messages=messages,
)
text = response.content[0].text
logger.info(
"%s got response (%d chars, %d input + %d output tokens)",
self.AGENT_ID,
len(text),
response.usage.input_tokens,
response.usage.output_tokens,
)
return text
except Exception as e:
self._on_retry(attempt, e, state)
if attempt == self.max_retries:
self._log_error(state, str(e))
raise
time.sleep(self.retry_delay * attempt) # linear backoff
raise RuntimeError("Unreachable")
def _build_messages(
self, history: list[dict], current_prompt: str
) -> list[dict]:
"""
Build the messages array for the API call.
Includes relevant history for continuity, capped at last 10
interactions to manage context window.
"""
messages = []
# Add recent history (last 10 exchanges)
for entry in history[-10:]:
messages.append({"role": "user", "content": entry["prompt"]})
messages.append({
"role": "assistant", "content": entry["response"],
})
# Add current prompt
messages.append({"role": "user", "content": current_prompt})
return messages
# ── Helpers ───────────────────────────────────────────
def _emit_event(
self,
state: TeamState,
event_name: str,
payload: dict,
) -> TeamState:
"""Add a domain event to the outbox."""
event = {
"event_name": event_name,
"source_agent": self.AGENT_ID,
"payload": payload,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return {
**state,
"event_outbox": state.get("event_outbox", []) + [event],
}
def _log_error(self, state: TeamState, error_msg: str) -> None:
"""Log an error to the state's error_log."""
logger.error("%s error: %s", self.AGENT_ID, error_msg)
def _parse_json_from_response(self, response: str) -> dict:
"""
Extract JSON from an LLM response that might contain
markdown fences or preamble text.
"""
text = response.strip()
# Try direct parse first
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try extracting from code fences
if "```json" in text:
start = text.index("```json") + 7
end = text.index("```", start)
return json.loads(text[start:end].strip())
if "```" in text:
start = text.index("```") + 3
end = text.index("```", start)
return json.loads(text[start:end].strip())
# Try finding first { to last }
first_brace = text.find("{")
last_brace = text.rfind("}")
if first_brace != -1 and last_brace != -1:
return json.loads(text[first_brace:last_brace + 1])
raise ValueError(
f"Could not extract JSON from response: {text[:200]}..."
)
Tại Sao Những Lựa Chọn Thiết Kế Này
run() không phải abstract. Execution flow — prepare prompt, call LLM, parse output, save to memory — là giống nhau cho mỗi agent. Những gì thay đổi là cái gì prompt để xây dựng và cách thế nào để parse output. Đây là Template Method pattern, và nó tồn tại chính xác cho loại vấn đề này.
_parse_json_from_response xử lý messy LLM output. Không quan trọng bạn chỉ dẫn một LLM một cách chắc chắn bao nhiêu để trả về “chỉ JSON, không preamble,” nó đôi khi sẽ thêm “Đây là JSON:” trước output. Hoặc wrap nó trong code fences. Method này xử lý tất cả những trường hợp đó để các agent riêng lẻ không phải làm.
History được capped ở 10 interactions. Đây là một giới hạn thực tế. Với claude-sonnet-4-20250514, context window đủ lớn để giữ nhiều hơn, nhưng mỗi interaction bao gồm một full prompt và response. Mười exchanges cung cấp cho agent đủ memory để duy trì sự liên tục mà không bị chìm trong context không liên quan.
Linear backoff, không exponential. Đối với API rate limits, exponential backoff có ý nghĩa. Đối với use case của tôi — nơi failures thường là transient network issues hoặc brief API overloads — linear backoff với một small multiplier đủ responsive. Nếu tôi consistently hitting rate limits, vấn đề là concurrency model của tôi, không phải retry strategy của tôi.
6. SQLiteMemory: Persistent Conversation Store
Agents cần memory. Không chỉ trong một single LangGraph execution, mà qua các executions. Khi agent SSE bị lỗi và graph restart từ một checkpoint, agent nên biết nó đã thử cái gì trước đó để nó không repeat cùng một sai lầm.
SQLite là tool đúng ở đây. Nó là serverless, zero-configuration, nhanh cho access patterns của tôi, và ships với Python. Tôi luôn có thể upgrade lên PostgreSQL sau nếu cần — interface là abstract đủ để swap.
# infra/memory.py
import sqlite3
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
class SQLiteMemory:
"""
Persistent memory store for agent interactions and domain events.
Stores:
- Conversation history (prompt/response pairs per agent per task)
- Domain events (for replay and audit)
- Agent artifacts (structured outputs)
"""
def __init__(self, db_path: str = "data/memory.db"):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self._create_tables()
def _create_tables(self) -> None:
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
prompt TEXT NOT NULL,
response TEXT NOT NULL,
token_count_in INTEGER DEFAULT 0,
token_count_out INTEGER DEFAULT 0,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS domain_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
event_name TEXT NOT NULL,
source_agent TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS artifacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
artifact_type TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_interactions_task
ON interactions(task_id, agent_id);
CREATE INDEX IF NOT EXISTS idx_events_task
ON domain_events(task_id);
CREATE INDEX IF NOT EXISTS idx_artifacts_task
ON artifacts(task_id, agent_id);
""")
self.conn.commit()
def save_interaction(
self,
task_id: str,
agent_id: str,
prompt: str,
response: str,
token_count_in: int = 0,
token_count_out: int = 0,
) -> None:
"""Save a prompt/response pair."""
self.conn.execute(
"""INSERT INTO interactions
(task_id, agent_id, prompt, response,
token_count_in, token_count_out, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
task_id, agent_id, prompt, response,
token_count_in, token_count_out,
datetime.now(timezone.utc).isoformat(),
),
)
self.conn.commit()
def get_history(
self,
task_id: str,
agent_id: str,
limit: int = 20,
) -> list[dict]:
"""Get conversation history for an agent on a task."""
rows = self.conn.execute(
"""SELECT prompt, response, created_at
FROM interactions
WHERE task_id = ? AND agent_id = ?
ORDER BY id DESC LIMIT ?""",
(task_id, agent_id, limit),
).fetchall()
return [dict(r) for r in reversed(rows)]
def save_event(
self,
task_id: str,
event_name: str,
source_agent: str,
payload: dict,
) -> None:
"""Persist a domain event."""
self.conn.execute(
"""INSERT INTO domain_events
(task_id, event_name, source_agent, payload, created_at)
VALUES (?, ?, ?, ?, ?)""",
(
task_id, event_name, source_agent,
json.dumps(payload),
datetime.now(timezone.utc).isoformat(),
),
)
self.conn.commit()
def get_events(
self,
task_id: str,
event_name: Optional[str] = None,
) -> list[dict]:
"""Get domain events, optionally filtered by name."""
if event_name:
rows = self.conn.execute(
"""SELECT * FROM domain_events
WHERE task_id = ? AND event_name = ?
ORDER BY id""",
(task_id, event_name),
).fetchall()
else:
rows = self.conn.execute(
"""SELECT * FROM domain_events
WHERE task_id = ? ORDER BY id""",
(task_id,),
).fetchall()
return [
{**dict(r), "payload": json.loads(r["payload"])}
for r in rows
]
def save_artifact(
self,
task_id: str,
agent_id: str,
artifact_type: str,
content: dict,
) -> None:
"""Save a structured artifact (requirement doc, spec, etc.)."""
self.conn.execute(
"""INSERT INTO artifacts
(task_id, agent_id, artifact_type, content, created_at)
VALUES (?, ?, ?, ?, ?)""",
(
task_id, agent_id, artifact_type,
json.dumps(content),
datetime.now(timezone.utc).isoformat(),
),
)
self.conn.commit()
def close(self) -> None:
self.conn.close()
Schema cố ý đơn giản. Ba bảng: interactions cho conversation history, domain_events cho event log, và artifacts cho structured outputs. Mỗi cái được indexed bởi task_id vì đó là primary access pattern — “hiển thị cho tôi mọi thứ đã xảy ra cho task này.”
7. ToolRegistry: Per-Agent Tool Isolation
Một trong những nguyên tắc từ Part 3 là agents chỉ nên có quyền truy cập vào các tools họ cần. Agent PO không nên có thể viết files. Agent SSE không nên có thể deploy. Điều này được enforce bởi ToolRegistry.
# infra/tools.py
import logging
from typing import Any, Callable
logger = logging.getLogger(__name__)
# Type alias for tool functions
ToolFn = Callable[..., str]
class ToolRegistry:
"""
Central registry for all tools available to agents.
Tools are registered globally, then filtered per-agent based
on the agent's config. An agent can only access tools that
are listed in its `tools` config.
"""
def __init__(self):
self._tools: dict[str, dict[str, Any]] = {}
def register(
self,
name: str,
fn: ToolFn,
description: str,
parameters: dict,
) -> None:
"""Register a tool globally."""
self._tools[name] = {
"name": name,
"fn": fn,
"description": description,
"parameters": parameters,
}
logger.info("Registered tool: %s", name)
def get_tools_for(
self, allowed_tool_names: list[str]
) -> list[dict[str, Any]]:
"""
Return tool definitions filtered to only what the agent
is allowed to use. Unknown tool names are logged and skipped.
"""
tools = []
for name in allowed_tool_names:
if name in self._tools:
tools.append(self._tools[name])
else:
logger.warning("Tool '%s' not found in registry", name)
return tools
def execute(self, name: str, **kwargs) -> str:
"""Execute a registered tool by name."""
if name not in self._tools:
raise KeyError(f"Tool '{name}' not registered")
return self._tools[name]["fn"](**kwargs)
def list_tools(self) -> list[str]:
"""List all registered tool names."""
return list(self._tools.keys())
def create_default_registry() -> ToolRegistry:
"""
Create a ToolRegistry with all standard tools registered.
Called once at startup.
"""
registry = ToolRegistry()
registry.register(
name="search_web",
fn=_search_web,
description="Search the web for information about a topic.",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
},
},
"required": ["query"],
},
)
registry.register(
name="create_document",
fn=_create_document,
description="Create a document and save it to the project.",
parameters={
"type": "object",
"properties": {
"title": {"type": "string"},
"content": {"type": "string"},
},
"required": ["title", "content"],
},
)
registry.register(
name="write_file",
fn=_write_file,
description="Write content to a file path in the project.",
parameters={
"type": "object",
"properties": {
"file_path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["file_path", "content"],
},
)
registry.register(
name="read_file",
fn=_read_file,
description="Read the content of a file.",
parameters={
"type": "object",
"properties": {
"file_path": {"type": "string"},
},
"required": ["file_path"],
},
)
registry.register(
name="run_tests",
fn=_run_tests,
description="Run the test suite and return results.",
parameters={
"type": "object",
"properties": {
"test_path": {
"type": "string",
"description": "Path to test file or directory",
},
},
"required": ["test_path"],
},
)
registry.register(
name="run_linter",
fn=_run_linter,
description="Run the linter on source files.",
parameters={
"type": "object",
"properties": {
"file_path": {"type": "string"},
},
"required": ["file_path"],
},
)
return registry
# ── Tool Implementations ──────────────────────────────────
# These are stubs. Real implementations come in later parts.
def _search_web(query: str) -> str:
"""Stub: web search. Replace with real API in production."""
return f"[search results for: {query}]"
def _create_document(title: str, content: str) -> str:
"""Stub: document creation."""
from pathlib import Path
path = Path(f"output/docs/{title.replace(' ', '_').lower()}.md")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
return f"Document saved: {path}"
def _write_file(file_path: str, content: str) -> str:
"""Stub: file writing."""
from pathlib import Path
path = Path(f"output/{file_path}")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
return f"File written: {path}"
def _read_file(file_path: str) -> str:
"""Stub: file reading."""
from pathlib import Path
path = Path(f"output/{file_path}")
if not path.exists():
return f"File not found: {path}"
return path.read_text()
def _run_tests(test_path: str) -> str:
"""Stub: test runner."""
return '{"total": 0, "passed": 0, "failed": 0, "skipped": 0}'
def _run_linter(file_path: str) -> str:
"""Stub: linter."""
return '{"errors": 0, "warnings": 0, "passed": true}'
Các tool implementations ở đây là stubs. Điều đó là cố ý. Trong Part 8, khi tôi xây dựng agent SSE, tôi sẽ thay thế _write_file và _run_tests bằng các implementations thực tế tạo các files thực tế và chạy pytest. Hiện tại, infrastructure là những gì quan trọng — khả năng để register tools, filter chúng per agent, và execute chúng qua một uniform interface.
8. LLM Client Wrapper
Một thin wrapper quanh Anthropic client xử lý environment setup và cung cấp một consistent interface.
# infra/llm.py
import os
import logging
from typing import Optional
from anthropic import Anthropic
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
load_dotenv()
def get_client() -> Anthropic:
"""Get a configured Anthropic client."""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise EnvironmentError(
"ANTHROPIC_API_KEY not set. Copy .env.example to .env"
)
return Anthropic(api_key=api_key)
def call_llm(
client: Anthropic,
model: str,
system: str,
messages: list[dict],
max_tokens: int = 4096,
temperature: float = 0.2,
tools: Optional[list[dict]] = None,
) -> dict:
"""
Unified LLM call with optional tool use.
Returns {"text": str, "tool_calls": list, "usage": dict}.
"""
kwargs = {
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
"system": system,
"messages": messages,
}
if tools:
kwargs["tools"] = tools
response = client.messages.create(**kwargs)
text_parts = []
tool_calls = []
for block in response.content:
if block.type == "text":
text_parts.append(block.text)
elif block.type == "tool_use":
tool_calls.append({
"id": block.id,
"name": block.name,
"input": block.input,
})
return {
"text": "\n".join(text_parts),
"tool_calls": tool_calls,
"usage": {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
"stop_reason": response.stop_reason,
}
9. The Entry Point: main.py
# main.py
import uuid
import logging
from datetime import datetime, timezone
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from config.loader import load_config
from domain.state import create_initial_state
from infra.memory import SQLiteMemory
from infra.tools import create_default_registry
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
console = Console()
def main():
console.print(
Panel(
"[bold blue]AI Software Team[/bold blue]\n"
"Multi-agent development pipeline",
border_style="blue",
)
)
# Load configuration
config = load_config()
console.print(
f"[green]Loaded config:[/green] "
f"{len(config['agents'])} agents configured"
)
# Initialize shared infrastructure
memory = SQLiteMemory()
tool_registry = create_default_registry()
console.print(
f"[green]Tools registered:[/green] "
f"{', '.join(tool_registry.list_tools())}"
)
# Get user brief
console.print("\n[bold]Enter your project brief[/bold] "
"(or 'demo' for a sample brief):\n")
brief = input("> ").strip()
if brief.lower() == "demo":
brief = (
"Build a task management API with user authentication, "
"CRUD operations for tasks and projects, team collaboration "
"features, and a simple dashboard. Use Python/FastAPI with "
"PostgreSQL. Should be deployable to AWS."
)
console.print(f"\n[dim]Using demo brief:[/dim] {brief}\n")
# Create initial state
task_id = str(uuid.uuid4())[:8]
state = create_initial_state(task_id=task_id, raw_brief=brief)
console.print(f"[green]Task created:[/green] {task_id}")
# In Parts 5-10, we build the LangGraph graph and run it.
# For now, demonstrate agent initialization.
from agents.base import BaseAgent
console.print("\n[bold]Infrastructure ready.[/bold]")
console.print(f" Memory DB: {memory.db_path}")
console.print(f" Tools: {tool_registry.list_tools()}")
console.print(f" State keys: {len(state)} fields initialized")
console.print(
f" Phase: {state['phase']} | "
f"Requirement state: {state['requirement_state']}"
)
console.print(
"\n[dim]Agent implementations come in Parts 5-10. "
"Run tests with: pytest tests/ -v[/dim]"
)
memory.close()
if __name__ == "__main__":
main()
Đây là một scaffold. Real graph construction xảy ra trong các parts sau khi tôi có concrete agents. Nhưng entry point đã functional — nó tải config, khởi tạo infrastructure, tạo state, và sẵn sàng nhận một brief.
10. Testing the Foundation
Tests cho infrastructure code không phải là tùy chọn. Các thành phần này được gọi bởi mỗi agent. Một bug ở đây lan truyền ở khắp nơi.
# tests/test_base_agent.py
import pytest
from unittest.mock import MagicMock, patch
from domain.state import create_initial_state, TeamState
from infra.memory import SQLiteMemory
from infra.tools import ToolRegistry, create_default_registry
class TestCreateInitialState:
def test_creates_state_with_all_keys(self):
state = create_initial_state(
task_id="test-001",
raw_brief="Build a todo app",
)
assert state["task_id"] == "test-001"
assert state["raw_brief"] == "Build a todo app"
assert state["phase"] == "requirement"
assert state["requirement_state"] == "received"
assert state["user_stories"] == []
assert state["implementation_attempts"] == 0
assert state["quality_gate_passed"] is False
assert state["schema_version"] == "1.0.0"
def test_default_quality_gates(self):
state = create_initial_state("t1", "brief")
gates = state["quality_gates"]
assert gates["min_test_coverage"] == 0.80
assert gates["lint_must_pass"] is True
def test_audit_trail_starts_empty(self):
state = create_initial_state("t1", "brief")
assert state["state_transitions"] == []
assert state["error_log"] == []
class TestSQLiteMemory:
@pytest.fixture
def memory(self, tmp_path):
db_path = str(tmp_path / "test.db")
mem = SQLiteMemory(db_path=db_path)
yield mem
mem.close()
def test_save_and_get_interaction(self, memory):
memory.save_interaction(
task_id="t1",
agent_id="po_agent",
prompt="What is the project?",
response="It is a todo app.",
)
history = memory.get_history("t1", "po_agent")
assert len(history) == 1
assert history[0]["prompt"] == "What is the project?"
assert history[0]["response"] == "It is a todo app."
def test_history_ordered_chronologically(self, memory):
for i in range(5):
memory.save_interaction(
task_id="t1",
agent_id="po_agent",
prompt=f"prompt-{i}",
response=f"response-{i}",
)
history = memory.get_history("t1", "po_agent")
assert len(history) == 5
assert history[0]["prompt"] == "prompt-0"
assert history[4]["prompt"] == "prompt-4"
def test_history_filtered_by_agent(self, memory):
memory.save_interaction("t1", "po_agent", "p1", "r1")
memory.save_interaction("t1", "ba_agent", "p2", "r2")
po_history = memory.get_history("t1", "po_agent")
ba_history = memory.get_history("t1", "ba_agent")
assert len(po_history) == 1
assert len(ba_history) == 1
def test_save_and_get_event(self, memory):
memory.save_event(
task_id="t1",
event_name="UserStoriesCreated",
source_agent="ba_agent",
payload={"story_count": 5},
)
events = memory.get_events("t1")
assert len(events) == 1
assert events[0]["event_name"] == "UserStoriesCreated"
assert events[0]["payload"]["story_count"] == 5
def test_filter_events_by_name(self, memory):
memory.save_event("t1", "EventA", "agent1", {})
memory.save_event("t1", "EventB", "agent2", {})
memory.save_event("t1", "EventA", "agent1", {"n": 2})
events = memory.get_events("t1", event_name="EventA")
assert len(events) == 2
class TestToolRegistry:
def test_register_and_list(self):
reg = ToolRegistry()
reg.register("tool_a", lambda: "a", "desc", {})
assert "tool_a" in reg.list_tools()
def test_get_tools_filters_by_allowed(self):
reg = ToolRegistry()
reg.register("tool_a", lambda: "a", "A", {})
reg.register("tool_b", lambda: "b", "B", {})
reg.register("tool_c", lambda: "c", "C", {})
tools = reg.get_tools_for(["tool_a", "tool_c"])
names = [t["name"] for t in tools]
assert names == ["tool_a", "tool_c"]
def test_execute_calls_function(self):
reg = ToolRegistry()
reg.register(
"adder", lambda x, y: str(x + y), "Add", {}
)
result = reg.execute("adder", x=2, y=3)
assert result == "5"
def test_execute_unknown_tool_raises(self):
reg = ToolRegistry()
with pytest.raises(KeyError):
reg.execute("nonexistent")
def test_default_registry_has_standard_tools(self):
reg = create_default_registry()
tools = reg.list_tools()
assert "search_web" in tools
assert "write_file" in tools
assert "read_file" in tools
assert "run_tests" in tools
assert "run_linter" in tools
assert "create_document" in tools
class TestConfigLoader:
def test_load_config(self, tmp_path):
config_content = """
project:
name: "Test"
default_model: "claude-sonnet-4-20250514"
max_retries: 2
retry_delay_seconds: 1
agents:
po_agent:
name: "Alex"
role: "PO"
ba_agent:
name: "Jordan"
role: "BA"
ta_agent:
name: "Sam"
role: "TA"
tl_agent:
name: "Morgan"
role: "TL"
sse_agent:
name: "Riley"
role: "SSE"
qc_agent:
name: "Casey"
role: "QC"
devops_agent:
name: "Drew"
role: "DevOps"
pm_agent:
name: "Taylor"
role: "PM"
"""
config_path = tmp_path / "agents.yaml"
config_path.write_text(config_content)
from config.loader import load_config, get_agent_config
config = load_config(str(config_path))
assert len(config["agents"]) == 8
po = get_agent_config(config, "po_agent")
assert po["name"] == "Alex"
assert po["model"] == "claude-sonnet-4-20250514"
assert po["max_retries"] == 2
Chạy chúng:
pytest tests/ -v
Những test này xác minh ba điều: state creation hoạt động và có các defaults hợp lý, memory persists và retrieves một cách chính xác, và tool registry filters tools per agent. Nếu bất kỳ cái nào bị lỗi, chúng tôi dừng lại và fix trước khi viết một single agent.
11. Putting It All Together
Hãy để tôi trace full flow để hiển thị cách các pieces kết nối. Đây là sequence xảy ra khi agent PO của Part 5 chạy:
main.pytảiagents.yamlvà tạo mộtToolRegistryvàSQLiteMemory.- Một user cung cấp một brief.
create_initial_state()xây dựngTeamStatevớiphase="requirement"vàrequirement_state="received". - LangGraph gọi node agent PO, gọi
po_agent.run(state). BaseAgent.run()đặtcurrent_agent, gọi_prepare_prompt(state)(PO-specific logic), sau đó_call_llm_with_retry().- Response LLM quay trở lại.
_parse_output()(PO-specific) trích xuất structured data và trả về updated state. - Interaction được lưu vào
SQLiteMemory. Một state transition được ghi chép. - LangGraph nhận updated state và route đến node tiếp theo.
Mỗi agent theo exact flow này. Những thứ duy nhất thay đổi là _prepare_prompt và _parse_output. Đó là sức mạnh của một well-designed base class.
12. Những Gì Tôi Xây Dựng và Những Gì Sắp Tới
Trong part này, tôi xây dựng năm thành phần nền tảng:
| Component | File | Purpose |
|---|---|---|
agents.yaml | config/agents.yaml | Configuration cho tất cả 8 agents |
BaseAgent | agents/base.py | Abstract base với LLM calls, retry, memory |
TeamState | domain/state.py | Shared state TypedDict với factory |
SQLiteMemory | infra/memory.py | Persistent conversation và event store |
ToolRegistry | infra/tools.py | Per-agent tool isolation |
Năm thành phần này không tạo ra bất kỳ visible output nào. Không có requirement documents, không có user stories, không có code. Chúng là pure infrastructure. Và chúng là lý do tại sao sáu parts tiếp theo có thể move nhanh — vì mỗi agent kế thừa từ BaseAgent, đọc từ TeamState, lưu trữ history trong SQLiteMemory, và truy cập tools thông qua ToolRegistry. Pattern được set. Contract được rõ ràng.
Trong Part 5, tôi xây dựng hai domain-specific agents đầu tiên: Product Owner (Alex) và Business Analyst (Jordan). Họ sẽ lấy một raw client brief và transform nó thành structured requirement documents và user stories. Đây là nơi hệ thống bắt đầu tạo ra các real artifacts.
Nền tảng được đặt. Thời gian để xây dựng trên nó.