In Part 5, we built the personas — the interviewer who asks probing questions, the coach who gives feedback, the evaluator who scores objectively. But here’s the uncomfortable truth I ran into about three weeks into the project: my beautifully crafted personas were confidently saying wrong things.
The interviewer asked a senior DevOps candidate about “deploying containers” and didn’t follow up with questions about orchestration, service mesh, or infrastructure-as-code. The evaluator scored a candidate’s answer about Kubernetes RBAC as “basic” when it was actually textbook-perfect. The coach gave generic advice about “being more specific” when the candidate had just given a near-perfect answer.
The problem wasn’t the personas. The problem was that GPT-4o’s general training data doesn’t know my company’s specific rubrics, the exact skills we care about for each role, or the depth we expect at each seniority level. The LLM is a generalist. My interview system needed to be a specialist.
That’s what RAG — Retrieval-Augmented Generation — solves. Instead of hoping the model happens to know your specific evaluation criteria, you retrieve exactly what it needs, exactly when it needs it, and inject it into the context. This part covers how to build that retrieval system fast enough to work in a real-time voice conversation.
Why Voice Agents Need RAG
Generic LLM knowledge fails in structured interviews for three concrete reasons.
Role-specificity. A “senior backend engineer” at a fintech startup means something very different than at a FAANG company or a healthcare SaaS. The depth of distributed systems knowledge, the tolerance for monoliths, the expectation of regulatory familiarity — all of these are company and role specific. No base model can know your specific hiring bar.
Rubric consistency. When you’re using AI to score candidates, you need every candidate evaluated against the same rubric. If the LLM improvises the scoring criteria based on what feels right, you get inconsistent results that won’t survive an audit and can introduce bias. The rubric needs to come from a source of truth, not from the model’s vibes.
Technical currency. The voice agent needs to know about technologies that may postdate its training cutoff. If you’re hiring for a role that uses a framework that was released 18 months ago, the model might have limited or outdated knowledge about it. RAG lets you inject current documentation and context.
The architecture I landed on has three layers:
- A vector database for semantic search across rubrics, job descriptions, and reference answers
- Redis for session state — conversation history, scores accumulated so far, candidate profile data
- A hybrid search layer that combines vector similarity with keyword matching for technical terms
All of this needs to happen in under 50ms from the moment the candidate stops speaking to the moment the LLM starts generating its response. Let me show you how to get there.
Vector Database Selection
I evaluated three options seriously before picking one.
| Database | Latency (p99) | Cost (1M vectors) | Highlights | Weaknesses |
|---|---|---|---|---|
| Pinecone Serverless | ~15ms | ~$0.10/month | Managed, fast cold start, no infra | Vendor lock-in, limited filtering |
| Weaviate Cloud | ~20ms | ~$25/month | Hybrid search built-in, open source | More complex setup |
| pgvector | ~5ms (local) | Postgres cost | Simple, SQL-native, no extra infra | Scales poorly past ~1M vectors |
| Qdrant | ~10ms | Self-hosted | Great filtering, open source | Need to manage infrastructure |
For a voice interview system at the scale I was building (thousands of candidates per month, dozens of job roles), Pinecone Serverless won. The managed service means zero ops overhead, the serverless tier means I’m not paying for idle capacity, and 15ms p99 latency leaves plenty of headroom in my 50ms budget.
If you’re self-hosting or have a smaller scale, pgvector on your existing Postgres is genuinely good enough and removes an external dependency. For anything requiring complex filtering (show me rubrics tagged “backend” AND “senior” AND “distributed-systems”), Qdrant’s payload filtering is excellent.
Here’s how I initialize the Pinecone client and set up the index:
import pinecone
from pinecone import ServerlessSpec
pc = pinecone.Pinecone(api_key=os.environ["PINECONE_API_KEY"])
# Create index if it doesn't exist
if "interview-knowledge" not in pc.list_indexes().names():
pc.create_index(
name="interview-knowledge",
dimension=1536, # text-embedding-3-small dimensions
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
index = pc.Index("interview-knowledge")
Embedding Models
The embedding model determines the quality of your semantic search. There are three realistic options:
OpenAI text-embedding-3-small is what I use in production. 1536 dimensions, $0.02 per million tokens, and the quality is excellent for English technical content. The “small” variant was released alongside a “large” variant (3072 dimensions) but in my testing the quality difference for interview content wasn’t worth the 2x cost and latency increase.
Cohere embed-v3 is a strong alternative with built-in support for specifying the embed type (search_query vs search_document), which actually helps retrieval quality. If you’re embedding documents vs queries, Cohere’s API makes that explicit. I found it about 10% more expensive than OpenAI’s small model with similar quality.
Voyage AI is specialized for code-heavy content and technical documentation. If you’re doing a lot of retrieval on code samples, architecture diagrams described in text, or technical reference docs, Voyage’s code-aware models are worth evaluating.
For dimensionality: higher dimensions generally mean better quality but more storage and slower retrieval. For interview content (rubrics, job descriptions, Q&A pairs), 1536 dimensions is the sweet spot. I haven’t seen meaningful quality gains going to 3072 for this use case.
Here’s a simple embedding utility:
from openai import AsyncOpenAI
from typing import List
client = AsyncOpenAI()
async def embed_text(text: str) -> List[float]:
"""Embed a single text string."""
response = await client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
async def embed_batch(texts: List[str]) -> List[List[float]]:
"""Embed multiple texts in a single API call."""
response = await client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [item.embedding for item in response.data]
Chunking Strategies for Interview Content
This is where most RAG implementations go wrong. Generic chunking — split every 512 tokens with 50-token overlap — works okay for long documents. It works badly for structured content like rubrics and job descriptions, where the semantic unit is a competency or a requirement, not a fixed token count.
I developed three different chunking strategies for the three main content types.
Rubric Chunking (by competency)
Interview rubrics are structured around competencies. Each competency is a self-contained evaluation unit with a name, description, and scoring criteria at each level. Chunking at the competency boundary keeps each chunk semantically coherent.
import json
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class RubricChunk:
role_id: str
role_name: str
competency_name: str
competency_description: str
level_descriptors: Dict[str, str] # "1": "...", "2": "...", etc.
weight: float # importance of this competency, 0-1
chunk_id: str
def chunk_rubric(rubric: Dict[str, Any]) -> List[RubricChunk]:
"""
Chunk a rubric by competency. Each competency becomes one chunk.
Expected rubric format:
{
"role_id": "swe-senior",
"role_name": "Senior Software Engineer",
"competencies": [
{
"name": "System Design",
"description": "Ability to design scalable, maintainable systems",
"weight": 0.3,
"levels": {
"1": "Can describe basic client-server architecture",
"2": "Understands CAP theorem, designs for availability vs consistency",
"3": "Designs distributed systems with appropriate trade-offs",
"4": "Architects for scale, owns technical vision"
}
},
...
]
}
"""
chunks = []
role_id = rubric["role_id"]
role_name = rubric["role_name"]
for i, competency in enumerate(rubric["competencies"]):
chunk_id = f"{role_id}__competency__{competency['name'].lower().replace(' ', '_')}"
chunk = RubricChunk(
role_id=role_id,
role_name=role_name,
competency_name=competency["name"],
competency_description=competency["description"],
level_descriptors=competency["levels"],
weight=competency.get("weight", 1.0 / len(rubric["competencies"])),
chunk_id=chunk_id
)
chunks.append(chunk)
return chunks
def rubric_chunk_to_text(chunk: RubricChunk) -> str:
"""
Convert a rubric chunk to a text string for embedding.
The text representation matters for retrieval quality.
"""
lines = [
f"Role: {chunk.role_name}",
f"Competency: {chunk.competency_name}",
f"Description: {chunk.competency_description}",
"",
"Scoring levels:"
]
for level, descriptor in sorted(chunk.level_descriptors.items()):
lines.append(f" Level {level}: {descriptor}")
return "\n".join(lines)
Job Description Chunking (by requirement)
Job descriptions are chunked by requirement section: responsibilities, required qualifications, preferred qualifications, and cultural expectations. Each section is further split by bullet point if it’s long.
@dataclass
class JDChunk:
job_id: str
job_title: str
section: str # "responsibilities", "required", "preferred", "culture"
content: str
chunk_id: str
def chunk_job_description(jd: Dict[str, Any]) -> List[JDChunk]:
"""
Chunk a job description by section, then by requirement within section.
"""
chunks = []
job_id = jd["job_id"]
job_title = jd["title"]
sections = {
"responsibilities": jd.get("responsibilities", []),
"required": jd.get("required_qualifications", []),
"preferred": jd.get("preferred_qualifications", []),
"culture": jd.get("cultural_expectations", [])
}
for section_name, items in sections.items():
if not items:
continue
# Group items into chunks of ~3-5 items to keep chunks focused
group_size = 4
for group_idx in range(0, len(items), group_size):
group = items[group_idx:group_idx + group_size]
content = f"Job: {job_title}\nSection: {section_name}\n\n"
content += "\n".join(f"- {item}" for item in group)
chunk_id = f"{job_id}__{section_name}__{group_idx // group_size}"
chunks.append(JDChunk(
job_id=job_id,
job_title=job_title,
section=section_name,
content=content,
chunk_id=chunk_id
))
return chunks
Technical Documentation Chunking (by concept)
For technical reference docs — API documentation, architecture decision records, domain knowledge bases — I use a concept-aware chunker that respects heading boundaries.
import re
def chunk_technical_doc(
content: str,
doc_id: str,
doc_title: str,
max_chunk_tokens: int = 400
) -> List[Dict[str, Any]]:
"""
Chunk technical documentation by heading boundaries.
Falls back to paragraph-level chunking for long sections.
"""
# Split by markdown headings (## or ###)
heading_pattern = re.compile(r'^(#{1,3})\s+(.+)$', re.MULTILINE)
sections = []
last_pos = 0
current_heading = doc_title
for match in heading_pattern.finditer(content):
# Save previous section
section_content = content[last_pos:match.start()].strip()
if section_content:
sections.append({
"heading": current_heading,
"content": section_content
})
current_heading = match.group(2)
last_pos = match.end()
# Don't forget the last section
remaining = content[last_pos:].strip()
if remaining:
sections.append({
"heading": current_heading,
"content": remaining
})
# Convert sections to chunks, splitting long ones
chunks = []
for section in sections:
full_text = f"{section['heading']}\n\n{section['content']}"
# Rough token estimate: 1 token ≈ 4 chars
if len(full_text) / 4 <= max_chunk_tokens:
chunks.append({
"chunk_id": f"{doc_id}__{section['heading'].lower().replace(' ', '_')}",
"doc_id": doc_id,
"doc_title": doc_title,
"heading": section["heading"],
"content": full_text
})
else:
# Split long sections by paragraph
paragraphs = section["content"].split("\n\n")
for para_idx, para in enumerate(paragraphs):
if para.strip():
chunk_text = f"{section['heading']}\n\n{para}"
chunks.append({
"chunk_id": f"{doc_id}__{section['heading'].lower().replace(' ', '_')}__{para_idx}",
"doc_id": doc_id,
"doc_title": doc_title,
"heading": section["heading"],
"content": chunk_text
})
return chunks
Indexing the Chunks
Once chunked, everything goes into Pinecone with metadata that allows filtered retrieval:
async def index_rubric_chunks(
chunks: List[RubricChunk],
index: pinecone.Index
):
"""Index rubric chunks into Pinecone with metadata."""
texts = [rubric_chunk_to_text(c) for c in chunks]
embeddings = await embed_batch(texts)
vectors = []
for chunk, embedding in zip(chunks, embeddings):
vectors.append({
"id": chunk.chunk_id,
"values": embedding,
"metadata": {
"type": "rubric",
"role_id": chunk.role_id,
"role_name": chunk.role_name,
"competency_name": chunk.competency_name,
"weight": chunk.weight,
"text": rubric_chunk_to_text(chunk) # Store text for retrieval
}
})
# Upsert in batches of 100
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
print(f"Indexed {len(vectors)} rubric chunks")
Real-Time RAG During Conversation
The 50ms retrieval budget isn’t arbitrary. In a voice conversation, the pipeline looks like this:
- Audio arrives and STT processes it — ~100-200ms
- We detect end-of-turn and trigger retrieval — this is the 50ms window
- Retrieval results augment the LLM context — context building ~20ms
- LLM generates first token — ~100-300ms depending on model and prompt length
- TTS begins converting the first sentence — ~100ms
Total time to first audio byte: roughly 400-700ms. That’s acceptable. If retrieval takes 200ms, we’re at 600-900ms which starts to feel sluggish in a conversation context.
There are three techniques I use to stay under 50ms:
Pre-warming. At the start of each interview session, I pre-fetch and cache all the rubric chunks for the target role. These don’t change during the interview, so there’s no reason to retrieve them on every turn.
Query prediction. While the candidate is still speaking (during STT), I start predicting what the next retrieval query is likely to be based on the conversation history. This is speculative — I abort if the prediction is wrong — but it shaves 10-15ms off the critical path.
ANN tuning. Approximate Nearest Neighbor search (what Pinecone uses) has a quality/speed tradeoff controlled by the top_k and nprobe parameters. For real-time retrieval I use top_k=5 instead of top_k=10 — in my testing, the 6th-10th results rarely improve LLM output quality enough to justify the extra latency.
Here’s the retrieval pipeline:
import asyncio
import time
from typing import Optional, List, Dict, Any
class InterviewRAGPipeline:
def __init__(
self,
pinecone_index: pinecone.Index,
redis_client,
session_id: str,
role_id: str
):
self.index = pinecone_index
self.redis = redis_client
self.session_id = session_id
self.role_id = role_id
self._rubric_cache: Optional[List[Dict]] = None
async def warm_cache(self):
"""Pre-fetch rubric chunks for this role at session start."""
start = time.perf_counter()
# Fetch all rubric chunks for this role
results = self.index.query(
vector=[0.0] * 1536, # Dummy vector — we're using filter only
top_k=50,
filter={"type": "rubric", "role_id": self.role_id},
include_metadata=True
)
self._rubric_cache = [
match.metadata for match in results.matches
]
elapsed = (time.perf_counter() - start) * 1000
print(f"Rubric cache warmed: {len(self._rubric_cache)} chunks in {elapsed:.1f}ms")
async def retrieve_for_turn(
self,
candidate_utterance: str,
conversation_context: str,
retrieval_types: List[str] = ["rubric", "jd", "reference"]
) -> Dict[str, Any]:
"""
Retrieve relevant context for the current conversation turn.
Target: < 50ms total.
"""
start = time.perf_counter()
# Build retrieval query from utterance + recent context
query = f"{candidate_utterance}\n\nConversation context: {conversation_context[-500:]}"
# Run embedding and vector search concurrently
query_embedding, cached_result = await asyncio.gather(
embed_text(query),
self._check_query_cache(query)
)
if cached_result:
elapsed = (time.perf_counter() - start) * 1000
print(f"Cache hit — retrieval in {elapsed:.1f}ms")
return cached_result
# Vector search with type filter
filter_condition = {
"$or": [{"type": t} for t in retrieval_types],
"$and": [{"role_id": {"$in": [self.role_id, "global"]}}]
}
results = self.index.query(
vector=query_embedding,
top_k=5,
filter=filter_condition,
include_metadata=True
)
retrieved = {
"rubric_chunks": [],
"jd_chunks": [],
"reference_chunks": []
}
for match in results.matches:
chunk_type = match.metadata.get("type", "reference")
if chunk_type == "rubric":
retrieved["rubric_chunks"].append(match.metadata["text"])
elif chunk_type == "jd":
retrieved["jd_chunks"].append(match.metadata["content"])
else:
retrieved["reference_chunks"].append(match.metadata["content"])
# Cache this result for 5 minutes
await self._cache_query_result(query, retrieved, ttl=300)
elapsed = (time.perf_counter() - start) * 1000
print(f"Vector retrieval in {elapsed:.1f}ms")
return retrieved
async def _check_query_cache(self, query: str) -> Optional[Dict]:
"""Check Redis for a cached retrieval result."""
cache_key = f"rag_cache:{hash(query)}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
async def _cache_query_result(
self, query: str, result: Dict, ttl: int = 300
):
"""Cache a retrieval result in Redis."""
cache_key = f"rag_cache:{hash(query)}"
await self.redis.setex(cache_key, ttl, json.dumps(result))
Redis for Session State
Every interview session accumulates state that needs to persist across turns and be accessible in under 1ms: the conversation history, scores accumulated so far, timing data, and candidate profile. Redis is the right tool for this — it’s in-memory, has rich data structures, and the latency profile is consistent.
I use four Redis data structures per session:
import redis.asyncio as aioredis
import json
from datetime import datetime
class InterviewSessionStore:
"""
Redis schema for a single interview session.
Keys:
- session:{id}:meta → Hash: role_id, candidate_id, start_time, status
- session:{id}:history → List: conversation turns (JSON)
- session:{id}:scores → Hash: competency_name → running score
- session:{id}:timing → Hash: question timings, pause durations, total_time
"""
def __init__(self, redis_client: aioredis.Redis, session_id: str):
self.redis = redis_client
self.session_id = session_id
self.key_prefix = f"session:{session_id}"
async def initialize(
self,
role_id: str,
candidate_id: str,
candidate_level: str
):
"""Initialize a new session."""
await self.redis.hset(
f"{self.key_prefix}:meta",
mapping={
"role_id": role_id,
"candidate_id": candidate_id,
"candidate_level": candidate_level,
"start_time": datetime.utcnow().isoformat(),
"status": "active",
"turn_count": 0
}
)
# Set TTL of 4 hours — interviews shouldn't last longer
await self.redis.expire(f"{self.key_prefix}:meta", 14400)
async def add_turn(
self,
role: str, # "interviewer" | "candidate"
content: str,
metadata: Dict[str, Any] = None
):
"""Append a conversation turn to history."""
turn = {
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
**(metadata or {})
}
# Push to list, keep last 100 turns
key = f"{self.key_prefix}:history"
await self.redis.rpush(key, json.dumps(turn))
# Trim to last 100 turns to prevent unbounded growth
await self.redis.ltrim(key, -100, -1)
# Increment turn counter
await self.redis.hincrby(f"{self.key_prefix}:meta", "turn_count", 1)
async def get_recent_history(self, n_turns: int = 10) -> List[Dict]:
"""Get the N most recent conversation turns."""
key = f"{self.key_prefix}:history"
raw_turns = await self.redis.lrange(key, -n_turns, -1)
return [json.loads(t) for t in raw_turns]
async def update_score(
self,
competency: str,
new_observation: float,
weight: float = 1.0
):
"""
Update running score for a competency using exponential moving average.
This handles multiple observations of the same competency gracefully.
"""
score_key = f"{self.key_prefix}:scores"
current = await self.redis.hget(score_key, competency)
if current is None:
# First observation
new_score = new_observation
else:
current_data = json.loads(current)
# EMA with alpha=0.4 — recent observations weighted more
alpha = 0.4
new_score = alpha * new_observation + (1 - alpha) * current_data["score"]
await self.redis.hset(
score_key,
competency,
json.dumps({
"score": new_score,
"weight": weight,
"observation_count": 1 + (
json.loads(current)["observation_count"] if current else 0
)
})
)
async def get_all_scores(self) -> Dict[str, Dict]:
"""Get all accumulated scores for the session."""
score_key = f"{self.key_prefix}:scores"
raw = await self.redis.hgetall(score_key)
return {k: json.loads(v) for k, v in raw.items()}
async def record_timing(self, event: str, duration_ms: float):
"""Record timing data for analytics."""
timing_key = f"{self.key_prefix}:timing"
await self.redis.hset(timing_key, event, duration_ms)
Hybrid Search
Here’s a problem I didn’t anticipate: pure vector search sometimes misses exact technical terms. If a candidate mentions “SOLID principles” and I’m searching for “system design best practices”, the vector similarity might be 0.7 — good enough to retrieve — but if a candidate mentions “ACID transactions” and I’m searching for “database reliability”, the vector similarity might only be 0.5 and the chunk gets ranked below less relevant results.
Technical interviews are full of exact terms that have specific meanings: Kubernetes, SOLID, CAP theorem, CQRS, event sourcing, two-phase commit. These terms are precise, and candidates expect the interviewer to understand exactly what they mean when they use them.
The solution is hybrid search — combining vector similarity with BM25 keyword matching. Weaviate has this built in; for Pinecone I implement it as a two-stage retrieval:
from rank_bm25 import BM25Okapi
import numpy as np
class HybridSearcher:
def __init__(
self,
pinecone_index: pinecone.Index,
all_chunks: List[Dict[str, Any]]
):
self.index = pinecone_index
self.chunks = all_chunks
# Build BM25 index over all chunks
tokenized = [
chunk["content"].lower().split()
for chunk in all_chunks
]
self.bm25 = BM25Okapi(tokenized)
self.chunk_ids = [chunk["chunk_id"] for chunk in all_chunks]
async def search(
self,
query: str,
top_k: int = 5,
alpha: float = 0.6 # 0.6 vector, 0.4 BM25
) -> List[Dict[str, Any]]:
"""
Hybrid search combining vector similarity and BM25.
alpha=1.0 → pure vector, alpha=0.0 → pure BM25
"""
# Vector search
query_embedding = await embed_text(query)
vector_results = self.index.query(
vector=query_embedding,
top_k=top_k * 3, # Get more candidates for re-ranking
include_metadata=True
)
# Build vector score dict
vector_scores = {
match.id: match.score
for match in vector_results.matches
}
# BM25 search
tokenized_query = query.lower().split()
bm25_scores_raw = self.bm25.get_scores(tokenized_query)
# Normalize BM25 scores to [0, 1]
bm25_max = bm25_scores_raw.max()
if bm25_max > 0:
bm25_scores_normalized = bm25_scores_raw / bm25_max
else:
bm25_scores_normalized = bm25_scores_raw
bm25_scores = {
self.chunk_ids[i]: float(bm25_scores_normalized[i])
for i in range(len(self.chunk_ids))
}
# Combine scores
all_chunk_ids = set(vector_scores.keys()) | set(bm25_scores.keys())
combined_scores = {}
for chunk_id in all_chunk_ids:
v_score = vector_scores.get(chunk_id, 0.0)
b_score = bm25_scores.get(chunk_id, 0.0)
combined_scores[chunk_id] = alpha * v_score + (1 - alpha) * b_score
# Sort and return top-k
ranked = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
# Fetch full chunk data for top results
results = []
chunk_lookup = {c["chunk_id"]: c for c in self.chunks}
for chunk_id, score in ranked:
if chunk_id in chunk_lookup:
results.append({
"chunk": chunk_lookup[chunk_id],
"score": score
})
return results
In practice I use alpha=0.7 (favoring vector) for general context retrieval and alpha=0.4 (favoring BM25) when the query contains obvious technical terms. I detect technical terms with a simple keyword list — if the query contains any known technical terms, I shift toward BM25.
Knowledge Base Management
The RAG system is only as good as what’s in it, and that means building an admin interface for managing rubrics, question banks, and interview templates. This is often an afterthought in RAG implementations but it becomes critical when you’re onboarding new roles or updating evaluation criteria.
The key operations the admin interface needs to support:
Upload and parse rubrics. Accept YAML or JSON rubric files, validate the schema, chunk them, and index them. Show a preview of what chunks would be created before committing.
Version control for templates. Interview templates change — you refine the scoring criteria, add new competencies, deprecate old questions. The system needs to track which version of a rubric was used for each interview, because you might need to re-evaluate historical sessions against a new rubric.
Preview retrieval quality. Before deploying a new rubric to production, you want to test: “given this candidate utterance, what would the system retrieve?” This is a developer-facing feature but it saves enormous debugging time.
class KnowledgeBaseManager:
def __init__(
self,
pinecone_index: pinecone.Index,
postgres_db # For version control and metadata
):
self.index = pinecone_index
self.db = postgres_db
async def upload_rubric(
self,
rubric: Dict[str, Any],
uploaded_by: str,
replace_existing: bool = False
) -> Dict[str, Any]:
"""Upload and index a rubric, with version tracking."""
role_id = rubric["role_id"]
# Check for existing version
existing = await self.db.fetchrow(
"SELECT version FROM rubric_versions WHERE role_id = $1 ORDER BY version DESC LIMIT 1",
role_id
)
version = (existing["version"] + 1) if existing else 1
# Chunk the rubric
chunks = chunk_rubric(rubric)
if replace_existing:
# Delete old vectors for this role
self.index.delete(
filter={"type": "rubric", "role_id": role_id}
)
# Index new chunks
await index_rubric_chunks(chunks, self.index)
# Record version in postgres
await self.db.execute(
"""
INSERT INTO rubric_versions
(role_id, version, rubric_data, chunk_count, uploaded_by, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
""",
role_id, version, json.dumps(rubric), len(chunks), uploaded_by
)
return {
"role_id": role_id,
"version": version,
"chunks_indexed": len(chunks),
"status": "success"
}
async def preview_retrieval(
self,
query: str,
role_id: str,
top_k: int = 5
) -> List[Dict[str, Any]]:
"""Preview what would be retrieved for a given query."""
query_embedding = await embed_text(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter={"role_id": role_id},
include_metadata=True
)
return [
{
"score": match.score,
"type": match.metadata.get("type"),
"competency": match.metadata.get("competency_name"),
"preview": match.metadata.get("text", "")[:300]
}
for match in results.matches
]
Context Window Optimization
Every token you put in the context costs money and adds latency. A typical GPT-4o call with 4000 tokens of context costs ~$0.01 and takes ~200ms. A call with 12000 tokens costs ~$0.03 and takes ~400ms. Over thousands of interviews, that difference compounds.
The key insight is that not all retrieved context is equal. I structure the LLM context in layers:
Static system prompt (~800 tokens): Role description, core interviewing instructions, output format requirements. This never changes during the session.
Rubric context (~400 tokens): The top 3 most relevant competency chunks for the current question. Retrieved fresh each turn.
Session state (~300 tokens): Current question number, competencies covered, scores so far. A structured summary, not raw history.
Conversation history (~600 tokens): Last 5 turns verbatim. Older turns are summarized.
Retrieved context (~400 tokens): JD requirements and reference material relevant to the current turn.
Total: ~2500 tokens per call. That’s my target. Going much higher than this starts to hurt both latency and cost meaningfully.
For managing conversation history in long interviews, I use a sliding window with summarization:
async def build_conversation_context(
session_store: InterviewSessionStore,
max_verbatim_turns: int = 5,
summary_model: str = "gpt-4o-mini"
) -> str:
"""
Build conversation context with sliding window + summarization.
Keeps last N turns verbatim, summarizes older turns.
"""
# Get all history
all_history = await session_store.get_recent_history(n_turns=50)
if len(all_history) <= max_verbatim_turns:
# Short interview — return everything verbatim
turns_text = "\n".join([
f"{t['role'].upper()}: {t['content']}"
for t in all_history
])
return f"Conversation history:\n{turns_text}"
# Split into old (to summarize) and recent (verbatim)
old_turns = all_history[:-max_verbatim_turns]
recent_turns = all_history[-max_verbatim_turns:]
# Summarize old turns
old_text = "\n".join([
f"{t['role'].upper()}: {t['content']}"
for t in old_turns
])
summary_response = await client.chat.completions.create(
model=summary_model,
messages=[{
"role": "user",
"content": f"""Summarize this interview conversation in 3-4 sentences.
Focus on: topics covered, candidate strengths/weaknesses observed,
competencies assessed so far.
Conversation:
{old_text}"""
}],
max_tokens=200
)
summary = summary_response.choices[0].message.content
# Build recent history text
recent_text = "\n".join([
f"{t['role'].upper()}: {t['content']}"
for t in recent_turns
])
return f"""Earlier in the interview (summary):
{summary}
Recent conversation:
{recent_text}"""
Interview Template System
The final piece is a structured question bank that lets interviewers define not just questions, but question trees — where the follow-up depends on the candidate’s answer.
@dataclass
class Question:
question_id: str
text: str
competency: str
difficulty: int # 1-4, maps to role levels
follow_ups: Dict[str, List[str]] # "good_answer": [...], "weak_answer": [...]
expected_signals: List[str] # What a good answer should contain
red_flags: List[str] # What would be concerning to hear
@dataclass
class InterviewTemplate:
template_id: str
role_id: str
candidate_level: int # 1-4
opening_question: str
question_bank: List[Question]
required_competencies: List[str] # Must be covered
time_budget_minutes: int
def select_next_question(
template: InterviewTemplate,
covered_competencies: List[str],
time_remaining_minutes: int,
candidate_performance: Dict[str, float] # competency → score so far
) -> Optional[Question]:
"""
Dynamically select the next question based on:
- What competencies haven't been covered yet
- Time remaining
- Candidate performance so far (probe weak areas more)
"""
uncovered_required = [
c for c in template.required_competencies
if c not in covered_competencies
]
# If we're running short on time, prioritize required competencies
if time_remaining_minutes < 5 and uncovered_required:
# Find a question for the first uncovered required competency
target_competency = uncovered_required[0]
candidates = [
q for q in template.question_bank
if q.competency == target_competency
and q.competency not in covered_competencies
]
return candidates[0] if candidates else None
# Otherwise, probe weak areas — find the lowest-scoring competency
if candidate_performance:
weakest_competency = min(
candidate_performance.items(),
key=lambda x: x[1]
)[0]
weak_questions = [
q for q in template.question_bank
if q.competency == weakest_competency
and q.competency not in covered_competencies
]
if weak_questions:
return weak_questions[0]
# Fall back to uncovered required competencies, then optional ones
uncovered = [
q for q in template.question_bank
if q.competency not in covered_competencies
]
return uncovered[0] if uncovered else None
This dynamic selection means the interview adapts in real-time. If a candidate demonstrates strong system design skills early, the system spends less time on that competency and probes more on areas that haven’t been assessed. If a candidate is clearly struggling with a topic, the system can adjust the difficulty of follow-up questions.
Putting It Together
By the time we’re done with Part 6, the voice agent has:
- A vector database with role-specific rubrics, job descriptions, and reference knowledge indexed by competency and requirement
- Sub-50ms retrieval using pre-warming, query caching, and tuned ANN search parameters
- Hybrid search that catches exact technical terms that pure vector search might miss
- Redis session state tracking conversation history, accumulated scores, and timing data
- Context window management that keeps each LLM call under 2500 tokens
- A dynamic question selection system that adapts to candidate performance in real-time
The difference in interview quality is immediately noticeable. The agent now follows up on “I’ve worked with microservices” with questions specific to the competency level we’re hiring for, scores answers against the actual rubric rather than generic best practices, and adapts the interview flow based on what it has learned so far.
In Part 7, we move to the client side — building the web and mobile interfaces that connect candidates to the voice agent. We’ll cover WebRTC from the browser, handling connection interruptions gracefully, building the candidate-facing UI with real-time transcript display, and adapting the experience for mobile devices where audio behavior is different from desktop.
This is Part 6 of a 12-part series: The Voice AI Interview Playbook.
Series outline:
- Why Real-Time Voice Changes Everything — The landscape, the vision, and the reference architecture (Part 1)
- Cascaded vs. Speech-to-Speech — Choosing your pipeline architecture (Part 2)
- LiveKit vs. Pipecat vs. Direct — Picking your framework (Part 3)
- STT, LLM, and TTS That Actually Work — Building the voice pipeline (Part 4)
- Multi-Role Agents — Interviewer, coach, and evaluator personas (Part 5)
- Knowledge Base and RAG — Making your voice agent an expert (this post)
- Web and Mobile Clients — Cross-platform voice experiences (Part 7)
- Video Interview Integration — Multimodal analysis with Gemini Live (Part 8)
- Recording, Transcription, and Compliance — GDPR, HIPAA, and getting it right (Part 9)
- Scaling to Thousands — Architecture for concurrent voice sessions (Part 10)
- Cost Optimization — From $0.14/min to $0.03/min (Part 11)
- Multi-Provider Support — OpenAI Realtime, Bedrock Nova, Grok, and the adapter pattern (Part 12)