Hello,
according to the documentation, the TTS models support streaming output:
However, after playing around with this feature with both the Python and Java SDK, I only ever receive a single chunk with the complete audio after a long period of waiting. Even for a 90 second speech (single-speaker TTS).
Is there anything to watch out for, which is not mentioned in the guide? Did any of you have success in streaming smaller chunks of audio?
The documenation is also quite vague as it does not show the full interaction with the client. I was assuming you need to use the generate_content_stream
method like you would for streaming text responses (maybe a wrong assumption?).
Thank you all!
2 Likes
Hey @Martin_Hiller , Thanks for the info. Definitely feels like better docs or full examples would help, especially around streaming TTS properly. Will follow up with the team on this.
1 Like
@GUNAND_MAYANGLAMBAM any update on this? Now I have no idea how to get the audio output in streaming mode, I saw a link about streaming generating text output cookbook/quickstarts/Streaming.ipynb at main · google-gemini/cookbook · GitHub with api generate_content_stream. But it does not work for gemini tts model.
Any update? We are at the same point, we’d love to build around and test this, but without knowing how to stream the output, we can’t use it
any updates on this? When can we expect a resolution? What is the priority of this?
@GUNAND_MAYANGLAMBAM
Here is the raw implementation and with sdk implmentation both were not working for the streaming. they are directly sending the whole audio chunk at once.
with sdk
async def generate_audio(self, text: str) -> AsyncGenerator[bytes, None]:
"""
Streams audio bytes generated by Google Gemini TTS for the given text.
Yields: bytes chunks of audio data.
"""
try:
client = genai.Client(api_key=self.api_key)
contents = [
google_types.Content(
role="user",
parts=[
google_types.Part.from_text(text=text),
],
),
]
generate_content_config = google_types.GenerateContentConfig(
temperature=self.temperature,
response_modalities=[
"audio",
],
speech_config=google_types.SpeechConfig(
voice_config=google_types.VoiceConfig(
prebuilt_voice_config=google_types.PrebuiltVoiceConfig(
voice_name=self.voice_name
)
)
),
)
async for chunk in await client.aio.models.generate_content_stream(
model=self.model,
contents=contents,
config=generate_content_config,
):
# Defensive: check for audio data in the chunk
if (
chunk.candidates is None
or chunk.candidates[0].content is None
or chunk.candidates[0].content.parts is None
):
continue
part = chunk.candidates[0].content.parts[0]
if hasattr(part, "inline_data") and part.inline_data and part.inline_data.data:
# part.inline_data.data is bytes
yield part.inline_data.data
# Optionally, you could yield text if present (for debugging)
# elif hasattr(chunk, "text") and chunk.text:
# print(chunk.text)
except Exception as e:
import pdb;pdb.set_trace()
print(f"Error in generating audio: {e}")
raise e
without sdk
async def generate_audio_sse(self, text: str) -> AsyncGenerator[bytes, None]:
"""
Streams audio bytes generated by Google Gemini TTS for the given text.
Uses REST API instead of SDK (avoids chunk-too-big).
"""
url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model}:streamGenerateContent?alt=sse"
headers = {
"x-goog-api-key": self.api_key,
"Content-Type": "application/json",
}
body = {
"contents": [{
"parts": [{"text": text}]
}],
"generationConfig": {
"responseModalities": ["AUDIO"],
"speechConfig": {
"voiceConfig": {
"prebuiltVoiceConfig": {
"voiceName": self.voice_name
}
}
}
}
}
async with aiohttp.ClientSession(read_bufsize=25 * 1024 * 1024) as session:
async with session.post(url, headers=headers, json=body) as resp:
if resp.status != 200:
err = await resp.text()
raise RuntimeError(f"TTS request failed {resp.status}: {err}")
# Process SSE line by line
async for line in resp.content:
# The line is a bytes object, e.g.:
# b'data: {...}\r\n'
# We want to extract the JSON after "data: "
try:
line_str = line.decode("utf-8").strip()
if not line_str.startswith("data: "):
continue
json_str = line_str[6:] # Remove "data: "
event = json.loads(json_str)
candidates = event.get("candidates", [])
if not candidates:
continue
content = candidates[0].get("content", {})
parts = content.get("parts", [])
if not parts:
continue
inline = parts[0].get("inlineData", {})
audio_b64 = inline.get("data")
if audio_b64:
yield base64.b64decode(audio_b64)
except Exception as e:
print(f"Error parsing SSE chunk: {e}")
continue