Introduction

In the main architecture guide, we established the 5-layer architecture for scalable multi-agent systems. This deep dive focuses on Layer 2 — the Orchestrator (Brain) Agent — the most critical component in the entire system.

The Brain agent is not a data executor. It is a planner and synthesiser. It receives natural-language questions, interprets business intent, plans query strategies, delegates to worker agents, validates results, and synthesises final answers.

Get this component wrong, and every downstream agent inherits the confusion. Get it right, and the system scales gracefully from 10 to 10,000 queries per day.


Table of Contents

  1. Why the Orchestrator Matters
  2. LangGraph Supervisor vs. Custom Orchestrator
  3. State Schema Design
  4. The Command API for Dynamic Routing
  5. Prompt Engineering for the Brain
  6. Context Injection Architecture
  7. Routing Logic Patterns
  8. Model Selection Strategy
  9. Error Handling in the Orchestrator
  10. Testing the Brain Agent
  11. Production Checklist

1. Why the Orchestrator Matters

In a multi-agent system, the Orchestrator is the single point of intelligence. Every architectural decision flows from this component:

DecisionImpact
Which model to use for the BrainCost per query, reasoning quality
How the Brain routes to workersLatency, correctness, cost
What context the Brain receivesAnswer quality, hallucination rate
How the Brain validates resultsTrust, reliability, error rate
When the Brain escalates to humansSafety, compliance, user trust

Key insight: The Brain is invoked 1-2 times per query (planning + synthesis). Workers are invoked 3-8 times. This means the Brain should use the best available model (Claude Sonnet) while workers use cheaper models (Haiku/DeepSeek). The cost ratio is typically 1:5 to 1:10 in favour of worker calls.


2. LangGraph Supervisor vs. Custom Orchestrator

LangGraph offers two approaches for building the Orchestrator:

2.1 Built-in Supervisor (create_supervisor)

The create_supervisor function handles orchestration logic automatically:

from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
from langchain_aws import ChatBedrock

# Create the Brain model
brain = ChatBedrock(
    model_id="anthropic.claude-sonnet-4-5-20250514-v1:0",
    region_name="us-east-1",
    model_kwargs=dict(max_tokens=4096, temperature=0)
)

# Create worker agents
sql_agent = create_react_agent(
    model=haiku_model,
    tools=[execute_athena_query],
    name="sql_writer",
    prompt="You are an Athena SQL specialist..."
)

formatter_agent = create_react_agent(
    model=haiku_model,
    tools=[format_output],
    name="formatter",
    prompt="You convert data to requested formats..."
)

# Create the supervisor (Brain)
workflow = create_supervisor(
    agents=[sql_agent, formatter_agent],
    model=brain,
    prompt=BRAIN_SYSTEM_PROMPT,
)

app = workflow.compile()

When to use: Prototyping, standard routing patterns, teams new to LangGraph.

2.2 Custom Orchestrator (StateGraph + Command)

For production systems, build a custom orchestrator using StateGraph and the Command API:

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import TypedDict, Annotated, Literal
from langgraph.graph import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    plan: str
    current_step: int
    total_steps: int
    worker_results: list
    confidence: float
    needs_human_review: bool

def brain_plan(state: AgentState) -> Command[Literal["sql_writer", "weight_applier", "__end__"]]:
    """Brain analyses the question and creates an execution plan."""
    result = brain_model.invoke(
        state["messages"],
        system=BRAIN_SYSTEM_PROMPT
    )

    # Parse the plan from the Brain's response
    plan = parse_execution_plan(result)
    first_worker = plan.steps[0].agent

    return Command(
        update=dict(
            messages=result.messages,
            plan=plan.to_json(),
            current_step=0,
            total_steps=len(plan.steps),
        ),
        goto=first_worker,
    )

def brain_synthesise(state: AgentState) -> Command[Literal["formatter", "brain_plan", "__end__"]]:
    """Brain reviews worker results and decides next action."""
    results = state["worker_results"]
    confidence = evaluate_confidence(results)

    if confidence <= 0.69:
        # Low confidence: retry with refined plan
        return Command(
            update=dict(confidence=confidence),
            goto="brain_plan",
        )

    if confidence <= 0.89:
        # Medium confidence: flag for human review
        return Command(
            update=dict(
                confidence=confidence,
                needs_human_review=True,
            ),
            goto="formatter",
        )

    # High confidence: proceed to formatting
    return Command(
        update=dict(confidence=confidence),
        goto="formatter",
    )

