HI everyone.
I have used File search API for my documents and is working fine. How should I work in case where i need to add metadata too? I saw details about custom metadata in the documentation.
My scenario: i have few text and it’s corresponding source url as a json file? I want this url to be given as source/citation when generating response.
def process_and_upload_files(store_name, uploaded_db):
if not os.path.exists(FOLDER_PATH):
print(f" Folder {FOLDER_PATH} does not exist.")
return
print(f" Scanning folder: {FOLDER_PATH}")
files_processed_count = 0
for file in os.listdir(FOLDER_PATH):
if not file.endswith(".txt"):
continue
base_id = file\[:-4\]
text_path = os.path.join(FOLDER_PATH, file)
meta_path = os.path.join(FOLDER_PATH, base_id + ".json")
file_hash = file_sha256(text_path)
if uploaded_db.get(base_id) == file_hash:
print(f"⏭ Skipping {base_id} (unchanged)")
continue
print(f"\\n⬆ Uploading {base_id}...")
\# -------- Load Metadata --------
custom_meta = \[\]
if os.path.exists(meta_path):
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta_json = json.load(f)
\# Map filepathstorypic → source_url
if "filepathstorypic" in meta_json:
meta_json\["source_url"\] = meta_json\["filepathstorypic"\]
custom_meta = convert_metadata_to_list(meta_json)
except Exception as e:
print(f"⚠ Metadata error for {base_id}: {e}")
\# -------- Step 1: Upload file --------
try:
upload_op = client.files.upload(
file=text_path,
config={"display_name": base_id}
)
while upload_op.state.name == "PROCESSING":
time.sleep(1)
upload_op = client.files.get(name=upload_op.name)
if upload_op.state.name == "FAILED":
print(f" File upload failed: {base_id}")
continue
except Exception as e:
print(f" Upload error: {e}")
continue
\# -------- Step 2: Import into File Search --------
try:
print(" Indexing...", end="", flush=True)
import_op = client.file_search_stores.import_file(
file_search_store_name=store_name,
file_name=upload_op.name,
custom_metadata=custom_meta
)
while not import_op.done:
time.sleep(2)
print(".", end="", flush=True)
import_op = client.operations.get(name=import_op.name)
print(" Done!")
uploaded_db\[base_id\] = file_hash
save_uploaded_db(uploaded_db)
files_processed_count += 1
except Exception as e:
print(f"\\n Indexing failed: {e}")
if files_processed_count == 0:
print("\\n No new or changed files.")
else:
print(f"\\n✔ Uploaded {files_processed_count} files.")
def file_sha256(path):
sha = hashlib.sha256()
with open(path, "rb") as f:
while chunk := f.read(8192):
sha.update(chunk)
return sha.hexdigest()
def load_uploaded_db():
if not os.path.exists(UPLOADED_FILES_DB):
return {}
with open(UPLOADED_FILES_DB, "r") as f:
return json.load(f)
def save_uploaded_db(db):
with open(UPLOADED_FILES_DB, "w") as f:
json.dump(db, f, indent=4)
# ------------------------------
# Metadata conversion
# ------------------------------
def convert_metadata_to_list(metadata_json):
meta_list = \[\]
for key, value in metadata_json.items():
clean_key = "".join(c for c in key if c.isalnum() or c == "\_")
entry = {"key": clean_key}
if isinstance(value, (int, float)):
entry\["numeric_value"\] = value
else:
entry\["string_value"\] = str(value)
meta_list.append(entry)
return meta_list
i fail to upload the file. as indexing fails. the inputs are 2 files of the same name but different extension. One as .txt and another .json, .json contains the metadatas and .txt file contains the content.