Lần đầu tiên tôi chạy toàn bộ pipeline từ đầu đến cuối, tôi nhìn vào terminal cuộn JSON trong mười hai phút. PO thì xong rồi. BA có lẽ cũng xong. QC và TA đang chạy song song, hoặc có thể một cái đã xong và cái kia vẫn còn chạy. SSE có thể đang viết code hoặc bị mắc trong một vòng lặp retry. Tôi không biết gì cả. Hệ thống là một hộp đen với con trỏ nhấp nháy.
Tôi đã thêm các lệnh print(). Sau đó tôi thêm timestamps. Tiếp theo tôi thêm output có màu sắc bằng Rich. Cuối cùng tôi từ bỏ và xây dựng một dashboard.
Dashboard đó đã thay đổi cách tôi làm việc với hệ thống nhiều hơn bất kỳ cải tiến agent nào khác. Không phải vì nó làm cho các agent thông minh hơn — nó không làm vậy. Nhưng vì khả năng nhìn thấy thay đổi hành vi. Khi bạn có thể thấy rằng agent SSE đã tiêu tốn token trong bốn phút cho một hàm mà nó chỉ nên mất ba mươi giây, bạn sẽ can thiệp. Khi bạn có thể thấy rằng 60% ngân sách của bạn đã được tiêu tốn bởi agent TA vì một prompt bị ràng buộc kém, bạn sẽ sửa prompt. Khi bạn có thể thấy một checkpoint đang chờ phê duyệt cùng với toàn bộ ngữ cảnh của những gì agent đã tạo ra, bạn sẽ đưa ra những quyết định phê duyệt tốt hơn.
Đây không phải một dashboard vô dụng. Đây là một bảng điều khiển vận hành.
Trong bài viết này, chúng tôi xây dựng toàn bộ: mô hình sự kiện, kho lưu trữ bền vững, theo dõi chi phí, bảng Kanban, phát trực tiếp token, và phê duyệt checkpoint của con người. Tất cả trong Streamlit, tất cả dưới 400 dòng Python.
1. Tại Sao Bạn Cần Một Dashboard
Hãy để tôi cụ thể về những vấn đề mà dashboard giải quyết.
Vấn đề 1: Trạng thái bị ẩn. Pipeline của bạn có tám agent. Mỗi agent có một chu kỳ đời: chờ đợi, đang chạy, chờ đầu vào, hoàn thành, thất bại. Không có dashboard, cách duy nhất để biết trạng thái của agent là đọc logs. Logs là tuần tự. Thực thi agent thì không. Bạn kết thúc bằng cách sử dụng grep để tìm timestamps và tái tạo trạng thái máy từ output văn bản. Điều này không mở rộng được quá hai agent.
Vấn đề 2: Chi phí không được theo dõi. Mọi lệnh gọi LLM đều tốn tiền. Claude Sonnet, GPT-4o, bất kỳ cái gì bạn đang sử dụng — bộ đếm đang chạy. Vấn đề không phải AI đắt (nó có giá rất rẻ trên mỗi tác vụ). Vấn đề là bạn không có khả năng hiển thị về nơi tiền đi. Là 80% ngân sách của bạn được tiêu tốn bởi agent SSE viết code, hay bởi agent PO đặt các câu hỏi làm rõ trong một vòng lặp retry? Không có theo dõi chi phí trên mỗi agent, bạn không thể tối ưu hóa.
**Vấn đề 3: Sự cố checkpoint. ** Trong Phần 8, chúng tôi đã xây dựng các checkpoint của con người — những cổng nơi hệ thống tạm dừng và chờ phê duyệt của con người trước khi tiếp tục. Trong một terminal, điều này có nghĩa là hệ thống in output và chờ bạn gõ “approve” hoặc “reject.” Vấn đề là ngữ cảnh. Để đưa ra quyết định phê duyệt tốt, bạn cần xem những gì agent đã tạo ra, những gì nó được yêu cầu tạo ra, và cách nó phù hợp với tiến trình chung. Một terminal không cho bạn bất kỳ ngữ cảnh nào mà không cuộn.
Vấn đề 4: Không có chế độ xem cấp dự án. Các log agent riêng lẻ cho bạn biết một agent đang làm gì. Chúng không cho bạn biết dự án đứng ở đâu. Một bảng Kanban — backlog, đang tiến hành, hoàn thành — cho bạn chế độ xem đó ngay lập tức.
2. Mô Hình AgentEvent
Mọi thông tin mà dashboard hiển thị đều đến từ một nguồn duy nhất: các sự kiện. Khi một agent bắt đầu, nó phát ra một sự kiện. Khi nó hoàn thành, sự kiện. Khi token phát trực tiếp, sự kiện. Khi đạt được checkpoint, sự kiện. Dashboard chỉ là một lớp chế độ xem trên một luồng sự kiện.
Chúng tôi đã có các domain event từ Phần 3. Bây giờ chúng tôi chính thức hóa chúng thành một mô hình mà dashboard có thể truy vấn.
# dashboard/models.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
class EventType(str, Enum):
AGENT_STARTED = "agent_started"
AGENT_COMPLETED = "agent_completed"
AGENT_FAILED = "agent_failed"
TOKENS_USED = "tokens_used"
CHECKPOINT_REACHED = "checkpoint_reached"
CHECKPOINT_RESOLVED = "checkpoint_resolved"
TASK_MOVED = "task_moved"
TOKEN_STREAM = "token_stream"
class TaskStatus(str, Enum):
BACKLOG = "backlog"
IN_PROGRESS = "in_progress"
DONE = "done"
BLOCKED = "blocked"
@dataclass
class AgentEvent:
event_type: EventType
agent_name: str
agent_role: str
timestamp: datetime = field(default_factory=datetime.utcnow)
data: dict[str, Any] = field(default_factory=dict)
event_id: str = field(default_factory=lambda: __import__('uuid').uuid4().hex)
def to_dict(self) -> dict:
return {
"event_id": self.event_id,
"event_type": self.event_type.value,
"agent_name": self.agent_name,
"agent_role": self.agent_role,
"timestamp": self.timestamp.isoformat(),
"data": self.data,
}
Quyết định thiết kế chính: các sự kiện là bất biến và chỉ nối thêm. Dashboard không bao giờ sửa đổi các sự kiện. Nó đọc chúng, tập hợp chúng, và hiển thị các chế độ xem. Điều này có nghĩa là chúng tôi có thể phát lại các sự kiện để tái tạo bất kỳ chế độ xem thời điểm nào của hệ thống — hữu ích cho gỡ lỗi và phân tích hậu kỳ.
3. EventStore: Lưu Trữ SQLite
Chúng tôi cần một nơi nào đó để lưu trữ các sự kiện. SQLite là lựa chọn đúng ở đây: không cần cấu hình, dựa trên tệp, xử lý các đọc đồng thời đủ tốt cho một dashboard với người dùng đơn, và phần mở rộng json xử lý trường data của chúng tôi một cách tự nhiên.
# dashboard/event_store.py
import json
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional
from .models import AgentEvent, EventType, TaskStatus
class EventStore:
def __init__(self, db_path: str = "events.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with self._conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
agent_name TEXT NOT NULL,
agent_role TEXT NOT NULL,
timestamp TEXT NOT NULL,
data TEXT NOT NULL DEFAULT '{}'
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_events_type
ON events(event_type)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_events_agent
ON events(agent_name)
""")
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def append(self, event: AgentEvent):
with self._conn() as conn:
conn.execute(
"""INSERT OR IGNORE INTO events
(event_id, event_type, agent_name, agent_role, timestamp, data)
VALUES (?, ?, ?, ?, ?, ?)""",
(
event.event_id,
event.event_type.value,
event.agent_name,
event.agent_role,
event.timestamp.isoformat(),
json.dumps(event.data),
),
)
def get_events(
self,
event_type: Optional[EventType] = None,
agent_name: Optional[str] = None,
since: Optional[datetime] = None,
limit: int = 500,
) -> list[dict]:
query = "SELECT * FROM events WHERE 1=1"
params: list = []
if event_type:
query += " AND event_type = ?"
params.append(event_type.value)
if agent_name:
query += " AND agent_name = ?"
params.append(agent_name)
if since:
query += " AND timestamp > ?"
params.append(since.isoformat())
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
with self._conn() as conn:
rows = conn.execute(query, params).fetchall()
return [
{**dict(row), "data": json.loads(row["data"])} for row in rows
]
def get_latest_by_agent(self) -> dict[str, dict]:
"""Return the most recent event for each agent."""
with self._conn() as conn:
rows = conn.execute("""
SELECT e.* FROM events e
INNER JOIN (
SELECT agent_name, MAX(timestamp) as max_ts
FROM events
GROUP BY agent_name
) latest ON e.agent_name = latest.agent_name
AND e.timestamp = latest.max_ts
""").fetchall()
return {
row["agent_name"]: {**dict(row), "data": json.loads(row["data"])}
for row in rows
}
def get_pending_checkpoints(self) -> list[dict]:
"""Return checkpoints that have not been resolved."""
with self._conn() as conn:
reached = conn.execute("""
SELECT * FROM events
WHERE event_type = 'checkpoint_reached'
ORDER BY timestamp DESC
""").fetchall()
resolved_ids = {
row["data"]
for row in conn.execute("""
SELECT data FROM events
WHERE event_type = 'checkpoint_resolved'
""").fetchall()
}
results = []
for row in reached:
data = json.loads(row["data"])
checkpoint_id = data.get("checkpoint_id", "")
if checkpoint_id not in {
json.loads(r).get("checkpoint_id") for r in resolved_ids
}:
results.append({**dict(row), "data": data})
return results
EventStore cố ý đơn giản. Không ORM, không framework migrations, không connection pooling. Đây là một dashboard cho một công cụ phát triển cục bộ, không phải một SaaS sản xuất. Nếu bạn cần mở rộng điều này, hãy thay thế SQLite bằng PostgreSQL và thêm connection pooling — giao diện vẫn giữ nguyên.
4. CostTracker: Biết Tiền Của Bạn Đi Đâu
Đây là lớp tôi ước tôi đã xây dựng vào ngày đầu tiên. CostTracker đọc các sự kiện sử dụng token từ kho lưu trữ và tính toán chi phí bằng cách sử dụng bảng giá cho mỗi mô hình.
# dashboard/cost_tracker.py
from dataclasses import dataclass
from .event_store import EventStore
from .models import EventType
# Prices per 1M tokens (USD) as of early 2026
MODEL_PRICING: dict[str, dict[str, float]] = {
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-haiku-3.5": {"input": 0.80, "output": 4.00},
"claude-opus-4-20250918": {"input": 15.00, "output": 75.00},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"deepseek-v3": {"input": 0.27, "output": 1.10},
}
@dataclass
class AgentCost:
agent_name: str
agent_role: str
input_tokens: int = 0
output_tokens: int = 0
total_cost: float = 0.0
call_count: int = 0
class CostTracker:
def __init__(self, store: EventStore, budget_limit: float = 5.00):
self.store = store
self.budget_limit = budget_limit
def _calc_cost(
self, model: str, input_tokens: int, output_tokens: int
) -> float:
pricing = MODEL_PRICING.get(model, {"input": 3.0, "output": 15.0})
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
def get_costs_by_agent(self) -> list[AgentCost]:
events = self.store.get_events(event_type=EventType.TOKENS_USED)
agents: dict[str, AgentCost] = {}
for event in events:
name = event["agent_name"]
if name not in agents:
agents[name] = AgentCost(
agent_name=name, agent_role=event["agent_role"]
)
ac = agents[name]
data = event["data"]
inp = data.get("input_tokens", 0)
out = data.get("output_tokens", 0)
model = data.get("model", "claude-sonnet-4-20250514")
ac.input_tokens += inp
ac.output_tokens += out
ac.total_cost += self._calc_cost(model, inp, out)
ac.call_count += 1
return sorted(agents.values(), key=lambda a: a.total_cost, reverse=True)
def get_total_cost(self) -> float:
return sum(a.total_cost for a in self.get_costs_by_agent())
def get_budget_pct(self) -> float:
total = self.get_total_cost()
return min((total / self.budget_limit) * 100, 100.0)
def is_over_budget(self) -> bool:
return self.get_total_cost() >= self.budget_limit
Có một vài điều cần lưu ý. Thứ nhất, bảng giá là một từ điển đơn giản. Giá mô hình thay đổi thường xuyên — bạn sẽ cập nhật cái này. Thứ hai, giá rơi lại mặc định giả định các tỷ lệ Claude Sonnet. Nếu một sự kiện đến với một mô hình không xác định, bạn sẽ nhận được một ước tính hợp lý thay vì một lỗi. Thứ ba, budget_limit mặc định là 5 đô la. Đối với một lần chạy pipeline đầy đủ với các mô hình lớp Sonnet, một dự án điển hình chi phí 2-4 đô la. Nếu bạn đang sử dụng Opus, nhân với 5x.
Phát Sự Kiện Chi Phí Từ Các Agent Của Bạn
Để cấp dữ liệu cho CostTracker, các agent của bạn cần phát ra các sự kiện TOKENS_USED sau mỗi lệnh gọi LLM. Dưới đây là cách tích hợp này với BaseAgent từ Phần 4:
# Inside BaseAgent._call_llm()
async def _call_llm(self, messages, **kwargs):
response = await self.client.messages.create(
model=self.model,
messages=messages,
**kwargs
)
# Emit token usage event
self.emit_event(AgentEvent(
event_type=EventType.TOKENS_USED,
agent_name=self.name,
agent_role=self.role,
data={
"model": self.model,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
))
return response
5. Streamlit Dashboard: Ứng Dụng Hoàn Chỉnh
Bây giờ là sự kiện chính. Dashboard là một ứng dụng Streamlit duy nhất với bố cục ba cột: trạng thái agent hoạt động ở bên trái, bảng Kanban ở giữa, bộ theo dõi chi phí ở bên phải.
# dashboard/app.py
import streamlit as st
import time
from datetime import datetime, timedelta
from .event_store import EventStore
from .cost_tracker import CostTracker
from .models import EventType, TaskStatus, AgentEvent
# ── Page config ────────────────────────────────────────────────
st.set_page_config(
page_title="AI Team Dashboard",
page_icon="⚙",
layout="wide",
initial_sidebar_state="collapsed",
)
# ── Shared state ───────────────────────────────────────────────
store = EventStore()
tracker = CostTracker(store)
AGENT_ROLES = {
"Alex": "Product Owner",
"Jamie": "Business Analyst",
"Sam": "QC Engineer",
"Casey": "Tech Architect",
"Jordan": "Sr. SW Engineer",
"Riley": "Tech Lead",
"Morgan": "DevOps Engineer",
"Taylor": "Project Manager",
}
def get_agent_status(agent_name: str, latest: dict) -> str:
if agent_name not in latest:
return "idle"
event = latest[agent_name]
etype = event["event_type"]
if etype == EventType.AGENT_STARTED.value:
return "running"
elif etype == EventType.AGENT_COMPLETED.value:
return "done"
elif etype == EventType.AGENT_FAILED.value:
return "failed"
elif etype == EventType.CHECKPOINT_REACHED.value:
return "checkpoint"
return "idle"
STATUS_COLORS = {
"idle": "#64748b",
"running": "#10b981",
"done": "#6366f1",
"failed": "#ef4444",
"checkpoint": "#f59e0b",
}
# ── Custom CSS ─────────────────────────────────────────────────
st.markdown("""
<style>
.stApp { background-color: #0f172a; }
.kanban-card {
background: #1e293b;
border-radius: 8px;
padding: 10px 12px;
margin-bottom: 8px;
border-left: 3px solid;
}
.kanban-card .title {
color: #e2e8f0;
font-size: 13px;
font-weight: 600;
margin-bottom: 4px;
}
.kanban-card .meta {
color: #64748b;
font-size: 11px;
}
.kanban-col-header {
color: #94a3b8;
font-size: 12px;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 1px;
padding-bottom: 8px;
border-bottom: 2px solid #334155;
margin-bottom: 12px;
}
.cost-bar {
background: #1e293b;
border-radius: 4px;
height: 20px;
margin: 4px 0;
position: relative;
}
.cost-bar-fill {
height: 100%;
border-radius: 4px;
transition: width 0.3s ease;
}
.metric-box {
background: #1e293b;
border-radius: 8px;
padding: 16px;
border: 1px solid #334155;
}
.stream-box {
background: #0a0f1a;
border: 1px solid #334155;
border-radius: 6px;
padding: 12px;
font-family: monospace;
font-size: 12px;
color: #6ee7b7;
max-height: 200px;
overflow-y: auto;
}
</style>
""", unsafe_allow_html=True)
# ── Layout ─────────────────────────────────────────────────────
st.markdown("### AI Team Dashboard")
col_agent, col_kanban, col_cost = st.columns([1, 2, 1])
latest_events = store.get_latest_by_agent()
# ── Column 1: Active Agent ─────────────────────────────────────
with col_agent:
st.markdown("**Active Agent**")
# Find the currently running agent
active_agent = None
for name, event in latest_events.items():
if event["event_type"] == EventType.AGENT_STARTED.value:
active_agent = name
break
if active_agent:
role = AGENT_ROLES.get(active_agent, "Unknown")
st.markdown(f"""
<div class="metric-box">
<span style="color:#a5b4fc;font-size:18px;font-weight:700">
{active_agent}
</span>
<br/>
<span style="color:#94a3b8;font-size:13px">{role}</span>
<br/><br/>
<span style="background:#10b981;color:#0f172a;padding:2px 10px;
border-radius:4px;font-size:11px;font-weight:600">
RUNNING
</span>
</div>
""", unsafe_allow_html=True)
# Live token stream
st.markdown("**Live Output**")
stream_events = store.get_events(
event_type=EventType.TOKEN_STREAM,
agent_name=active_agent,
limit=20,
)
stream_text = "\n".join(
e["data"].get("chunk", "") for e in reversed(stream_events)
)
st.markdown(
f'<div class="stream-box">{stream_text or "Waiting..."}</div>',
unsafe_allow_html=True,
)
# Elapsed time
started_events = store.get_events(
event_type=EventType.AGENT_STARTED,
agent_name=active_agent,
limit=1,
)
if started_events:
started_at = datetime.fromisoformat(started_events[0]["timestamp"])
elapsed = datetime.utcnow() - started_at
mins = int(elapsed.total_seconds() // 60)
secs = int(elapsed.total_seconds() % 60)
st.markdown(f"""
<div class="metric-box" style="margin-top:12px">
<span style="color:#94a3b8;font-size:11px">Elapsed</span><br/>
<span style="color:#e2e8f0;font-family:monospace;font-size:20px">
{mins:02d}:{secs:02d}
</span>
</div>
""", unsafe_allow_html=True)
else:
st.info("No agent currently running.")
# ── Checkpoint approval UI ──────────────────────────────────
st.markdown("---")
st.markdown("**Checkpoints**")
pending = store.get_pending_checkpoints()
if pending:
for cp in pending:
data = cp["data"]
cp_id = data.get("checkpoint_id", "unknown")
agent = cp["agent_name"]
summary = data.get("summary", "Review required")
st.warning(f"Checkpoint: {agent} -- {summary}")
c1, c2 = st.columns(2)
with c1:
if st.button(
"Approve", key=f"approve_{cp_id}", type="primary"
):
store.append(AgentEvent(
event_type=EventType.CHECKPOINT_RESOLVED,
agent_name=agent,
agent_role=AGENT_ROLES.get(agent, ""),
data={
"checkpoint_id": cp_id,
"decision": "approved",
},
))
st.rerun()
with c2:
if st.button("Reject", key=f"reject_{cp_id}"):
feedback = st.text_input(
"Feedback", key=f"feedback_{cp_id}"
)
store.append(AgentEvent(
event_type=EventType.CHECKPOINT_RESOLVED,
agent_name=agent,
agent_role=AGENT_ROLES.get(agent, ""),
data={
"checkpoint_id": cp_id,
"decision": "rejected",
"feedback": feedback,
},
))
st.rerun()
else:
st.markdown(
'<span style="color:#64748b;font-size:12px">'
"No pending checkpoints</span>",
unsafe_allow_html=True,
)
# ── Column 2: Kanban Board ─────────────────────────────────────
with col_kanban:
st.markdown("**Kanban Board**")
def get_tasks() -> dict[str, list[dict]]:
"""Build task lists from events."""
board: dict[str, list[dict]] = {
"backlog": [],
"in_progress": [],
"done": [],
}
for name, role in AGENT_ROLES.items():
status = get_agent_status(name, latest_events)
task = {
"agent": name,
"role": role,
"status": status,
"description": _task_description(name, latest_events),
}
if status == "done":
board["done"].append(task)
elif status in ("running", "checkpoint"):
board["in_progress"].append(task)
else:
board["backlog"].append(task)
return board
def _task_description(name: str, latest: dict) -> str:
if name not in latest:
return "Waiting"
data = latest[name].get("data", {})
return data.get("task_summary", AGENT_ROLES.get(name, ""))
def render_card(task: dict, border_color: str) -> str:
return f"""
<div class="kanban-card" style="border-left-color:{border_color}">
<div class="title">{task['role']}</div>
<div class="meta">{task['description']}</div>
<div class="meta" style="margin-top:4px;color:{border_color}">
{task['agent']}
</div>
</div>
"""
board = get_tasks()
k1, k2, k3 = st.columns(3)
with k1:
st.markdown(
'<div class="kanban-col-header">Backlog</div>',
unsafe_allow_html=True,
)
for task in board["backlog"]:
st.markdown(
render_card(task, "#64748b"), unsafe_allow_html=True
)
if not board["backlog"]:
st.markdown(
'<span style="color:#475569;font-size:11px">Empty</span>',
unsafe_allow_html=True,
)
with k2:
st.markdown(
'<div class="kanban-col-header" style="border-color:#10b981">'
"In Progress</div>",
unsafe_allow_html=True,
)
for task in board["in_progress"]:
st.markdown(
render_card(task, "#10b981"), unsafe_allow_html=True
)
if not board["in_progress"]:
st.markdown(
'<span style="color:#475569;font-size:11px">Empty</span>',
unsafe_allow_html=True,
)
with k3:
st.markdown(
'<div class="kanban-col-header" style="border-color:#6366f1">'
"Done</div>",
unsafe_allow_html=True,
)
for task in board["done"]:
st.markdown(
render_card(task, "#6366f1"), unsafe_allow_html=True
)
if not board["done"]:
st.markdown(
'<span style="color:#475569;font-size:11px">Empty</span>',
unsafe_allow_html=True,
)
# ── Column 3: Cost Tracker ─────────────────────────────────────
with col_cost:
st.markdown("**Cost Tracker**")
total_cost = tracker.get_total_cost()
budget_pct = tracker.get_budget_pct()
budget_color = "#10b981" if budget_pct < 75 else (
"#f59e0b" if budget_pct < 90 else "#ef4444"
)
# Total spend
st.markdown(f"""
<div class="metric-box">
<span style="color:#94a3b8;font-size:11px">Total Spend</span><br/>
<span style="color:#fbbf24;font-family:monospace;font-size:28px;
font-weight:700">${total_cost:.2f}</span>
<span style="color:#64748b;font-size:12px">
/ ${tracker.budget_limit:.2f}
</span>
</div>
""", unsafe_allow_html=True)
# Budget bar
st.markdown(f"""
<div style="margin:12px 0">
<div class="cost-bar">
<div class="cost-bar-fill"
style="width:{budget_pct}%;background:{budget_color}">
</div>
</div>
<span style="color:#94a3b8;font-size:11px">
{budget_pct:.1f}% of budget
</span>
</div>
""", unsafe_allow_html=True)
if tracker.is_over_budget():
st.error("Budget limit reached. Pipeline paused.")
# Per-agent costs
st.markdown("**Cost by Agent**")
agent_costs = tracker.get_costs_by_agent()
max_cost = max((a.total_cost for a in agent_costs), default=1.0) or 1.0
for ac in agent_costs:
bar_width = (ac.total_cost / max_cost) * 100
st.markdown(f"""
<div style="margin:6px 0">
<div style="display:flex;justify-content:space-between">
<span style="color:#cbd5e1;font-size:12px">
{ac.agent_name} ({ac.agent_role})
</span>
<span style="color:#94a3b8;font-family:monospace;font-size:11px">
${ac.total_cost:.3f}
</span>
</div>
<div class="cost-bar">
<div class="cost-bar-fill"
style="width:{bar_width}%;background:#6366f1">
</div>
</div>
<span style="color:#475569;font-size:10px">
{ac.input_tokens:,} in / {ac.output_tokens:,} out
-- {ac.call_count} calls
</span>
</div>
""", unsafe_allow_html=True)
if not agent_costs:
st.markdown(
'<span style="color:#475569;font-size:12px">'
"No cost data yet</span>",
unsafe_allow_html=True,
)
# Token summary
total_input = sum(a.input_tokens for a in agent_costs)
total_output = sum(a.output_tokens for a in agent_costs)
st.markdown(f"""
<div class="metric-box" style="margin-top:16px">
<span style="color:#94a3b8;font-size:11px">Token Usage</span><br/>
<span style="color:#64748b;font-family:monospace;font-size:12px">
Input: {total_input:>8,}<br/>
Output: {total_output:>8,}<br/>
</span>
<span style="color:#e2e8f0;font-family:monospace;font-size:12px;
font-weight:600">
Total: {total_input + total_output:>8,}
</span>
</div>
""", unsafe_allow_html=True)
# ── Auto-refresh ───────────────────────────────────────────────
REFRESH_INTERVAL = 3 # seconds
time.sleep(REFRESH_INTERVAL)
st.rerun()
Đó là toàn bộ ứng dụng dashboard. Hãy để tôi đi qua các quyết định thiết kế.
6. Quyết Định Thiết Kế
Auto-refresh: Vòng lặp time.sleep(3) + st.rerun() là cơ chế thời gian thực đơn giản nhất. Streamlit chạy lại toàn bộ tập lệnh trên mọi tương tác dù sao — cái này chỉ thêm polling. Để phát trực tiếp WebSocket thực sự, hãy sử dụng st.write_stream(), nhưng polling đơn giản hơn và đủ cho một công cụ cục bộ.
HTML thô trên các thành phần Streamlit: Các thẻ Kanban và các thanh chi phí là HTML thô. Các thành phần tích hợp của Streamlit (st.metric, st.progress) có kiểu dáng hạn chế. HTML thô cung cấp kiểm soát toàn bộ trên chủ đề tối, đường viền thẻ và bố cục nhỏ gọn. Sự đánh đổi: các phần tử HTML thô không tương tác như các widget Streamlit, nhưng đối với các phần tử hiển thị, điều đó rất tốt.
Tách dựa trên sự kiện: Dashboard nhập zero mã agent. Nó chỉ đọc các sự kiện từ SQLite. Điều này có nghĩa là bạn có thể chạy dashboard mà không có pipeline (xem xét các lần chạy quá khứ), thay thế các triển khai agent một cách tự do, phát lại các sự kiện để gỡ lỗi hoặc sao chép tệp SQLite sang một máy khác.
Luồng checkpoint: Nhấp vào Phê duyệt ghi một sự kiện CHECKPOINT_RESOLVED với decision: "approved". Pipeline thăm dò kho sự kiện cho các sự kiện giải quyết. Từ chối bao gồm một đầu vào văn bản cho phản hồi, được lưu trữ trong sự kiện và chuyển lại cho agent để sửa đổi — cùng một mô hình con người trong vòng lặp từ Phần 8, nhưng với giao diện thích hợp.
7. Đấu Dây Pipeline Với Dashboard
Các agent của bạn cần phát ra các sự kiện vào những thời điểm thích hợp. Dưới đây là mô hình tích hợp cho BaseAgent:
# agents/base_agent.py (additions)
class BaseAgent:
def __init__(self, name, role, event_store: EventStore, **kwargs):
# ... existing init ...
self.event_store = event_store
async def run(self, state):
self.event_store.append(AgentEvent(
event_type=EventType.AGENT_STARTED,
agent_name=self.name,
agent_role=self.role,
data={"task_summary": self._get_task_summary(state)},
))
try:
result = await self._execute(state)
self.event_store.append(AgentEvent(
event_type=EventType.AGENT_COMPLETED,
agent_name=self.name,
agent_role=self.role,
data={"task_summary": "Completed"},
))
return result
except Exception as e:
self.event_store.append(AgentEvent(
event_type=EventType.AGENT_FAILED,
agent_name=self.name,
agent_role=self.role,
data={"error": str(e)},
))
raise
Để phát trực tiếp token — hiển thị output của LLM khi nó tạo ra — bạn phát ra các sự kiện TOKEN_STREAM từ callback phát trực tiếp:
async def _stream_llm(self, messages, **kwargs):
async with self.client.messages.stream(
model=self.model, messages=messages, **kwargs
) as stream:
full_response = ""
async for chunk in stream.text_stream:
full_response += chunk
self.event_store.append(AgentEvent(
event_type=EventType.TOKEN_STREAM,
agent_name=self.name,
agent_role=self.role,
data={"chunk": chunk, "full_text": full_response[-200:]},
))
return full_response
Lưu ý trường full_text chỉ giữ 200 ký tự cuối cùng. Điều này ngăn chặn cơ sở dữ liệu SQLite phát triển không bị ràng buộc trong các lần chạy tạo dài.
8. Chạy Dashboard
Cài Đặt
pip install streamlit
Đó là sự phụ thuộc bổ sung duy nhất. Dashboard sử dụng SQLite (tích hợp sẵn trong Python), dataclasses (thư viện tiêu chuẩn) và Streamlit.
Cấu Trúc Thư Mục
project/
dashboard/
__init__.py
app.py
models.py
event_store.py
cost_tracker.py
agents/
base_agent.py
...
events.db # created automatically
Khởi Chạy
# Terminal 1: Run the pipeline
python -m agents.run_pipeline --brief "Build a task manager app"
# Terminal 2: Launch the dashboard
streamlit run dashboard/app.py --server.port 8501
Dashboard mở tại http://localhost:8501. Nó tự động làm mới mỗi ba giây. Bạn sẽ thấy các agent xuất hiện trên bảng Kanban khi chúng bắt đầu và hoàn thành. Chi phí tích lũy theo thời gian thực. Các checkpoint xuất hiện với các nút phê duyệt khi đạt được.
Chạy Với Dữ Liệu Demo
Để kiểm tra mà không cần pipeline trực tiếp, hãy seed kho sự kiện bằng các sự kiện mẫu:
# scripts/seed_demo_events.py
from dashboard.event_store import EventStore
from dashboard.models import AgentEvent, EventType
from datetime import datetime, timedelta
store = EventStore()
now = datetime.utcnow()
# Completed agents
for name, role, mins, summary in [
("Alex", "Product Owner", 10, "Requirements doc complete"),
("Jamie", "Business Analyst", 8, "12 user stories defined"),
("Sam", "QC Engineer", 5, "38 test cases written"),
("Casey", "Tech Architect", 5, "Architecture spec finalized"),
]:
store.append(AgentEvent(
event_type=EventType.AGENT_COMPLETED,
agent_name=name, agent_role=role,
timestamp=now - timedelta(minutes=mins),
data={"task_summary": summary},
))
# Currently running agent
store.append(AgentEvent(
event_type=EventType.AGENT_STARTED,
agent_name="Jordan", agent_role="Sr. SW Engineer",
timestamp=now - timedelta(minutes=2),
data={"task_summary": "Implementing auth module"},
))
# Token usage for cost tracking
for name, role, inp, out in [
("Alex", "Product Owner", 8200, 3400),
("Jamie", "Business Analyst", 12500, 5800),
("Jordan", "Sr. SW Engineer", 18400, 8200),
]:
store.append(AgentEvent(
event_type=EventType.TOKENS_USED,
agent_name=name, agent_role=role,
data={"model": "claude-sonnet-4-20250514",
"input_tokens": inp, "output_tokens": out},
))
# Pending checkpoint
store.append(AgentEvent(
event_type=EventType.CHECKPOINT_REACHED,
agent_name="Jordan", agent_role="Sr. SW Engineer",
data={"checkpoint_id": "cp-sse-001",
"summary": "Auth module ready for review"},
))
print(f"Seeded events into {store.db_path}")
python scripts/seed_demo_events.py && streamlit run dashboard/app.py
9. Điều Bạn Thấy Khi Nó Chạy
Một lần chạy pipeline điển hình diễn ra như thế này trên dashboard: Alex (PO) xuất hiện trong “Đang Tiến Hành,” phát trực tiếp tài liệu yêu cầu, đạt một checkpoint. Bạn phê duyệt. Alex chuyển sang “Xong.” Jamie (BA) tiếp quản, tạo các user story, hoàn thành. Sau đó Sam (QC) và Casey (TA) xuất hiện trong “Đang Tiến Hành” cùng lúc — quạt fan ra từ Phần 6. Cả hai hoàn thành trong vài giây lẫn nhau. Jordan (SSE) bắt đầu giai đoạn dài nhất, phát trực tiếp mã thực tế. Bộ theo dõi chi phí cho thấy agent SSE tiêu tốn 60% ngân sách. Riley (TL) xem xét, Morgan (DevOps) cấu hình CI/CD. Mười hai phút, tám thẻ trong “Xong,” tổng chi phí $2.47. Sự khác biệt giữa xem quá trình này diễn ra trên dashboard so với nhìn vào output terminal là sự khác biệt giữa lái xe với kính chắn gió và lái xe với mắt bị nhắm.
10. Mở Rộng Dashboard
Dashboard cơ sở bao gồm những điều cơ bản. Dưới đây là ba tiện ích mở rộng đáng xây dựng:
Chế độ xem timeline. Thêm một timeline kiểu Gantt cho thấy khi nào mỗi agent bắt đầu và hoàn thành. Điều này giúp bạn xác định các điểm nghẽn. Nếu agent SSE mất 4 phút nhưng QC mất 30 giây, bạn biết nơi cần tối ưu hóa. Streamlit không có biểu đồ Gantt gốc, nhưng st.plotly_chart với hình nhân vật timeline của Plotly hoạt động tốt.
Tab nhật ký sự kiện. Thêm một tab thứ hai với danh sách cuộn tất cả các sự kiện, mới nhất trước. Đây là chế độ xem gỡ lỗi của bạn. Khi có gì đó không ổn, nhật ký sự kiện cho bạn biết chính xác điều gì xảy ra, theo thứ tự nào, với dữ liệu nào.
Phép chiếu chi phí. Dựa trên quỹ đạo chi phí của lần chạy hiện tại, dự báo tổng chi phí nếu các agent còn lại tiêu tốn token ở tỷ lệ trung bình. Hiển thị cảnh báo nếu chi phí dự báo vượt quá ngân sách. Đây là toán học đơn giản: projected = current_cost + (remaining_agents * avg_cost_per_agent).
Sắp Tới
Dashboard cung cấp cho bạn mắt trên hệ thống. Nhưng mắt chỉ là chưa đủ — bạn cũng cần biết liệu hệ thống có khỏe mạnh trên các lần chạy, liệu các agent có suy giảm theo thời gian, và cách triển khai toàn bộ điều này.
Trong Phần 12, chúng tôi mang lại tất cả: triển khai, kiểm tra từ đầu đến cuối, và bài học từ việc chạy hệ thống này trong sản xuất. Đó là bài viết cuối cùng trong loạt bài.
Những Điều Rút Ra Chính
-
Các sự kiện là nền tảng. Mọi tính năng dashboard — Kanban, chi phí, checkpoint, phát trực tiếp — đều xuất phát từ một luồng đơn của các đối tượng
AgentEvent. Thiết kế các sự kiện của bạn tốt và dashboard tự viết. -
Khả năng nhìn thấy chi phí thay đổi hành vi. Khi bạn có thể thấy rằng một agent tiêu tốn 60% ngân sách của bạn, bạn sửa prompt. Không có khả năng nhìn thấy, bạn sẽ không bao giờ biết.
-
Checkpoint cần ngữ cảnh. Một lời nhắc terminal yêu cầu “approve? y/n” là chưa đủ. Dashboard cho thấy những gì đã được tạo ra, ai đã tạo ra nó, và nơi nó phù hợp trong pipeline. Ngữ cảnh tốt hơn dẫn đến các quyết định của con người tốt hơn.
-
Sự tách biệt quan trọng. Dashboard nhập zero mã agent. Nó đọc các sự kiện từ SQLite. Điều này có nghĩa là bạn có thể chạy, phát lại và gỡ lỗi độc lập với pipeline.
-
Bắt đầu đơn giản. Ba cột, thẻ HTML thô, một vòng lặp polling. Không WebSocket, không React frontend, không GraphQL API. Bạn có thể thêm độ phức tạp sau nếu cần. Bạn có lẽ sẽ không.