In Part 5, we built the personas — the interviewer who asks probing questions, the coach who gives feedback, the evaluator who scores objectively. But here’s the uncomfortable truth I ran into about three weeks into the project: my beautifully crafted personas were confidently saying wrong things.

The interviewer asked a senior DevOps candidate about “deploying containers” and didn’t follow up with questions about orchestration, service mesh, or infrastructure-as-code. The evaluator scored a candidate’s answer about Kubernetes RBAC as “basic” when it was actually textbook-perfect. The coach gave generic advice about “being more specific” when the candidate had just given a near-perfect answer.

The problem wasn’t the personas. The problem was that GPT-4o’s general training data doesn’t know my company’s specific rubrics, the exact skills we care about for each role, or the depth we expect at each seniority level. The LLM is a generalist. My interview system needed to be a specialist.

That’s what RAG — Retrieval-Augmented Generation — solves. Instead of hoping the model happens to know your specific evaluation criteria, you retrieve exactly what it needs, exactly when it needs it, and inject it into the context. This part covers how to build that retrieval system fast enough to work in a real-time voice conversation.

Why Voice Agents Need RAG

Generic LLM knowledge fails in structured interviews for three concrete reasons.

Role-specificity. A “senior backend engineer” at a fintech startup means something very different than at a FAANG company or a healthcare SaaS. The depth of distributed systems knowledge, the tolerance for monoliths, the expectation of regulatory familiarity — all of these are company and role specific. No base model can know your specific hiring bar.

Rubric consistency. When you’re using AI to score candidates, you need every candidate evaluated against the same rubric. If the LLM improvises the scoring criteria based on what feels right, you get inconsistent results that won’t survive an audit and can introduce bias. The rubric needs to come from a source of truth, not from the model’s vibes.

Technical currency. The voice agent needs to know about technologies that may postdate its training cutoff. If you’re hiring for a role that uses a framework that was released 18 months ago, the model might have limited or outdated knowledge about it. RAG lets you inject current documentation and context.

The architecture I landed on has three layers:

  1. A vector database for semantic search across rubrics, job descriptions, and reference answers
  2. Redis for session state — conversation history, scores accumulated so far, candidate profile data
  3. A hybrid search layer that combines vector similarity with keyword matching for technical terms

All of this needs to happen in under 50ms from the moment the candidate stops speaking to the moment the LLM starts generating its response. Let me show you how to get there.

Vector Database Selection

I evaluated three options seriously before picking one.

DatabaseLatency (p99)Cost (1M vectors)HighlightsWeaknesses
Pinecone Serverless~15ms~$0.10/monthManaged, fast cold start, no infraVendor lock-in, limited filtering
Weaviate Cloud~20ms~$25/monthHybrid search built-in, open sourceMore complex setup
pgvector~5ms (local)Postgres costSimple, SQL-native, no extra infraScales poorly past ~1M vectors
Qdrant~10msSelf-hostedGreat filtering, open sourceNeed to manage infrastructure

For a voice interview system at the scale I was building (thousands of candidates per month, dozens of job roles), Pinecone Serverless won. The managed service means zero ops overhead, the serverless tier means I’m not paying for idle capacity, and 15ms p99 latency leaves plenty of headroom in my 50ms budget.

If you’re self-hosting or have a smaller scale, pgvector on your existing Postgres is genuinely good enough and removes an external dependency. For anything requiring complex filtering (show me rubrics tagged “backend” AND “senior” AND “distributed-systems”), Qdrant’s payload filtering is excellent.

Here’s how I initialize the Pinecone client and set up the index:

import pinecone
from pinecone import ServerlessSpec

pc = pinecone.Pinecone(api_key=os.environ["PINECONE_API_KEY"])

# Create index if it doesn't exist
if "interview-knowledge" not in pc.list_indexes().names():
    pc.create_index(
        name="interview-knowledge",
        dimension=1536,  # text-embedding-3-small dimensions
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )

index = pc.Index("interview-knowledge")

Embedding Models

The embedding model determines the quality of your semantic search. There are three realistic options:

OpenAI text-embedding-3-small is what I use in production. 1536 dimensions, $0.02 per million tokens, and the quality is excellent for English technical content. The “small” variant was released alongside a “large” variant (3072 dimensions) but in my testing the quality difference for interview content wasn’t worth the 2x cost and latency increase.

