Những agents của bạn có một vấn đề mà có thể bạn chưa nhận ra. Chúng là cá vàng.

Mỗi lần pipeline chạy, agent SSE viết code như thể nó chưa bao giờ viết code trước đây. Agent QC phát minh ra chiến lược kiểm thử từ đầu. Agent TA đưa ra quyết định kiến trúc mà không nhớ rằng nó đã đưa ra chính xác cùng một quyết định vào tuần trước — và quyết định đó hoá ra là sai.

Đây không phải là giới hạn của mô hình. GPT-4, Claude, Gemini — chúng đều có khả năng học từ context. Giới hạn là kiến trúc. Chúng tôi xây dựng một pipeline mà state sống trong TeamState, chảy qua đồ thị, và biến mất khi chạy kết thúc. Các agents có working memory nhưng không có long-term memory. Chúng có trí thông minh nhưng không có kinh nghiệm.

Trong các nhóm con người, kinh nghiệm là thứ tách biệt một lập trình viên junior khỏi một cái cao cấp. Không phải khả năng thô — kinh nghiệm. Lập trình viên cao cấp đã chứng kiến một trăm triển khai thất bại, một nghìn nhận xét đánh giá code, mười nghìn trường hợp biên. Họ không giải quyết vấn đề từ nguyên tắc cơ bản mỗi lần. Họ so khớp khuôn mẫu với lịch sử của họ, và họ trở nên nhanh hơn và chính xác hơn với mỗi dự án.

Bài viết này cung cấp cho agents của bạn cùng một khả năng. Chúng tôi sẽ xây dựng ba loại memory, một hệ thống reflection học từ mọi tác vụ hoàn thành, một thư viện skills tích lũy các khuôn mẫu được chứng minh, và một giao thức chia sẻ kiến thức giữa các agents cho phép bài học của một agent mang lợi ích cho toàn bộ nhóm.

Vào cuối, các agents của bạn sẽ trở nên tốt hơn một cách đo lường được với mỗi lần chạy.


1. Ba Loại Agent Memory

Khoa học nhận thức chia tách human memory thành ba danh mục ánh xạ một cách đáng chú ý tốt đến các hệ thống agent.

Working memory là những gì bạn đang suy nghĩ ngay bây giờ. Nó nhanh, có hạn chế về dung lượng, và tạm thời. Trong hệ thống của chúng tôi, đây là TeamState — dữ liệu chảy qua pipeline LangGraph trong một lần chạy. Messages, current task context, iteration counts, QC feedback. Nó tồn tại trong thời gian thực thi một pipeline và biến mất khi chạy hoàn tất.

Episodic memory là bản ghi của các trải nghiệm quá khứ của bạn. “Lần trước tôi sử dụng MongoDB cho một dự án có các truy vấn relational nặng, nó là một thảm họa.” Các episodic memories là cụ thể, ngữ cảnh, và có thể tìm kiếm theo tương tự. Trong hệ thống của chúng tôi, đây sẽ là một vector store ChromaDB nơi agents viết reflections sau khi hoàn thành các tác vụ. Khi một tác vụ tương tự đến trong tương lai, agent truy lục các trải nghiệm quá khứ có liên quan và sử dụng chúng để đưa ra quyết định tốt hơn.

Semantic memory là kiến thức chung của bạn — những sự kiện, skills, và khuôn mẫu trừu tượng hóa từ các trải nghiệm cụ thể. “Dữ liệu relational thuộc về PostgreSQL. Dữ liệu tài liệu thuộc về MongoDB.” Các semantic memories không được liên kết với các sự kiện cụ thể; chúng là những sự thật chưng cất. Trong hệ thống của chúng tôi, đây sẽ là một thư viện skills — bản ghi có cấu trúc của các khuôn mẫu được chứng minh với các điểm số confidence tăng khi bằng chứng tích lũy.

Three memory types: Working Memory (TeamState, ephemeral), Episodic Memory (ChromaDB, past experiences), Semantic Memory (Skills Library, learned abilities)

Dòng giữa ba loại này là có hướng. Working memory tạo ra nguyên liệu thô trong một lần chạy. Sau khi chạy, một bước reflection chưng cất nguyên liệu thô đó thành episodic memories. Theo thời gian, các khuôn mẫu định kỳ trong episodic memory được nâng cấp lên semantic memory — thư viện skills. Mỗi lớp chậm hơn để viết nhưng kéo dài hơn và có giá trị hơn.


2. Kho Lưu Trữ Memory: ChromaDB cho Episodic Memory

