The first time I ran the full pipeline end-to-end, I stared at a terminal scrolling JSON for twelve minutes. I had no idea which agents were done, which were running, or whether the SSE was writing code or stuck in a retry loop. The system was a black box with a blinking cursor.

I added print() statements. Then timestamps. Then Rich colored output. Then I gave up and built a dashboard.

That dashboard changed how I work with the system more than any single agent improvement. Visibility changes behavior. When you can see the SSE consuming tokens for four minutes on a function that should take thirty seconds, you intervene. When you see 60% of your budget going to a poorly constrained prompt, you fix it. When a checkpoint shows the full context of what an agent produced, you make better approval decisions.

In this article, we build the entire thing: event model, persistent store, cost tracking, Kanban board, live token streaming, and human checkpoint approval. All in Streamlit.


1. Why You Need a Dashboard

Let me be specific about the problems a dashboard solves.

Invisible state. Eight agents, each with its own lifecycle. Without a dashboard, you reconstruct state from grep-ing log timestamps. That does not scale past two agents.

Untracked costs. The meter is running on every LLM call. Without per-agent cost tracking, you cannot tell if 80% of your budget goes to the SSE writing code or the PO stuck in a retry loop.

Checkpoint friction. Terminal-based approval (“approve? y/n”) lacks context. You need to see what was produced and where it fits in the pipeline to make good decisions.

No project-level view. Logs show what one agent is doing. A Kanban board shows where the project stands.

Dashboard wireframe: three-column layout with active agent panel, Kanban board, and cost tracker

2. The AgentEvent Model

Every piece of information the dashboard displays comes from a single source: events. When an agent starts, it emits an event. When it finishes, event. When tokens stream in, event. When a checkpoint is reached, event. The dashboard is just a view layer over a stream of events.

We already have domain events from Part 3. Now we formalize them into a model the dashboard can query.

# dashboard/models.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any


class EventType(str, Enum):
    AGENT_STARTED = "agent_started"
    AGENT_COMPLETED = "agent_completed"
    AGENT_FAILED = "agent_failed"
    TOKENS_USED = "tokens_used"
    CHECKPOINT_REACHED = "checkpoint_reached"
    CHECKPOINT_RESOLVED = "checkpoint_resolved"
    TASK_MOVED = "task_moved"
    TOKEN_STREAM = "token_stream"


class TaskStatus(str, Enum):
    BACKLOG = "backlog"
    IN_PROGRESS = "in_progress"
    DONE = "done"
    BLOCKED = "blocked"


@dataclass
class AgentEvent:
    event_type: EventType
    agent_name: str
    agent_role: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    data: dict[str, Any] = field(default_factory=dict)
    event_id: str = field(default_factory=lambda: __import__('uuid').uuid4().hex)

    def to_dict(self) -> dict:
        return {
            "event_id": self.event_id,
            "event_type": self.event_type.value,
            "agent_name": self.agent_name,
            "agent_role": self.agent_role,
            "timestamp": self.timestamp.isoformat(),
            "data": self.data,
        }

The key design decision: events are immutable and append-only. The dashboard never modifies events. It reads them, aggregates them, and renders views. This means we can replay events to reconstruct any point-in-time view of the system — useful for debugging and post-mortem analysis.


3. EventStore: SQLite Persistence

We need somewhere to store events. SQLite is the right choice here: zero configuration, file-based, handles concurrent reads well enough for a single-user dashboard, and the json extension handles our data field natively.

# dashboard/event_store.py
import json
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional

from .models import AgentEvent, EventType, TaskStatus