Cohere embed-v3 is a strong alternative with built-in support for specifying the embed type (search_query vs search_document), which actually helps retrieval quality. If you’re embedding documents vs queries, Cohere’s API makes that explicit. I found it about 10% more expensive than OpenAI’s small model with similar quality.

Voyage AI is specialized for code-heavy content and technical documentation. If you’re doing a lot of retrieval on code samples, architecture diagrams described in text, or technical reference docs, Voyage’s code-aware models are worth evaluating.

For dimensionality: higher dimensions generally mean better quality but more storage and slower retrieval. For interview content (rubrics, job descriptions, Q&A pairs), 1536 dimensions is the sweet spot. I haven’t seen meaningful quality gains going to 3072 for this use case.

Here’s a simple embedding utility:

from openai import AsyncOpenAI
from typing import List

client = AsyncOpenAI()

async def embed_text(text: str) -> List[float]:
    """Embed a single text string."""
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

async def embed_batch(texts: List[str]) -> List[List[float]]:
    """Embed multiple texts in a single API call."""
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]

Chunking Strategies for Interview Content

This is where most RAG implementations go wrong. Generic chunking — split every 512 tokens with 50-token overlap — works okay for long documents. It works badly for structured content like rubrics and job descriptions, where the semantic unit is a competency or a requirement, not a fixed token count.

I developed three different chunking strategies for the three main content types.

Rubric Chunking (by competency)

Interview rubrics are structured around competencies. Each competency is a self-contained evaluation unit with a name, description, and scoring criteria at each level. Chunking at the competency boundary keeps each chunk semantically coherent.

import json
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class RubricChunk:
    role_id: str
    role_name: str
    competency_name: str
    competency_description: str
    level_descriptors: Dict[str, str]  # "1": "...", "2": "...", etc.
    weight: float  # importance of this competency, 0-1
    chunk_id: str

def chunk_rubric(rubric: Dict[str, Any]) -> List[RubricChunk]:
    """
    Chunk a rubric by competency. Each competency becomes one chunk.

    Expected rubric format:
    {
        "role_id": "swe-senior",
        "role_name": "Senior Software Engineer",
        "competencies": [
            {
                "name": "System Design",
                "description": "Ability to design scalable, maintainable systems",
                "weight": 0.3,
                "levels": {
                    "1": "Can describe basic client-server architecture",
                    "2": "Understands CAP theorem, designs for availability vs consistency",
                    "3": "Designs distributed systems with appropriate trade-offs",
                    "4": "Architects for scale, owns technical vision"
                }
            },
            ...
        ]
    }
    """
    chunks = []
    role_id = rubric["role_id"]
    role_name = rubric["role_name"]

    for i, competency in enumerate(rubric["competencies"]):
        chunk_id = f"{role_id}__competency__{competency['name'].lower().replace(' ', '_')}"

        chunk = RubricChunk(
            role_id=role_id,
            role_name=role_name,
            competency_name=competency["name"],
            competency_description=competency["description"],
            level_descriptors=competency["levels"],
            weight=competency.get("weight", 1.0 / len(rubric["competencies"])),
            chunk_id=chunk_id
        )
        chunks.append(chunk)

    return chunks

def rubric_chunk_to_text(chunk: RubricChunk) -> str:
    """
    Convert a rubric chunk to a text string for embedding.
    The text representation matters for retrieval quality.
    """
    lines = [
        f"Role: {chunk.role_name}",
        f"Competency: {chunk.competency_name}",
        f"Description: {chunk.competency_description}",
        "",
        "Scoring levels:"
    ]

    for level, descriptor in sorted(chunk.level_descriptors.items()):
        lines.append(f"  Level {level}: {descriptor}")

    return "\n".join(lines)

Job Description Chunking (by requirement)

Job descriptions are chunked by requirement section: responsibilities, required qualifications, preferred qualifications, and cultural expectations. Each section is further split by bullet point if it’s long.

@dataclass
class JDChunk:
    job_id: str
    job_title: str
    section: str  # "responsibilities", "required", "preferred", "culture"
    content: str
    chunk_id: str