ChromaDB là một vector database mã nguồn mở lưu trữ các documents cùng với embeddings của chúng. Bạn cung cấp cho nó text, nó vectorhóa nó, và sau này bạn có thể truy vấn “tìm tôi các documents tương tự với text này.” Đó là chính xác những gì episodic memory cần — similarity-based retrieval trên một corpus ngày càng phát triển của các trải nghiệm quá khứ.

Đầu tiên, mô hình dữ liệu:

# memory/models.py
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum

class MemoryType(str, Enum):
    REFLECTION = "reflection"
    ERROR_PATTERN = "error_pattern"
    DECISION_OUTCOME = "decision_outcome"
    REVIEW_FEEDBACK = "review_feedback"

class AgentMemory(BaseModel):
    """A single episodic memory entry."""
    memory_id: str = Field(default_factory=lambda: str(uuid4()))
    agent_role: str                         # "sse", "qc", "ta", etc.
    memory_type: MemoryType
    task_id: str                            # which pipeline run produced this
    content: str                            # the actual memory text
    context: dict = Field(default_factory=dict)  # structured metadata
    created_at: datetime = Field(default_factory=datetime.utcnow)
    relevance_score: float = 0.0            # updated on retrieval

    @property
    def search_text(self) -> str:
        """Combined text used for embedding and similarity search."""
        return f"[{self.agent_role}] [{self.memory_type.value}] {self.content}"

Bây giờ kho lưu trữ chính nó:

# memory/store.py
import chromadb
from chromadb.config import Settings

class AgentMemoryStore:
    """Persistent episodic memory backed by ChromaDB."""

    def __init__(self, persist_dir: str = "./data/memory"):
        self.client = chromadb.PersistentClient(
            path=persist_dir,
            settings=Settings(anonymized_telemetry=False),
        )
        self.collection = self.client.get_or_create_collection(
            name="agent_memories",
            metadata={"hnsw:space": "cosine"},
        )

    def store(self, memory: AgentMemory) -> str:
        """Store a memory. Returns the memory ID."""
        self.collection.add(
            ids=[memory.memory_id],
            documents=[memory.search_text],
            metadatas=[{
                "agent_role": memory.agent_role,
                "memory_type": memory.memory_type.value,
                "task_id": memory.task_id,
                "created_at": memory.created_at.isoformat(),
            }],
        )
        return memory.memory_id

    def recall(
        self,
        query: str,
        agent_role: str | None = None,
        memory_type: MemoryType | None = None,
        n_results: int = 5,
    ) -> list[dict]:
        """Retrieve memories similar to the query."""
        where_filter = {}
        if agent_role:
            where_filter["agent_role"] = agent_role
        if memory_type:
            where_filter["memory_type"] = memory_type.value

        results = self.collection.query(
            query_texts=[query],
            n_results=n_results,
            where=where_filter if where_filter else None,
        )
        return [
            {
                "content": results["documents"][0][i],
                "distance": results["distances"][0][i],
                "metadata": results["metadatas"][0][i],
            }
            for i in range(len(results["documents"][0]))
        ]

    def forget_before(self, cutoff: datetime) -> int:
        """Remove memories older than cutoff. Returns count removed."""
        all_data = self.collection.get(
            where={"created_at": {"$lt": cutoff.isoformat()}},
        )
        if all_data["ids"]:
            self.collection.delete(ids=all_data["ids"])
        return len(all_data["ids"])

Phương thức recall lấy một truy vấn ngôn ngữ tự nhiên và trả lại các memories tương tự nhất. Khi agent SSE sắp triển khai một FastAPI endpoint, nó truy vấn “implementing REST API endpoint with authentication” và nhận lại các memories từ các lần chạy trước nơi nó đã làm điều gì đó tương tự — bao gồm những gì đã sai.


3. Task Reflection: Học Sau Mỗi Lần Chạy

Memory mà không có reflection chỉ là logging. Một log nói “test failed with exit code 1.” Một memory nói “the test failed because I forgot to mock the database connection, và đây là lần thứ ba tôi đã mắc lỗi đó với integration tests.”

Reflection xảy ra sau khi một tác vụ hoàn thành. Mỗi agent kiểm tra những gì nó đã tạo ra, những phản hồi nó nhận được, và kết quả là gì. Sau đó nó viết một structured reflection vào episodic memory.

# memory/reflection.py
from memory.models import AgentMemory, MemoryType
from memory.store import AgentMemoryStore
from langchain_anthropic import ChatAnthropic
from pydantic import BaseModel

class TaskReflection(BaseModel):
    """Structured reflection output from an agent."""
    what_went_well: str
    what_went_wrong: str
    key_decision: str
    decision_outcome: str           # "positive", "negative", "neutral"
    pattern_noticed: str | None = None
    improvement_for_next_time: str