# Build the graph
graph = StateGraph(AgentState)
graph.add_node("brain_plan", brain_plan)
graph.add_node("brain_synthesise", brain_synthesise)
graph.add_node("sql_writer", sql_writer_node)
graph.add_node("weight_applier", weight_applier_node)
graph.add_node("formatter", formatter_node)

graph.add_edge(START, "brain_plan")
graph.add_edge("sql_writer", "brain_synthesise")
graph.add_edge("weight_applier", "brain_synthesise")
graph.add_edge("formatter", END)

app = graph.compile(checkpointer=dynamodb_checkpointer)

When to use: Production systems, custom routing logic, confidence-based branching, human-in-the-loop.

2.3 Decision Matrix

Featurecreate_supervisorCustom StateGraph
Setup time30 minutes2-4 hours
Routing flexibilityTool-call basedFull programmatic control
State schemaFixedCustom TypedDict
Confidence routingNot built-inFull control
Human-in-the-loopBasicAdvanced (interrupt API)
TestingBlack boxUnit-testable nodes
Production readinessGood for simple flowsRequired for complex flows

Recommendation: Start with create_supervisor for POC. Migrate to custom StateGraph when you need confidence routing, custom state, or human-in-the-loop.


3. State Schema Design

The state schema is the API contract between all agents. Design it first, before writing any agent logic.

3.1 Core State Fields

from typing import TypedDict, Annotated, Optional
from langgraph.graph import add_messages
from datetime import datetime

class AgentState(TypedDict):
    # Communication
    messages: Annotated[list, add_messages]

    # Planning
    plan: Optional[str]               # JSON execution plan
    current_step: int                  # Which step we're on
    total_steps: int                   # Total planned steps

    # Results
    worker_results: list               # Accumulated worker outputs
    final_answer: Optional[str]        # Synthesised answer

    # Quality Control
    confidence: float                  # 0.0 to 1.0
    needs_human_review: bool           # Flag for HITL
    retry_count: int                   # Circuit breaker

    # Metadata
    query_id: str                      # Unique query identifier
    started_at: str                    # ISO timestamp
    model_costs: list                  # Token usage tracking

3.2 State Design Principles

PrincipleImplementation
Immutable updatesNever mutate state directly; always return new state via Command
Reducer functionsUse add_messages for message lists to prevent overwrites
Flat structureAvoid nested dicts (they don’t merge well across nodes)
Type safetyUse TypedDict with explicit types for every field
Cost trackingInclude model_costs to track token usage per node
IdempotencyInclude query_id so retries don’t create duplicate work

3.3 Anti-Patterns

  • Storing raw LLM responses: Store parsed, structured data instead
  • Using a single “context” string field: Use typed fields for each piece of context
  • Forgetting retry_count: Without it, failed nodes retry forever
  • Skipping timestamps: You cannot debug latency issues without timing data

4. The Command API for Dynamic Routing

The Command object is the key to flexible orchestration. It lets you update state AND route to the next node in a single return:

4.1 Basic Routing

def brain_node(state: AgentState) -> Command[Literal["sql_writer", "formatter", "__end__"]]:
    """Route based on the execution plan."""
    plan = json.loads(state["plan"])
    current = state["current_step"]

    if current >= state["total_steps"]:
        return Command(goto="__end__")

    next_worker = plan["steps"][current]["agent"]
    return Command(
        update=dict(current_step=current + 1),
        goto=next_worker,
    )

4.2 Confidence-Based Routing

def synthesise_node(state: AgentState) -> Command[Literal["formatter", "brain_plan", "human_review"]]:
    """Route based on result confidence."""
    confidence = state["confidence"]

    if confidence > 0.89:
        return Command(goto="formatter")
    elif confidence > 0.69:
        return Command(
            update=dict(needs_human_review=True),
            goto="human_review",
        )
    else:
        # Retry with refined plan (max 2 retries)
        if state["retry_count"] <= 1:
            return Command(
                update=dict(retry_count=state["retry_count"] + 1),
                goto="brain_plan",
            )
        else:
            # Give up gracefully
            return Command(
                update=dict(needs_human_review=True),
                goto="human_review",
            )