def chunk_job_description(jd: Dict[str, Any]) -> List[JDChunk]:
    """
    Chunk a job description by section, then by requirement within section.
    """
    chunks = []
    job_id = jd["job_id"]
    job_title = jd["title"]

    sections = {
        "responsibilities": jd.get("responsibilities", []),
        "required": jd.get("required_qualifications", []),
        "preferred": jd.get("preferred_qualifications", []),
        "culture": jd.get("cultural_expectations", [])
    }

    for section_name, items in sections.items():
        if not items:
            continue

        # Group items into chunks of ~3-5 items to keep chunks focused
        group_size = 4
        for group_idx in range(0, len(items), group_size):
            group = items[group_idx:group_idx + group_size]
            content = f"Job: {job_title}\nSection: {section_name}\n\n"
            content += "\n".join(f"- {item}" for item in group)

            chunk_id = f"{job_id}__{section_name}__{group_idx // group_size}"
            chunks.append(JDChunk(
                job_id=job_id,
                job_title=job_title,
                section=section_name,
                content=content,
                chunk_id=chunk_id
            ))

    return chunks

Technical Documentation Chunking (by concept)

For technical reference docs — API documentation, architecture decision records, domain knowledge bases — I use a concept-aware chunker that respects heading boundaries.

import re

def chunk_technical_doc(
    content: str,
    doc_id: str,
    doc_title: str,
    max_chunk_tokens: int = 400
) -> List[Dict[str, Any]]:
    """
    Chunk technical documentation by heading boundaries.
    Falls back to paragraph-level chunking for long sections.
    """
    # Split by markdown headings (## or ###)
    heading_pattern = re.compile(r'^(#{1,3})\s+(.+)$', re.MULTILINE)

    sections = []
    last_pos = 0
    current_heading = doc_title

    for match in heading_pattern.finditer(content):
        # Save previous section
        section_content = content[last_pos:match.start()].strip()
        if section_content:
            sections.append({
                "heading": current_heading,
                "content": section_content
            })

        current_heading = match.group(2)
        last_pos = match.end()

    # Don't forget the last section
    remaining = content[last_pos:].strip()
    if remaining:
        sections.append({
            "heading": current_heading,
            "content": remaining
        })

    # Convert sections to chunks, splitting long ones
    chunks = []
    for section in sections:
        full_text = f"{section['heading']}\n\n{section['content']}"

        # Rough token estimate: 1 token ≈ 4 chars
        if len(full_text) / 4 <= max_chunk_tokens:
            chunks.append({
                "chunk_id": f"{doc_id}__{section['heading'].lower().replace(' ', '_')}",
                "doc_id": doc_id,
                "doc_title": doc_title,
                "heading": section["heading"],
                "content": full_text
            })
        else:
            # Split long sections by paragraph
            paragraphs = section["content"].split("\n\n")
            for para_idx, para in enumerate(paragraphs):
                if para.strip():
                    chunk_text = f"{section['heading']}\n\n{para}"
                    chunks.append({
                        "chunk_id": f"{doc_id}__{section['heading'].lower().replace(' ', '_')}__{para_idx}",
                        "doc_id": doc_id,
                        "doc_title": doc_title,
                        "heading": section["heading"],
                        "content": chunk_text
                    })

    return chunks

Indexing the Chunks

Once chunked, everything goes into Pinecone with metadata that allows filtered retrieval:

async def index_rubric_chunks(
    chunks: List[RubricChunk],
    index: pinecone.Index
):
    """Index rubric chunks into Pinecone with metadata."""
    texts = [rubric_chunk_to_text(c) for c in chunks]
    embeddings = await embed_batch(texts)

    vectors = []
    for chunk, embedding in zip(chunks, embeddings):
        vectors.append({
            "id": chunk.chunk_id,
            "values": embedding,
            "metadata": {
                "type": "rubric",
                "role_id": chunk.role_id,
                "role_name": chunk.role_name,
                "competency_name": chunk.competency_name,
                "weight": chunk.weight,
                "text": rubric_chunk_to_text(chunk)  # Store text for retrieval
            }
        })

    # Upsert in batches of 100
    batch_size = 100
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        index.upsert(vectors=batch)

    print(f"Indexed {len(vectors)} rubric chunks")

Real-Time RAG During Conversation

The 50ms retrieval budget isn’t arbitrary. In a voice conversation, the pipeline looks like this:

  1. Audio arrives and STT processes it — ~100-200ms
  2. We detect end-of-turn and trigger retrieval — this is the 50ms window
  3. Retrieval results augment the LLM context — context building ~20ms
  4. LLM generates first token — ~100-300ms depending on model and prompt length
  5. TTS begins converting the first sentence — ~100ms