REFLECTION_PROMPT = """You are {agent_role}, reflecting on a completed task.

## Task Summary
{task_summary}

## Your Output
{agent_output}

## Feedback Received
{feedback}

## Final Outcome
{outcome}

Reflect on this task honestly. What went well? What went wrong?
What was the key decision you made, and how did it turn out?
Do you notice any recurring pattern? What would you do differently next time?

Respond as JSON matching the TaskReflection schema."""


class ReflectionEngine:
    """Generates and stores post-task reflections."""

    def __init__(self, memory_store: AgentMemoryStore):
        self.memory = memory_store
        self.llm = ChatAnthropic(
            model="claude-sonnet-4-20250514",
            temperature=0.3,
        )

    async def reflect(
        self,
        agent_role: str,
        task_id: str,
        task_summary: str,
        agent_output: str,
        feedback: str,
        outcome: str,
    ) -> TaskReflection:
        """Generate a reflection and store it as episodic memory."""
        prompt = REFLECTION_PROMPT.format(
            agent_role=agent_role,
            task_summary=task_summary,
            agent_output=agent_output[:2000],  # truncate to fit context
            feedback=feedback,
            outcome=outcome,
        )

        response = await self.llm.ainvoke([{"role": "user", "content": prompt}])
        reflection = TaskReflection.model_validate_json(response.content)

        # Store the main reflection
        self.memory.store(AgentMemory(
            agent_role=agent_role,
            memory_type=MemoryType.REFLECTION,
            task_id=task_id,
            content=(
                f"Task: {task_summary}\n"
                f"Well: {reflection.what_went_well}\n"
                f"Wrong: {reflection.what_went_wrong}\n"
                f"Improve: {reflection.improvement_for_next_time}"
            ),
            context={
                "decision": reflection.key_decision,
                "decision_outcome": reflection.decision_outcome,
            },
        ))

        # If a pattern was noticed, store it separately
        if reflection.pattern_noticed:
            self.memory.store(AgentMemory(
                agent_role=agent_role,
                memory_type=MemoryType.ERROR_PATTERN,
                task_id=task_id,
                content=reflection.pattern_noticed,
            ))

        return reflection

Bước reflection được kết nối với pipeline sau cập nhật trạng thái cuối cùng của agent PM. Khi pipeline đạt đến giai đoạn "done", mọi agent đã tham gia đều có cơ hội để suy ngẫm:

# graph/reflection_node.py

async def reflection_node(state: TeamState) -> dict:
    """Post-pipeline reflection node. Runs after PM marks task complete."""
    engine = ReflectionEngine(memory_store=get_memory_store())

    agents_to_reflect = ["po", "ba", "ta", "qc", "sse", "tl"]

    for role in agents_to_reflect:
        output_key = f"{role}_output"
        if output_key not in state or not state[output_key]:
            continue

        await engine.reflect(
            agent_role=role,
            task_id=state["task_id"],
            task_summary=state.get("clarified_requirement", ""),
            agent_output=str(state[output_key])[:2000],
            feedback=state.get("qc_feedback", "No feedback"),
            outcome=state.get("phase", "unknown"),
        )

    return {"reflection_complete": True}

4. Thư Viện Skills: Semantic Memory

Episodic memories là những trải nghiệm thô. Chúng hữu ích cho việc recall, nhưng chúng nhiễu. Thư viện skills là lớp được tuyển chọn, có cấu trúc phía trên chúng — các khuôn mẫu được chứng minh đã được xác thực trên nhiều trải nghiệm.

# memory/skills.py
from pydantic import BaseModel, Field
from datetime import datetime
from uuid import uuid4

class Skill(BaseModel):
    """A learned skill with confidence tracking."""
    skill_id: str = Field(default_factory=lambda: str(uuid4()))
    name: str                               # "fastapi_auth_endpoint"
    description: str                        # what this skill does
    agent_role: str                         # which agent owns this
    category: str                           # "api_design", "testing", etc.
    template: str                           # the actual pattern/template
    confidence: float = 0.5                 # 0.0 to 1.0
    times_used: int = 0
    times_succeeded: int = 0
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)
    source_task_ids: list[str] = Field(default_factory=list)
    approved: bool = False                  # requires human approval

    @property
    def success_rate(self) -> float:
        if self.times_used == 0:
            return 0.0
        return self.times_succeeded / self.times_used

    def record_usage(self, succeeded: bool, task_id: str):
        """Update stats after the skill is used."""
        self.times_used += 1
        if succeeded:
            self.times_succeeded += 1
        self.source_task_ids.append(task_id)
        # Bayesian-style confidence update
        self.confidence = (
            (self.confidence * (self.times_used - 1) + (1.0 if succeeded else 0.0))
            / self.times_used
        )
        self.updated_at = datetime.utcnow()


