Hey harry, I think the real solution has been under our noses the whole time. I just ran a bunch of tests, and the files api does work with setting an fps. I tested this by downloading some videos where text flashes really quickly and asked gemini in an A/B test to list all the words in the video. The api call where I give an fps of 5 gets much more of the words than the 1fps setting. Idk why the docs are contradictory about this, but the files api definitely works with setting fps now. And the video I uploaded was more than 20mb. Here’s my code below if you want to try it:
#!/usr/bin/env python3
rsvp_fps_simple_gui.py
GUI picker → Files API upload → exactly two calls (1 fps vs high fps)
Returns a JSON array of words for each. No retries, no continuations, no segmentation.
pip install -U google-genai python-dotenv
import os, sys, time, json, mimetypes, datetime, re, textwrap
from pathlib import Path
from dataclasses import dataclass
from dotenv import load_dotenv
Google GenAI SDK
from google import genai
from google.genai import types
GUI (stdlib)
import tkinter as tk
from tkinter import filedialog, simpledialog, messagebox
----------- tunables -----------
MODEL_DEFAULT = “gemini-2.5-pro”
MAX_OUTPUT_TOKENS = 16384 # single-call cap; if it truncates, that’s intentional for this test
WORD_MIN_LEN = 1 # set to 2 to drop single-letter flashes
NORMALIZE_CASE = False # True → lowercase all words
----------- helpers -----------
def human_secs(s: int) → str:
m, sec = divmod(int(s), 60)
return f"{m:02d}:{sec:02d}"
def get_video_mime(path: str) → str:
mt, _ = mimetypes.guess_type(path)
return mt or “application/octet-stream”
def now_stamp() → str:
return datetime.datetime.now().strftime(“%Y%m%d_%H%M%S”)
def normalize_state(state_obj) → str:
if state_obj is None:
return “”
s = getattr(state_obj, “name”, None)
return s if isinstance(s, str) else str(state_obj)
def get_file_uri(file_obj) → str:
# Different SDK versions expose “uri” or “file_uri”
return getattr(file_obj, “uri”, None) or getattr(file_obj, “file_uri”, None)
def sanitize_words(seq):
out =
for w in seq:
if not isinstance(w, str): continue
w2 = w.strip()
# strip leading/trailing punctuation, keep internal ’ and -
w2 = re.sub(r"+|[^\w]+$", “”, w2)
if NORMALIZE_CASE: w2 = w2.lower()
if len(w2) >= WORD_MIN_LEN: out.append(w2)
return out
@dataclass
class RunResult:
fps: float
words: list[str]
usage: object
latency_s: float
raw_text: str
def print_usage(label: str, usage, clip_len_s: int):
print(f"\n[{label}] usage metadata")
if not usage:
print(" (no usage metadata returned)“)
return
try:
print(f” total_token_count: {usage.total_token_count}“)
print(f” prompt_token_count: {usage.prompt_token_count} | candidates_token_count: {usage.candidates_token_count}“)
v = a = t = 0
if getattr(usage, “prompt_tokens_details”, None):
for m in usage.prompt_tokens_details:
mod = getattr(m, “modality”, “”)
mod = getattr(mod, “value”, mod) # enum → str if needed
if mod == “VIDEO”: v += m.token_count
elif mod == “AUDIO”: a += m.token_count
elif mod == “TEXT”: t += m.token_count
print(f” modality tokens: video={v} audio={a} text={t}“)
if clip_len_s > 0 and v:
print(f” video_tokens_per_second ≈ {v/clip_len_s:.1f}“)
except Exception as e:
print(” (could not parse usage metadata)", e)
def build_rsvp_prompt(start_s: int, end_s: int) → str:
return textwrap.dedent(f"“”
You are reading a rapid-serial-visual-presentation (RSVP) video.
Between {human_secs(start_s)} and {human_secs(end_s)}, output EVERY distinct word shown on-screen in EXACT order.
Rules:
- Return ONLY a JSON array of strings (no keys, no prose).
- Include each word once at its first complete visibility; ignore duplicates across frames.
- Exclude punctuation-only tokens. Keep words with apostrophes/hyphens.
- Do not hallucinate; if unreadable, skip it.
- Do not add extra words that are not on the screen.
""").strip()
def extract_json_array(text: str):
# Try direct parse; else last […] in the text
try:
obj = json.loads(text)
return obj if isinstance(obj, list) else
except Exception:
m = re.findall(r"[[\s\S]*]", text)
if not m: return
try:
obj = json.loads(m[-1])
return obj if isinstance(obj, list) else
except Exception:
return
def call_once_words(client, file_uri: str, mime: str, start_s: int, end_s: int, fps: float, model: str) → RunResult:
video_part = types.Part(
file_data=types.FileData(file_uri=file_uri, mime_type=mime),
video_metadata=types.VideoMetadata(
start_offset=f"{start_s}s",
end_offset=f"{end_s}s",
fps=fps,
),
)
cfg = types.GenerateContentConfig(
temperature=0,
max_output_tokens=MAX_OUTPUT_TOKENS,
response_mime_type=“application/json”,
response_schema={“type”: “ARRAY”, “items”: {“type”: “STRING”}},
)
prompt_part = types.Part(text=build_rsvp_prompt(start_s, end_s))
t0 = time.time()
resp = client.models.generate_content(
model=model,
contents=types.Content(parts=[video_part, prompt_part]),
config=cfg,
)
dt = time.time() - t0
# Prefer resp.text; fall back to stitching candidates if needed
raw = getattr(resp, "text", None)
if not (isinstance(raw, str) and raw.strip()):
raw = ""
if getattr(resp, "candidates", None):
for cand in resp.candidates:
content = getattr(cand, "content", None)
parts = getattr(content, "parts", None) if content else None
if not parts: continue
for p in parts:
txt = getattr(p, "text", None)
if isinstance(txt, str) and txt.strip():
raw += (txt + "\n")
raw = raw.strip() or "[]"
arr = extract_json_array(raw)
words = sanitize_words(arr)
usage = getattr(resp, "usage_metadata", None)
return RunResult(fps=fps, words=words, usage=usage, latency_s=dt, raw_text=raw)
----------- main (GUI) -----------
def main():
# Hidden Tk root
root = tk.Tk(); root.withdraw(); root.update()
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
if not api_key:
messagebox.showerror("Missing API key", "Set GEMINI_API_KEY or GOOGLE_API_KEY in your environment or .env")
sys.exit(1)
# Pick file
path = filedialog.askopenfilename(
title="Choose an RSVP/fast-reading video file",
filetypes=[("Video files", "*.mp4 *.mov *.mkv *.webm *.avi *.m4v *.mpg *.mpeg"),
("All files", "*.*")]
)
if not path:
messagebox.showinfo("Canceled", "No file selected."); sys.exit(0)
if not os.path.exists(path):
messagebox.showerror("Error", f"File not found:\n{path}"); sys.exit(1)
# Params
try:
start_s = simpledialog.askinteger("Start (seconds)", "Start offset (seconds):", initialvalue=10, minvalue=0)
if start_s is None: raise KeyboardInterrupt
end_s = simpledialog.askinteger("End (seconds)", "End offset (seconds):", initialvalue=40, minvalue=1)
if end_s is None: raise KeyboardInterrupt
if end_s <= start_s:
messagebox.showerror("Invalid range", "End must be greater than start."); sys.exit(1)
high_fps = simpledialog.askfloat("High FPS", "High FPS to test:", initialvalue=12.0, minvalue=0.5, maxvalue=24.0)
if high_fps is None: raise KeyboardInterrupt
model = simpledialog.askstring("Model", "Model ID:", initialvalue=MODEL_DEFAULT)
if not model: raise KeyboardInterrupt
except KeyboardInterrupt:
messagebox.showinfo("Canceled", "Canceled by user."); sys.exit(0)
clip_len_s = end_s - start_s
mime = get_video_mime(path)
# Confirm
summary = (f"File: {os.path.basename(path)}\n"
f"MIME: {mime}\n"
f"Clip: {human_secs(start_s)}–{human_secs(end_s)} ({clip_len_s}s)\n"
f"Model: {model}\n"
f"High FPS: {high_fps}\n\nProceed to upload & analyze?")
if not messagebox.askyesno("Confirm", summary): sys.exit(0)
client = genai.Client(api_key=api_key)
# Upload via Files API (minimal; still need to wait for ACTIVE)
print("\n=== Files API + FPS test (RSVP, simple) ===")
print(f"Uploading: {path} (mime={mime})")
up_t0 = time.time()
try:
f = client.files.upload(file=path, mime_type=mime) # some SDKs accept mime_type
except TypeError:
f = client.files.upload(file=path) # fallback
print(" upload state:", normalize_state(getattr(f, "state", None)), "| name:", getattr(f, "name", None))
# Wait for ACTIVE (required by Files API; minimal loop)
while True:
state = normalize_state(getattr(f, "state", None))
if state.upper() == "ACTIVE":
break
time.sleep(1.0)
f = client.files.get(name=f.name)
print(" waiting… state:", normalize_state(getattr(f, "state", None)))
up_dt = time.time() - up_t0
file_uri = get_file_uri(f)
if not file_uri:
messagebox.showerror("Error", "Could not obtain file URI from upload response.")
sys.exit(1)
print(f"\nFile ACTIVE. uri={file_uri}")
print(f"Upload + activation time: {up_dt:.1f}s")
print(f"Testing RSVP words on {human_secs(start_s)}–{human_secs(end_s)} | model {model}")
out_dir = Path.cwd() / f"rsvp_fps_simple_{now_stamp()}"
out_dir.mkdir(parents=True, exist_ok=True)
# ---- Run A: 1 fps (single call) ----
print("\n--- RUN A: 1 fps ---")
res1 = call_once_words(client, file_uri, mime, start_s, end_s, 1.0, model)
print(f"Latency: {res1.latency_s:.2f}s | words={len(res1.words)}")
print_usage("1 fps", res1.usage, clip_len_s)
(out_dir / "words_1fps.json").write_text(json.dumps(res1.words, ensure_ascii=False, indent=2), encoding="utf-8")
# ---- Run B: high fps (single call) ----
print(f"\n--- RUN B: {high_fps} fps ---")
res2 = call_once_words(client, file_uri, mime, start_s, end_s, high_fps, model)
print(f"Latency: {res2.latency_s:.2f}s | words={len(res2.words)}")
print_usage(f"{high_fps} fps", res2.usage, clip_len_s)
(out_dir / f"words_{int(high_fps)}fps.json").write_text(json.dumps(res2.words, ensure_ascii=False, indent=2), encoding="utf-8")
# ---- Compare video token deltas (single-call apples-to-apples) ----
def video_tokens(u):
if not u or not getattr(u, "prompt_tokens_details", None):
return None
for m in u.prompt_tokens_details:
mod = getattr(m, "modality", "")
mod = getattr(mod, "value", mod)
if mod == "VIDEO":
return m.token_count
return None
v1 = video_tokens(res1.usage); v2 = video_tokens(res2.usage)
ratio_msg = "N/A"
if v1 and v2 and v1 > 0:
ratio_msg = f"{(v2 / v1):.2f}×"
print("\n=== Summary ===")
print(f"Clip length: {clip_len_s}s | Model: {model}")
print(f"Word counts: 1fps={len(res1.words)} vs {int(high_fps)}fps={len(res2.words)}")
print(f"Video tokens: 1fps={v1} vs {int(high_fps)}fps={v2} (ratio={ratio_msg})")
print(f"Outputs saved to: {out_dir.resolve()}")
messagebox.showinfo(
"Done",
f"Words: 1fps={len(res1.words)} vs {int(high_fps)}fps={len(res2.words)}\n"
f"Video tokens: 1fps={v1} vs {int(high_fps)}fps={v2} (ratio={ratio_msg})\n"
f"Saved:\n- {out_dir/'words_1fps.json'}\n- {out_dir/f'words_{int(high_fps)}fps.json'}"
)
if name == “main”:
main()