Total time to first audio byte: roughly 400-700ms. That’s acceptable. If retrieval takes 200ms, we’re at 600-900ms which starts to feel sluggish in a conversation context.

There are three techniques I use to stay under 50ms:

Pre-warming. At the start of each interview session, I pre-fetch and cache all the rubric chunks for the target role. These don’t change during the interview, so there’s no reason to retrieve them on every turn.

Query prediction. While the candidate is still speaking (during STT), I start predicting what the next retrieval query is likely to be based on the conversation history. This is speculative — I abort if the prediction is wrong — but it shaves 10-15ms off the critical path.

ANN tuning. Approximate Nearest Neighbor search (what Pinecone uses) has a quality/speed tradeoff controlled by the top_k and nprobe parameters. For real-time retrieval I use top_k=5 instead of top_k=10 — in my testing, the 6th-10th results rarely improve LLM output quality enough to justify the extra latency.

Here’s the retrieval pipeline:

import asyncio
import time
from typing import Optional, List, Dict, Any

class InterviewRAGPipeline:
    def __init__(
        self,
        pinecone_index: pinecone.Index,
        redis_client,
        session_id: str,
        role_id: str
    ):
        self.index = pinecone_index
        self.redis = redis_client
        self.session_id = session_id
        self.role_id = role_id
        self._rubric_cache: Optional[List[Dict]] = None

    async def warm_cache(self):
        """Pre-fetch rubric chunks for this role at session start."""
        start = time.perf_counter()

        # Fetch all rubric chunks for this role
        results = self.index.query(
            vector=[0.0] * 1536,  # Dummy vector — we're using filter only
            top_k=50,
            filter={"type": "rubric", "role_id": self.role_id},
            include_metadata=True
        )

        self._rubric_cache = [
            match.metadata for match in results.matches
        ]

        elapsed = (time.perf_counter() - start) * 1000
        print(f"Rubric cache warmed: {len(self._rubric_cache)} chunks in {elapsed:.1f}ms")

    async def retrieve_for_turn(
        self,
        candidate_utterance: str,
        conversation_context: str,
        retrieval_types: List[str] = ["rubric", "jd", "reference"]
    ) -> Dict[str, Any]:
        """
        Retrieve relevant context for the current conversation turn.
        Target: < 50ms total.
        """
        start = time.perf_counter()

        # Build retrieval query from utterance + recent context
        query = f"{candidate_utterance}\n\nConversation context: {conversation_context[-500:]}"

        # Run embedding and vector search concurrently
        query_embedding, cached_result = await asyncio.gather(
            embed_text(query),
            self._check_query_cache(query)
        )

        if cached_result:
            elapsed = (time.perf_counter() - start) * 1000
            print(f"Cache hit — retrieval in {elapsed:.1f}ms")
            return cached_result

        # Vector search with type filter
        filter_condition = {
            "$or": [{"type": t} for t in retrieval_types],
            "$and": [{"role_id": {"$in": [self.role_id, "global"]}}]
        }

        results = self.index.query(
            vector=query_embedding,
            top_k=5,
            filter=filter_condition,
            include_metadata=True
        )

        retrieved = {
            "rubric_chunks": [],
            "jd_chunks": [],
            "reference_chunks": []
        }

        for match in results.matches:
            chunk_type = match.metadata.get("type", "reference")
            if chunk_type == "rubric":
                retrieved["rubric_chunks"].append(match.metadata["text"])
            elif chunk_type == "jd":
                retrieved["jd_chunks"].append(match.metadata["content"])
            else:
                retrieved["reference_chunks"].append(match.metadata["content"])

        # Cache this result for 5 minutes
        await self._cache_query_result(query, retrieved, ttl=300)

        elapsed = (time.perf_counter() - start) * 1000
        print(f"Vector retrieval in {elapsed:.1f}ms")

        return retrieved

    async def _check_query_cache(self, query: str) -> Optional[Dict]:
        """Check Redis for a cached retrieval result."""
        cache_key = f"rag_cache:{hash(query)}"
        cached = await self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        return None

    async def _cache_query_result(
        self, query: str, result: Dict, ttl: int = 300
    ):
        """Cache a retrieval result in Redis."""
        cache_key = f"rag_cache:{hash(query)}"
        await self.redis.setex(cache_key, ttl, json.dumps(result))

