Real time, gemini 2 audio change? how to?

Real time, gemini 2 audio change? how to change audio vocie?

import asyncio
import base64
import contextlib
import json
import os
import wave
import logging
import pygame
from websockets import connect

# --- Configuration ---
MODEL = 'models/gemini-2.0-flash-exp'
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise EnvironmentError("GOOGLE_API_KEY environment variable is not set.")
HOST = 'generativelanguage.googleapis.com'
URI = f'wss://{HOST}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}'

# --- Constants ---
WAVE_CHANNELS = 1
WAVE_RATE = 24000
WAVE_SAMPLE_WIDTH = 2
AUDIO_FILE_PREFIX = "audio_"
MAX_LOG_CHUNK_SIZE = 200

# --- Logging Setup ---
logger = logging.getLogger('Bidi')
logger.setLevel(logging.DEBUG)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# --- Audio Utils ---
@contextlib.contextmanager
def wave_file(filename, channels=WAVE_CHANNELS, rate=WAVE_RATE, sample_width=WAVE_SAMPLE_WIDTH):
    """Context manager for creating and managing wave files."""
    try:
        with wave.open(filename, "wb") as wf:
            wf.setnchannels(channels)
            wf.setsampwidth(sample_width)
            wf.setframerate(rate)
            yield wf
    except wave.Error as e:
        logger.error(f"Error opening wave file '{filename}': {e}")
        raise

async def audio_playback_task(file_name, stop_event):
    """Plays audio using pygame until stopped."""
    logger.debug(f"Starting playback: {file_name}")
    try:
        pygame.mixer.music.load(file_name)
        pygame.mixer.music.play()
        while pygame.mixer.music.get_busy() and not stop_event.is_set():
            await asyncio.sleep(0.1)
    except pygame.error as e:
        logger.error(f"Pygame error during playback: {e}")
    except Exception as e:
        logger.error(f"Unexpected error during playback: {e}")
    finally:
      logger.debug(f"Playback complete: {file_name}")

