The conversation ends. The respondent hangs up. The AI agent disconnects from the LiveKit room. And then the real work begins.

A 30-minute research interview produces a raw audio recording — typically 15-25MB of Ogg/Opus at 48kHz. That recording is useless to a researcher in its raw form. They cannot search it, compare it to other sessions, or extract themes from it. What they need is structured data: timestamped transcripts, per-utterance sentiment, extracted themes, topic coverage maps, and cross-session comparison grids.

We built a fully automatic pipeline that transforms a raw recording into queryable research data in 3-7 minutes, with zero human intervention. Three stages: transcription, enrichment, analysis. Each stage has its own queue, its own failure handling, and its own performance characteristics. This post walks through all of it — including the transcript batching trick that cut our database write load by 80%.

Why Post-Processing Matters More in Research

In most voice AI applications — customer service, appointment booking, sales calls — post-processing means “save the transcript and maybe generate a summary.” The transcript is a record of what happened. Nice to have, not mission-critical.

Research is different. The post-processed data is the product. Researchers are not building a chatbot — they are building a dataset. Every session contributes to a qualitative corpus that needs to support:

  • Cross-session theme comparison. “How many respondents mentioned charging infrastructure as a barrier? What did they say exactly?”
  • Respondent grids. A matrix view: respondent x topic, with sentiment and key quotes per cell.
  • Per-phase analysis. “What themes emerged during the probing phase that were not mentioned during exploration?”
  • Longitudinal tracking. “How did attitudes shift between wave 1 (January) and wave 2 (March)?”
  • Verbatim search. “Find every instance where a respondent used the word ‘trust’ in the context of brand perception.”

This is not text search over transcripts. It requires structured metadata — sentiment scores, topic tags, phase labels, speaker identification — attached to every segment of the conversation. Building that metadata automatically is what the pipeline does.

Stage 1: Transcription

The first stage takes the raw audio recording and produces a timestamped, speaker-labeled transcript. We use OpenAI’s Whisper API — specifically the whisper-1 model via the transcription endpoint with timestamp_granularities=["segment"] for segment-level timestamps.

Why Whisper over other ASR options? Three reasons. First, accuracy: Whisper consistently scores highest on our research audio, which includes diverse accents, overlapping speech, and domain-specific vocabulary. Second, language coverage: research studies often run in multiple languages, and Whisper handles 57+ languages without needing separate models. Third, cost: at $0.006/minute, a 30-minute interview costs $0.18 to transcribe — trivial compared to the S2S model cost during the live session.

The Hallucination Problem

ASR hallucination is a real issue that most tutorials ignore. When there is silence in the audio — pauses, dead air, the respondent thinking — Whisper sometimes generates phantom text. Common hallucinations include “Thank you for watching,” “Please subscribe,” “Music playing,” and repetitions of the last spoken phrase. In a research transcript, these phantom entries corrupt the data.

We filter hallucinations with a combination of heuristics:

import re
from dataclasses import dataclass


@dataclass
class TranscriptSegment:
    start_time: float
    end_time: float
    text: str
    speaker: str = ""  # Assigned in post-processing
    confidence: float = 1.0


# Patterns that indicate ASR hallucination during silence
HALLUCINATION_PATTERNS = [
    re.compile(r"^(thank you for watching|thanks for watching)", re.IGNORECASE),
    re.compile(r"^(please subscribe|like and subscribe)", re.IGNORECASE),
    re.compile(r"^(music|music playing|\[music\])", re.IGNORECASE),
    re.compile(r"^\.{3,}$"),  # Just ellipses
    re.compile(r"^\s*$"),  # Whitespace only
]

# Minimum segment duration to be considered real speech
MIN_SEGMENT_DURATION_SEC = 0.3

# Maximum repetition ratio (catches looping hallucinations)
MAX_REPEAT_RATIO = 0.7


def filter_hallucinations(segments: list[TranscriptSegment]) -> list[TranscriptSegment]:
    """Remove ASR hallucinations from transcript segments."""
    filtered = []
    for seg in segments:
        text = seg.text.strip()

        # Skip empty segments
        if not text:
            continue

        # Skip known hallucination patterns
        if any(p.match(text) for p in HALLUCINATION_PATTERNS):
            continue

        # Skip segments shorter than minimum duration
        if (seg.end_time - seg.start_time) < MIN_SEGMENT_DURATION_SEC:
            continue

        # Detect repetition loops: "the the the the the"
        words = text.lower().split()
        if len(words) > 3:
            unique_ratio = len(set(words)) / len(words)
            if unique_ratio < (1 - MAX_REPEAT_RATIO):
                continue

        filtered.append(seg)

    return filtered