Redis for Session State

Every interview session accumulates state that needs to persist across turns and be accessible in under 1ms: the conversation history, scores accumulated so far, timing data, and candidate profile. Redis is the right tool for this — it’s in-memory, has rich data structures, and the latency profile is consistent.

I use four Redis data structures per session:

import redis.asyncio as aioredis
import json
from datetime import datetime

class InterviewSessionStore:
    """
    Redis schema for a single interview session.

    Keys:
    - session:{id}:meta       → Hash: role_id, candidate_id, start_time, status
    - session:{id}:history    → List: conversation turns (JSON)
    - session:{id}:scores     → Hash: competency_name → running score
    - session:{id}:timing     → Hash: question timings, pause durations, total_time
    """

    def __init__(self, redis_client: aioredis.Redis, session_id: str):
        self.redis = redis_client
        self.session_id = session_id
        self.key_prefix = f"session:{session_id}"

    async def initialize(
        self,
        role_id: str,
        candidate_id: str,
        candidate_level: str
    ):
        """Initialize a new session."""
        await self.redis.hset(
            f"{self.key_prefix}:meta",
            mapping={
                "role_id": role_id,
                "candidate_id": candidate_id,
                "candidate_level": candidate_level,
                "start_time": datetime.utcnow().isoformat(),
                "status": "active",
                "turn_count": 0
            }
        )
        # Set TTL of 4 hours — interviews shouldn't last longer
        await self.redis.expire(f"{self.key_prefix}:meta", 14400)

    async def add_turn(
        self,
        role: str,  # "interviewer" | "candidate"
        content: str,
        metadata: Dict[str, Any] = None
    ):
        """Append a conversation turn to history."""
        turn = {
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
            **(metadata or {})
        }

        # Push to list, keep last 100 turns
        key = f"{self.key_prefix}:history"
        await self.redis.rpush(key, json.dumps(turn))

        # Trim to last 100 turns to prevent unbounded growth
        await self.redis.ltrim(key, -100, -1)

        # Increment turn counter
        await self.redis.hincrby(f"{self.key_prefix}:meta", "turn_count", 1)

    async def get_recent_history(self, n_turns: int = 10) -> List[Dict]:
        """Get the N most recent conversation turns."""
        key = f"{self.key_prefix}:history"
        raw_turns = await self.redis.lrange(key, -n_turns, -1)
        return [json.loads(t) for t in raw_turns]

    async def update_score(
        self,
        competency: str,
        new_observation: float,
        weight: float = 1.0
    ):
        """
        Update running score for a competency using exponential moving average.
        This handles multiple observations of the same competency gracefully.
        """
        score_key = f"{self.key_prefix}:scores"
        current = await self.redis.hget(score_key, competency)

        if current is None:
            # First observation
            new_score = new_observation
        else:
            current_data = json.loads(current)
            # EMA with alpha=0.4 — recent observations weighted more
            alpha = 0.4
            new_score = alpha * new_observation + (1 - alpha) * current_data["score"]

        await self.redis.hset(
            score_key,
            competency,
            json.dumps({
                "score": new_score,
                "weight": weight,
                "observation_count": 1 + (
                    json.loads(current)["observation_count"] if current else 0
                )
            })
        )

    async def get_all_scores(self) -> Dict[str, Dict]:
        """Get all accumulated scores for the session."""
        score_key = f"{self.key_prefix}:scores"
        raw = await self.redis.hgetall(score_key)
        return {k: json.loads(v) for k, v in raw.items()}

    async def record_timing(self, event: str, duration_ms: float):
        """Record timing data for analytics."""
        timing_key = f"{self.key_prefix}:timing"
        await self.redis.hset(timing_key, event, duration_ms)

Here’s a problem I didn’t anticipate: pure vector search sometimes misses exact technical terms. If a candidate mentions “SOLID principles” and I’m searching for “system design best practices”, the vector similarity might be 0.7 — good enough to retrieve — but if a candidate mentions “ACID transactions” and I’m searching for “database reliability”, the vector similarity might only be 0.5 and the chunk gets ranked below less relevant results.

Technical interviews are full of exact terms that have specific meanings: Kubernetes, SOLID, CAP theorem, CQRS, event sourcing, two-phase commit. These terms are precise, and candidates expect the interviewer to understand exactly what they mean when they use them.