class EventStore:
    def __init__(self, db_path: str = "events.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with self._conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS events (
                    event_id TEXT PRIMARY KEY,
                    event_type TEXT NOT NULL,
                    agent_name TEXT NOT NULL,
                    agent_role TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    data TEXT NOT NULL DEFAULT '{}'
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_events_type
                ON events(event_type)
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_events_agent
                ON events(agent_name)
            """)

    def _conn(self) -> sqlite3.Connection:
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        return conn

    def append(self, event: AgentEvent):
        with self._conn() as conn:
            conn.execute(
                """INSERT OR IGNORE INTO events
                   (event_id, event_type, agent_name, agent_role, timestamp, data)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                (
                    event.event_id,
                    event.event_type.value,
                    event.agent_name,
                    event.agent_role,
                    event.timestamp.isoformat(),
                    json.dumps(event.data),
                ),
            )

    def get_events(self, event_type=None, agent_name=None,
                   since=None, limit=500) -> list[dict]:
        query, params = "SELECT * FROM events WHERE 1=1", []
        if event_type:
            query += " AND event_type = ?"; params.append(event_type.value)
        if agent_name:
            query += " AND agent_name = ?"; params.append(agent_name)
        if since:
            query += " AND timestamp > ?"; params.append(since.isoformat())
        query += " ORDER BY timestamp DESC LIMIT ?"; params.append(limit)
        with self._conn() as conn:
            rows = conn.execute(query, params).fetchall()
        return [{**dict(r), "data": json.loads(r["data"])} for r in rows]

    def get_latest_by_agent(self) -> dict[str, dict]:
        with self._conn() as conn:
            rows = conn.execute("""
                SELECT e.* FROM events e INNER JOIN (
                    SELECT agent_name, MAX(timestamp) as max_ts
                    FROM events GROUP BY agent_name
                ) latest ON e.agent_name = latest.agent_name
                    AND e.timestamp = latest.max_ts
            """).fetchall()
        return {r["agent_name"]: {**dict(r), "data": json.loads(r["data"])}
                for r in rows}

    def get_pending_checkpoints(self) -> list[dict]:
        with self._conn() as conn:
            reached = conn.execute(
                "SELECT * FROM events WHERE event_type='checkpoint_reached'"
                " ORDER BY timestamp DESC").fetchall()
            resolved = {json.loads(r["data"]).get("checkpoint_id")
                        for r in conn.execute(
                "SELECT data FROM events"
                " WHERE event_type='checkpoint_resolved'").fetchall()}
        return [{**dict(r), "data": json.loads(r["data"])} for r in reached
                if json.loads(r["data"]).get("checkpoint_id") not in resolved]

The EventStore is intentionally simple. No ORM, no migrations, no connection pooling. For production scale, swap SQLite for PostgreSQL — the interface stays the same.


4. CostTracker: Know Where Your Money Goes

This is the class I wish I had built on day one. The CostTracker reads token usage events from the store and calculates costs using the pricing table for each model.

# dashboard/cost_tracker.py
from dataclasses import dataclass
from .event_store import EventStore
from .models import EventType


# Prices per 1M tokens (USD) as of early 2026
MODEL_PRICING: dict[str, dict[str, float]] = {
    "claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
    "claude-haiku-3.5": {"input": 0.80, "output": 4.00},
    "claude-opus-4-20250918": {"input": 15.00, "output": 75.00},
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "deepseek-v3": {"input": 0.27, "output": 1.10},
}


@dataclass
class AgentCost:
    agent_name: str
    agent_role: str
    input_tokens: int = 0
    output_tokens: int = 0
    total_cost: float = 0.0
    call_count: int = 0


class CostTracker:
    def __init__(self, store: EventStore, budget_limit: float = 5.00):
        self.store = store
        self.budget_limit = budget_limit

    def _calc_cost(
        self, model: str, input_tokens: int, output_tokens: int
    ) -> float:
        pricing = MODEL_PRICING.get(model, {"input": 3.0, "output": 15.0})
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        return input_cost + output_cost

    def get_costs_by_agent(self) -> list[AgentCost]:
        events = self.store.get_events(event_type=EventType.TOKENS_USED)
        agents: dict[str, AgentCost] = {}
        for event in events:
            name = event["agent_name"]
            if name not in agents:
                agents[name] = AgentCost(
                    agent_name=name, agent_role=event["agent_role"]
                )
            ac = agents[name]
            data = event["data"]
            inp = data.get("input_tokens", 0)
            out = data.get("output_tokens", 0)
            model = data.get("model", "claude-sonnet-4-20250514")
            ac.input_tokens += inp
            ac.output_tokens += out
            ac.total_cost += self._calc_cost(model, inp, out)
            ac.call_count += 1
        return sorted(agents.values(), key=lambda a: a.total_cost, reverse=True)

    def get_total_cost(self) -> float:
        return sum(a.total_cost for a in self.get_costs_by_agent())

    def get_budget_pct(self) -> float:
        total = self.get_total_cost()
        return min((total / self.budget_limit) * 100, 100.0)

    def is_over_budget(self) -> bool:
        return self.get_total_cost() >= self.budget_limit