class SkillsLibrary:
    """Persistent store for learned skills."""

    def __init__(self, db_path: str = "./data/skills.json"):
        self.db_path = db_path
        self.skills: dict[str, Skill] = {}
        self._load()

    def _load(self):
        import json; from pathlib import Path
        path = Path(self.db_path)
        if path.exists():
            data = json.loads(path.read_text())
            self.skills = {k: Skill.model_validate(v) for k, v in data.items()}

    def _save(self):
        import json; from pathlib import Path
        data = {k: v.model_dump(mode="json") for k, v in self.skills.items()}
        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
        Path(self.db_path).write_text(json.dumps(data, indent=2))

    def add_skill(self, skill: Skill) -> str:
        """Add a new skill. Returns skill ID."""
        self.skills[skill.skill_id] = skill
        self._save()
        return skill.skill_id

    def find_skills(
        self,
        agent_role: str | None = None,
        category: str | None = None,
        min_confidence: float = 0.0,
        approved_only: bool = True,
    ) -> list[Skill]:
        """Find skills matching criteria, sorted by confidence."""
        results = []
        for skill in self.skills.values():
            if agent_role and skill.agent_role != agent_role:
                continue
            if category and skill.category != category:
                continue
            if skill.confidence < min_confidence:
                continue
            if approved_only and not skill.approved:
                continue
            results.append(skill)
        return sorted(results, key=lambda s: s.confidence, reverse=True)

    def record_usage(self, skill_id: str, succeeded: bool, task_id: str):
        """Record that a skill was used and whether it succeeded."""
        if skill_id in self.skills:
            self.skills[skill_id].record_usage(succeeded, task_id)
            self._save()

    def promote_from_reflections(
        self,
        memory_store: AgentMemoryStore,
        agent_role: str,
        pattern_query: str,
        min_occurrences: int = 3,
    ) -> Skill | None:
        """
        Check if a pattern appears frequently enough in episodic memory
        to be promoted to a skill.
        """
        memories = memory_store.recall(
            query=pattern_query,
            agent_role=agent_role,
            memory_type=MemoryType.ERROR_PATTERN,
            n_results=10,
        )

        # Filter by similarity threshold
        relevant = [m for m in memories if m["distance"] < 0.3]

        if len(relevant) >= min_occurrences:
            skill = Skill(
                name=f"auto_{agent_role}_{len(self.skills)}",
                description=f"Pattern detected from {len(relevant)} similar experiences",
                agent_role=agent_role,
                category="auto_detected",
                template=pattern_query,
                confidence=0.5,
                approved=False,  # needs human review
                source_task_ids=[
                    m["metadata"]["task_id"] for m in relevant
                ],
            )
            self.add_skill(skill)
            return skill
        return None

Skills bắt đầu ở confidence 0.5. Mỗi sử dụng thành công đẩy confidence lên; mỗi lỗi đẩy nó xuống. Trên đủ các lần chạy, những skills đáng tin cậy nổi lên phía trên và những cái không đáng tin cậy chìm xuống. Một agent tìm kiếm hướng dẫn nhận được highest-confidence relevant skill trước tiên.

Cờ approved ngăn chặn hệ thống học các thói quen xấu. Các skills được tự động phát hiện bắt đầu không được phê duyệt. Một con người xem xét chúng trước khi chúng vào quay hoạt động. Một khuôn mẫu xuất hiện ba lần có thể tồn tại vì agent đã mắc cùng một lỗi ba lần, không phải vì nó tìm thấy một cách tiếp cận tốt.


5. Tiêm Memory Vào Agent Prompts

Memory tồn tại nhưng không bao giờ được tham khảo là vô dụng. Phần cuối cùng là bước truy lục — trước khi mỗi agent chạy, chúng tôi truy vấn cả episodic memory và thư viện skills, và tiêm context liên quan vào prompt của agent.

# memory/injection.py

