In Part 11, we optimized costs — breaking down every component of per-minute voice AI spend and showing the three tipping points where optimization effort pays off. We referenced “switching to Grok for cost” and “activating failover” as if they were simple switches you flip. They are, if you build the architecture correctly.
This final post in the series is about building that architecture.
Multi-provider support is the last piece of the puzzle because it is the one that ties everything else together. It is what lets you swap providers without rewriting interview logic. It is what keeps your platform running when a provider has an outage. It is what makes the cost optimization decisions from Part 11 reversible and safe, instead of irreversible infrastructure bets.
If you only build one abstraction layer in your voice AI platform, make it this one.
Why Multi-Provider Support Matters
Teams that bet everything on a single provider discover the problem the hard way — usually during a high-stakes hiring event when that provider has an outage.
But reliability is only one of four reasons to build multi-provider support:
1. Redundancy Against Outages
Every voice AI provider has incidents. OpenAI Realtime has had documented availability issues. Bedrock Nova Sonic is newer and still ironing out edge cases. Grok is scaling rapidly and occasionally shows growing pains. Gemini Live had a brief outage during its first month of general availability.
A platform that depends on a single provider will go down when that provider goes down. At $0.05-0.14/min with enterprise clients expecting SLAs, that is not acceptable.
2. Cost Routing
As we showed in Part 11, provider costs vary from $0.03/min (optimized Gemini Live) to $0.08/min (full OpenAI Realtime with full context). Routing different interview types to different providers based on cost profile is only possible if your architecture supports it. Without an adapter layer, provider selection is a deploy-time decision, not a runtime one.
3. Capability Matching
Different providers have genuine capability differences:
- Gemini Live: Only provider with native multimodal (audio + video simultaneously). Use it for video interviews
- Bedrock Nova Sonic: Best multi-language support (100+ languages with native accents). Use it for international candidates
- OpenAI Realtime: Most mature function calling. Use it where structured rubric scoring during the session is required
- Grok: Lowest cost, real-time web search access. Use it for high-volume screening rounds
With a multi-provider architecture, you can route based on the interview’s requirements, not just availability.
4. Regulatory and Compliance Requirements
Enterprise clients in certain industries or regions have requirements about where their data is processed. AWS customers who need everything to stay in their VPC with SOC 2 audit trails will insist on Bedrock. European companies with strict data residency requirements may be limited to providers with EU data processing agreements. A provider adapter lets you serve these clients without maintaining separate codebases.
The Provider Adapter Pattern
The adapter pattern is the correct abstraction here. You define a common interface that every voice AI provider must implement, and your interview logic talks to that interface — never to a specific provider’s API directly.
The Abstract Interface
# voice_provider_base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import AsyncIterator, Callable, Optional
import asyncio
@dataclass
class AudioChunk:
data: bytes
sample_rate: int
channels: int
timestamp_ms: float
@dataclass
class FunctionCall:
name: str
arguments: dict
call_id: str
@dataclass
class ProviderEvent:
event_type: str # 'audio', 'function_call', 'transcript', 'turn_end', 'error'
payload: AudioChunk | FunctionCall | str | Exception | None
class VoiceProviderAdapter(ABC):
"""
Abstract base for all voice AI providers.
All provider-specific logic lives in the concrete implementation.
Interview orchestration logic never touches provider APIs directly.
"""
@abstractmethod
async def connect(
self,
system_prompt: str,
voice_config: dict,
available_functions: list[dict]
) -> None:
"""
Establish a persistent connection to the provider.
Should handle authentication and session initialization.
"""
...
@abstractmethod
async def send_audio(self, chunk: AudioChunk) -> None:
"""
Send a chunk of candidate audio to the provider.
Called continuously during candidate speech.
"""
...
@abstractmethod
async def receive_events(self) -> AsyncIterator[ProviderEvent]:
"""
Async generator yielding events from the provider.
Yields: AI audio chunks, function calls, transcript updates, turn ends.
"""
...
@abstractmethod
async def call_function_result(
self,
call_id: str,
result: dict
) -> None:
"""
Return the result of a function call back to the provider.
"""
...
@abstractmethod
async def update_context(self, message: dict) -> None:
"""
Inject a message into the conversation context.
Used for system updates, section transitions, etc.
"""
...
@abstractmethod
async def interrupt(self) -> None:
"""
Signal that the candidate has started speaking.
Stops current AI audio output (barge-in handling).
"""
...
@abstractmethod
async def disconnect(self) -> None:
"""
Clean up the provider connection.
"""
...
@property
@abstractmethod
def provider_name(self) -> str:
"""Human-readable provider name for logging and metrics."""
...
@property
@abstractmethod
def supports_video(self) -> bool:
"""Whether this provider can process video input alongside audio."""
...
This interface is the contract. Every concrete adapter must implement all of these methods. The interview orchestration layer only ever calls methods on this interface.
OpenAI Realtime Adapter
OpenAI Realtime supports two connection modes: WebRTC for browser-side agents and WebSocket for server-side agents. The adapter handles both, but in our architecture (server-side agent workers connecting to LiveKit), WebSocket is the relevant path.
# openai_realtime_adapter.py
import asyncio
import json
import websockets
from voice_provider_base import (
VoiceProviderAdapter, AudioChunk, FunctionCall, ProviderEvent
)
import base64
import os
class OpenAIRealtimeAdapter(VoiceProviderAdapter):
OPENAI_WS_URL = "wss://api.openai.com/v1/realtime"
def __init__(self, api_key: str, model: str = "gpt-4o-realtime-preview"):
self.api_key = api_key
self.model = model
self.ws: websockets.WebSocketClientProtocol | None = None
self._event_queue: asyncio.Queue = asyncio.Queue()
self._session_id: str | None = None
async def connect(
self,
system_prompt: str,
voice_config: dict,
available_functions: list[dict]
) -> None:
url = f"{self.OPENAI_WS_URL}?model={self.model}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"OpenAI-Beta": "realtime=v1"
}
self.ws = await websockets.connect(url, additional_headers=headers)
# Configure the session
await self.ws.send(json.dumps({
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"instructions": system_prompt,
"voice": voice_config.get("voice_id", "alloy"),
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {"model": "whisper-1"},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500
},
"tools": available_functions,
"tool_choice": "auto",
"temperature": 0.8,
"max_response_output_tokens": 4096
}
}))
# Start background receiver
asyncio.create_task(self._receive_loop())
async def _receive_loop(self):
"""Background task that pumps WebSocket messages to the event queue."""
try:
async for message in self.ws:
event = json.loads(message)
await self._process_event(event)
except websockets.ConnectionClosed:
await self._event_queue.put(ProviderEvent(
event_type='error',
payload=ConnectionError("OpenAI Realtime connection closed")
))
async def _process_event(self, event: dict):
event_type = event.get("type", "")
if event_type == "response.audio.delta":
audio_data = base64.b64decode(event["delta"])
await self._event_queue.put(ProviderEvent(
event_type='audio',
payload=AudioChunk(
data=audio_data,
sample_rate=24000,
channels=1,
timestamp_ms=0.0
)
))
elif event_type == "response.function_call_arguments.done":
await self._event_queue.put(ProviderEvent(
event_type='function_call',
payload=FunctionCall(
name=event["name"],
arguments=json.loads(event["arguments"]),
call_id=event["call_id"]
)
))
elif event_type == "response.done":
await self._event_queue.put(ProviderEvent(
event_type='turn_end',
payload=None
))
async def send_audio(self, chunk: AudioChunk) -> None:
if not self.ws:
raise RuntimeError("Not connected")
audio_b64 = base64.b64encode(chunk.data).decode()
await self.ws.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": audio_b64
}))
async def receive_events(self):
while True:
event = await self._event_queue.get()
yield event
if event.event_type == 'error':
break
async def call_function_result(self, call_id: str, result: dict) -> None:
await self.ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(result)
}
}))
await self.ws.send(json.dumps({"type": "response.create"}))
async def update_context(self, message: dict) -> None:
await self.ws.send(json.dumps({
"type": "conversation.item.create",
"item": message
}))
async def interrupt(self) -> None:
await self.ws.send(json.dumps({"type": "response.cancel"}))
async def disconnect(self) -> None:
if self.ws:
await self.ws.close()
@property
def provider_name(self) -> str:
return "openai_realtime"
@property
def supports_video(self) -> bool:
return False
Bedrock Nova Sonic Adapter
Nova Sonic uses HTTP/2 bidirectional streaming, which requires a different connection approach. AWS SDK v3 provides the BedrockRuntimeClient with streaming support.
# bedrock_nova_adapter.py
import asyncio
import json
from voice_provider_base import (
VoiceProviderAdapter, AudioChunk, FunctionCall, ProviderEvent
)
import boto3
from botocore.config import Config
import base64
class BedrockNovaSonicAdapter(VoiceProviderAdapter):
MODEL_ID = "amazon.nova-sonic-v1:0"
def __init__(self, region: str = "us-east-1", profile: str | None = None):
session = boto3.Session(profile_name=profile)
self.bedrock = session.client(
'bedrock-runtime',
region_name=region,
config=Config(
read_timeout=300,
connect_timeout=30
)
)
self._event_queue: asyncio.Queue = asyncio.Queue()
self._stream = None
self._stream_input_queue: asyncio.Queue = asyncio.Queue()
async def connect(
self,
system_prompt: str,
voice_config: dict,
available_functions: list[dict]
) -> None:
# Nova Sonic uses InvokeModelWithBidirectionalStream
self._stream = self.bedrock.invoke_model_with_bidirectional_stream(
modelId=self.MODEL_ID
)
# Send initial session start event
init_event = {
"event": {
"sessionStart": {
"inferenceConfiguration": {
"maxTokens": 1024,
"topP": 0.9,
"temperature": 0.7
}
}
}
}
await self._send_to_stream(init_event)
# Send system prompt as promptStart
prompt_event = {
"event": {
"promptStart": {
"promptName": "interview-session",
"textOutputConfiguration": {"mediaType": "text/plain"},
"audioOutputConfiguration": {
"mediaType": "audio/lpcm",
"sampleRateHertz": 24000,
"sampleSizeBits": 16,
"channelCount": 1,
"voiceId": voice_config.get("voice_id", "matthew"),
"encoding": "base64"
},
"toolUseOutputConfiguration": {"enabled": True},
"toolConfiguration": {
"tools": [
{"toolSpec": t} for t in available_functions
]
}
}
}
}
await self._send_to_stream(prompt_event)
# Send system prompt content
system_event = {
"event": {
"contentBlockStart": {
"promptName": "interview-session",
"contentBlockIndex": 0,
"role": "SYSTEM"
}
}
}
await self._send_to_stream(system_event)
await self._send_to_stream({
"event": {
"textInput": {
"promptName": "interview-session",
"contentBlockIndex": 0,
"content": system_prompt
}
}
})
# Start background tasks
asyncio.create_task(self._receive_loop())
asyncio.create_task(self._send_loop())
async def _send_loop(self):
"""Background task that sends queued events to the stream."""
async for event in self._stream.input_stream:
item = await self._stream_input_queue.get()
if item is None:
break
await self._stream.input_stream.send(item)
async def _send_to_stream(self, event: dict):
await self._stream_input_queue.put({"chunk": {"bytes": json.dumps(event).encode()}})
async def _receive_loop(self):
"""Process response stream from Nova Sonic."""
try:
async for event in self._stream.body:
chunk = event.get("chunk", {})
if not chunk:
continue
payload = json.loads(chunk.get("bytes", b"{}"))
event_data = payload.get("event", {})
if "audioOutput" in event_data:
audio_b64 = event_data["audioOutput"]["content"]
audio_bytes = base64.b64decode(audio_b64)
await self._event_queue.put(ProviderEvent(
event_type='audio',
payload=AudioChunk(
data=audio_bytes,
sample_rate=24000,
channels=1,
timestamp_ms=0.0
)
))
elif "toolUse" in event_data:
tool = event_data["toolUse"]
await self._event_queue.put(ProviderEvent(
event_type='function_call',
payload=FunctionCall(
name=tool["toolName"],
arguments=json.loads(tool["content"]),
call_id=tool["toolUseId"]
)
))
elif "contentBlockStop" in event_data:
await self._event_queue.put(ProviderEvent(
event_type='turn_end',
payload=None
))
except Exception as e:
await self._event_queue.put(ProviderEvent(
event_type='error',
payload=e
))
async def send_audio(self, chunk: AudioChunk) -> None:
audio_b64 = base64.b64encode(chunk.data).decode()
event = {
"event": {
"audioInput": {
"promptName": "interview-session",
"contentBlockIndex": 1,
"content": audio_b64
}
}
}
await self._send_to_stream(event)
async def receive_events(self):
while True:
event = await self._event_queue.get()
yield event
if event.event_type == 'error':
break
async def call_function_result(self, call_id: str, result: dict) -> None:
event = {
"event": {
"toolResult": {
"promptName": "interview-session",
"contentBlockIndex": 2,
"toolUseId": call_id,
"content": json.dumps(result)
}
}
}
await self._send_to_stream(event)
async def update_context(self, message: dict) -> None:
# Nova Sonic injects context as a text input block
await self._send_to_stream({
"event": {
"textInput": {
"promptName": "interview-session",
"contentBlockIndex": 99, # High index to append
"content": f"[System: {message.get('content', '')}]"
}
}
})
async def interrupt(self) -> None:
await self._send_to_stream({
"event": {
"audioInputEnd": {
"promptName": "interview-session"
}
}
})
async def disconnect(self) -> None:
await self._stream_input_queue.put(None)
@property
def provider_name(self) -> str:
return "bedrock_nova_sonic"
@property
def supports_video(self) -> bool:
return False
Grok Voice Agent Adapter
Grok’s best feature from an engineering perspective: it is OpenAI Realtime API compatible. The Grok adapter is almost identical to the OpenAI Realtime adapter with three differences: the WebSocket URL, the authentication header format, and the model name. This is intentional on xAI’s part — they explicitly designed for migration from OpenAI.
# grok_voice_adapter.py
from openai_realtime_adapter import OpenAIRealtimeAdapter
class GrokVoiceAdapter(OpenAIRealtimeAdapter):
"""
Grok Voice Agent adapter.
Extends OpenAI Realtime adapter — API is intentionally compatible.
Only the endpoint URL and auth header differ.
"""
GROK_WS_URL = "wss://api.x.ai/v1/realtime"
def __init__(self, api_key: str, model: str = "grok-2-voice"):
# Call grandparent __init__ to skip OpenAI-specific setup
self.api_key = api_key
self.model = model
self.ws = None
self._event_queue = asyncio.Queue()
async def connect(
self,
system_prompt: str,
voice_config: dict,
available_functions: list[dict]
) -> None:
import websockets
import json
url = f"{self.GROK_WS_URL}?model={self.model}"
headers = {
"Authorization": f"Bearer {self.api_key}",
# Grok uses same beta header format as OpenAI
"X-Beta": "realtime=v1"
}
self.ws = await websockets.connect(url, additional_headers=headers)
# Session config is identical to OpenAI Realtime
await self.ws.send(json.dumps({
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"instructions": system_prompt,
"voice": voice_config.get("voice_id", "ember"),
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {"model": "whisper-1"},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500
},
"tools": available_functions,
"tool_choice": "auto"
}
}))
asyncio.create_task(self._receive_loop())
@property
def provider_name(self) -> str:
return "grok_voice"
# All other methods inherited from OpenAIRealtimeAdapter unchanged
This is exactly how adapter patterns should work: the shared behavior lives in the base class, and the Grok adapter only overrides what actually differs. A migration from OpenAI Realtime to Grok is six lines of code change.
Grok’s Real-Time Search Capability
One capability worth calling out separately: Grok has real-time web search access during voice conversations. For interview scenarios, this means the AI can answer “what does your company do?” or fact-check claims about a candidate’s published work without a custom RAG pipeline.
# Register the web search tool with Grok
grok_functions = [
{
"name": "search_web",
"description": "Search the web for current information about a topic",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"context": {"type": "string", "description": "Why this search is needed"}
},
"required": ["query"]
}
}
# ... other interview functions
]
Grok handles the web search internally — you do not need to implement the tool result callback for built-in tools.
Provider Routing Strategies
With multiple adapters available, you need logic to decide which provider handles each session. The routing strategies range from simple to sophisticated:
# provider_router.py
from enum import Enum
from dataclasses import dataclass
from typing import Type
import random
class RoutingStrategy(Enum):
ROUND_ROBIN = "round_robin"
LATENCY_BASED = "latency_based"
COST_BASED = "cost_based"
CAPABILITY_BASED = "capability_based"
@dataclass
class InterviewRequirements:
requires_video: bool
language: str
interview_type: str # 'screening', 'technical', 'executive'
candidate_region: str
preferred_cost_tier: str # 'economy', 'standard', 'premium'
class ProviderRouter:
# Cost per minute (flat rate or estimate)
PROVIDER_COSTS = {
"grok_voice": 0.05,
"openai_realtime": 0.065,
"gemini_live": 0.045,
"bedrock_nova_sonic": 0.055
}
def select_provider(
self,
requirements: InterviewRequirements,
available_providers: list[str],
circuit_breaker_status: dict[str, bool]
) -> str:
# Filter to healthy providers only
healthy = [p for p in available_providers if not circuit_breaker_status.get(p, False)]
if not healthy:
raise RuntimeError("No healthy providers available")
# Capability-based: video interviews must use Gemini Live
if requirements.requires_video and "gemini_live" in healthy:
return "gemini_live"
# Compliance-based: AWS region requirements → Bedrock
if requirements.candidate_region in ["us-gov-east-1", "us-gov-west-1"]:
if "bedrock_nova_sonic" in healthy:
return "bedrock_nova_sonic"
# Non-English interviews → Bedrock Nova (100+ languages)
if requirements.language not in ["en", "en-US", "en-GB", "en-AU"]:
if "bedrock_nova_sonic" in healthy:
return "bedrock_nova_sonic"
# Cost-based routing for economy tier
if requirements.preferred_cost_tier == "economy":
return min(healthy, key=lambda p: self.PROVIDER_COSTS.get(p, 999))
# Standard tier: Grok for screening, OpenAI for technical
if requirements.interview_type == "screening":
return "grok_voice" if "grok_voice" in healthy else healthy[0]
if requirements.interview_type in ["technical", "executive"]:
return "openai_realtime" if "openai_realtime" in healthy else healthy[0]
# Default: lowest cost healthy provider
return min(healthy, key=lambda p: self.PROVIDER_COSTS.get(p, 999))
The Circuit Breaker Pattern
A circuit breaker monitors provider health and automatically routes traffic away from a failing provider. Without it, a partial provider outage turns into cascading failures as every new session attempts and fails to connect to the degraded provider.
Here is a production circuit breaker implementation with sliding window error tracking:
# circuit_breaker.py
import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from collections import deque
import structlog
logger = structlog.get_logger()
class CircuitState(Enum):
CLOSED = "closed" # Normal operation — requests pass through
OPEN = "open" # Failing — requests blocked
HALF_OPEN = "half_open" # Testing recovery — limited requests allowed
@dataclass
class CircuitBreakerConfig:
# Open circuit if error rate exceeds this threshold
error_rate_threshold: float = 0.5 # 50% error rate
# Minimum requests before evaluating error rate
min_requests_threshold: int = 10
# How long to keep the circuit open before testing recovery
recovery_timeout_seconds: float = 60.0
# How many successful test requests before closing the circuit
half_open_success_threshold: int = 3
# Sliding window size for error rate calculation
window_size: int = 20
class ProviderCircuitBreaker:
def __init__(self, provider_name: str, config: CircuitBreakerConfig):
self.provider_name = provider_name
self.config = config
self.state = CircuitState.CLOSED
self._results: deque[bool] = deque(maxlen=config.window_size)
self._last_failure_time: float = 0.0
self._half_open_successes: int = 0
self._lock = asyncio.Lock()
# Prometheus metrics
from prometheus_client import Gauge
self.state_metric = Gauge(
f'circuit_breaker_state_{provider_name}',
f'Circuit breaker state for {provider_name} (0=closed, 1=half_open, 2=open)'
)
async def call(self, func, *args, **kwargs):
"""Execute a provider call through the circuit breaker."""
async with self._lock:
if self.state == CircuitState.OPEN:
elapsed = time.time() - self._last_failure_time
if elapsed >= self.config.recovery_timeout_seconds:
self.state = CircuitState.HALF_OPEN
self._half_open_successes = 0
logger.info("circuit_breaker_half_open", provider=self.provider_name)
else:
raise CircuitOpenError(
f"Provider {self.provider_name} circuit is open. "
f"Retry in {self.config.recovery_timeout_seconds - elapsed:.0f}s"
)
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure(e)
raise
async def _on_success(self):
async with self._lock:
self._results.append(True)
if self.state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.config.half_open_success_threshold:
self.state = CircuitState.CLOSED
self._results.clear()
logger.info("circuit_breaker_closed", provider=self.provider_name)
self.state_metric.set(0)
async def _on_failure(self, error: Exception):
async with self._lock:
self._results.append(False)
self._last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# Any failure in half-open state re-opens the circuit
self.state = CircuitState.OPEN
logger.warning("circuit_breaker_reopened", provider=self.provider_name)
self.state_metric.set(2)
return
if len(self._results) >= self.config.min_requests_threshold:
error_count = self._results.count(False)
error_rate = error_count / len(self._results)
if error_rate >= self.config.error_rate_threshold:
self.state = CircuitState.OPEN
logger.error(
"circuit_breaker_opened",
provider=self.provider_name,
error_rate=error_rate,
error=str(error)
)
self.state_metric.set(2)
def is_open(self) -> bool:
return self.state == CircuitState.OPEN
class CircuitOpenError(Exception):
pass
The circuit breaker gets wired into the provider routing layer:
# provider_manager.py
class VoiceProviderManager:
def __init__(self):
self.adapters = {
"openai_realtime": OpenAIRealtimeAdapter(os.environ["OPENAI_API_KEY"]),
"grok_voice": GrokVoiceAdapter(os.environ["GROK_API_KEY"]),
"bedrock_nova_sonic": BedrockNovaSonicAdapter(region="us-east-1"),
"gemini_live": GeminiLiveAdapter(os.environ["GEMINI_API_KEY"])
}
self.circuit_breakers = {
name: ProviderCircuitBreaker(name, CircuitBreakerConfig())
for name in self.adapters
}
self.router = ProviderRouter()
async def get_adapter(
self,
requirements: InterviewRequirements
) -> VoiceProviderAdapter:
cb_status = {name: cb.is_open() for name, cb in self.circuit_breakers.items()}
provider_name = self.router.select_provider(
requirements,
list(self.adapters.keys()),
cb_status
)
return self.adapters[provider_name], self.circuit_breakers[provider_name]
async def connect_with_fallback(
self,
requirements: InterviewRequirements,
system_prompt: str,
voice_config: dict,
available_functions: list[dict]
) -> tuple[VoiceProviderAdapter, str]:
"""
Connect to the best available provider, falling back on circuit breaker trips.
"""
tried_providers = set()
while len(tried_providers) < len(self.adapters):
adapter, cb = await self.get_adapter(requirements)
if adapter.provider_name in tried_providers:
raise RuntimeError("All providers exhausted or circuit-open")
tried_providers.add(adapter.provider_name)
try:
await cb.call(
adapter.connect,
system_prompt,
voice_config,
available_functions
)
return adapter, adapter.provider_name
except (CircuitOpenError, Exception) as e:
logger.warning(
"provider_connect_failed",
provider=adapter.provider_name,
error=str(e),
will_fallback=True
)
continue
raise RuntimeError("Failed to connect to any voice AI provider")
Context Transfer During Failover
When a provider fails mid-session and you need to transfer to a backup provider, the hardest problem is carrying conversation context across. The candidate has already been talking for 15 minutes. The new provider knows nothing about what was discussed.
Two approaches:
Option 1: Full History Replay (Avoid This)
Pass the complete conversation transcript to the new provider as context. Simple to implement. Problem: most providers have 60-second session warmup when receiving large context. A 15-minute transcript might be 5,000-10,000 tokens. The new provider spends 5+ seconds processing context before it can respond. This is a 5-second dead silence from the candidate’s perspective — unacceptable.
Option 2: Summarization (Use This)
Generate a concise handoff summary (200-400 words) using GPT-4o mini in parallel with the failover connection attempt. The summary takes 1-2 seconds to generate. The connection takes 1-2 seconds to establish. They race, and you use whichever finishes second.
# failover_handler.py
import asyncio
async def handle_provider_failover(
failed_provider: VoiceProviderAdapter,
session_state: dict,
provider_manager: VoiceProviderManager,
requirements: InterviewRequirements
) -> VoiceProviderAdapter:
"""
Handle mid-session provider failure.
Transfer context to backup provider with minimal interruption.
"""
logger.warning(
"provider_failover_initiated",
failed_provider=failed_provider.provider_name,
session_id=session_state["session_id"],
elapsed_minutes=session_state.get("elapsed_minutes", 0)
)
# Race: generate summary AND establish new connection simultaneously
summary_task = asyncio.create_task(
generate_handoff_summary(session_state["conversation_history"])
)
backup_adapter_task = asyncio.create_task(
provider_manager.connect_with_fallback(
requirements,
session_state["system_prompt"],
session_state["voice_config"],
session_state["available_functions"]
)
)
# Wait for both to complete
summary, (backup_adapter, backup_name) = await asyncio.gather(
summary_task,
backup_adapter_task
)
# Inject summary as system context in the new session
await backup_adapter.update_context({
"role": "system",
"content": f"""[CONTEXT TRANSFER - SESSION RESUMED]
You are continuing an interview that was briefly interrupted due to a technical issue.
Interview progress so far:
{summary}
The candidate has been informed there was a brief connection issue.
Continue the interview naturally from where it was paused.
Current section: {session_state['current_section']}
Remaining sections: {session_state['remaining_sections']}"""
})
logger.info(
"provider_failover_complete",
failed_provider=failed_provider.provider_name,
backup_provider=backup_name
)
return backup_adapter
async def generate_handoff_summary(conversation_history: list[dict]) -> str:
"""Generate a concise handoff summary for context transfer."""
history_text = "\n".join([
f"{turn['role'].upper()}: {turn['content']}"
for turn in conversation_history[-30:] # Last 30 turns max
])
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Summarize this interview conversation in 250 words for context transfer.
Include: topics covered, candidate's key answers and strengths/weaknesses noted,
current interview section, and what was just being discussed when interrupted.
Conversation:
{history_text}"""
}],
max_tokens=400
)
return response.choices[0].message.content
The candidate hears: “I apologize, we had a brief connection issue. Let’s continue where we left off.” There is a 3-5 second gap. Then the interview resumes. From their perspective, it looks like a brief network glitch. The transition is invisible.
Testing Multi-Provider
The adapter pattern makes testing significantly easier because you can test interview logic with a mock provider:
# test_provider_adapter.py
import pytest
from unittest.mock import AsyncMock
from voice_provider_base import VoiceProviderAdapter, AudioChunk, ProviderEvent
class MockVoiceProvider(VoiceProviderAdapter):
"""Mock provider for testing interview orchestration logic."""
def __init__(self, responses: list[str]):
self.responses = responses
self.response_index = 0
self.sent_audio_chunks: list[AudioChunk] = []
self.function_results: list[tuple] = []
async def connect(self, system_prompt, voice_config, available_functions):
self.system_prompt = system_prompt
self.available_functions = available_functions
async def send_audio(self, chunk: AudioChunk) -> None:
self.sent_audio_chunks.append(chunk)
async def receive_events(self):
if self.response_index < len(self.responses):
response_text = self.responses[self.response_index]
self.response_index += 1
# Simulate audio output as text bytes (for testing)
yield ProviderEvent(
event_type='audio',
payload=AudioChunk(
data=response_text.encode(),
sample_rate=24000,
channels=1,
timestamp_ms=0.0
)
)
yield ProviderEvent(event_type='turn_end', payload=None)
async def call_function_result(self, call_id, result):
self.function_results.append((call_id, result))
async def update_context(self, message):
pass
async def interrupt(self):
pass
async def disconnect(self):
pass
@property
def provider_name(self) -> str:
return "mock"
@property
def supports_video(self) -> bool:
return False
# Integration test per provider
@pytest.mark.integration
async def test_openai_realtime_connect():
"""Test that OpenAI Realtime adapter establishes a session correctly."""
adapter = OpenAIRealtimeAdapter(api_key=os.environ["OPENAI_API_KEY_TEST"])
try:
await adapter.connect(
system_prompt="You are a test interviewer.",
voice_config={"voice_id": "alloy"},
available_functions=[]
)
# Send 1 second of silence to verify audio pipeline
silence = bytes(48000 * 2) # 1s of 24kHz 16-bit silence
await adapter.send_audio(AudioChunk(silence, 24000, 1, 0.0))
# Expect turn_end or error within 10 seconds
async with asyncio.timeout(10):
async for event in adapter.receive_events():
if event.event_type in ('turn_end', 'error'):
break
assert True # Connection and basic audio round-trip succeeded
finally:
await adapter.disconnect()
Run integration tests against each provider’s sandbox environment in your CI pipeline. This catches provider-side breaking changes before they hit production.
Provider Scorecard Dashboard
Beyond metrics in code, build a provider scorecard dashboard that gives your team and leadership visibility into real-time provider health:
Provider Health Dashboard — Updated every 60 seconds
════════════════════════════════════════════════════════════════════
Provider Status TTFA (p95) Error Rate Cost/min Sessions
──────────────────────────────────────────────────────────────────────────────
OpenAI Realtime HEALTHY 420ms 0.2% $0.065 847
Grok Voice HEALTHY 380ms 0.1% $0.050 1,203
Bedrock Nova HEALTHY 580ms 0.4% $0.055 312
Gemini Live HEALTHY 350ms 0.3% $0.048 156
Circuit Breakers: All CLOSED
Last failover: 3 days ago (OpenAI → Grok, 14:32 UTC)
Monthly cost projection (current rate):
OpenAI Realtime: $4,127
Grok Voice: $3,609
Bedrock Nova: $1,025
Gemini Live: $449
────────────────
Total: $9,210
Cost vs single-provider (OpenAI only):
Current multi-provider: $9,210/month
OpenAI-only equivalent: $15,680/month
Savings: $6,470/month (41%)
The cost savings visible in this dashboard are the direct result of the routing logic described above — different interview types going to the most cost-appropriate provider for that requirement.
Series Conclusion: What We Built
Twelve posts. Twelve layers of a complete voice AI interview platform.
We started with the fundamental constraint: humans notice latency above 300ms, and every architectural decision flows from that. We chose speech-to-speech providers (Gemini Live, OpenAI Realtime, Grok, Bedrock Nova Sonic) that collapse the cascaded pipeline’s latency overhead. We built on LiveKit for media transport, giving us the SFU layer that handles WebRTC complexity. We designed three distinct AI personas — interviewer, coach, evaluator — each with their own system prompt engineering and knowledge context.
We built the knowledge base and RAG pipeline that makes the AI an expert on the job description and company. We handled web and mobile clients with their platform-specific audio constraints. We added video interview capability where Gemini Live processes screen and camera alongside audio. We implemented the recording and compliance layer that satisfies GDPR, HIPAA, and enterprise audit requirements.
We scaled the infrastructure from 10 to 10,000 concurrent sessions with LiveKit SFU mesh, stateless agent workers, Kubernetes custom-metrics scaling, and regional deployment. We optimized costs from $3.47 to under $1.00 per interview with provider switching, context management, caching, and spot instance batch processing. And in this final post, we wrapped it all in the multi-provider adapter pattern that makes every other optimization possible — provider redundancy, cost routing, capability matching, and circuit breakers that keep the system running when individual providers degrade.
The architecture is production-ready. Every pattern in this series came from running voice interviews at scale and discovering what breaks under real conditions with real candidates.
What comes next is up to you. The adapter pattern you built in this post means adding new providers — or new modalities — is a matter of writing a new concrete adapter class. The infrastructure from Part 10 scales to whatever hiring volumes you face. The cost optimization framework from Part 11 gives you a roadmap as your usage grows.
Build well. Record responsibly. And may your TTFA stay below 500ms.
This is Part 12 of a 12-part series: The Voice AI Interview Playbook.
Series outline:
- Why Real-Time Voice Changes Everything — The landscape, the vision, and the reference architecture (Part 1)
- Cascaded vs. Speech-to-Speech — Choosing your pipeline architecture (Part 2)
- LiveKit vs. Pipecat vs. Direct — Picking your framework (Part 3)
- STT, LLM, and TTS That Actually Work — Building the voice pipeline (Part 4)
- Multi-Role Agents — Interviewer, coach, and evaluator personas (Part 5)
- Knowledge Base and RAG — Making your voice agent an expert (Part 6)
- Web and Mobile Clients — Cross-platform voice experiences (Part 7)
- Video Interview Integration — Multimodal analysis with Gemini Live (Part 8)
- Recording, Transcription, and Compliance — GDPR, HIPAA, and getting it right (Part 9)
- Scaling to Thousands — Architecture for concurrent voice sessions (Part 10)
- Cost Optimization — From $0.14/min to $0.03/min (Part 11)
- Multi-Provider Support — OpenAI Realtime, Bedrock Nova, Grok, and the adapter pattern (this post)