Video processing - Best approaches towards analyzing large videos?

Hello fellow developers,

As of today, we can use high FPS for videos < 20 mb.. For better video processing larger videos, it is recommended to truncate them into sequence of shorter videos (< 20 MB) and use higher FPS to get better analysis.. This is the high level approach..

I’m looking to gather practical insights on how you’re handling your use-cases. I’m particularly interested in:

  • Strategies for splitting videos (fixed time vs. scene detection).

  • Techniques for preserving context across chunks (e.g., overlapping, prompt chaining).

  • Tools you’re using for the pre-processing pipeline.

  • Performance of gemini models towards your use-case.

Could you share your use case, the main challenges you faced (like losing temporal context), and the approach that ultimately worked for you?

Your valuable insights will directly help us understand user needs and guide the development of future product enhancements.

1 Like

Hello,

I’ve spent all of the last few days trying to solve this problem. There are a bunch of solutions. The data I work with right now is long form sports streams. To get good understanding I want a higher fps of course. Locally, I use ffmpeg to downsample to a lower resolution and keep only the n frames per second that I wanted gemini to use. This means I am able to fit much more of a video into the 20mb without any loss.

For deployment, it would be much easier if I could just use the files api and specify an fps there. I wish this were possible, please make it so. In the web app environment I am trying to develop, ffmpeg-wasm is really slow, and I might be dealing with really high quality data (a 4k stream means < 30sec for ~20mb). I don’t want to use another api to downsize or get to the right fps.

I use overlapping. I think this is a pretty arbitrary choice for me, just have the overlap at least as long as an average play or event (~20s). I also used one normal 1fps pass over the full video and asked for some global context about player numbers, camera positions, the overall flow of a game, and anything else that might be useful to the individual chunks.

Not being able to specify higher fps with the files api is really a hindrance, and I don’t have a good solution at this point. But otherwise, Gemini works great and is has really good understanding of what is happening moment to moment. Please add this functionality soon! Thanks!

3 Likes

Thanks Krish for making the thread!

I am in a similar field as Andrew, I am using the Gemini video understanding to analyse Padel matches. The quality is very good, however at 1 fps the results are not good enough as the game just simply moves too fast, similar to your sports streams.

I have tried to implement the approach of downsizing everything to under 20mb and sending the files over to Gemini. However this got very clutered, specifically due to the overlapping part, as for my app, I will be tracking actions like “Longest rallies”, however if the videos overlap then the model loses context of the “start” and “stop”, and this implementation simply got too clustered trying to give Gemini 100x 20mb files, it caused me other problems related to, as you say, using ffmpeg to downsize etc.

I tried to then use ffmpeg to 1) adjust the fps myself so the file size is not so large 2) extend the video size by the multiple of the fps that I chose, for example, if I wanted 5fps on my 1 minute video, I would make my video 5 minutes long, with each second being proceesed at 1 frame using the files API, and then updated my prompt to understand that context that each second is 1/5 of a real-time second. However from my testing, I did not get better results, though I will keep trying this approach as I find this implementation ‘cleaner’ than sending Gemini 100x 20mb video files.

I agree with Andrew above, I really hope that Google adds functionality soon where we can simply upload a larger video, and then select our fps, similarly to what is now possible with videos under 20mb.

1 Like

I have an update; this is what has worked well for me rather than the other 2 approaches that I have mentioned earlier in this thread;

What I do is pass the video 2 times to gemini, but I have the start time at different times, meaning Google now will see more at it’s 1FPS, then I merge the results together then have an ‘qa agent’ that does a final check and says ‘YES’ or ‘NO’ if what I wanted was achieved;

Here’s an example:
My video:
├── Pass 1: Extract from 0.0s → Upload → Analyze
└── Pass 2: Extract from 0.8s → Upload → Analyze
↓ (both happen in parallel)
Merge results → QA Review → Final output

1 Like

Honestly that seems pretty strange. How are you merging them together? Seems like it would really reduce quality of understanding to have the video at 1fps then again at 1fps with an offset in context.

Like relevant frames wouldn’t be next to each other necessarily.

How are you able to QA? What is the metric?

1 Like

Hey Harry, I found another solution. Turns out that you can use vertex instead of the generic api, and when you upload a file to the project, you can specify the uri and the fps that you want, and this will work without any processing beforehand.

