Manual Activity Detection: Second turn not processed - am I missing something?

I’m implementing a multi-turn audio conversation using manual activity detection with the Gemini Live API, but only the first turn is being processed. Subsequent turns receive zero responses from the API, and I’m wondering if I’m missing a required step or configuration.

Configuration:

from google import genai
from google.genai import types

client = genai.Client()
model = "gemini-2.5-flash-native-audio-preview-12-2025"

config = {
    "response_modalities": ["AUDIO"],
    "input_audio_transcription": {},
    "output_audio_transcription": {},
    "realtime_input_config": {
        "automatic_activity_detection": {"disabled": True}
    }
}

async with client.aio.live.connect(model=model, config=config) as session:
    # Send turns...

My Implementation Pattern:

# Turn 1
await session.send_realtime_input(activity_start={})
# Send audio in chunks
for chunk in audio_chunks:
    await session.send_realtime_input(
        audio=types.Blob(data=chunk, mime_type="audio/pcm;rate=16000")
    )
# Add silence padding
await session.send_realtime_input(
    audio=types.Blob(data=silence, mime_type="audio/pcm;rate=16000")
)
await session.send_realtime_input(activity_end={})

# Wait for turn_complete
# (properly received and processed)

# Wait 2+ seconds for session to stabilize

# Turn 2 - Same pattern
await session.send_realtime_input(activity_start={})
# Send audio chunks...
await session.send_realtime_input(activity_end={})

# NO RESPONSES RECEIVED - timeout after 30 seconds

What I Observe:

Turn 1 (Works perfectly):

  • Sends: activity_start → 117 audio chunks → silence → activity_end

  • Receives: 14 server messages including input transcriptions, model output, and turn_complete

  • Duration: ~5 seconds

Turn 2 (Fails completely):

  • Sends: activity_start → 88 audio chunks → silence → activity_end (identical process)

  • Receives: Zero server messages (no transcription, no model output, no turn_complete)

  • Result: Timeout after 30 seconds

Detailed Logs:

