Hello,
I’m Trying to update my current configuration for a room vocal assistant from using Gemini 2.5 Flash to Gemini 3.1 Flash.
When I try to update the system in order to use Gemini 3.1 Flash, the server doesn’t reply anything back,
This is the script for the 3.1 version that I’m using to debug the connection:
import os
import sys
import time
import json
import socket
import subprocess
import threading
import signal
import cv2
import requests
import asyncio
import struct
import math
import re
import warnings
import builtins
import wave
from dotenv import load_dotenv
FORZA IPv4 PER EVITARE HANG DEL WEBSOCKET SU RASPBERRY PIETRO VPN
old_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
res = old_getaddrinfo(*args, **kwargs)
return [r for r in res if r[0] == socket.AF_INET]
socket.getaddrinfo = new_getaddrinfo
import pvporcupine
from pvrecorder import PvRecorder
SDK Google GenAI
from google import genai
from google.genai import types
— DEBUG SDK —
print(f"\n[DEBUG SDK] Classi Live disponibili: {[name for name in dir(types) if ‘Live’ in name]}\n")
Moduli per il server Web Bluetooth
from flask import Flask, jsonify
import logging
Nascondiamo gli avvisi per mantenere il terminale pulito
warnings.filterwarnings(“ignore”, category=DeprecationWarning)
Carica le chiavi API dal file .env
load_dotenv()
======== CONFIGURAZIONE CHIAVI ==================================
PICOVOICE_ACCESS_KEY = os.environ.get(“PICOVOICE_ACCESS_KEY”)
GEMINI_API_KEY = os.environ.get(“GEMINI_API_KEY”)
TEAMS_WEBHOOK_URL = os.environ.get(“TEAMS_WEBHOOK_URL”)
if not all([PICOVOICE_ACCESS_KEY, GEMINI_API_KEY]):
print(“ERRORE: Assicurati di aver inserito le chiavi PICOVOICE e GEMINI nel file .env”)
exit(1)
======== CONFIGURAZIONE GEMINI 3.1 (MARZO 2026) =================
Usiamo il MODEL_ID completo con prefisso come verificato dal curl
MODEL_ID = “models/gemini-3.1-flash-live-preview”
client = genai.Client(
api_key=GEMINI_API_KEY,
http_options={‘api_version’: ‘v1alpha’}
)
======== CONFIGURAZIONE SISTEMA E HARDWARE ======================
QSYS_CORE_IP = “192.168.100.138”
QSYS_RTSP_CAMERA_URL = “rtsp://192.168.100.168/preview”
WAKE_WORD_PATH = “ok-viola.ppn”
MIC_DEVICE_INDEX = 1
==============================================================
MAGIA DEL TERMINALE LIVE SU Q-SYS (Custom Controls “TextAI”)
==============================================================
MAX_LOG_LINES = 8
qsys_log_history =
def _send_to_qsys_bg(display_text):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1.0)
s.connect((QSYS_CORE_IP, 1710))
rpc_msg = {
“jsonrpc”: “2.0”,
“method”: “Component.Set”,
“params”: {
“Name”: “TextAI”,
“Controls”: [{“Name”: “text.1”, “Value”: display_text}]
},
“id”: 9999
}
s.sendall(json.dumps(rpc_msg).encode() + b’\x00’)
s.close()
except: pass
def qsys_print(*args, **kwargs):
msg = " ".join(str(arg) for arg in args)
builtins.print(msg, **kwargs)
if not msg.strip() or msg == “\n”: return
global qsys_log_history
qsys_log_history.append(msg)
if len(qsys_log_history) > MAX_LOG_LINES: qsys_log_history.pop(0)
display_text = "\r\n".join(qsys_log_history)
threading.Thread(target=_send_to_qsys_bg, args=(display_text,), daemon=True).start()
print = qsys_print
==============================================================
COMUNICAZIONE QRC CON Q-SYS
==============================================================
current_process = None
is_media_playing_flag = False
def send_qrc_component_position(component_name, control_name, position_val):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2.0)
s.connect((QSYS_CORE_IP, 1710))
msg = {
“jsonrpc”: “2.0”, “method”: “Component.Set”,
“params”: {“Name”: component_name, “Controls”: [{“Name”: control_name, “Position”: position_val}]}, “id”: 1
}
s.sendall(json.dumps(msg).encode() + b’\x00’)
s.close()
return True
except: return False
def get_qrc_component_position(component_name, control_name):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2.0)
s.connect((QSYS_CORE_IP, 1710))
msg = {
“jsonrpc”: “2.0”, “method”: “Component.Get”,
“params”: {“Name”: component_name, “Controls”: [{“Name”: control_name}]}, “id”: 1234
}
s.sendall(json.dumps(msg).encode() + b’\x00’)
data = b""
start_time = time.time()
while time.time() - start_time < 2.0:
chunk = s.recv(2048)
if not chunk: break
data += chunk
if b’\x00’ in data:
messages = data.split(b’\x00’)
for m in messages:
if not m: continue
try:
response = json.loads(m.decode())
if response.get(“id”) == 1234:
s.close()
if “error” in response: return 0.5
return float(response[“result”][“Controls”][0][“Position”])
except: pass
data = messages[-1]
s.close()
return 0.5
except: return 0.5
def send_qrc_named_control(control_name, position_val):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2.0)
s.connect((QSYS_CORE_IP, 1710))
msg = {
“jsonrpc”: “2.0”, “method”: “Control.Set”,
“params”: {“Name”: control_name, “Position”: float(position_val)}, “id”: 1
}
s.sendall(json.dumps(msg).encode() + b’\x00’)
s.close()
return True
except: return False
def send_qrc_named_control_value(control_name, value_str):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2.0)
s.connect((QSYS_CORE_IP, 1710))
msg = {
“jsonrpc”: “2.0”, “method”: “Control.Set”,
“params”: {“Name”: control_name, “Value”: str(value_str)}, “id”: 1
}
s.sendall(json.dumps(msg).encode() + b’\x00’)
s.close()
return True
except: return False
def get_qrc_named_control(control_name):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2.0)
s.connect((QSYS_CORE_IP, 1710))
msg = {
“jsonrpc”: “2.0”, “method”: “Control.Get”,
“params”: [control_name], “id”: 1235
}
s.sendall(json.dumps(msg).encode() + b’\x00’)
data = b""
start_time = time.time()
while time.time() - start_time < 2.0:
chunk = s.recv(2048)
if not chunk: break
data += chunk
if b’\x00’ in data:
messages = data.split(b’\x00’)
for m in messages:
if not m: continue
try:
response = json.loads(m.decode())
if response.get(“id”) == 1235:
s.close()
if “error” in response: return None
return response[“result”][0].get(“Value”)
except: pass
data = messages[-1]
s.close()
return None
except: return None
==============================================================
FUNZIONI CONTROLLO LED E CHIUSURA
==============================================================
def set_ai_led(is_active: bool):
val = 1.0 if is_active else 0.0
send_qrc_named_control(“LED_AI”, val)
force_quit_session = False
is_ai_session_active = False
def end_ai_session() → str:
global force_quit_session
force_quit_session = True
return “Imposto la chiusura della sessione vocale. Saluta l’utente.”
==============================================================
STRUMENTI PER GEMINI (Momentaneamente non passati al config)
==============================================================
def switch_qsys_video_source(action: str, source_name: str = “USBC”) → str:
“”“Cambia la sorgente video principale sul Q-SYS. Sorgenti valide: PC, USBC, RASPBERRY, CAMERA, SCREENSAVER.”“”
source_map = {
“PC”: “AV 1”,
“HDMI”: “AV 1”,
“USBC”: “AV 2”,
“USB-C”: “AV 2”,
“RASPBERRY”: “HDMI 1”,
“CAMERA”: “Mediacast In 1”,
“SCREENSAVER”: “Graphic 1”
}
reverse_map = {
"AV 1": "HDMI del PC",
"AV 2": "USB-C del PC",
"HDMI 1": "Raspberry",
"Mediacast In 1": "Camera della sala",
"Graphic 1": "Screensaver"
}
ctrl_name = "Selettore_NV32"
if action == 'get':
current_val = get_qrc_named_control(ctrl_name)
if current_val in reverse_map:
return f"La sorgente video attuale è: {reverse_map[current_val]}."
elif current_val:
return f"La sorgente video attuale è impostata su: {current_val}."
else:
return "Non riesco a leggere la sorgente video attuale dal Core."
elif action == 'set':
target = source_name.upper().replace(" ", "")
qsys_value = None
for key in source_map.keys():
if key in target:
qsys_value = source_map[key]
break
if not qsys_value:
qsys_value = "AV 2"
success = send_qrc_named_control_value(ctrl_name, qsys_value)
if success:
return f"Sorgente cambiata con successo in {reverse_map.get(qsys_value, qsys_value)}."
return "Errore nel cambio sorgente."
def change_volume(action: str, level: float = 10.0) → str:
“”“Modifica il volume della sala Q-SYS. Azioni: get, set, up, down. Level da 0 a 100.”“”
current_pos = get_qrc_component_position(“VolumeSala”, “gain”)
current_pct = current_pos * 100.0
if action == 'get': return f"Il volume è al {int(current_pct)}%."
if action == 'set': new_pct = level
elif action == 'up': new_pct = current_pct + level
elif action == 'down': new_pct = current_pct - level
else: new_pct = current_pct
new_pct = max(0.0, min(100.0, new_pct))
send_qrc_component_position("VolumeSala", "gain", new_pct / 100.0)
return f"Volume impostato al {int(new_pct)}%."
TOOL_MAP = {
“switch_qsys_video_source”: switch_qsys_video_source,
“change_volume”: change_volume,
“end_ai_session”: end_ai_session
}
==============================================================
LOGICA LIVE API (AUDIO NATIVO WEBSOCKET)
==============================================================
def play_beep():
filename = “/tmp/beep_loud.wav”
if not os.path.exists(filename):
sample_rate = 16000
with wave.open(filename, ‘wb’) as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
frames = [struct.pack(‘<h’, int(32000.0 * math.sin(2.0 * math.pi * 880.0 * i / sample_rate))) for i in range(int(sample_rate * 0.15))]
wf.writeframes(b’'.join(frames))
subprocess.run([“aplay”, “-D”, “plughw:2,0”, “-q”, filename], stderr=subprocess.DEVNULL)
system_prompt = “”"
Sei ‘Viola’, l’Intelligenza Artificiale avanzata di una sala riunioni Prase.
Rispondi in modo vocale conciso, naturale e amichevole. Non usare elenchi lunghi.
“”"
async def live_api_session(recorder):
ai_is_speaking = False
last_activity_time = time.time()
global is_ai_session_active, force_quit_session
is_ai_session_active = True
force_quit_session = False
set_ai_led(True)
print(f"\n[LIVE 3.1] 🚀 AVVIO PROTOCOLLO DI NEGOZIAZIONE...")
playback_queue = asyncio.Queue()
aplay_process = subprocess.Popen(
['aplay', '-D', 'plughw:2,0', '-f', 'S16_LE', '-c', '1', '-r', '48000', '-q'],
stdin=subprocess.PIPE, stderr=subprocess.DEVNULL
)
# CONFIGURAZIONE MINIMALE TESTATA PER MARZO 2026
# Non usiamo classi, passiamo un dizionario "flat"
config = {
"response_modalities": ["AUDIO"],
"system_instruction": {"parts": [{"text": system_prompt}]}
}
try:
# Nota: usiamo esattamente l'ID del curl
async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
print("[LIVE 3.1] 🔗 WebSocket Stabilito. Attendo Handshake...")
setup_event = asyncio.Event()
async def recv_loop():
nonlocal last_activity_time
try:
# Usiamo session.receive() che è un generatore
async for response in session.receive():
# --- QUESTO PRINT DEVE APPARIRE ---
print(f"\n[DEBUG] Messaggio ricevuto dal server: {type(response)}")
if getattr(response, 'setup_complete', None):
print("✅✅✅ HANDSHAKE COMPLETATO! VIOLA È VIVA.")
setup_event.set()
# Mandiamo un testo per "bucare" il silenzio
try:
await session.send(input=types.LiveClientRealtimeInput(text="Ciao"))
except Exception: pass
continue
if getattr(response, 'server_content', None) and response.server_content.model_turn:
for part in response.server_content.model_turn.parts:
if getattr(part, 'inline_data', None):
playback_queue.put_nowait(part.inline_data.data)
last_activity_time = time.time()
except Exception as e:
print(f"❌ Errore ricezione: {e}")
async def mic_loop():
await setup_event.wait()
print("[LIVE 3.1] 🎤 Microfono in streaming HD...")
while not force_quit_session:
pcm = await asyncio.to_thread(recorder.read)
pcm_bytes = struct.pack(f"<{len(pcm)}h", *pcm)
try:
# Invia audio come Blob binario
await session.send(input=types.LiveClientRealtimeInput(
audio=types.Blob(data=pcm_bytes, mime_type="audio/pcm;rate=16000")
))
except: break
await asyncio.sleep(0.01)
async def playback_worker():
while not force_quit_session:
try:
chunk = await asyncio.wait_for(playback_queue.get(), timeout=0.5)
if aplay_process.poll() is None:
aplay_process.stdin.write(chunk)
aplay_process.stdin.flush()
except: pass
# Watchdog di emergenza: se non riceve setup_complete entro 10s, chiude
async def setup_watchdog():
await asyncio.sleep(10)
if not setup_event.is_set():
print("\n[ERRORE] Il server non ha risposto al setup. Possibile blocco VPN/Regionale.")
global force_quit_session
force_quit_session = True
await asyncio.gather(recv_loop(), mic_loop(), playback_worker(), setup_watchdog())
except Exception as e:
print(f"\n[ERRORE CRITICO] {e}")
finally:
is_ai_session_active = False
if 'aplay_process' in locals(): aplay_process.terminate()
play_beep()
set_ai_led(False)
print("\n[LIVE 3.1] 🛑 Connessione chiusa. Ritorno in Standby.")
==============================================================
SERVER WEB FLASK PER CONTROLLO BLUETOOTH
==============================================================
app = Flask(name)
log = logging.getLogger(‘werkzeug’)
log.setLevel(logging.ERROR)
@app.route(‘/bluetooth/pair’, methods=[‘GET’, ‘POST’])
def bt_pair():
subprocess.run(“bluetoothctl system-alias ‘Bluetooth Sala 1’”, shell=True)
subprocess.run(“bluetoothctl discoverable on”, shell=True)
subprocess.run(“bluetoothctl pairable on”, shell=True)
return “Pairing attivo”, 200
@app.route(‘/bluetooth/reset’, methods=[‘GET’, ‘POST’])
def bt_reset():
subprocess.run(“bluetoothctl discoverable off”, shell=True)
subprocess.run(“for dev in $(bluetoothctl devices | awk ‘{print $2}’); do bluetoothctl remove $dev; done”, shell=True)
return “Bluetooth azzerato”, 200
@app.route(‘/bluetooth/playpause’, methods=[‘GET’, ‘POST’])
def bt_playpause():
subprocess.run(“playerctl play-pause”, shell=True)
return “OK”, 200
@app.route(‘/bluetooth/next’, methods=[‘GET’, ‘POST’])
def bt_next():
subprocess.run(“playerctl next”, shell=True)
return “OK”, 200
@app.route(‘/bluetooth/prev’, methods=[‘GET’, ‘POST’])
def bt_prev():
subprocess.run(“playerctl previous”, shell=True)
return “OK”, 200
@app.route(‘/bluetooth/force_volume’, methods=[‘GET’, ‘POST’])
def bt_force_volume():
subprocess.run(“playerctl -a volume 1.0”, shell=True)
return “Volume 100%”, 200
@app.route(‘/bluetooth/status’, methods=[‘GET’])
def bt_status():
result = subprocess.run(“bluetoothctl info | grep ‘Connected: yes’”, shell=True, capture_output=True, text=True)
if “Connected: yes” in result.stdout:
name_result = subprocess.run(“bluetoothctl info | grep ‘Name:’”, shell=True, capture_output=True, text=True)
device_name = name_result.stdout.replace(“Name:”, “”).strip()
return jsonify({“connected”: True, “device_name”: device_name})
else:
return jsonify({“connected”: False, “device_name”: “”})
def run_flask_server():
app.run(host=‘0.0.0.0’, port=5000)
==============================================================
MAIN LOOP DELL’APPLICAZIONE
==============================================================
def main():
print(“=======================================”)
print(" Q-SYS AI ASSISTANT INIZIALIZZATO “)
print(”=======================================")
threading.Thread(target=run_flask_server, daemon=True).start()
def tsc_listener():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 3000))
server.listen(5)
while True:
client, addr = server.accept()
msg = client.recv(1024).decode().strip()
if msg == "PLAY_PRASE_TUTORIAL": play_video("tutorial_sala")
client.close()
threading.Thread(target=tsc_listener, daemon=True).start()
try:
if os.path.exists(WAKE_WORD_PATH):
print(f"[PORCUPINE] Carico wake word custom: {WAKE_WORD_PATH}")
porcupine = pvporcupine.create(access_key=PICOVOICE_ACCESS_KEY, keyword_paths=[WAKE_WORD_PATH], model_path="porcupine_params_it.pv")
else:
print("[PORCUPINE] File custom non trovato, carico wake word di default...")
porcupine = pvporcupine.create(access_key=PICOVOICE_ACCESS_KEY, keywords=['porcupine'])
except Exception as e:
print(f"[PORCUPINE] Errore avvio Wake Word. ({e})")
return
recorder = PvRecorder(device_index=MIC_DEVICE_INDEX, frame_length=porcupine.frame_length)
recorder.start()
print("[SYSTEM] Wakeword detector in ascolto infinito...")
try:
while True:
pcm = recorder.read()
keyword_index = porcupine.process(pcm)
if keyword_index >= 0:
print("\n[WAKE_WORD] Wake Word Scattata! Avvio connessione Live...")
asyncio.run(live_api_session(recorder))
print("\n[SYSTEM] Wakeword detector di nuovo in ascolto...")
except KeyboardInterrupt:
print("\nUscita richiesta da terminale.")
finally:
recorder.delete()
porcupine.delete()
if name == “main”:
main()
And this is the log I’m getting from the latest configuration:
=======================================
Q-SYS AI ASSISTANT INIZIALIZZATO
[PORCUPINE] Carico wake word custom: ok-viola.ppn
- Serving Flask app ‘mainnew31’
- Debug mode: off
[SYSTEM] Wakeword detector in ascolto infinito…
[WAKE_WORD] Wake Word Scattata! Avvio connessione Live…
[LIVE 3.1]
AVVIO PROTOCOLLO DI NEGOZIAZIONE…
[LIVE 3.1]
WebSocket Stabilito. Attendo Handshake…
[ERRORE] Il server non ha risposto al setup. Possibile blocco VPN/Regionale.
I got no reply even using VPNs logated into the US.
Checking with a curl command the status of my API key, the Gemini 3.1 seems available.
Are there any territorial limitations in place? (I’m located in Italy)
Am I doing something wrong?
Thank you for the support