4.3 Parallel Fan-Out with Send

For tasks that can be parallelised (e.g., querying multiple data sources):

from langgraph.types import Send

def brain_parallel(state: AgentState) -> list[Send]:
    """Fan out to multiple workers simultaneously."""
    plan = json.loads(state["plan"])
    parallel_steps = plan.get("parallel", [])

    sends = []
    for step in parallel_steps:
        sends.append(
            Send(
                step["agent"],
                dict(
                    messages=state["messages"],
                    task=step["task"],
                    query_id=state["query_id"],
                )
            )
        )
    return sends

5. Prompt Engineering for the Brain

The Brain prompt is the most important piece of text in your entire system. It determines routing quality, cost efficiency, and answer accuracy.

5.1 Prompt Structure

BRAIN_SYSTEM_PROMPT = """You are the orchestrator of a data analytics system.
Your role is to PLAN and SYNTHESISE, never to execute queries directly.

## Your Capabilities
You have access to these worker agents:
- sql_writer: Translates query specs to Athena SQL (fast, cheap)
- weight_applier: Applies rim weighting to raw results (required for all metrics)
- formatter: Converts data to JSON/markdown/narrative

## Data Dictionary
[DATA_DICTIONARY]

## Business Rules
[BUSINESS_RULES]

## Decision Framework
For every user question:
1. CLASSIFY the question type (metric, comparison, trend, anomaly)
2. PLAN the execution steps (which workers, in what order)
3. SPECIFY structured instructions for each worker
4. VALIDATE results against expected schema
5. SYNTHESISE a final answer with confidence level

## Output Format
Always respond with a JSON plan:
- question_type: metric | comparison | trend | anomaly
- confidence_required: 0.7 | 0.8 | 0.9
- steps: list of worker calls with structured inputs
- expected_output: description of what the final answer looks like

## Critical Rules
- NEVER write SQL directly. Always delegate to sql_writer.
- EVERY metric output MUST pass through weight_applier.
- If confidence is below 0.7, FLAG for human review.
- Maximum 5 worker calls per query (cost control).
- Always include cost estimate in your plan.
"""

5.2 Context Injection vs. Fine-Tuning

ApproachCostFlexibilityWhen to Use
System prompt injectionUSD 0 upfrontChange instantlySchema, business rules
RAG retrievalUSD 0.01/queryDynamic contextLarge knowledge bases
Few-shot examplesUSD 0 upfrontVersion-controlledComplex output formats
Fine-tuningUSD 500-5000Requires retrainingRare — only for style/tone

Our recommendation: Use system prompt injection for everything that fits in the context window. Use RAG only when context exceeds 100K tokens. Never fine-tune unless you have exhausted prompt engineering.

5.3 Prompt Versioning

prompts/
  brain/
    v1.0.0.txt          # Initial production prompt
    v1.1.0.txt          # Added rim weighting rules
    v1.2.0.txt          # Improved routing logic
    CHANGELOG.md         # What changed and why
  workers/
    sql_writer/
      v1.0.0.txt
    weight_applier/
      v1.0.0.txt

Store prompts in git alongside code. Use semantic versioning. Deploy prompt updates through the same CI/CD pipeline as code changes.


6. Context Injection Architecture

The Brain’s intelligence comes from runtime context injection — loading schema knowledge, business rules, and examples at invocation time.

6.1 Three-Layer Context Stack

LayerContentSizeUpdate Frequency
StaticData dictionary, column descriptions2-5K tokensWeekly
DynamicCurrent rim weighting coefficients500 tokensDaily
Query-specificUser question, conversation history1-3K tokensPer query

6.2 Implementation Pattern

def build_brain_context(user_question: str, session_id: str) -> str:
    """Assemble the full context for the Brain agent."""

    # Layer 1: Static context (cached)
    data_dict = load_from_s3("prompts/data-dictionary.txt")
    business_rules = load_from_s3("prompts/business-rules.txt")

    # Layer 2: Dynamic context (refreshed daily)
    rim_weights = get_current_rim_weights()

    # Layer 3: Query-specific context
    conversation = get_conversation_history(session_id)

    # Assemble the prompt
    prompt = BRAIN_SYSTEM_PROMPT
    prompt = prompt.replace("[DATA_DICTIONARY]", data_dict)
    prompt = prompt.replace("[BUSINESS_RULES]", business_rules)

    return prompt