The pricing table is a simple dictionary — update it when model pricing changes. Unknown models fall back to Sonnet rates. The default budget_limit of $5 covers a typical full pipeline run with Sonnet-class models ($2-4). For Opus, multiply by 5x.

Emitting Cost Events from Your Agents

To feed the CostTracker, your agents need to emit TOKENS_USED events after each LLM call. Here is how that integrates with the BaseAgent from Part 4:

# Inside BaseAgent._call_llm()
async def _call_llm(self, messages, **kwargs):
    response = await self.client.messages.create(
        model=self.model,
        messages=messages,
        **kwargs
    )
    # Emit token usage event
    self.emit_event(AgentEvent(
        event_type=EventType.TOKENS_USED,
        agent_name=self.name,
        agent_role=self.role,
        data={
            "model": self.model,
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
        }
    ))
    return response

5. Streamlit Dashboard: The Full Application

Now the main event. The dashboard is a single Streamlit application with a three-column layout: active agent status on the left, Kanban board in the center, cost tracker on the right.

# dashboard/app.py
import streamlit as st
import time
from datetime import datetime, timedelta

from .event_store import EventStore
from .cost_tracker import CostTracker
from .models import EventType, TaskStatus, AgentEvent


# ── Page config ────────────────────────────────────────────────
st.set_page_config(
    page_title="AI Team Dashboard",
    page_icon="⚙",
    layout="wide",
    initial_sidebar_state="collapsed",
)

# ── Shared state ───────────────────────────────────────────────
store = EventStore()
tracker = CostTracker(store)

AGENT_ROLES = {
    "Alex": "Product Owner",
    "Jamie": "Business Analyst",
    "Sam": "QC Engineer",
    "Casey": "Tech Architect",
    "Jordan": "Sr. SW Engineer",
    "Riley": "Tech Lead",
    "Morgan": "DevOps Engineer",
    "Taylor": "Project Manager",
}


def get_agent_status(agent_name: str, latest: dict) -> str:
    if agent_name not in latest:
        return "idle"
    event = latest[agent_name]
    etype = event["event_type"]
    if etype == EventType.AGENT_STARTED.value:
        return "running"
    elif etype == EventType.AGENT_COMPLETED.value:
        return "done"
    elif etype == EventType.AGENT_FAILED.value:
        return "failed"
    elif etype == EventType.CHECKPOINT_REACHED.value:
        return "checkpoint"
    return "idle"


STATUS_COLORS = {
    "idle": "#64748b",
    "running": "#10b981",
    "done": "#6366f1",
    "failed": "#ef4444",
    "checkpoint": "#f59e0b",
}


# ── Custom CSS (dark theme) ─────────────────────────────────────
st.markdown("""
<style>
    .stApp { background-color: #0f172a; }
    .kanban-card { background:#1e293b; border-radius:8px; padding:10px 12px;
                   margin-bottom:8px; border-left:3px solid; }
    .kanban-card .title { color:#e2e8f0; font-size:13px; font-weight:600; }
    .kanban-card .meta { color:#64748b; font-size:11px; }
    .kanban-col-header { color:#94a3b8; font-size:12px; font-weight:700;
        text-transform:uppercase; letter-spacing:1px; padding-bottom:8px;
        border-bottom:2px solid #334155; margin-bottom:12px; }
    .cost-bar { background:#1e293b; border-radius:4px; height:20px; margin:4px 0; }
    .cost-bar-fill { height:100%; border-radius:4px; }
    .metric-box { background:#1e293b; border-radius:8px; padding:16px;
                  border:1px solid #334155; }
    .stream-box { background:#0a0f1a; border:1px solid #334155; border-radius:6px;
        padding:12px; font-family:monospace; font-size:12px; color:#6ee7b7;
        max-height:200px; overflow-y:auto; }
</style>
""", unsafe_allow_html=True)