The solution is hybrid search — combining vector similarity with BM25 keyword matching. Weaviate has this built in; for Pinecone I implement it as a two-stage retrieval:

from rank_bm25 import BM25Okapi
import numpy as np

class HybridSearcher:
    def __init__(
        self,
        pinecone_index: pinecone.Index,
        all_chunks: List[Dict[str, Any]]
    ):
        self.index = pinecone_index
        self.chunks = all_chunks

        # Build BM25 index over all chunks
        tokenized = [
            chunk["content"].lower().split()
            for chunk in all_chunks
        ]
        self.bm25 = BM25Okapi(tokenized)
        self.chunk_ids = [chunk["chunk_id"] for chunk in all_chunks]

    async def search(
        self,
        query: str,
        top_k: int = 5,
        alpha: float = 0.6  # 0.6 vector, 0.4 BM25
    ) -> List[Dict[str, Any]]:
        """
        Hybrid search combining vector similarity and BM25.
        alpha=1.0 → pure vector, alpha=0.0 → pure BM25
        """

        # Vector search
        query_embedding = await embed_text(query)
        vector_results = self.index.query(
            vector=query_embedding,
            top_k=top_k * 3,  # Get more candidates for re-ranking
            include_metadata=True
        )

        # Build vector score dict
        vector_scores = {
            match.id: match.score
            for match in vector_results.matches
        }

        # BM25 search
        tokenized_query = query.lower().split()
        bm25_scores_raw = self.bm25.get_scores(tokenized_query)

        # Normalize BM25 scores to [0, 1]
        bm25_max = bm25_scores_raw.max()
        if bm25_max > 0:
            bm25_scores_normalized = bm25_scores_raw / bm25_max
        else:
            bm25_scores_normalized = bm25_scores_raw

        bm25_scores = {
            self.chunk_ids[i]: float(bm25_scores_normalized[i])
            for i in range(len(self.chunk_ids))
        }

        # Combine scores
        all_chunk_ids = set(vector_scores.keys()) | set(bm25_scores.keys())
        combined_scores = {}

        for chunk_id in all_chunk_ids:
            v_score = vector_scores.get(chunk_id, 0.0)
            b_score = bm25_scores.get(chunk_id, 0.0)
            combined_scores[chunk_id] = alpha * v_score + (1 - alpha) * b_score

        # Sort and return top-k
        ranked = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]

        # Fetch full chunk data for top results
        results = []
        chunk_lookup = {c["chunk_id"]: c for c in self.chunks}

        for chunk_id, score in ranked:
            if chunk_id in chunk_lookup:
                results.append({
                    "chunk": chunk_lookup[chunk_id],
                    "score": score
                })

        return results

In practice I use alpha=0.7 (favoring vector) for general context retrieval and alpha=0.4 (favoring BM25) when the query contains obvious technical terms. I detect technical terms with a simple keyword list — if the query contains any known technical terms, I shift toward BM25.

Knowledge Base Management

The RAG system is only as good as what’s in it, and that means building an admin interface for managing rubrics, question banks, and interview templates. This is often an afterthought in RAG implementations but it becomes critical when you’re onboarding new roles or updating evaluation criteria.

The key operations the admin interface needs to support:

Upload and parse rubrics. Accept YAML or JSON rubric files, validate the schema, chunk them, and index them. Show a preview of what chunks would be created before committing.

Version control for templates. Interview templates change — you refine the scoring criteria, add new competencies, deprecate old questions. The system needs to track which version of a rubric was used for each interview, because you might need to re-evaluate historical sessions against a new rubric.

Preview retrieval quality. Before deploying a new rubric to production, you want to test: “given this candidate utterance, what would the system retrieve?” This is a developer-facing feature but it saves enormous debugging time.