I mean this introduces some other problems, like files taking up space for longer than you want, having to use the google cloud console, adding more complexity, etc.

1 Like

Hey harry, I think the real solution has been under our noses the whole time. I just ran a bunch of tests, and the files api does work with setting an fps. I tested this by downloading some videos where text flashes really quickly and asked gemini in an A/B test to list all the words in the video. The api call where I give an fps of 5 gets much more of the words than the 1fps setting. Idk why the docs are contradictory about this, but the files api definitely works with setting fps now. And the video I uploaded was more than 20mb. Here’s my code below if you want to try it:

#!/usr/bin/env python3

rsvp_fps_simple_gui.py

GUI picker → Files API upload → exactly two calls (1 fps vs high fps)

Returns a JSON array of words for each. No retries, no continuations, no segmentation.

pip install -U google-genai python-dotenv

import os, sys, time, json, mimetypes, datetime, re, textwrap
from pathlib import Path
from dataclasses import dataclass
from dotenv import load_dotenv

Google GenAI SDK

from google import genai
from google.genai import types

GUI (stdlib)

import tkinter as tk
from tkinter import filedialog, simpledialog, messagebox

----------- tunables -----------

MODEL_DEFAULT = “gemini-2.5-pro”
MAX_OUTPUT_TOKENS = 16384 # single-call cap; if it truncates, that’s intentional for this test
WORD_MIN_LEN = 1 # set to 2 to drop single-letter flashes
NORMALIZE_CASE = False # True → lowercase all words

----------- helpers -----------

def human_secs(s: int) → str:
m, sec = divmod(int(s), 60)
return f"{m:02d}:{sec:02d}"

def get_video_mime(path: str) → str:
mt, _ = mimetypes.guess_type(path)
return mt or “application/octet-stream”

def now_stamp() → str:
return datetime.datetime.now().strftime(“%Y%m%d_%H%M%S”)

def normalize_state(state_obj) → str:
if state_obj is None:
return “”
s = getattr(state_obj, “name”, None)
return s if isinstance(s, str) else str(state_obj)

def get_file_uri(file_obj) → str:
# Different SDK versions expose “uri” or “file_uri”
return getattr(file_obj, “uri”, None) or getattr(file_obj, “file_uri”, None)

def sanitize_words(seq):
out =
for w in seq:
if not isinstance(w, str): continue
w2 = w.strip()
# strip leading/trailing punctuation, keep internal ’ and -
w2 = re.sub(r"[1]+|[^\w]+$", “”, w2)
if NORMALIZE_CASE: w2 = w2.lower()
if len(w2) >= WORD_MIN_LEN: out.append(w2)
return out

@dataclass
class RunResult:
fps: float
words: list[str]
usage: object
latency_s: float
raw_text: str