# ── Layout ─────────────────────────────────────────────────────
st.markdown("### AI Team Dashboard")

col_agent, col_kanban, col_cost = st.columns([1, 2, 1])

latest_events = store.get_latest_by_agent()

# ── Column 1: Active Agent ─────────────────────────────────────
with col_agent:
    st.markdown("**Active Agent**")

    # Find the currently running agent
    active_agent = None
    for name, event in latest_events.items():
        if event["event_type"] == EventType.AGENT_STARTED.value:
            active_agent = name
            break

    if active_agent:
        role = AGENT_ROLES.get(active_agent, "Unknown")
        st.markdown(f"""
        <div class="metric-box">
            <span style="color:#a5b4fc;font-size:18px;font-weight:700">
                {active_agent}
            </span>
            <br/>
            <span style="color:#94a3b8;font-size:13px">{role}</span>
            <br/><br/>
            <span style="background:#10b981;color:#0f172a;padding:2px 10px;
                   border-radius:4px;font-size:11px;font-weight:600">
                RUNNING
            </span>
        </div>
        """, unsafe_allow_html=True)

        # Live token stream
        st.markdown("**Live Output**")
        stream_events = store.get_events(
            event_type=EventType.TOKEN_STREAM,
            agent_name=active_agent,
            limit=20,
        )
        stream_text = "\n".join(
            e["data"].get("chunk", "") for e in reversed(stream_events)
        )
        st.markdown(
            f'<div class="stream-box">{stream_text or "Waiting..."}</div>',
            unsafe_allow_html=True,
        )

        # Elapsed time
        started_events = store.get_events(
            event_type=EventType.AGENT_STARTED,
            agent_name=active_agent,
            limit=1,
        )
        if started_events:
            started_at = datetime.fromisoformat(started_events[0]["timestamp"])
            elapsed = datetime.utcnow() - started_at
            mins = int(elapsed.total_seconds() // 60)
            secs = int(elapsed.total_seconds() % 60)
            st.markdown(f"""
            <div class="metric-box" style="margin-top:12px">
                <span style="color:#94a3b8;font-size:11px">Elapsed</span><br/>
                <span style="color:#e2e8f0;font-family:monospace;font-size:20px">
                    {mins:02d}:{secs:02d}
                </span>
            </div>
            """, unsafe_allow_html=True)
    else:
        st.info("No agent currently running.")

    # ── Checkpoint approval UI ──────────────────────────────────
    st.markdown("---")
    st.markdown("**Checkpoints**")
    pending = store.get_pending_checkpoints()
    for cp in pending:
        data = cp["data"]
        cp_id = data.get("checkpoint_id", "unknown")
        agent = cp["agent_name"]
        st.warning(f"Checkpoint: {agent} -- {data.get('summary', 'Review')}")
        c1, c2 = st.columns(2)
        with c1:
            if st.button("Approve", key=f"approve_{cp_id}", type="primary"):
                store.append(AgentEvent(
                    event_type=EventType.CHECKPOINT_RESOLVED,
                    agent_name=agent, agent_role=AGENT_ROLES.get(agent, ""),
                    data={"checkpoint_id": cp_id, "decision": "approved"},
                ))
                st.rerun()
        with c2:
            if st.button("Reject", key=f"reject_{cp_id}"):
                feedback = st.text_input("Feedback", key=f"fb_{cp_id}")
                store.append(AgentEvent(
                    event_type=EventType.CHECKPOINT_RESOLVED,
                    agent_name=agent, agent_role=AGENT_ROLES.get(agent, ""),
                    data={"checkpoint_id": cp_id, "decision": "rejected",
                          "feedback": feedback},
                ))
                st.rerun()
    if not pending:
        st.caption("No pending checkpoints")


# ── Column 2: Kanban Board ─────────────────────────────────────
with col_kanban:
    st.markdown("**Kanban Board**")

    def get_tasks():
        board = {"backlog": [], "in_progress": [], "done": []}
        for name, role in AGENT_ROLES.items():
            status = get_agent_status(name, latest_events)
            desc = latest_events.get(name, {}).get("data", {}).get(
                "task_summary", role)
            task = {"agent": name, "role": role, "description": desc}
            if status == "done": board["done"].append(task)
            elif status in ("running", "checkpoint"): board["in_progress"].append(task)
            else: board["backlog"].append(task)
        return board

    def render_card(task, color):
        return (f'<div class="kanban-card" style="border-left-color:{color}">'
                f'<div class="title">{task["role"]}</div>'
                f'<div class="meta">{task["description"]}</div>'
                f'<div class="meta" style="color:{color}">{task["agent"]}</div>'
                f'</div>')

    board = get_tasks()
    k1, k2, k3 = st.columns(3)
    col_config = [
        (k1, "Backlog", "backlog", "#64748b"),
        (k2, "In Progress", "in_progress", "#10b981"),
        (k3, "Done", "done", "#6366f1"),
    ]
    for col, label, key, color in col_config:
        with col:
            st.markdown(f'<div class="kanban-col-header" '
                        f'style="border-color:{color}">{label}</div>',
                        unsafe_allow_html=True)
            for task in board[key]:
                st.markdown(render_card(task, color), unsafe_allow_html=True)
            if not board[key]:
                st.caption("Empty")


# ── Column 3: Cost Tracker ─────────────────────────────────────
with col_cost:
    st.markdown("**Cost Tracker**")

    total_cost = tracker.get_total_cost()
    budget_pct = tracker.get_budget_pct()
    bcolor = "#10b981" if budget_pct < 75 else (
        "#f59e0b" if budget_pct < 90 else "#ef4444")

    st.markdown(f"""
    <div class="metric-box">
        <span style="color:#94a3b8;font-size:11px">Total Spend</span><br/>
        <span style="color:#fbbf24;font-family:monospace;font-size:28px;
               font-weight:700">${total_cost:.2f}</span>
        <span style="color:#64748b"> / ${tracker.budget_limit:.2f}</span>
    </div>
    <div style="margin:12px 0"><div class="cost-bar">
        <div class="cost-bar-fill" style="width:{budget_pct}%;background:{bcolor}">
        </div></div>
        <span style="color:#94a3b8;font-size:11px">{budget_pct:.1f}% of budget</span>
    </div>
    """, unsafe_allow_html=True)
    if tracker.is_over_budget():
        st.error("Budget limit reached.")

    # Per-agent costs
    st.markdown("**Cost by Agent**")
    agent_costs = tracker.get_costs_by_agent()
    max_cost = max((a.total_cost for a in agent_costs), default=1.0) or 1.0
    for ac in agent_costs:
        w = (ac.total_cost / max_cost) * 100
        st.markdown(f"""<div style="margin:6px 0">
            <div style="display:flex;justify-content:space-between">
                <span style="color:#cbd5e1;font-size:12px">{ac.agent_name}</span>
                <span style="color:#94a3b8;font-family:monospace;font-size:11px">
                    ${ac.total_cost:.3f}</span></div>
            <div class="cost-bar"><div class="cost-bar-fill"
                 style="width:{w}%;background:#6366f1"></div></div>
            <span style="color:#475569;font-size:10px">
                {ac.input_tokens:,} in / {ac.output_tokens:,} out</span>
        </div>""", unsafe_allow_html=True)
    if not agent_costs:
        st.caption("No cost data yet")

    # Token summary
    ti = sum(a.input_tokens for a in agent_costs)
    to = sum(a.output_tokens for a in agent_costs)
    st.markdown(f"""<div class="metric-box" style="margin-top:16px">
        <span style="color:#94a3b8;font-size:11px">Tokens</span><br/>
        <span style="color:#64748b;font-family:monospace;font-size:12px">
            In: {ti:,} | Out: {to:,}</span><br/>
        <span style="color:#e2e8f0;font-family:monospace;font-weight:600">
            Total: {ti+to:,}</span>
    </div>""", unsafe_allow_html=True)


# ── Auto-refresh ───────────────────────────────────────────────
REFRESH_INTERVAL = 3  # seconds
time.sleep(REFRESH_INTERVAL)
st.rerun()

That is the complete dashboard application.


6. Design Decisions

Auto-refresh: The time.sleep(3) + st.rerun() loop is the simplest real-time mechanism. Streamlit reruns the full script on every interaction anyway — this just adds polling. For true WebSocket streaming, use st.write_stream(), but polling is simpler and sufficient for a local tool.

Raw HTML over Streamlit components: The Kanban cards and cost bars are raw HTML. Streamlit’s built-in components (st.metric, st.progress) have limited styling. Raw HTML gives full control over the dark theme, card borders, and compact layout. The trade-off: raw HTML elements are not interactive like Streamlit widgets, but for display-only elements, that is fine.

Event-driven separation: The dashboard imports zero agent code. It only reads events from SQLite. This means you can run the dashboard without the pipeline (review past runs), swap agent implementations freely, replay events for debugging, or copy the SQLite file to another machine.

Checkpoint flow: Clicking Approve writes a CHECKPOINT_RESOLVED event with decision: "approved". The pipeline polls the event store for resolution events. Reject includes a text input for feedback, which is stored in the event and passed back to the agent for revision — the same human-in-the-loop pattern from Part 8, but with a proper UI.


7. Wiring the Pipeline to the Dashboard

Your agents need to emit events at the right moments. Here is the integration pattern for the BaseAgent:

# agents/base_agent.py (additions)
class BaseAgent:
    def __init__(self, name, role, event_store: EventStore, **kwargs):
        # ... existing init ...
        self.event_store = event_store

    async def run(self, state):
        self.event_store.append(AgentEvent(
            event_type=EventType.AGENT_STARTED,
            agent_name=self.name,
            agent_role=self.role,
            data={"task_summary": self._get_task_summary(state)},
        ))
        try:
            result = await self._execute(state)
            self.event_store.append(AgentEvent(
                event_type=EventType.AGENT_COMPLETED,
                agent_name=self.name,
                agent_role=self.role,
                data={"task_summary": "Completed"},
            ))
            return result
        except Exception as e:
            self.event_store.append(AgentEvent(
                event_type=EventType.AGENT_FAILED,
                agent_name=self.name,
                agent_role=self.role,
                data={"error": str(e)},
            ))
            raise

For token streaming — showing the LLM’s output as it generates — you emit TOKEN_STREAM events from the streaming callback:

async def _stream_llm(self, messages, **kwargs):
    async with self.client.messages.stream(
        model=self.model, messages=messages, **kwargs
    ) as stream:
        full_response = ""
        async for chunk in stream.text_stream:
            full_response += chunk
            self.event_store.append(AgentEvent(
                event_type=EventType.TOKEN_STREAM,
                agent_name=self.name,
                agent_role=self.role,
                data={"chunk": chunk, "full_text": full_response[-200:]},
            ))
    return full_response

Note the full_text field keeps only the last 200 characters, preventing unbounded database growth.


8. Running the Dashboard

pip install streamlit  # only additional dependency
project/
  dashboard/
    __init__.py
    app.py
    models.py
    event_store.py
    cost_tracker.py
  agents/
    base_agent.py
    ...
  events.db          # created automatically

Launch

# Terminal 1: Run the pipeline
python -m agents.run_pipeline --brief "Build a task manager app"

# Terminal 2: Launch the dashboard
streamlit run dashboard/app.py --server.port 8501

The dashboard opens at http://localhost:8501. It auto-refreshes every three seconds. You will see agents appear on the Kanban board as they start and complete. Costs accumulate in real time. Checkpoints appear with approval buttons when reached.

Running With Demo Data

For testing without a live pipeline, seed the event store with sample events:

# scripts/seed_demo_events.py
from dashboard.event_store import EventStore
from dashboard.models import AgentEvent, EventType
from datetime import datetime, timedelta

store = EventStore()
now = datetime.utcnow()

# Completed agents
for name, role, mins, summary in [
    ("Alex", "Product Owner", 10, "Requirements doc complete"),
    ("Jamie", "Business Analyst", 8, "12 user stories defined"),
    ("Sam", "QC Engineer", 5, "38 test cases written"),
    ("Casey", "Tech Architect", 5, "Architecture spec finalized"),
]:
    store.append(AgentEvent(
        event_type=EventType.AGENT_COMPLETED,
        agent_name=name, agent_role=role,
        timestamp=now - timedelta(minutes=mins),
        data={"task_summary": summary},
    ))

# Currently running agent
store.append(AgentEvent(
    event_type=EventType.AGENT_STARTED,
    agent_name="Jordan", agent_role="Sr. SW Engineer",
    timestamp=now - timedelta(minutes=2),
    data={"task_summary": "Implementing auth module"},
))

# Token usage for cost tracking
for name, role, inp, out in [
    ("Alex", "Product Owner", 8200, 3400),
    ("Jamie", "Business Analyst", 12500, 5800),
    ("Jordan", "Sr. SW Engineer", 18400, 8200),
]:
    store.append(AgentEvent(
        event_type=EventType.TOKENS_USED,
        agent_name=name, agent_role=role,
        data={"model": "claude-sonnet-4-20250514",
              "input_tokens": inp, "output_tokens": out},
    ))

# Pending checkpoint
store.append(AgentEvent(
    event_type=EventType.CHECKPOINT_REACHED,
    agent_name="Jordan", agent_role="Sr. SW Engineer",
    data={"checkpoint_id": "cp-sse-001",
          "summary": "Auth module ready for review"},
))
print(f"Seeded events into {store.db_path}")
python scripts/seed_demo_events.py && streamlit run dashboard/app.py

9. What You See When It Runs

A typical pipeline run unfolds like this on the dashboard: Alex (PO) appears in “In Progress,” streams the requirements doc, hits a checkpoint. You approve. Alex moves to “Done.” Jamie (BA) takes over, generates user stories, completes. Then Sam (QC) and Casey (TA) appear in “In Progress” simultaneously — the parallel fan-out from Part 6. Both complete within seconds of each other. Jordan (SSE) starts the longest phase, streaming actual code. The cost tracker shows the SSE agent consuming 60% of the budget. Riley (TL) reviews, Morgan (DevOps) configures CI/CD. Twelve minutes, eight cards in “Done,” total cost $2.47. The difference between watching this unfold on a dashboard versus staring at terminal output is the difference between driving with a windshield and driving with your eyes closed.


10. Extensions Worth Building

Three additions that pay for themselves quickly: a timeline view using st.plotly_chart with Plotly’s timeline figure to visualize agent durations and identify bottlenecks; an event log tab with a scrollable list of all events for debugging; and cost projection that estimates total spend based on current_cost + (remaining_agents * avg_cost_per_agent) and warns if it exceeds budget.


What Comes Next

The dashboard gives you eyes on the system. In Part 12, we bring it all together: deployment, end-to-end testing, and lessons learned from running this system in production.


Key Takeaways

  1. Events are the foundation. Every dashboard feature — Kanban, costs, checkpoints, streaming — derives from a single stream of AgentEvent objects. Design your events well and the dashboard writes itself.

  2. Cost visibility changes behavior. When you can see that one agent consumes 60% of your budget, you fix the prompt. Without visibility, you would never know.

  3. Checkpoints need context. A terminal prompt asking “approve? y/n” is not enough. The dashboard shows what was produced, who produced it, and where it fits in the pipeline. Better context leads to better human decisions.

  4. Separation matters. The dashboard imports zero agent code. It reads events from SQLite. This means you can run, replay, and debug independently of the pipeline.

  5. Start simple. Three columns, raw HTML cards, a polling loop. No WebSockets, no React frontend, no GraphQL API. You can add complexity later if you need it. You probably will not.

Export for reading

Comments