class MemoryInjector:
    """Retrieves and formats memories for agent prompt injection."""

    def __init__(
        self,
        memory_store: AgentMemoryStore,
        skills_library: SkillsLibrary,
    ):
        self.memory = memory_store
        self.skills = skills_library

    def build_memory_context(
        self,
        agent_role: str,
        task_description: str,
        max_memories: int = 3,
        max_skills: int = 2,
    ) -> str:
        """Build a memory context block for prompt injection."""
        sections = []

        # 1. Retrieve relevant episodic memories
        memories = self.memory.recall(
            query=task_description,
            agent_role=agent_role,
            n_results=max_memories,
        )

        if memories:
            sections.append("## Relevant Past Experiences")
            for i, mem in enumerate(memories, 1):
                sections.append(f"{i}. {mem['content']}")

        # 2. Retrieve relevant skills
        skills = self.skills.find_skills(
            agent_role=agent_role,
            min_confidence=0.6,
            approved_only=True,
        )[:max_skills]

        if skills:
            sections.append("\n## Proven Skills Available")
            for skill in skills:
                sections.append(
                    f"- **{skill.name}** (confidence: {skill.confidence:.0%}): "
                    f"{skill.description}\n"
                    f"  Template: {skill.template[:300]}"
                )

        if not sections:
            return ""

        return (
            "\n---\n"
            "# Memory Context (from past runs)\n"
            "Use these past experiences and skills to inform your work. "
            "Do not follow them blindly — adapt to the current task.\n\n"
            + "\n".join(sections)
            + "\n---\n"
        )


def inject_memory_into_prompt(
    base_prompt: str,
    agent_role: str,
    task_description: str,
    injector: MemoryInjector,
) -> str:
    """Append memory context to an agent's base prompt."""
    memory_context = injector.build_memory_context(
        agent_role=agent_role,
        task_description=task_description,
    )
    if memory_context:
        return base_prompt + "\n" + memory_context
    return base_prompt

Sự tiêm xảy ra trong hàm agent node, ngay trước lệnh gọi LLM:

# Example: SSE agent node with memory injection

async def sse_node(state: TeamState) -> dict:
    injector = MemoryInjector(
        memory_store=get_memory_store(),
        skills_library=get_skills_library(),
    )

    base_prompt = SSE_SYSTEM_PROMPT  # the normal prompt from Part 7
    enhanced_prompt = inject_memory_into_prompt(
        base_prompt=base_prompt,
        agent_role="sse",
        task_description=state.get("clarified_requirement", ""),
        injector=injector,
    )

    # Now use enhanced_prompt instead of base_prompt
    llm = ChatAnthropic(model="claude-sonnet-4-20250514")
    response = await llm.ainvoke([
        {"role": "system", "content": enhanced_prompt},
        {"role": "user", "content": build_sse_task(state)},
    ])

    return {"sse_output": response.content}

Prompt mà agent SSE bây giờ thấy bao gồm không chỉ định nghĩa vai trò của nó và tác vụ hiện tại, mà còn “lần trước bạn xây dựng một API tương tự, bạn đã quên xác thực input trên PATCH endpoint” và “proven skill: always add request body validation middleware (confidence: 87%).“


6. Inter-Agent Knowledge Sharing

Agents học cá nhân, nhưng một số bài học áp dụng trên toàn bộ nhóm. Khi agent QC phát hiện rằng “endpoints mà không có rate limiting luôn luôn thất bại trong load testing,” kiến thức đó nên có sẵn cho agent TA khi nó thiết kế kiến trúc và cho agent SSE khi nó viết code.

Cơ chế chia sẻ sử dụng một mô hình proposal-and-approval:

# memory/sharing.py
from pydantic import BaseModel
from enum import Enum

class ShareStatus(str, Enum):
    PROPOSED = "proposed"
    APPROVED = "approved"
    REJECTED = "rejected"

class KnowledgeShare(BaseModel):
    """A knowledge item proposed for cross-agent sharing."""
    share_id: str
    source_agent: str                # who discovered this
    target_agents: list[str]         # who should receive it
    knowledge: str                   # the insight
    evidence_task_ids: list[str]     # supporting evidence
    status: ShareStatus = ShareStatus.PROPOSED
    rejection_reason: str | None = None