6.3 Token Budget Management

With Claude Sonnet’s 200K context window, budget tokens carefully:

ComponentTokensPercentage
System prompt (base)1,50015%
Data dictionary3,00030%
Business rules1,00010%
Rim weights5005%
Conversation history2,00020%
Output budget2,00020%
Total per query10,000100%

At Claude Sonnet pricing (USD 3/1M input tokens), this is approximately USD 0.03 per Brain invocation.


7. Routing Logic Patterns

7.1 Intent-Based Routing

The Brain classifies the user’s intent and routes to the appropriate worker pipeline:

IntentPipelineExample
Simple metricsql_writer -> weight_applier -> formatter”What is the reach for BBC One?”
Comparisonsql_writer (x2 parallel) -> weight_applier -> formatter”Compare ITV vs Channel 4 reach”
Trend analysissql_writer (time series) -> weight_applier -> formatter”Show BBC One reach trend over 12 months”
Anomaly detectionsql_writer -> weight_applier -> anomaly_detector -> formatter”Flag any unusual viewing patterns”

7.2 Multi-Step Planning

For complex queries, the Brain creates a multi-step execution plan:

# Example: "Compare BBC One vs ITV reach by age group, weighted"
plan = dict(
    question_type="comparison",
    confidence_required=0.8,
    steps=[
        dict(agent="sql_writer", task="Query BBC One reach by age_group"),
        dict(agent="sql_writer", task="Query ITV reach by age_group"),
        dict(agent="weight_applier", task="Apply rim weights to both result sets"),
        dict(agent="formatter", task="Create comparison table with weighted metrics"),
    ],
    estimated_cost="USD 0.05",
    estimated_latency="3-5 seconds",
)

7.3 Circuit Breaker Pattern

Prevent infinite loops and runaway costs:

MAX_WORKER_CALLS = 5
MAX_RETRIES = 2
MAX_COST_PER_QUERY = 0.50  # USD

def check_circuit_breaker(state: AgentState) -> bool:
    """Return True if we should stop execution."""
    total_calls = len(state["worker_results"])
    total_cost = sum(c["cost"] for c in state["model_costs"])

    if total_calls >= MAX_WORKER_CALLS:
        return True  # Too many worker calls
    if state["retry_count"] >= MAX_RETRIES:
        return True  # Too many retries
    if total_cost >= MAX_COST_PER_QUERY:
        return True  # Cost limit reached
    return False

8. Model Selection Strategy

8.1 The Brain/Worker Cost Split

RoleModelCost (Input/Output per 1M)Invocations per QueryCost per Query
Brain (Plan)Claude Sonnet 4USD 3 / USD 151~USD 0.03
Brain (Synthesise)Claude Sonnet 4USD 3 / USD 151~USD 0.02
SQL WriterClaude Haiku 3.5USD 0.80 / USD 41-3~USD 0.005
Weight ApplierClaude Haiku 3.5USD 0.80 / USD 41~USD 0.003
FormatterClaude Haiku 3.5USD 0.80 / USD 41~USD 0.002
Total5-7~USD 0.06

8.2 Prompt Caching (90% Savings)

Amazon Bedrock’s prompt caching is a game-changer for the Brain:

  • The system prompt + data dictionary (~5K tokens) is cached across invocations
  • Cache hit rate: 80-95% for production workloads
  • Cost reduction: 90% on cached tokens (USD 0.30/1M instead of USD 3/1M)
  • With caching, Brain cost drops from ~USD 0.03 to ~USD 0.005 per query

8.3 Cross-Region Inference

Enable cross-region inference for free high availability:

  • No code changes required — just enable in Bedrock settings
  • Automatic failover across regions
  • Zero additional cost
  • Recommended: us-east-1 (primary), us-west-2 (failover)

9. Error Handling in the Orchestrator

9.1 Error Categories

Error TypeExampleResponse
Worker timeoutSQL query takes over 30 secondsRetry with simplified query
Low confidenceWeight applier reports convergence failureFlag for human review
Schema mismatchWorker returns unexpected formatParse error, retry
Cost overrunQuery exceeds USD 0.50 budgetStop execution, return partial results
Model errorBedrock throttling or 5xxExponential backoff, failover to another region