class AudioLoop:
    def __init__(self, tools=None, system_instructions=None):
        """Initializes the AudioLoop with tools, system instructions, and pygame."""
        self.tools = tools if tools else []
        self.system_instructions = system_instructions
        self.ws = None
        self.index = 0
        self.stop_audio = asyncio.Event()
        pygame.mixer.init()
        logger.debug("AudioLoop Initialized")

    async def run(self):
        """Main execution loop for handling WebSocket connection and message processing."""
        print("Type 'q' to quit, 'i' to interrupt audio")
        extra_headers = {'Content-Type': 'application/json'}
        try:
            async with connect(URI, open_timeout=10, extra_headers=extra_headers) as ws:
                self.ws = ws
                await self._setup()
                while True:
                    if not await self._send():
                        break
                    await self._recv()
        except Exception as e:
            logger.error(f"Exception in main run: {e}")
        finally:
            logger.debug("AudioLoop Finished")
            pygame.mixer.quit()


    async def _setup(self):
        """Sets up the WebSocket connection with model and tool configurations."""
        logger.debug("Setting up WebSocket connection")
        setup_data = {
            'setup': {
                "model": MODEL,
                "tools": self.tools
            }
        }

        await self.ws.send(json.dumps(setup_data))
        raw_response = await self.ws.recv()
        setup_response = json.loads(raw_response.decode('ascii'))
        logger.debug(f'Connection established: {setup_response}')

        if self.system_instructions:
            instruction_data = {
                "client_content": {
                  "turns": [{
                     "role": "user",
                      "parts": [{"text": self.system_instructions}]
                   }]
                  }
              }
            await self.ws.send(json.dumps(instruction_data))
            logger.debug('System instructions sent')

    async def _send(self):
        """Sends user input to the model."""
        logger.debug('Preparing to send message')
        text = await asyncio.to_thread(input, "message > ")

        if text.lower() == 'q':
            logger.debug("User initiated quit command")
            return False
        if text.lower() == 'i':
            logger.debug("User interrupted audio playback")
            self.stop_audio.set()  # Signal audio task to stop
            return True

        msg = {
            "client_content": {
                "turns": [{
                    "role": "user",
                    "parts": [{"text": text}]
                }],
                'turn_complete': True
            }
        }
        try:
            await self.ws.send(json.dumps(msg))
            logger.debug('Message sent successfully')
            return True
        except Exception as e:
            logger.error(f"Error sending message: {e}")
            return False

    async def _recv(self):
        """Receives and processes responses from the model, handling audio and turn completion."""
        file_name = f"{AUDIO_FILE_PREFIX}{self.index}.wav"
        self.index += 1
        audio_task = None
        self.stop_audio.clear() # Clear the flag for the next playback

        try:
            with wave_file(file_name) as wav:
                logger.debug('Receiving data from server')
                async for raw_response in self.ws:
                    try:
                        response = json.loads(raw_response.decode())
                        logger.debug(f'Received chunk: {str(response)[:MAX_LOG_CHUNK_SIZE]}')

                        server_content = response.get('serverContent')
                        if not server_content:
                            logger.error(f'Unhandled message (no serverContent): {response}')
                            break

                        model_turn = server_content.get('modelTurn')
                        if model_turn and model_turn.get('parts'):
                            for part in model_turn['parts']:
                                if 'inlineData' in part and 'data' in part['inlineData']:
                                    b64data = part['inlineData']['data']
                                    pcm_data = base64.b64decode(b64data)
                                    logger.debug(f"Received PCM data (size: {len(pcm_data)} bytes)")
                                    wav.writeframes(pcm_data)

                        if server_content.get('turnComplete'):
                            logger.debug('Server turn complete')
                            break
                    except json.JSONDecodeError:
                        logger.error(f"Error decoding server message. Raw: {raw_response}")
                        break
                    except Exception as e:
                        logger.error(f"Error processing message: {e}, Raw: {raw_response}")
                        break

            audio_task = asyncio.create_task(audio_playback_task(file_name, self.stop_audio))
            if not self.stop_audio.is_set():
                 await audio_task # Wait audio to be played if not interrupted
        except Exception as e:
            logger.error(f"Error in recv: {e}")



# --- Run the Example ---
async def main():
    """Main function to start the audio loop with specific configurations."""
    tools = [
        {'google_search': {}},
    ]
    system_instructions = "You are a  helpfull assistant"
    try:
        await AudioLoop(tools=tools, system_instructions=system_instructions).run()
    except Exception as e:
      logger.error(f"Error in main: {e}")


if __name__ == "__main__":
    asyncio.run(main())

this is simple text to audio assistant : realtime api, who knows how to change audio vocie???

1 Like

Hi,

Haven’t tried it myself, however, the generationConfighas a new property speechConfig to configure and control voice output. Maybe it helps.

in case any one needs here is correct code

import asyncio
import base64
import json
import numpy as np
import os
import websockets
import wave
import contextlib
import pygame
from IPython.display import display, Markdown

# ANSI color codes
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
BLUE = "\033[94m"
RESET = "\033[0m"

voices = {"Puck", "Charon", "Kore", "Fenrir", "Aoede"}

# --- Configuration ---
MODEL = 'models/gemini-2.0-flash-exp'
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise EnvironmentError("GOOGLE_API_KEY environment variable is not set.")
HOST = 'generativelanguage.googleapis.com'
URI = f'wss://{HOST}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}'

# Audio parameters
WAVE_CHANNELS = 1  # Mono audio
WAVE_RATE = 24000
WAVE_SAMPLE_WIDTH = 2