class KnowledgeBaseManager:
    def __init__(
        self,
        pinecone_index: pinecone.Index,
        postgres_db  # For version control and metadata
    ):
        self.index = pinecone_index
        self.db = postgres_db

    async def upload_rubric(
        self,
        rubric: Dict[str, Any],
        uploaded_by: str,
        replace_existing: bool = False
    ) -> Dict[str, Any]:
        """Upload and index a rubric, with version tracking."""

        role_id = rubric["role_id"]

        # Check for existing version
        existing = await self.db.fetchrow(
            "SELECT version FROM rubric_versions WHERE role_id = $1 ORDER BY version DESC LIMIT 1",
            role_id
        )

        version = (existing["version"] + 1) if existing else 1

        # Chunk the rubric
        chunks = chunk_rubric(rubric)

        if replace_existing:
            # Delete old vectors for this role
            self.index.delete(
                filter={"type": "rubric", "role_id": role_id}
            )

        # Index new chunks
        await index_rubric_chunks(chunks, self.index)

        # Record version in postgres
        await self.db.execute(
            """
            INSERT INTO rubric_versions
                (role_id, version, rubric_data, chunk_count, uploaded_by, created_at)
            VALUES ($1, $2, $3, $4, $5, NOW())
            """,
            role_id, version, json.dumps(rubric), len(chunks), uploaded_by
        )

        return {
            "role_id": role_id,
            "version": version,
            "chunks_indexed": len(chunks),
            "status": "success"
        }

    async def preview_retrieval(
        self,
        query: str,
        role_id: str,
        top_k: int = 5
    ) -> List[Dict[str, Any]]:
        """Preview what would be retrieved for a given query."""
        query_embedding = await embed_text(query)

        results = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            filter={"role_id": role_id},
            include_metadata=True
        )

        return [
            {
                "score": match.score,
                "type": match.metadata.get("type"),
                "competency": match.metadata.get("competency_name"),
                "preview": match.metadata.get("text", "")[:300]
            }
            for match in results.matches
        ]

Context Window Optimization

Every token you put in the context costs money and adds latency. A typical GPT-4o call with 4000 tokens of context costs ~$0.01 and takes ~200ms. A call with 12000 tokens costs ~$0.03 and takes ~400ms. Over thousands of interviews, that difference compounds.

The key insight is that not all retrieved context is equal. I structure the LLM context in layers:

Static system prompt (~800 tokens): Role description, core interviewing instructions, output format requirements. This never changes during the session.

Rubric context (~400 tokens): The top 3 most relevant competency chunks for the current question. Retrieved fresh each turn.

Session state (~300 tokens): Current question number, competencies covered, scores so far. A structured summary, not raw history.

Conversation history (~600 tokens): Last 5 turns verbatim. Older turns are summarized.

Retrieved context (~400 tokens): JD requirements and reference material relevant to the current turn.

Total: ~2500 tokens per call. That’s my target. Going much higher than this starts to hurt both latency and cost meaningfully.

For managing conversation history in long interviews, I use a sliding window with summarization:

async def build_conversation_context(
    session_store: InterviewSessionStore,
    max_verbatim_turns: int = 5,
    summary_model: str = "gpt-4o-mini"
) -> str:
    """
    Build conversation context with sliding window + summarization.
    Keeps last N turns verbatim, summarizes older turns.
    """

    # Get all history
    all_history = await session_store.get_recent_history(n_turns=50)

    if len(all_history) <= max_verbatim_turns:
        # Short interview — return everything verbatim
        turns_text = "\n".join([
            f"{t['role'].upper()}: {t['content']}"
            for t in all_history
        ])
        return f"Conversation history:\n{turns_text}"

    # Split into old (to summarize) and recent (verbatim)
    old_turns = all_history[:-max_verbatim_turns]
    recent_turns = all_history[-max_verbatim_turns:]

    # Summarize old turns
    old_text = "\n".join([
        f"{t['role'].upper()}: {t['content']}"
        for t in old_turns
    ])

    summary_response = await client.chat.completions.create(
        model=summary_model,
        messages=[{
            "role": "user",
            "content": f"""Summarize this interview conversation in 3-4 sentences.
Focus on: topics covered, candidate strengths/weaknesses observed,
competencies assessed so far.

Conversation:
{old_text}"""
        }],
        max_tokens=200
    )

    summary = summary_response.choices[0].message.content

    # Build recent history text
    recent_text = "\n".join([
        f"{t['role'].upper()}: {t['content']}"
        for t in recent_turns
    ])

    return f"""Earlier in the interview (summary):
{summary}

Recent conversation:
{recent_text}"""

Interview Template System

The final piece is a structured question bank that lets interviewers define not just questions, but question trees — where the follow-up depends on the candidate’s answer.

@dataclass
class Question:
    question_id: str
    text: str
    competency: str
    difficulty: int  # 1-4, maps to role levels
    follow_ups: Dict[str, List[str]]  # "good_answer": [...], "weak_answer": [...]
    expected_signals: List[str]  # What a good answer should contain
    red_flags: List[str]  # What would be concerning to hear