9.2 Graceful Degradation

from langgraph.types import RetryPolicy

# Configure retry policies per node
retry_policy = RetryPolicy(
    max_attempts=3,
    initial_interval=1.0,    # seconds
    backoff_factor=2.0,      # exponential
    max_interval=10.0,       # max wait
)

# Apply to the graph
graph.add_node("sql_writer", sql_writer_node, retry=retry_policy)

When all retries are exhausted, the Brain should:

  1. Return the best partial result available
  2. Clearly indicate what failed and why
  3. Suggest the user try a simpler query
  4. Log the full error context for debugging

10. Testing the Brain Agent

10.1 Unit Testing Individual Nodes

def test_brain_routes_to_sql_writer():
    """Test that metric questions route to sql_writer."""
    state = AgentState(
        messages=[HumanMessage(content="What is BBC One reach?")],
        plan="",
        current_step=0,
        total_steps=0,
        worker_results=[],
        confidence=0.0,
        needs_human_review=False,
        retry_count=0,
        query_id="test-001",
        started_at="2026-03-16T00:00:00Z",
        model_costs=[],
    )

    result = brain_plan(state)
    assert result.goto == "sql_writer"
    assert "plan" in result.update

10.2 Integration Testing with LangSmith

Use LangSmith datasets for systematic evaluation:

Test CategoryExamplesPass Criteria
Routing accuracy50 questions with known routes95% correct routing
Cost compliance20 complex queriesNo query exceeds USD 0.50
Confidence calibration30 mixed-quality resultsConfidence matches actual accuracy
Human escalation10 ambiguous queriesAll flagged correctly

10.3 Regression Testing in CI

# .github/workflows/agent-ci.yml
name: Agent Quality Gates
on:
  pull_request:

jobs:
  eval:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run LangSmith Evaluations
        run: python -m pytest tests/eval/ --langsmith-dataset=routing-accuracy
      - name: Check Cost Budgets
        run: python scripts/check_cost_budgets.py
      - name: Prompt Diff Report
        run: python scripts/prompt_diff.py --base main --head HEAD

11. Production Checklist

Before deploying the Orchestrator to production:

Architecture

  • State schema defined with TypedDict and explicit types
  • Command API used for all routing decisions
  • Circuit breaker implemented (max calls, max retries, max cost)
  • Graceful degradation for all error types

Prompts

  • Brain prompt versioned in git
  • Data dictionary injected at runtime (not hardcoded)
  • Context budget calculated and within limits
  • Few-shot examples included for complex output formats

Cost

  • Prompt caching enabled on Bedrock
  • Brain uses Sonnet, workers use Haiku (5-10x savings)
  • Per-query cost limit enforced
  • Cost tracking in state for monitoring

Testing

  • Routing accuracy above 95% on test dataset
  • Confidence calibration validated
  • Human escalation tested end-to-end
  • Regression tests in CI pipeline

Observability

  • LangSmith tracing enabled for all invocations
  • CloudWatch metrics: latency, cost, routing distribution
  • Alerting on anomalous cost or error rates
  • Dashboard for query volume and success rates

Next in the Series

  • Part 2: Worker Agents and Tool Design — SQL Writer, Weight Applier, Formatter implementation
  • Part 3: State Management and Data Layer — DynamoDB checkpointing, S3 state, Athena integration
  • Part 4: Security Architecture — IAM, prompt injection defense, audit logging
  • Part 5: CI/CD Pipeline for Agents — LangSmith eval, prompt versioning, canary deploys
  • Part 6: Scaling Patterns and Production — Step Functions, auto-scaling, cost optimization

References

  • LangGraph Documentation — Workflows and Agents: docs.langchain.com
  • LangGraph Supervisor Pattern Tutorial: dev.to/programmingcentral
  • AWS Multi-Agent Orchestration with LangGraph: github.com/aws-solutions-library-samples
  • Mastering LangGraph State Management in 2025: sparkco.ai
  • LangGraph Graph API Overview: docs.langchain.com
  • Amazon Bedrock Pricing: aws.amazon.com/bedrock/pricing
Export for reading

Comments