The repetition detector is the most important filter. When Whisper hallucinates during extended silence, it often loops a single phrase — “I think, I think, I think, I think” — generating 10-15 identical segments. The unique word ratio catches this pattern without filtering legitimate repetition in natural speech (a respondent saying “it’s very, very important” has a unique ratio around 0.6, well above the threshold).

Speaker Diarization

S2S recordings have two speakers: the AI agent and the respondent. Traditional diarization systems (like pyannote or AWS Transcribe) cluster speakers by voice characteristics. That works, but there is a simpler approach for AI interviews: content-based classification.

The AI agent’s responses follow predictable patterns — they ask questions, they paraphrase, they use phrases from the system prompt. The respondent’s utterances are answers — they contain personal experiences, opinions, and information the AI could not have generated. We run a lightweight LLM classification pass over the transcript segments, feeding each segment with its surrounding context and asking “Is this the AI interviewer or the human respondent?” Accuracy is above 97% on our test set, and the few misclassifications are always short segments (“Mm-hmm,” “Yes”) where the label matters less.

Stage 2: Enrichment

This is the most computationally expensive stage. For each transcript segment, we extract:

  • Sentiment (positive / neutral / negative / mixed, plus a -1.0 to 1.0 score)
  • Keywords (noun phrases and named entities relevant to the study)
  • Themes (mapped to the study’s theme codebook, e.g., “brand-trust”, “price-sensitivity”, “sustainability-concerns”)
  • Phase label (which interview phase this segment belongs to, based on timestamps)

All metadata is stored as JSONB in PostgreSQL with GIN indexes for fast querying.

CREATE TABLE transcript_segments (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id UUID NOT NULL REFERENCES sessions(id),
    speaker TEXT NOT NULL,          -- 'ai' or 'respondent'
    text TEXT NOT NULL,
    start_time FLOAT NOT NULL,
    end_time FLOAT NOT NULL,
    phase_name TEXT,
    metadata JSONB NOT NULL DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT now()
);

-- GIN index for fast JSONB querying (themes, keywords, sentiment)
CREATE INDEX idx_segments_metadata ON transcript_segments USING GIN (metadata);

-- Composite index for cross-session queries
CREATE INDEX idx_segments_session_phase ON transcript_segments (session_id, phase_name);

-- Text search index for verbatim search
CREATE INDEX idx_segments_text_search ON transcript_segments USING GIN (to_tsvector('english', text));

The GIN index on the metadata column is what makes cross-session queries fast. A query like “find all segments where metadata->'themes' contains ‘brand-trust’ and metadata->'sentiment_score' < -0.3” hits the index instead of scanning every row.

The Bottleneck: LLM Calls Per Segment

A 30-minute interview produces 150-300 transcript segments. If you make one LLM API call per segment for enrichment, that is 150-300 sequential API calls. At ~500ms per call, that is 75-150 seconds just for the API round trips — not including processing time. This was our initial implementation, and it made Stage 2 the bottleneck by far.

The fix is two-fold: parallel workers and batch segments per API call.

import asyncio
import json
from typing import Any

# Concurrency limit for LLM API calls
MAX_CONCURRENT_ENRICHMENTS = 5

# Number of segments to batch per LLM call
BATCH_SIZE = 15


async def enrich_segments(
    segments: list[TranscriptSegment],
    study_themes: list[str],
    llm_client: Any,
) -> list[dict]:
    """Enrich transcript segments with sentiment, keywords, and themes.

    Batches segments and processes in parallel for throughput.
    """
    semaphore = asyncio.Semaphore(MAX_CONCURRENT_ENRICHMENTS)
    results: list[dict] = [{}] * len(segments)

    # Create batches
    batches = []
    for i in range(0, len(segments), BATCH_SIZE):
        batch = segments[i:i + BATCH_SIZE]
        batches.append((i, batch))

    async def process_batch(start_idx: int, batch: list[TranscriptSegment]):
        async with semaphore:
            prompt = _build_enrichment_prompt(batch, study_themes)
            response = await llm_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": ENRICHMENT_SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,  # Low temperature for consistent extraction
            )
            enrichments = json.loads(response.choices[0].message.content)

            for j, enrichment in enumerate(enrichments.get("segments", [])):
                results[start_idx + j] = enrichment

    # Run all batches concurrently (bounded by semaphore)
    await asyncio.gather(*[
        process_batch(start_idx, batch)
        for start_idx, batch in batches
    ])

    return results