class KnowledgeBroker:
    """Manages cross-agent knowledge sharing with approval gates."""

    def __init__(self, skills_library: SkillsLibrary):
        self.skills = skills_library
        self.pending: list[KnowledgeShare] = []

    def propose(
        self,
        source_agent: str,
        target_agents: list[str],
        knowledge: str,
        evidence_task_ids: list[str],
    ) -> KnowledgeShare:
        """Propose knowledge for sharing. Requires approval."""
        share = KnowledgeShare(
            share_id=str(uuid4()),
            source_agent=source_agent,
            target_agents=target_agents,
            knowledge=knowledge,
            evidence_task_ids=evidence_task_ids,
        )
        self.pending.append(share)
        return share

    def approve(self, share_id: str) -> bool:
        """Approve a knowledge share and create skills for targets."""
        share = next(
            (s for s in self.pending if s.share_id == share_id), None
        )
        if not share:
            return False

        share.status = ShareStatus.APPROVED

        for target in share.target_agents:
            self.skills.add_skill(Skill(
                name=f"shared_from_{share.source_agent}",
                description=share.knowledge,
                agent_role=target,
                category="shared_knowledge",
                template=share.knowledge,
                confidence=0.6,  # starts with moderate confidence
                approved=True,   # already approved via sharing
                source_task_ids=share.evidence_task_ids,
            ))
        return True

    def reject(self, share_id: str, reason: str) -> bool:
        """Reject a knowledge share."""
        share = next(
            (s for s in self.pending if s.share_id == share_id), None
        )
        if not share:
            return False
        share.status = ShareStatus.REJECTED
        share.rejection_reason = reason
        return True

    def get_pending(self) -> list[KnowledgeShare]:
        """List all pending knowledge shares for human review."""
        return [s for s in self.pending if s.status == ShareStatus.PROPOSED]

Sau bước reflection, các agents có thể đề xuất chia sẻ kiến thức một cách tự động:

# In reflection_node, after reflecting:

async def propose_cross_agent_shares(
    reflection: TaskReflection,
    agent_role: str,
    task_id: str,
    broker: KnowledgeBroker,
):
    """If a reflection contains a broadly useful pattern, propose sharing."""
    if not reflection.pattern_noticed:
        return

    # Map agent roles to likely beneficiaries
    share_targets = {
        "qc": ["sse", "ta"],      # QC findings help SSE and TA
        "tl": ["sse", "qc"],      # TL review patterns help SSE and QC
        "sse": ["ta", "qc"],      # SSE impl patterns help TA and QC
        "ta": ["sse", "devops"],   # TA arch decisions help SSE and DevOps
    }

    targets = share_targets.get(agent_role, [])
    if targets:
        broker.propose(
            source_agent=agent_role,
            target_agents=targets,
            knowledge=reflection.pattern_noticed,
            evidence_task_ids=[task_id],
        )

Con người xem lại các chia sẻ chưa hoàn thành trong dashboard. “Agent QC nhận thấy rằng tất cả các APIs mà không có input validation thất bại trong các test cases. Chia sẻ với SSE và TA?” Phê duyệt, và cả hai agents đều nhận được một skill mới trong thư viện của họ.


7. Learning từ External Sources

Agents không nên chỉ học từ kinh nghiệm của chính họ. Một ResearchTool cho phép agents truy vấn tài liệu bên ngoài và các hướng dẫn best-practice trong quá trình thực thi.

# tools/research.py
from langchain_core.tools import tool
from langchain_community.utilities import GoogleSearchAPIWrapper

class ResearchTool:
    """Lets agents search for external best practices."""

    def __init__(self):
        self.search = GoogleSearchAPIWrapper(k=3)

    @tool
    async def research_best_practice(self, query: str) -> str:
        """Search for best practices on a topic. Use when encountering
        unfamiliar problems or verifying your approach."""
        results = self.search.results(query, num_results=3)
        formatted = [
            f"**{r['title']}**\n{r['snippet']}\nSource: {r['link']}"
            for r in results
        ]
        return "\n\n---\n\n".join(formatted) if formatted else "No results found."

Các agents TA và SSE nhận được công cụ này. Ràng buộc chính yếu: kết quả research là tạm thời. Chúng thông tin cho lần chạy hiện tại nhưng không nhập vào kho lưu trữ memory. Chỉ reflections và approved skills tồn tại. Điều này ngăn chặn hệ thống từ việc ghi nhớ các câu trả lời Stack Overflow ngẫu nhiên. Agent sử dụng research để đưa ra quyết định, kết quả được suy ngẫm, và chỉ các khuôn mẫu được chứng minh mới tốt nghiệp thành long-term memory.


8. Kết Nối Tất Cả Vào Graph

Đây là cách hệ thống memory hoàn chỉnh tích hợp với pipeline LangGraph từ các phần trước:

# graph/builder.py (updated with memory nodes)
from langgraph.graph import StateGraph, END