TURN 1:
09:23:22 | >> [SEND] activity_start
09:23:24 | << [RECV #1] [HEARD]: Hello
09:23:24 | >> [SEND] Sent 117 audio chunks (119040 bytes)
09:23:25 | >> [SEND] activity_end
09:23:26 | << [RECV #11] [TUTOR]: Paris.
09:23:27 | << [RECV #14] ✓ TURN_COMPLETE

[2 second pause]

TURN 2:
09:23:30 | >> [SEND] activity_start
09:23:31 | >> [SEND] Sent 88 audio chunks (89856 bytes)
09:23:31 | >> [SEND] activity_end
09:24:02 | TIMEOUT - No responses after 30 seconds
           Total responses still at #14 (no new messages)

What I’ve Tried:

  1. Waiting for turn_complete event before sending next turn

  2. Adding delays from 0.5s to 2s between turns

  3. Adding 0.3s stabilization after activity_start before sending audio

  4. Sending silence padding (100ms) before activity_end

  5. Adding 0.2s delay after audio chunks complete

  6. Verifying audio format is correct (16-bit PCM, 16kHz)

  7. Ensuring connection remains open between turns

Questions:

  1. Am I missing a required signal or message between turns when using manual activity detection?

  2. Do I need to send a session reset or re-initialization message after turn_complete?

  3. Is there a state I need to clear before the session will accept a new activity_start?

  4. Should I be using send_client_content instead of send_realtime_input for subsequent turns?

  5. Is manual activity detection mode intended only for single-turn interactions?

The documentation doesn’t mention any special requirements for multi-turn conversations with manual activity detection, so I’m wondering if there’s an undocumented step I’m missing or if this is a limitation of the current API.

Full reproducible code pasted below for reference. Any guidance would be greatly appreciated!

Environment:

  • Python SDK: google-genai (latest)

  • Model: gemini-2.5-flash-native-audio-preview-12-2025

  • Audio format: 16-bit PCM, 16kHz, mono

import asyncio
import os
import io
import logging
from gtts import gTTS
from pydub import AudioSegment
from google import genai
from google.genai import types

# --- CONFIG ---
API_KEY = os.getenv("gemini_api_key") 
MODEL = "gemini-2.5-flash-native-audio-preview-12-2025"

logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("manual_vad_final")

client = genai.Client(api_key=API_KEY)

def generate_audio(text: str) -> bytes:
    """Generate PCM audio from text using gTTS"""
    mp3 = io.BytesIO()
    gTTS(text).write_to_fp(mp3)
    mp3.seek(0)
    return AudioSegment.from_file(mp3, format="mp3") \
        .set_frame_rate(16000).set_channels(1).set_sample_width(2).raw_data

async def run_manual_vad_test():
    """
    Most robust implementation of manual VAD for multi-turn conversation.
    This is our best attempt before reporting the bug to Google.
    """
    
    turns_data = [
        "Hello tutor, what is the capital of France?",
        "And what is the population of Paris?",
        "Thanks, goodbye!"
    ]

    # Manual VAD configuration
    config = {
        "response_modalities": ["AUDIO"],
        "system_instruction": {"parts": [{"text": "You are a concise tutor. Give brief answers."}]},
        "input_audio_transcription": {},
        "output_audio_transcription": {},
        "realtime_input_config": {
            "automatic_activity_detection": {
                "disabled": True  # MANUAL mode
            }
        }
    }

    async with client.aio.live.connect(model=MODEL, config=config) as session:
        logger.info("=" * 70)
        logger.info("Connected to Gemini Live - MANUAL ACTIVITY DETECTION MODE")
        logger.info("=" * 70)
        logger.info("")

        turn_complete_event = asyncio.Event()
        current_turn = [0]
        response_count = [0]
        last_turn_complete_time = [None]
        
        async def receiver():
            """Monitor all responses from Gemini"""
            try:
                async for response in session.receive():
                    response_count[0] += 1
                    
                    sc = getattr(response, "server_content", None)
                    if sc:
                        # Check for turn_complete
                        if getattr(sc, "turn_complete", False):
                            logger.info(f"<< [RECV #{response_count[0]}] ✓ TURN_COMPLETE (Turn {current_turn[0]})")
                            last_turn_complete_time[0] = asyncio.get_event_loop().time()
                            turn_complete_event.set()
                        
                        # Log output transcription
                        if getattr(sc, "output_transcription", None):
                            txt = sc.output_transcription.text
                            if txt:
                                logger.info(f"<< [RECV #{response_count[0]}] [TUTOR]: {txt}")
                        
                        # Log input transcription (what Gemini heard)
                        if getattr(sc, "input_transcription", None):
                            txt = sc.input_transcription.text
                            if txt:
                                logger.info(f"<< [RECV #{response_count[0]}] [HEARD]: {txt}")
                        
                        # Check for interruption
                        if getattr(sc, "interrupted", False):
                            logger.warning(f"<< [RECV #{response_count[0]}] ⚠ INTERRUPTED")
                        
                        # Log model turn if present
                        if getattr(sc, "model_turn", None):
                            logger.info(f"<< [RECV #{response_count[0]}] Model turn received")
                    
                    # Log any other response types
                    if hasattr(response, 'text') and response.text:
                        logger.info(f"<< [RECV #{response_count[0]}] TEXT: {response.text}")
                        
            except asyncio.CancelledError:
                logger.info("Receiver task cancelled")
            except Exception as e:
                logger.error(f"Receiver error: {e}")
                import traceback
                traceback.print_exc()

        async def send_turn_with_manual_vad(turn_num: int, text: str):
            """Send a single turn with manual activity detection"""
            current_turn[0] = turn_num
            
            logger.info("")
            logger.info("=" * 70)
            logger.info(f"TURN {turn_num} START")
            logger.info("=" * 70)
            logger.info(f"[USER] Speaking: '{text}'")
            
            # Wait for previous turn to fully complete (if not first turn)
            if turn_num > 1:
                if last_turn_complete_time[0]:
                    elapsed = asyncio.get_event_loop().time() - last_turn_complete_time[0]
                    wait_time = max(0, 2.0 - elapsed)
                    if wait_time > 0:
                        logger.info(f"[SYNC] Waiting {wait_time:.2f}s for session to stabilize...")
                        await asyncio.sleep(wait_time)
                
                logger.info(f"[SYNC] Previous turn complete, proceeding with Turn {turn_num}")
            
            # Generate audio
            pcm_audio = generate_audio(text)
            logger.info(f"[AUDIO] Generated {len(pcm_audio)} bytes of PCM audio")
            
            # STEP 1: Send activity_start
            logger.info(f">> [SEND] activity_start")
            await session.send_realtime_input(activity_start={})
            await asyncio.sleep(0.3)  # Let server register activity_start
            
            # STEP 2: Send audio in chunks
            chunk_size = 1024
            chunks_sent = 0
            for i in range(0, len(pcm_audio), chunk_size):
                chunk = pcm_audio[i:i+chunk_size]
                await session.send_realtime_input(
                    audio=types.Blob(data=chunk, mime_type="audio/pcm;rate=16000")
                )
                chunks_sent += 1
                await asyncio.sleep(0.01)  # Simulate streaming
            
            logger.info(f">> [SEND] Sent {chunks_sent} audio chunks ({len(pcm_audio)} bytes)")
            await asyncio.sleep(0.2)  # Let audio settle
            
            # STEP 3: Send padding silence (helps with audio processing)
            silence_padding = b"\x00" * 3200  # 100ms of silence at 16kHz
            logger.info(f">> [SEND] Sending silence padding")
            await session.send_realtime_input(
                audio=types.Blob(data=silence_padding, mime_type="audio/pcm;rate=16000")
            )
            await asyncio.sleep(0.3)
            
            # STEP 4: Send activity_end
            logger.info(f">> [SEND] activity_end")
            await session.send_realtime_input(activity_end={})
            await asyncio.sleep(0.2)
            
            logger.info(f"[TURN {turn_num}] Audio fully sent, waiting for turn_complete...")
            
            # STEP 5: Wait for turn_complete with timeout
            try:
                await asyncio.wait_for(turn_complete_event.wait(), timeout=30.0)
                turn_complete_event.clear()
                logger.info(f"[TURN {turn_num}] ✓ Turn complete confirmed!")
            except asyncio.TimeoutError:
                logger.error(f"[TURN {turn_num}] ✗ TIMEOUT - No turn_complete received after 30s")
                logger.error(f"[TURN {turn_num}] Total responses received: {response_count[0]}")
                raise

        async def sender():
            """Send all turns sequentially"""
            try:
                for i, text in enumerate(turns_data, start=1):
                    await send_turn_with_manual_vad(i, text)
                    
                    # Pause between turns
                    if i < len(turns_data):
                        logger.info(f"[PAUSE] Waiting before next turn...")
                        await asyncio.sleep(2.0)
                
                logger.info("")
                logger.info("=" * 70)
                logger.info("ALL TURNS COMPLETED SUCCESSFULLY!")
                logger.info("=" * 70)
                
            except Exception as e:
                logger.error(f"Sender failed: {e}")
                import traceback
                traceback.print_exc()

        # Start both tasks
        receiver_task = asyncio.create_task(receiver())
        sender_task = asyncio.create_task(sender())

        # Wait for sender to complete
        await sender_task
        
        # Give receiver a moment to catch final messages
        await asyncio.sleep(2)
        
        # Clean shutdown
        receiver_task.cancel()
        try:
            await receiver_task
        except asyncio.CancelledError:
            pass

    logger.info("")
    logger.info("Session closed - Test complete")
    logger.info(f"Total responses received: {response_count[0]}")

if __name__ == "__main__":
    try:
        asyncio.run(run_manual_vad_test())
    except KeyboardInterrupt:
        logger.info("Test interrupted by user")
    except Exception as e:
        logger.error(f"Test failed with error: {e}")
        import traceback
        traceback.print_exc()