@dataclass
class InterviewTemplate:
    template_id: str
    role_id: str
    candidate_level: int  # 1-4
    opening_question: str
    question_bank: List[Question]
    required_competencies: List[str]  # Must be covered
    time_budget_minutes: int

def select_next_question(
    template: InterviewTemplate,
    covered_competencies: List[str],
    time_remaining_minutes: int,
    candidate_performance: Dict[str, float]  # competency → score so far
) -> Optional[Question]:
    """
    Dynamically select the next question based on:
    - What competencies haven't been covered yet
    - Time remaining
    - Candidate performance so far (probe weak areas more)
    """

    uncovered_required = [
        c for c in template.required_competencies
        if c not in covered_competencies
    ]

    # If we're running short on time, prioritize required competencies
    if time_remaining_minutes < 5 and uncovered_required:
        # Find a question for the first uncovered required competency
        target_competency = uncovered_required[0]
        candidates = [
            q for q in template.question_bank
            if q.competency == target_competency
            and q.competency not in covered_competencies
        ]
        return candidates[0] if candidates else None

    # Otherwise, probe weak areas — find the lowest-scoring competency
    if candidate_performance:
        weakest_competency = min(
            candidate_performance.items(),
            key=lambda x: x[1]
        )[0]

        weak_questions = [
            q for q in template.question_bank
            if q.competency == weakest_competency
            and q.competency not in covered_competencies
        ]

        if weak_questions:
            return weak_questions[0]

    # Fall back to uncovered required competencies, then optional ones
    uncovered = [
        q for q in template.question_bank
        if q.competency not in covered_competencies
    ]

    return uncovered[0] if uncovered else None

This dynamic selection means the interview adapts in real-time. If a candidate demonstrates strong system design skills early, the system spends less time on that competency and probes more on areas that haven’t been assessed. If a candidate is clearly struggling with a topic, the system can adjust the difficulty of follow-up questions.

Putting It Together

By the time we’re done with Part 6, the voice agent has:

  • A vector database with role-specific rubrics, job descriptions, and reference knowledge indexed by competency and requirement
  • Sub-50ms retrieval using pre-warming, query caching, and tuned ANN search parameters
  • Hybrid search that catches exact technical terms that pure vector search might miss
  • Redis session state tracking conversation history, accumulated scores, and timing data
  • Context window management that keeps each LLM call under 2500 tokens
  • A dynamic question selection system that adapts to candidate performance in real-time

The difference in interview quality is immediately noticeable. The agent now follows up on “I’ve worked with microservices” with questions specific to the competency level we’re hiring for, scores answers against the actual rubric rather than generic best practices, and adapts the interview flow based on what it has learned so far.

In Part 7, we move to the client side — building the web and mobile interfaces that connect candidates to the voice agent. We’ll cover WebRTC from the browser, handling connection interruptions gracefully, building the candidate-facing UI with real-time transcript display, and adapting the experience for mobile devices where audio behavior is different from desktop.


This is Part 6 of a 12-part series: The Voice AI Interview Playbook.

Series outline:

  1. Why Real-Time Voice Changes Everything — The landscape, the vision, and the reference architecture (Part 1)
  2. Cascaded vs. Speech-to-Speech — Choosing your pipeline architecture (Part 2)
  3. LiveKit vs. Pipecat vs. Direct — Picking your framework (Part 3)
  4. STT, LLM, and TTS That Actually Work — Building the voice pipeline (Part 4)
  5. Multi-Role Agents — Interviewer, coach, and evaluator personas (Part 5)
  6. Knowledge Base and RAG — Making your voice agent an expert (this post)
  7. Web and Mobile Clients — Cross-platform voice experiences (Part 7)
  8. Video Interview Integration — Multimodal analysis with Gemini Live (Part 8)
  9. Recording, Transcription, and Compliance — GDPR, HIPAA, and getting it right (Part 9)
  10. Scaling to Thousands — Architecture for concurrent voice sessions (Part 10)
  11. Cost Optimization — From $0.14/min to $0.03/min (Part 11)
  12. Multi-Provider Support — OpenAI Realtime, Bedrock Nova, Grok, and the adapter pattern (Part 12)
Export for reading

Comments