@contextlib.contextmanager
def wave_file(filename, channels=WAVE_CHANNELS, rate=WAVE_RATE, sample_width=WAVE_SAMPLE_WIDTH):
    """Context manager for creating and managing wave files."""
    try:
        with wave.open(filename, "wb") as wf:
            wf.setnchannels(channels)
            wf.setsampwidth(sample_width)
            wf.setframerate(rate)
            yield wf
    except wave.Error as e:
        print(f"{RED}Error opening wave file '{filename}': {e}{RESET}")
        raise


async def audio_playback_task(file_name, stop_event):
    """Plays audio using pygame until stopped."""
    print(f"{BLUE}Starting playback: {file_name}{RESET}")
    try:
        pygame.mixer.music.load(file_name)
        pygame.mixer.music.play()
        while pygame.mixer.music.get_busy() and not stop_event.is_set():
            await asyncio.sleep(0.1)
    except pygame.error as e:
        print(f"{RED}Pygame error during playback: {e}{RESET}")
    except Exception as e:
        print(f"{RED}Unexpected error during playback: {e}{RESET}")
    finally:
        print(f"{BLUE}Playback complete: {file_name}{RESET}")


async def generate_audio(ws, text_input: str, voice_name="Kore") -> None:
    """
    Sends text input to the Gemini API, receives an audio response, saves it to a file, and plays it back.
    Relies on the server to maintain the session history.
    """
    pygame.mixer.init()  # Initialize pygame mixer

    msg = {
        "client_content": {
            "turns": [{"role": "user", "parts": [{"text": text_input}]}],
            "turn_complete": True,
        }
    }
    await ws.send(json.dumps(msg))

    responses = []
    async for raw_response in ws:
        response = json.loads(raw_response.decode())
        server_content = response.get("serverContent")
        if server_content is None:
            break

        model_turn = server_content.get("modelTurn")
        if model_turn:
            parts = model_turn.get("parts")
            if parts:
                for part in parts:
                    if "inlineData" in part and "data" in part["inlineData"]:
                        pcm_data = base64.b64decode(part["inlineData"]["data"])
                        responses.append(np.frombuffer(pcm_data, dtype=np.int16))

        turn_complete = server_content.get("turnComplete")
        if turn_complete:
            break

    if responses:
        display(Markdown(f"{YELLOW}**Response >**{RESET}"))
        audio_array = np.concatenate(responses)
        file_name = 'output.wav'
        with wave_file(file_name) as wf:
            wf.writeframes(audio_array.tobytes())
        stop_event = asyncio.Event()
        try:
            await audio_playback_task(file_name, stop_event)
        except Exception as e:
            print(f"{RED}Error during audio playback: {e}{RESET}")
    else:
        print(f"{YELLOW}No audio returned{RESET}")
    pygame.mixer.quit()  # clean up pygame mixer


async def main():
    print(f"{GREEN}Available voices: {', '.join(voices)}{RESET}")
    default_voice = "Kore"
    print(f"{GREEN}Default voice is set to: {default_voice}, you can change it in the code{RESET}")

    config = {
        "response_modalities": ["AUDIO"],
        "speech_config": {
            "voice_config": {
                "prebuilt_voice_config": {
                    "voice_name": default_voice  # Set voice
                }
            }
        }
    }

    async with websockets.connect(URI) as ws:

        async def setup(ws) -> None:
            await ws.send(
                json.dumps(
                    {
                        "setup": {
                            "model": MODEL,
                            "generation_config": config,
                        }
                    }
                )
            )

            raw_response = await ws.recv(decode=False)
            setup_response = json.loads(raw_response.decode("ascii"))
            print(f"{GREEN}Connected: {setup_response}{RESET}")

        await setup(ws)
        while True:
            text_prompt = input(f"{YELLOW}Enter your text (or type 'exit' to quit): {RESET}")
            if text_prompt.lower() == "exit":
                break

            try:
                await generate_audio(ws, text_prompt, default_voice)
            except Exception as e:
                print(f"{RED}An error occurred: {e}{RESET}")


if __name__ == "__main__":
    asyncio.run(main())
4 Likes

ok now, any ideas how to make these vocies SING? or change how they speak?