def _build_enrichment_prompt(
    batch: list[TranscriptSegment],
    study_themes: list[str],
) -> str:
    """Build the enrichment prompt for a batch of segments."""
    segments_text = ""
    for i, seg in enumerate(batch):
        segments_text += (
            f"[{i}] ({seg.speaker}, {seg.start_time:.1f}s-{seg.end_time:.1f}s): "
            f"{seg.text}\n"
        )

    themes_list = ", ".join(study_themes)

    return (
        f"Analyze the following transcript segments from a research interview.\n"
        f"Study themes: {themes_list}\n\n"
        f"For each segment, extract:\n"
        f"- sentiment: positive/neutral/negative/mixed\n"
        f"- sentiment_score: float from -1.0 to 1.0\n"
        f"- keywords: list of relevant noun phrases\n"
        f"- themes: list from the study themes that apply\n\n"
        f"Segments:\n{segments_text}\n\n"
        f"Return JSON: {{\"segments\": [{{\"index\": 0, \"sentiment\": ..., "
        f"\"sentiment_score\": ..., \"keywords\": [...], \"themes\": [...]}}]}}"
    )

Batching 15 segments per API call reduces 200 calls to ~14 calls. Running 5 in parallel brings the wall-clock time from 100 seconds to ~2 seconds for the API calls themselves. The total Stage 2 time dropped from 4-6 minutes to 1-4 minutes. We use gpt-4o-mini for enrichment because the task is extraction, not generation — it does not need the reasoning capability of a full-size model, and it is 15x cheaper.

One caveat: batch size matters. Below 10 segments, you lose efficiency. Above 20, the LLM starts missing segments or confusing their indices. 15 is the sweet spot we found empirically across English, Spanish, and Portuguese transcripts.

Stage 3: Analysis

The final stage aggregates the segment-level enrichments into session-level insights. This is the output researchers actually look at.

Theme consolidation. Count how many segments map to each theme. Calculate the sentiment distribution per theme. Rank themes by frequency and by strength of sentiment.

Respondent grid row. For each dimension in the study’s grid (e.g., “brand perception”, “purchase intent”, “barrier identification”), extract the respondent’s position based on their enriched segments. This row gets added to a cross-session comparison grid.

Section insights. Per-phase summaries: what was most discussed in each phase, what themes emerged, what was the overall sentiment trajectory.

from collections import Counter, defaultdict


async def generate_session_analysis(
    segments: list[TranscriptSegment],
    enrichments: list[dict],
    study_themes: list[str],
    llm_client: Any,
) -> dict:
    """Generate session-level analysis from enriched segments."""

    # Theme frequency and sentiment distribution
    theme_stats: dict[str, dict] = defaultdict(lambda: {
        "count": 0,
        "sentiments": Counter(),
        "avg_score": 0.0,
        "scores": [],
    })

    for seg, enrichment in zip(segments, enrichments):
        if seg.speaker != "respondent":
            continue  # Only analyze respondent utterances
        for theme in enrichment.get("themes", []):
            stats = theme_stats[theme]
            stats["count"] += 1
            stats["sentiments"][enrichment.get("sentiment", "neutral")] += 1
            score = enrichment.get("sentiment_score", 0.0)
            stats["scores"].append(score)

    # Calculate averages
    for theme, stats in theme_stats.items():
        scores = stats.pop("scores")
        stats["avg_score"] = sum(scores) / len(scores) if scores else 0.0
        stats["sentiments"] = dict(stats["sentiments"])

    # Phase-level summaries
    phase_segments: dict[str, list[str]] = defaultdict(list)
    for seg, enrichment in zip(segments, enrichments):
        if seg.speaker == "respondent" and seg.phase_name:
            phase_segments[seg.phase_name].append(seg.text)

    # Generate narrative summary via LLM
    summary_prompt = _build_summary_prompt(theme_stats, phase_segments)
    summary_response = await llm_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a qualitative research analyst."},
            {"role": "user", "content": summary_prompt},
        ],
        temperature=0.3,
    )

    return {
        "theme_stats": dict(theme_stats),
        "phase_summaries": dict(phase_segments),
        "narrative_summary": summary_response.choices[0].message.content,
        "respondent_segment_count": sum(
            1 for s in segments if s.speaker == "respondent"
        ),
        "total_duration_seconds": (
            segments[-1].end_time - segments[0].start_time if segments else 0
        ),
    }

Stage 3 is the fastest stage — one or two LLM calls for the narrative summary, plus in-memory aggregation over the enrichments. It completes in 30-60 seconds consistently.