def build_team_graph() -> StateGraph:
    graph = StateGraph(TeamState)

    # ── Existing agent nodes (from Parts 5-9) ──
    graph.add_node("po", po_node)
    graph.add_node("ba", ba_node)
    graph.add_node("ta", ta_node)
    graph.add_node("qc", qc_node)
    graph.add_node("sse", sse_node)          # now uses memory injection
    graph.add_node("tl", tl_node)
    graph.add_node("devops", devops_node)
    graph.add_node("pm", pm_node)

    # ── New memory nodes ──
    graph.add_node("reflect", reflection_node)
    graph.add_node("skill_check", skill_promotion_node)

    # ── Existing edges (abbreviated) ──
    graph.set_entry_point("po")
    graph.add_edge("po", "ba")
    graph.add_edge("ba", "ta")
    graph.add_edge("ba", "qc")               # parallel fan-out
    graph.add_edge("ta", "sse")
    graph.add_edge("qc", "sse")
    graph.add_edge("sse", "tl")
    graph.add_conditional_edges("tl", tl_router)  # pass -> devops, fail -> sse
    graph.add_edge("devops", "pm")

    # ── Memory edges (post-pipeline) ──
    graph.add_edge("pm", "reflect")           # reflect after PM wraps up
    graph.add_edge("reflect", "skill_check")  # check for skill promotions
    graph.add_edge("skill_check", END)

    return graph.compile()


async def skill_promotion_node(state: TeamState) -> dict:
    """Check if any episodic patterns should be promoted to skills."""
    memory_store = get_memory_store()
    skills_lib = get_skills_library()

    # Check common pattern categories
    patterns_to_check = [
        ("sse", "input validation on API endpoints"),
        ("sse", "error handling in async functions"),
        ("qc", "edge cases in authentication flows"),
        ("ta", "database selection for relational data"),
    ]

    promoted = []
    for agent_role, pattern in patterns_to_check:
        skill = skills_lib.promote_from_reflections(
            memory_store=memory_store,
            agent_role=agent_role,
            pattern_query=pattern,
        )
        if skill:
            promoted.append(skill.name)

    return {"promoted_skills": promoted}

Pipeline bây giờ có một phần đuôi: sau khi PM khai báo tác vụ hoàn tất, nút reflection chạy, mọi agent viết các memories của nó, và nút skill promotion kiểm tra xem liệu có bất kỳ khuôn mẫu nào đã xảy ra đủ thường xuyên để trở thành skills hay không.


9. Đo Lường Sự Cải Thiện

“Các agents đang trở nên tốt hơn” không phải là một khẳng định hữu ích mà không có các chỉ số. Đây là cách chúng tôi theo dõi xem liệu hệ thống memory thực sự hoạt động hay không.

# metrics/improvement.py
from dataclasses import dataclass, field

@dataclass
class ImprovementMetrics:
    """Track agent improvement across pipeline runs."""
    task_id: str
    run_number: int

    # Quality metrics
    qc_pass_on_first_try: bool = False
    tl_review_pass_on_first: bool = False
    iteration_count: int = 0           # QC/TL rejection cycles

    # Memory metrics
    memories_recalled: int = 0          # how many memories were injected
    skills_applied: int = 0             # how many skills were used
    new_reflections: int = 0            # reflections written this run

    # Efficiency metrics
    total_llm_calls: int = 0
    total_tokens_used: int = 0
    pipeline_duration_seconds: float = 0.0