def print_usage(label: str, usage, clip_len_s: int):
print(f"\n[{label}] usage metadata")
if not usage:
print(" (no usage metadata returned)“)
return
try:
print(f” total_token_count: {usage.total_token_count}“)
print(f” prompt_token_count: {usage.prompt_token_count} | candidates_token_count: {usage.candidates_token_count}“)
v = a = t = 0
if getattr(usage, “prompt_tokens_details”, None):
for m in usage.prompt_tokens_details:
mod = getattr(m, “modality”, “”)
mod = getattr(mod, “value”, mod) # enum → str if needed
if mod == “VIDEO”: v += m.token_count
elif mod == “AUDIO”: a += m.token_count
elif mod == “TEXT”: t += m.token_count
print(f” modality tokens: video={v} audio={a} text={t}“)
if clip_len_s > 0 and v:
print(f” video_tokens_per_second ≈ {v/clip_len_s:.1f}“)
except Exception as e:
print(” (could not parse usage metadata)", e)

def build_rsvp_prompt(start_s: int, end_s: int) → str:
return textwrap.dedent(f"“”
You are reading a rapid-serial-visual-presentation (RSVP) video.

    Between {human_secs(start_s)} and {human_secs(end_s)}, output EVERY distinct word shown on-screen in EXACT order.

    Rules:
    - Return ONLY a JSON array of strings (no keys, no prose).
    - Include each word once at its first complete visibility; ignore duplicates across frames.
    - Exclude punctuation-only tokens. Keep words with apostrophes/hyphens.
    - Do not hallucinate; if unreadable, skip it.
    - Do not add extra words that are not on the screen.
""").strip()

def extract_json_array(text: str):
# Try direct parse; else last […] in the text
try:
obj = json.loads(text)
return obj if isinstance(obj, list) else
except Exception:
m = re.findall(r"[[\s\S]*]", text)
if not m: return
try:
obj = json.loads(m[-1])
return obj if isinstance(obj, list) else
except Exception:
return

def call_once_words(client, file_uri: str, mime: str, start_s: int, end_s: int, fps: float, model: str) → RunResult:
video_part = types.Part(
file_data=types.FileData(file_uri=file_uri, mime_type=mime),
video_metadata=types.VideoMetadata(
start_offset=f"{start_s}s",
end_offset=f"{end_s}s",
fps=fps,
),
)
cfg = types.GenerateContentConfig(
temperature=0,
max_output_tokens=MAX_OUTPUT_TOKENS,
response_mime_type=“application/json”,
response_schema={“type”: “ARRAY”, “items”: {“type”: “STRING”}},
)
prompt_part = types.Part(text=build_rsvp_prompt(start_s, end_s))

t0 = time.time()
resp = client.models.generate_content(
    model=model,
    contents=types.Content(parts=[video_part, prompt_part]),
    config=cfg,
)
dt = time.time() - t0

# Prefer resp.text; fall back to stitching candidates if needed
raw = getattr(resp, "text", None)
if not (isinstance(raw, str) and raw.strip()):
    raw = ""
    if getattr(resp, "candidates", None):
        for cand in resp.candidates:
            content = getattr(cand, "content", None)
            parts = getattr(content, "parts", None) if content else None
            if not parts: continue
            for p in parts:
                txt = getattr(p, "text", None)
                if isinstance(txt, str) and txt.strip():
                    raw += (txt + "\n")
    raw = raw.strip() or "[]"

arr = extract_json_array(raw)
words = sanitize_words(arr)
usage = getattr(resp, "usage_metadata", None)
return RunResult(fps=fps, words=words, usage=usage, latency_s=dt, raw_text=raw)

----------- main (GUI) -----------

def main():
# Hidden Tk root
root = tk.Tk(); root.withdraw(); root.update()

load_dotenv()
api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
if not api_key:
    messagebox.showerror("Missing API key", "Set GEMINI_API_KEY or GOOGLE_API_KEY in your environment or .env")
    sys.exit(1)

# Pick file
path = filedialog.askopenfilename(
    title="Choose an RSVP/fast-reading video file",
    filetypes=[("Video files", "*.mp4 *.mov *.mkv *.webm *.avi *.m4v *.mpg *.mpeg"),
               ("All files", "*.*")]
)
if not path:
    messagebox.showinfo("Canceled", "No file selected."); sys.exit(0)
if not os.path.exists(path):
    messagebox.showerror("Error", f"File not found:\n{path}"); sys.exit(1)

# Params
try:
    start_s = simpledialog.askinteger("Start (seconds)", "Start offset (seconds):", initialvalue=10, minvalue=0)
    if start_s is None: raise KeyboardInterrupt
    end_s = simpledialog.askinteger("End (seconds)", "End offset (seconds):", initialvalue=40, minvalue=1)
    if end_s is None: raise KeyboardInterrupt
    if end_s <= start_s:
        messagebox.showerror("Invalid range", "End must be greater than start."); sys.exit(1)
    high_fps = simpledialog.askfloat("High FPS", "High FPS to test:", initialvalue=12.0, minvalue=0.5, maxvalue=24.0)
    if high_fps is None: raise KeyboardInterrupt
    model = simpledialog.askstring("Model", "Model ID:", initialvalue=MODEL_DEFAULT)
    if not model: raise KeyboardInterrupt
except KeyboardInterrupt:
    messagebox.showinfo("Canceled", "Canceled by user."); sys.exit(0)

clip_len_s = end_s - start_s
mime = get_video_mime(path)

# Confirm
summary = (f"File: {os.path.basename(path)}\n"
           f"MIME: {mime}\n"
           f"Clip: {human_secs(start_s)}–{human_secs(end_s)} ({clip_len_s}s)\n"
           f"Model: {model}\n"
           f"High FPS: {high_fps}\n\nProceed to upload & analyze?")
if not messagebox.askyesno("Confirm", summary): sys.exit(0)

client = genai.Client(api_key=api_key)

# Upload via Files API (minimal; still need to wait for ACTIVE)
print("\n=== Files API + FPS test (RSVP, simple) ===")
print(f"Uploading: {path} (mime={mime})")
up_t0 = time.time()
try:
    f = client.files.upload(file=path, mime_type=mime)  # some SDKs accept mime_type
except TypeError:
    f = client.files.upload(file=path)  # fallback
print("  upload state:", normalize_state(getattr(f, "state", None)), "| name:", getattr(f, "name", None))

# Wait for ACTIVE (required by Files API; minimal loop)
while True:
    state = normalize_state(getattr(f, "state", None))
    if state.upper() == "ACTIVE":
        break
    time.sleep(1.0)
    f = client.files.get(name=f.name)
    print("  waiting… state:", normalize_state(getattr(f, "state", None)))
up_dt = time.time() - up_t0

file_uri = get_file_uri(f)
if not file_uri:
    messagebox.showerror("Error", "Could not obtain file URI from upload response.")
    sys.exit(1)

print(f"\nFile ACTIVE. uri={file_uri}")
print(f"Upload + activation time: {up_dt:.1f}s")
print(f"Testing RSVP words on {human_secs(start_s)}–{human_secs(end_s)} | model {model}")

out_dir = Path.cwd() / f"rsvp_fps_simple_{now_stamp()}"
out_dir.mkdir(parents=True, exist_ok=True)

# ---- Run A: 1 fps (single call) ----
print("\n--- RUN A: 1 fps ---")
res1 = call_once_words(client, file_uri, mime, start_s, end_s, 1.0, model)
print(f"Latency: {res1.latency_s:.2f}s | words={len(res1.words)}")
print_usage("1 fps", res1.usage, clip_len_s)
(out_dir / "words_1fps.json").write_text(json.dumps(res1.words, ensure_ascii=False, indent=2), encoding="utf-8")

# ---- Run B: high fps (single call) ----
print(f"\n--- RUN B: {high_fps} fps ---")
res2 = call_once_words(client, file_uri, mime, start_s, end_s, high_fps, model)
print(f"Latency: {res2.latency_s:.2f}s | words={len(res2.words)}")
print_usage(f"{high_fps} fps", res2.usage, clip_len_s)
(out_dir / f"words_{int(high_fps)}fps.json").write_text(json.dumps(res2.words, ensure_ascii=False, indent=2), encoding="utf-8")

# ---- Compare video token deltas (single-call apples-to-apples) ----
def video_tokens(u):
    if not u or not getattr(u, "prompt_tokens_details", None):
        return None
    for m in u.prompt_tokens_details:
        mod = getattr(m, "modality", "")
        mod = getattr(mod, "value", mod)
        if mod == "VIDEO":
            return m.token_count
    return None

v1 = video_tokens(res1.usage); v2 = video_tokens(res2.usage)
ratio_msg = "N/A"
if v1 and v2 and v1 > 0:
    ratio_msg = f"{(v2 / v1):.2f}×"

print("\n=== Summary ===")
print(f"Clip length: {clip_len_s}s | Model: {model}")
print(f"Word counts: 1fps={len(res1.words)} vs {int(high_fps)}fps={len(res2.words)}")
print(f"Video tokens: 1fps={v1} vs {int(high_fps)}fps={v2} (ratio={ratio_msg})")
print(f"Outputs saved to: {out_dir.resolve()}")

messagebox.showinfo(
    "Done",
    f"Words: 1fps={len(res1.words)} vs {int(high_fps)}fps={len(res2.words)}\n"
    f"Video tokens: 1fps={v1} vs {int(high_fps)}fps={v2} (ratio={ratio_msg})\n"
    f"Saved:\n- {out_dir/'words_1fps.json'}\n- {out_dir/f'words_{int(high_fps)}fps.json'}"
)

if name == “main”:
main()


  1. ^\w ↩︎

1 Like