Transcript Batching: The 80% DB Load Reduction

This one is not about the post-interview pipeline — it is about the live session. But it was the single biggest operational improvement we made, so it belongs here.

During a live S2S interview, we capture a running transcript by periodically transcribing the audio buffer. Each utterance generates a database write: insert the transcript segment with its timestamp, speaker label, and session ID.

The math gets scary fast. At 200 concurrent sessions, with respondents and AI each producing 1-2 utterances per second during active conversation, you get:

200 sessions x 2-3 utterances/sec = 400-600 DB writes/sec

PostgreSQL can handle this, but it is wasteful. Each write is a separate transaction — connection acquisition, query parsing, index update, WAL write, connection release. The per-transaction overhead dominates when the payload is a single row.

The fix: buffer transcript entries in memory and flush in batches.

import asyncio
from typing import Optional
from datetime import datetime


class TranscriptBatchWriter:
    """Batches transcript entries and flushes periodically to reduce DB load."""

    def __init__(
        self,
        db_pool,
        batch_size: int = 50,
        flush_interval_sec: float = 2.0,
    ):
        self.db_pool = db_pool
        self.batch_size = batch_size
        self.flush_interval_sec = flush_interval_sec
        self._buffer: list[dict] = []
        self._lock = asyncio.Lock()
        self._flush_task: Optional[asyncio.Task] = None

    async def start(self):
        self._flush_task = asyncio.create_task(self._periodic_flush())

    async def add(self, session_id: str, speaker: str, text: str,
                  start_time: float, end_time: float):
        async with self._lock:
            self._buffer.append({
                "session_id": session_id,
                "speaker": speaker,
                "text": text,
                "start_time": start_time,
                "end_time": end_time,
                "created_at": datetime.utcnow(),
            })
            if len(self._buffer) >= self.batch_size:
                await self._flush()

    async def _periodic_flush(self):
        """Flush buffer every N seconds, even if batch size not reached."""
        while True:
            await asyncio.sleep(self.flush_interval_sec)
            async with self._lock:
                if self._buffer:
                    await self._flush()

    async def _flush(self):
        """Write all buffered entries in a single transaction."""
        if not self._buffer:
            return
        entries = self._buffer.copy()
        self._buffer.clear()

        async with self.db_pool.acquire() as conn:
            await conn.executemany(
                """INSERT INTO live_transcripts
                   (session_id, speaker, text, start_time, end_time, created_at)
                   VALUES ($1, $2, $3, $4, $5, $6)""",
                [(e["session_id"], e["speaker"], e["text"],
                  e["start_time"], e["end_time"], e["created_at"])
                 for e in entries],
            )

The result: instead of 400-600 individual writes per second, we get 8-12 batch writes per second. Each batch inserts 50 rows in a single transaction. The database connection pool dropped from 80% utilization to under 15%. Query latency for reads (dashboard queries, live transcript polling) improved by 3x because the DB was no longer saturated with writes.

Two configuration knobs matter: batch_size and flush_interval_sec. Set the batch too large and you risk losing entries if the process crashes (entries still in the buffer are lost). Set the flush interval too high and live transcript updates lag. We found 50 entries / 2 seconds to be the right balance — at worst you lose 2 seconds of transcript on a crash, which the post-interview pipeline reconstructs from the recording anyway.

The Job Queue Architecture

The three pipeline stages run as separate workers connected by a job queue. We use BullMQ (Redis-backed) for the queue infrastructure, but the pattern works with any job queue — Celery, AWS SQS, Google Cloud Tasks.

# Pseudocode: queue setup and stage triggers
# Actual implementation uses BullMQ via a TypeScript worker service

QUEUE_CONFIG = {
    "transcription": {
        "concurrency": 3,        # 3 parallel transcription jobs
        "retry_attempts": 3,
        "retry_backoff": [5, 15, 40],  # seconds: exponential-ish
        "timeout_sec": 300,       # 5 min max per job
    },
    "enrichment": {
        "concurrency": 5,        # 5 parallel enrichment jobs
        "retry_attempts": 3,
        "retry_backoff": [5, 15, 40],
        "timeout_sec": 600,       # 10 min max (large interviews)
    },
    "analysis": {
        "concurrency": 10,       # Lightweight, can run more
        "retry_attempts": 2,
        "retry_backoff": [5, 20],
        "timeout_sec": 120,       # 2 min max
    },
}