def compute_improvement_trend(
    metrics_history: list[ImprovementMetrics],
) -> dict:
    """Compute improvement trends over the last N runs."""
    if len(metrics_history) < 2:
        return {"status": "insufficient_data"}

    recent = metrics_history[-10:]  # last 10 runs

    first_half = recent[: len(recent) // 2]
    second_half = recent[len(recent) // 2 :]

    def avg(items, attr):
        vals = [getattr(i, attr) for i in items]
        return sum(vals) / len(vals) if vals else 0

    return {
        "first_try_pass_rate": {
            "early": avg(first_half, "qc_pass_on_first_try"),
            "recent": avg(second_half, "qc_pass_on_first_try"),
        },
        "avg_iterations": {
            "early": avg(first_half, "iteration_count"),
            "recent": avg(second_half, "iteration_count"),
        },
        "avg_tokens": {
            "early": avg(first_half, "total_tokens_used"),
            "recent": avg(second_half, "total_tokens_used"),
        },
        "memories_used": {
            "early": avg(first_half, "memories_recalled"),
            "recent": avg(second_half, "memories_recalled"),
        },
    }

Các chỉ số bạn muốn thấy xu hướng theo hướng đúng:

  1. First-try pass rate nên tăng. Nếu agents đang học từ những thất bại quá khứ, chúng nên tạo ra công việc vượt qua kiểm tra QC và TL lần đầu tiên thường xuyên hơn.
  2. Iteration count nên giảm. Ít chu kỳ rejection có nghĩa là các agents đang tạo ra output chất lượng cao hơn từ đầu.
  3. Token usage nên ổn định hoặc giảm. Better skills có nghĩa là ít fumbling, đó là ít tokens retry lãng phí.
  4. Memories recalled nên tăng sớm và sau đó ghi nhận. Ban đầu không có memories. Sau 10-20 lần chạy, cửa hàng được điền vào. Sau 50 lần chạy, các memories đúng đang được truy lục một cách nhất quán.

Theo dõi những thứ này qua các lần chạy và vẽ chúng. Nếu các đường phẳng, hệ thống memory của bạn không hoạt động. Nếu first-try pass rate đang tăng và iteration count đang giảm, các agents của bạn thực sự đang học.


10. Những Xem Xét Thực Tế

Memory Hygiene

Không phải tất cả các memories đều đáng giữ. Một reflection nói “mọi thứ đều tốt, không có vấn đề gì” thêm nhiễu mà không có tín hiệu. Lọc reflections trước khi lưu trữ:

def is_meaningful_reflection(reflection: TaskReflection) -> bool:
    """Only store reflections that contain actionable information."""
    if reflection.what_went_wrong == "Nothing" and not reflection.pattern_noticed:
        return False
    if len(reflection.improvement_for_next_time) < 20:
        return False  # too vague to be useful
    return True

Memory Decay

Các memories cũ trở nên ít phù hợp hơn. Một reflection về một khuôn mẫu Python 3.10 ít hữu ích hơn khi dự án đã chuyển sang 3.12. Thực hiện dọn dẹp định kỳ:

# Run monthly
from datetime import timedelta

def cleanup_old_memories(store: AgentMemoryStore, max_age_days: int = 90):
    cutoff = datetime.utcnow() - timedelta(days=max_age_days)
    removed = store.forget_before(cutoff)
    print(f"Removed {removed} memories older than {max_age_days} days")

Cost Control

Mỗi truy lục memory là một lệnh gọi embedding. Mỗi reflection là một lệnh gọi LLM. Cho một nhóm 6 agents suy ngẫm trên mỗi lần chạy, đó là 6 lệnh gọi LLM bổ sung trên mỗi thực thi pipeline. Sử dụng một mô hình nhỏ hơn, rẻ hơn cho reflections (Claude Haiku hoặc GPT-4o-mini) và batch embedding calls khi có thể.

The Cold Start Problem

Trên lần chạy đầu tiên, không có memories và không có skills. Hệ thống hoạt động giống hệt với phiên bản non-memory từ các phần trước. Điều này tốt. Hệ thống memory là một cải thiện phụ, không phải một phụ thuộc. Pipeline hoạt động mà không có memories; nó hoạt động tốt hơn với chúng.

Để tăng tốc độ cold start, bạn có thể hạt giống thư viện skills với các best practices đã biết trước lần chạy đầu tiên:

def seed_initial_skills(library: SkillsLibrary):
    """Pre-load common skills to avoid cold start."""
    library.add_skill(Skill(
        name="api_input_validation",
        description="Always validate request bodies with Pydantic models",
        agent_role="sse",
        category="api_design",
        template="Use Pydantic BaseModel for all request/response schemas",
        confidence=0.8,
        approved=True,
    ))
    library.add_skill(Skill(
        name="test_isolation",
        description="Each test case must be independent and idempotent",
        agent_role="qc",
        category="testing",
        template="Use fixtures for setup/teardown. Never depend on test order.",
        confidence=0.9,
        approved=True,
    ))

Điều Chúng Tôi Xây Dựng

Bài viết này thêm ba lớp vào hệ thống agent:

  1. Episodic memory thông qua ChromaDB — agents lưu trữ reflections sau mỗi tác vụ và truy lục các trải nghiệm tương tự trước các tác vụ trong tương lai.
  2. Semantic memory thông qua Skills Library — các khuôn mẫu định kỳ được nâng cấp lên skills với các điểm số confidence và các cánh cổng phê duyệt con người.
  3. Knowledge sharing thông qua KnowledgeBroker — bài học của một agent có thể được chia sẻ với phần còn lại của nhóm thông qua quy trình proposal-and-approval.

Kết quả là một hệ thống thực sự cải thiện theo thời gian. Không phải thông qua fine-tuning mô hình hoặc prompt engineering, mà thông qua kinh nghiệm tích lũy — cơ chế tương tự làm cho các nhóm con người trở nên tốt hơn với thực hành.

Trong Phần 11, chúng tôi sẽ giải quyết câu chuyện triển khai: containerizing toàn bộ hệ thống multi-agent, thiết lập cơ sở hạ tầng orchestration, và chạy pipeline trong production với observability thích hợp. Các agents rất thông minh. Bây giờ chúng tôi cần chúng đáng tin cậy.

Xuất nội dung

Bình luận