async def on_session_ended(session_id: str, recording_url: str):
    """Triggered when a voice session ends. Starts the pipeline."""
    await queue_add("transcription", {
        "session_id": session_id,
        "recording_url": recording_url,
        "stage": 1,
    })


async def on_transcription_complete(job_data: dict):
    """Stage 1 done — trigger Stage 2."""
    await queue_add("enrichment", {
        "session_id": job_data["session_id"],
        "transcript_id": job_data["transcript_id"],
        "stage": 2,
    })


async def on_enrichment_complete(job_data: dict):
    """Stage 2 done — trigger Stage 3."""
    await queue_add("analysis", {
        "session_id": job_data["session_id"],
        "transcript_id": job_data["transcript_id"],
        "stage": 3,
    })


async def on_job_failed(queue_name: str, job_data: dict, error: str):
    """Handle failed jobs. After max retries, move to dead letter queue."""
    config = QUEUE_CONFIG[queue_name]
    attempt = job_data.get("attempt", 0) + 1

    if attempt <= config["retry_attempts"]:
        backoff = config["retry_backoff"][attempt - 1]
        job_data["attempt"] = attempt
        await queue_add(queue_name, job_data, delay_sec=backoff)
    else:
        await queue_add("dead_letter", {
            **job_data,
            "failed_queue": queue_name,
            "error": error,
            "final_attempt": attempt,
        })
        await notify_ops(f"Pipeline job failed permanently: {queue_name}/{job_data['session_id']}")

Each stage has independent concurrency controls. Transcription is capped at 3 because Whisper API has rate limits. Enrichment runs 5 concurrent workers because the LLM calls are the bottleneck. Analysis runs 10 because it is mostly in-memory aggregation.

The retry strategy is exponential backoff: 5 seconds, 15 seconds, 40 seconds. Most transient failures — API rate limits, network timeouts — resolve within the first retry. If a job fails all three attempts, it goes to the dead letter queue for manual investigation. In practice, the dead letter queue gets 1-2 jobs per week across thousands of sessions, almost always due to corrupted audio recordings.

Stage chaining is event-driven: Stage 1 completion triggers a Stage 2 job, Stage 2 completion triggers Stage 3. There is no orchestrator polling for completion — the queue handles it. This keeps the architecture simple and means a failure in Stage 2 does not block other sessions from completing Stage 1.

Performance Numbers

Across ~2,000 research sessions in production, here are the median pipeline durations:

StageMedian DurationP95 DurationNotes
Stage 1: Transcription1.5 min2.5 minScales with recording length
Stage 2: Enrichment2.0 min4.0 minScales with segment count
Stage 3: Analysis0.5 min1.0 minMostly constant
Total pipeline3.5 min6.5 minEnd-to-end

The total pipeline time — from session end to fully enriched, queryable data — is 3-7 minutes. Researchers see partial results (transcript) within 2 minutes and complete results within 7. For context, manual qualitative analysis of a single 30-minute interview takes a trained researcher 2-4 hours. The pipeline does it in minutes, at a fraction of the cost.

Cost per session for the full pipeline: approximately $0.35 — $0.18 for Whisper transcription, $0.12 for GPT-4o-mini enrichment calls, $0.05 for the analysis LLM call. At scale, this is less than 5% of the live session cost (which is dominated by the S2S model at $0.06-0.24/minute depending on provider).

Looking Ahead

The pipeline produces structured data. But who pays for all of this — the live S2S session, the transcription, the enrichment, the analysis? And how do you track costs at the per-session level so you can set budgets and catch runaway spending? In Part 5, we will break down the real cost structure and the per-minute tracking system that made cost-aware engineering possible.


References:


This is Part 4 of an 8-part series: Production Voice AI for Research at Scale.

Series outline:

  1. The Architecture Nobody Warns You About — Server-side agents, metadata transport, provider selection (Part 1)
  2. Zombie Agents, Pre-Warming, and the 5 Bugs That Cost Us Weeks — Production pain points and fixes (Part 2)
  3. Multi-Phase State Machines — Research protocol as code, LLM-driven transitions (Part 3)
  4. From Recording to Insight — The automatic post-interview pipeline (Part 4)
  5. The Real Cost — Per-minute tracking, budgets, self-hosting math (Part 5)
  6. What Breaks at 200 Concurrent Sessions — Scaling bottlenecks and operational metrics (Part 6)
  7. Multi-Language Voice AI — Language detection, provider routing, locale-aware VAD, i18n prompts (Part 7)
  8. Deployment and Go-Live — Docker, Kubernetes, CI/CD, zero-downtime deploys, monitoring (Part 8)
Export for reading

Comments