Files
mars-panorama-pipeline/pipeline/panorama_pipeline.py
Franck Garnier 1aacaea084 feat: auto-generate metadata markdown for each panorama
Includes: coverage, FOV, camera specs, photogrammetry reference,
equirectangular projection info, timing, image list.

Also: docker-compose user fix (1000:100) to avoid root-owned files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 22:34:00 -04:00

780 lines
28 KiB
Python

#!/usr/bin/env python3
"""
Mars Rover Panorama Pipeline
=============================
Automated panorama stitching for Perseverance and Curiosity NavCam/Mastcam-Z images.
Usage:
# From Docker:
docker run -v /path/to/images:/data -v /path/to/output:/output mars-panorama-pipeline \
--sol 1813 --camera NAVCAM_LEFT --rover perseverance
# Standalone:
python3 panorama_pipeline.py --sol 1813 --camera NAVCAM_LEFT --rover perseverance \
--data-dir /mnt/astro/mars_rovers --output-dir /mnt/astro/mars_rovers/images/panorama
"""
import argparse
import cv2
import glob
import math
import mysql.connector
from panorama_metadata import generate_metadata
import numpy as np
import os
import re
import subprocess
import sys
import time
from collections import defaultdict, OrderedDict
from PIL import Image
Image.MAX_IMAGE_PIXELS = None # No limit — Mars panoramas can be very large
# ============================================================
# Configuration
# ============================================================
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "192.168.1.42"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "soldan"),
"password": os.getenv("MYSQL_PASSWORD", "Sol17Smr"),
"database": os.getenv("MYSQL_DATABASE", "mars_rovers"),
}
# NavCam tile overlap
TILE_OVERLAP_PX = 16
# Camera FOV reference (empirical measurements)
CAMERA_FOV = {
# Perseverance NavCam: combined tiles 01+04 (2560px after 16px overlap)
"NAVCAM_LEFT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
"NAVCAM_RIGHT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
# Mastcam-Z at wide zoom (26mm)
"MCZ_LEFT": {"hfov": 25.6, "native_res": (1648, 1200)},
"MCZ_RIGHT": {"hfov": 25.6, "native_res": (1648, 1200)},
}
# ============================================================
# Utility functions
# ============================================================
def get_seq(imageid):
"""Extract sequence_id from Perseverance imageid."""
parts = imageid.split("_")
if len(parts) >= 5 and len(parts[4]) >= 17:
return parts[4][8:]
return None
def get_tile(imageid):
"""Extract tile number from imageid."""
parts = imageid.split("_")
if len(parts) >= 7:
return parts[-2]
return None
def run_hugin(tool, args, cwd=None, timeout=None):
"""Run a Hugin CLI tool. Timeout scales with number of images if not specified."""
cmd = f"{tool} {args}"
if timeout is None:
timeout = 7200 # 2 hours default for large panoramas
result = subprocess.run(cmd, shell=True, capture_output=True, text=True,
timeout=timeout, cwd=cwd)
return result
# ============================================================
# Step 1: Find panorama sequence
# ============================================================
def find_panorama_sequences(sol, camera, rover="perseverance"):
"""Find complete panorama sequences for a given sol and camera."""
conn = mysql.connector.connect(**DB_CONFIG)
cur = conn.cursor()
is_navcam = "NAVCAM" in camera
dim_filter = "AND dimension = '(1288,968)'" if is_navcam else "AND dimension = '(1648,1200)'"
cur.execute(f"""
SELECT imageid, mast_az, mast_el, dimension, image_src
FROM photos_{rover}
WHERE sol = %s AND camera_name = %s
AND mast_az IS NOT NULL AND mast_az != 'UNK'
AND sample_type = 'full'
{dim_filter}
ORDER BY CAST(mast_az AS DECIMAL(10,2)), imageid
""", (sol, camera))
rows = cur.fetchall()
cur.close()
conn.close()
# Group by (sequence_id, azimuth)
sequences = defaultdict(lambda: defaultdict(list))
for imageid, az, el, dim, src in rows:
seq = get_seq(imageid)
if seq:
az_key = round(float(az), 0)
sequences[seq][az_key].append({
"imageid": imageid, "tile": get_tile(imageid),
"az": float(az), "el": float(el), "src": src,
})
# Determine FOV for chain validation
if is_navcam:
fov = CAMERA_FOV[camera]["combined_hfov"]
else:
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
# Find valid sequences
results = []
for seq_id, pointings in sequences.items():
azs = sorted(pointings.keys())
n_ptg = len(azs)
if n_ptg < 4:
continue
if is_navcam:
has_pairs = all(len(pointings[az]) >= 2 for az in azs)
if not has_pairs:
continue
gaps = [azs[i + 1] - azs[i] for i in range(len(azs) - 1)]
max_gap = max(gaps) if gaps else 0
chain_ok = all(g < fov for g in gaps)
if chain_ok:
results.append({
"seq_id": seq_id,
"n_pointings": n_ptg,
"az_range": azs[-1] - azs[0],
"max_gap": max_gap,
"pointings": dict(pointings),
})
results.sort(key=lambda x: -x["az_range"])
return results
# ============================================================
# Step 2: Prepare images (combine tiles, CLAHE)
# ============================================================
def combine_navcam_tiles(t01_path, t04_path, output_path, overlap=TILE_OVERLAP_PX):
"""Combine NavCam tiles 01+04 with overlap blending."""
t01 = cv2.imread(t01_path)
t04 = cv2.imread(t04_path)
if t01 is None or t04 is None:
return None
h, w = t01.shape[:2]
combined_w = w + w - overlap
combined = np.zeros((h, combined_w, 3), dtype=np.uint8)
combined[:, :w - overlap, :] = t01[:, :w - overlap, :]
combined[:, w:, :] = t04[:, overlap:, :]
# Linear blend in overlap zone
for px in range(overlap):
alpha = px / overlap
combined[:, w - overlap + px, :] = (
(1 - alpha) * t01[:, w - overlap + px, :].astype(float) +
alpha * t04[:, px, :].astype(float)
).astype(np.uint8)
cv2.imwrite(output_path, combined)
return combined
def apply_clahe_navcam(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + per-tile vignette correction for NavCam combined images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
hw = w // 2
corrected = img.astype(np.float32)
for tile_start in [0, hw]:
cx = tile_start + hw // 2
cy = h // 2
Y, X = np.ogrid[:h, tile_start:tile_start + hw]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt((hw // 2) ** 2 + (h // 2) ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:h, tile_start:tile_start + hw, ch] = np.clip(
corrected[:h, tile_start:tile_start + hw, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
def apply_clahe_single(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + single radial vignette correction for Mastcam-Z images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
corrected = img.astype(np.float32)
cy, cx = h // 2, w // 2
Y, X = np.ogrid[:h, :w]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt(cx ** 2 + cy ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:, :, ch] = np.clip(corrected[:, :, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
# ============================================================
# Step 3: Build PTO
# ============================================================
def build_pto(pto_path, img_paths, azs, els, img_w, img_h, fov, ref_idx=None):
"""Build a Hugin PTO file."""
if ref_idx is None:
ref_idx = len(img_paths) // 2
lines = []
for i, (path, az, el) in enumerate(zip(img_paths, azs, els)):
if i == 0:
lines.append(
f'i w{img_w} h{img_h} f0 v{fov} Ra0 Rb0 Rc0 Rd0 Re0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a0 b0 c0 d0 e0 g0 t0 Va1 Vb0 Vc0 Vd0 Vx0 Vy0 Vm5 n"{path}"')
else:
lines.append(
f'i w{img_w} h{img_h} f0 v=0 Ra=0 Rb=0 Rc=0 Rd=0 Re=0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a=0 b=0 c=0 d=0 e=0 g=0 t=0 Va=0 Vb=0 Vc=0 Vd=0 Vx=0 Vy=0 Vm5 n"{path}"')
geo_vars = "\n".join([f"v r{i}\nv p{i}\nv y{i}" for i in range(len(img_paths))])
content = "# hugin project file\n#hugin_ptoversion 2\n"
content += 'p f2 w6064 h1330 v360 k0 E0 R0 n"TIFF_m c:LZW r:CROP"\nm i0\n\n'
content += "\n".join(lines) + "\n\n"
content += geo_vars + "\nv\n\n"
content += f"#hugin_optimizeReferenceImage {ref_idx}\n"
with open(pto_path, "w") as f:
f.write(content)
return pto_path
# ============================================================
# Step 4: Run Hugin pipeline
# ============================================================
def run_pipeline(work_dir, clahe_images, original_images, azs, els,
img_w, img_h, fov, output_name, blender="auto", camera=None,
chunk_size=20, chunk_overlap=5):
"""Full pipeline: cpfind(CLAHE) -> swap(originals) -> optimize(geo) -> nona -> blend.
For large panoramas (>chunk_size images), cpfind runs on overlapping chunks
to avoid O(n^2) explosion in cpfind --multirow and cpclean.
Blender selection:
- auto: enblend for NavCam, verdandi for Mastcam-Z
- enblend: force enblend --pre-assemble
- verdandi: force verdandi
"""
# Select blender
if blender == "auto":
if camera and "MCZ" in camera:
blender = "verdandi"
else:
blender = "enblend"
timings = {}
n_images = len(clahe_images)
# Build full CLAHE PTO (needed for final assembly)
pto_clahe = os.path.join(work_dir, f"{output_name}_clahe.pto")
build_pto(pto_clahe, clahe_images, azs, els, img_w, img_h, fov)
if n_images <= chunk_size:
# Small panorama: run cpfind on all images at once
print(f" cpfind ({n_images} images)...", flush=True)
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{pto_clahe}" "{pto_clahe}"',
cwd=work_dir)
timings["cpfind"] = time.time() - t0
for line in r.stdout.split("\n"):
if "Found" in line:
print(f" {line.strip()}")
# cpclean
print(" cpclean...", flush=True)
t0 = time.time()
run_hugin("cpclean", f'-o "{pto_clahe}" "{pto_clahe}"', cwd=work_dir)
timings["cpclean"] = time.time() - t0
else:
# Large panorama: incremental assembly strategy
# Images are sorted by azimuth — consecutive indices = spatial neighbors.
# We build the panorama progressively:
# 1. Start with first initial_size images, full cpfind + optimize
# 2. Add batch_size new images at a time
# 3. cpfind only between new images and the tail of existing set
# 4. Optimize incrementally (existing images constrain new ones)
initial_size = chunk_size # first group size
batch_size = chunk_overlap # images to add per iteration
bridge_size = 5 # how many existing images to overlap with new ones
print(f" Incremental assembly: {n_images} images, "
f"initial={initial_size}, batch={batch_size}, bridge={bridge_size}",
flush=True)
all_cp_lines = []
t0_total = time.time()
# Phase 1: initial group — full cpfind
init_end = min(initial_size, n_images)
init_pto = os.path.join(work_dir, f"{output_name}_init.pto")
build_pto(init_pto, clahe_images[:init_end],
azs[:init_end], els[:init_end], img_w, img_h, fov)
print(f" Phase 1: cpfind on images 0-{init_end-1} ({init_end} images)...",
flush=True)
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{init_pto}" "{init_pto}"',
cwd=work_dir)
run_hugin("cpclean", f'-o "{init_pto}" "{init_pto}"', cwd=work_dir)
# Read initial CPs (local indices = global indices for first group)
with open(init_pto) as f:
for line in f:
if line.startswith("c "):
all_cp_lines.append(line)
os.remove(init_pto)
init_time = time.time() - t0
print(f" Initial: {init_time:.0f}s, CPs: {len(all_cp_lines)}", flush=True)
# Phase 2: add batches incrementally
cursor = init_end
batch_num = 0
while cursor < n_images:
batch_num += 1
batch_end = min(cursor + batch_size, n_images)
new_count = batch_end - cursor
# Bridge: last bridge_size images from existing set + new images
bridge_start = max(0, cursor - bridge_size)
group_start = bridge_start
group_end = batch_end
group_size = group_end - group_start
# Build PTO for this bridge+new group
batch_pto = os.path.join(work_dir, f"{output_name}_batch{batch_num}.pto")
build_pto(batch_pto,
clahe_images[group_start:group_end],
azs[group_start:group_end],
els[group_start:group_end],
img_w, img_h, fov)
# cpfind on bridge+new group
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{batch_pto}" "{batch_pto}"',
cwd=work_dir)
run_hugin("cpclean", f'-o "{batch_pto}" "{batch_pto}"', cwd=work_dir)
batch_time = time.time() - t0
# Read CPs and remap local indices to global
batch_cps = 0
with open(batch_pto) as f:
for line in f:
if line.startswith("c "):
parts = line.split()
new_parts = []
for p in parts:
if p.startswith("n") and not p.startswith("N"):
local_idx = int(p[1:])
new_parts.append(f"n{group_start + local_idx}")
elif p.startswith("N"):
local_idx = int(p[1:])
new_parts.append(f"N{group_start + local_idx}")
else:
new_parts.append(p)
all_cp_lines.append(" ".join(new_parts) + "\n")
batch_cps += 1
os.remove(batch_pto)
print(f" Batch {batch_num}: imgs {cursor}-{batch_end-1} "
f"(bridge from {bridge_start}): {batch_time:.0f}s, "
f"+{batch_cps} CPs, total: {len(all_cp_lines)}", flush=True)
cursor = batch_end
timings["cpfind"] = time.time() - t0_total
timings["cpclean"] = 0 # included in per-batch processing
# Write all CPs to the full CLAHE PTO
with open(pto_clahe, "a") as f:
f.writelines(all_cp_lines)
# Count final CPs
with open(pto_clahe) as f:
cp_lines = [l for l in f if l.startswith("c ")]
print(f" Total CPs: {len(cp_lines)}")
if len(cp_lines) < 5:
print(" ERROR: Not enough control points")
return None, timings
# Build render PTO with originals + CPs from CLAHE
pto_render = os.path.join(work_dir, f"{output_name}_render.pto")
build_pto(pto_render, original_images, azs, els, img_w, img_h, fov)
with open(pto_render, "a") as f:
f.writelines(cp_lines)
# Geometry-only optimization (NO -m)
print(" autooptimiser (geo only)...", flush=True)
t0 = time.time()
run_hugin("autooptimiser", f'-a -l -s -o "{pto_render}" "{pto_render}"', cwd=work_dir)
timings["autooptimiser"] = time.time() - t0
# Remove S (crop) lines and reset distortion params
with open(pto_render) as f:
lines = f.readlines()
with open(pto_render, "w") as f:
for l in lines:
if l.startswith("S "):
continue
f.write(l)
# nona
print(" nona...", flush=True)
prefix = os.path.join(work_dir, f"{output_name}_tmp_")
t0 = time.time()
run_hugin("nona", f'-m TIFF_m -o "{prefix}" "{pto_render}"', cwd=work_dir)
timings["nona"] = time.time() - t0
tifs = sorted(glob.glob(f"{prefix}*.tif"))
print(f" Remapped: {len(tifs)} files")
if not tifs:
print(" ERROR: nona produced no output")
return None, timings
# Blend: verdandi for MCZ (no seam overexposure), enblend for NavCam (better vignette)
print(f" {blender}...", flush=True)
out_tif = os.path.join(work_dir, f"{output_name}.tif")
tif_list = " ".join([f'"{t}"' for t in tifs])
t0 = time.time()
if blender == "verdandi":
run_hugin("verdandi", f'-o "{out_tif}" {tif_list}', cwd=work_dir)
else:
run_hugin("enblend", f'--pre-assemble -o "{out_tif}" {tif_list}', cwd=work_dir)
timings["blend"] = time.time() - t0
# Convert to PNG
out_png = os.path.join(work_dir, f"{output_name}.png")
try:
img = Image.open(out_tif)
img.save(out_png, "PNG")
size_str = f"{img.size[0]}x{img.size[1]}"
fsize = os.path.getsize(out_png) / 1024 / 1024
print(f" Output: {size_str}, {fsize:.1f}MB")
except Exception as e:
print(f" ERROR converting: {e}")
out_png = None
# Cleanup temp files
for t in tifs:
os.remove(t)
if os.path.exists(out_tif):
os.remove(out_tif)
# Generate metadata file
try:
generate_metadata(
work_dir=work_dir, output_name=output_name,
sol=int(re.search(r'\d+', output_name).group()),
camera=camera or "unknown", rover="perseverance",
azs=azs, els=els, img_w=img_w, img_h=img_h, fov=fov,
n_cps=len(cp_lines), timings=timings,
original_images=original_images, blender=blender,
output_png=out_png)
except Exception as e:
print(f" Warning: metadata generation failed: {e}")
return out_png, timings
# ============================================================
# Main orchestrator
# ============================================================
def process_navcam(sol, camera, rover, data_dir, output_dir):
"""Process a NavCam panorama: find sequence, combine tiles, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"NAVCAM Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0] # Best sequence
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
t01 = [i for i in imgs if i["tile"] == "01"]
t04 = [i for i in imgs if i["tile"] == "04"]
if not t01 or not t04:
continue
az, el = t01[0]["az"], t01[0]["el"]
t01_path = os.path.join(src_dir, f"{t01[0]['imageid']}01.png")
t04_path = os.path.join(src_dir, f"{t04[0]['imageid']}01.png")
# Download if missing
for tile_img, tile_src in [(t01_path, t01[0].get("src")), (t04_path, t04[0].get("src"))]:
if not os.path.exists(tile_img) and tile_src:
os.makedirs(os.path.dirname(tile_img), exist_ok=True)
subprocess.run(["wget", "-q", tile_src, "-O", tile_img], timeout=30)
if not os.path.exists(t01_path) or not os.path.exists(t04_path):
print(f" Missing tiles for az={az:.1f}")
continue
# Combine tiles
fname = f"pointage_az{az:.2f}.png"
orig_path = os.path.join(work_dir, fname)
combine_navcam_tiles(t01_path, t04_path, orig_path)
# CLAHE
clahe_path = os.path.join(preproc_dir, fname)
apply_clahe_navcam(orig_path, clahe_path)
originals.append(orig_path)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" az={az:>7.1f}: tiles combined + CLAHE")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV[camera]["combined_hfov"]
img_w = 2560 # 1288 + 1288 - 16
img_h = 968
output_name = f"panorama_sol{sol}"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name,
camera=camera)
print_timings(timings)
return result
def process_mastcamz(sol, camera, rover, data_dir, output_dir):
"""Process a Mastcam-Z panorama: find sequence, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"MASTCAM-Z Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0]
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}_mcz")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
# Deduplicate by azimuth
seen_az = OrderedDict()
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
if az_key not in seen_az:
seen_az[az_key] = imgs[0]
for az_key, img_data in seen_az.items():
az, el = img_data["az"], img_data["el"]
imageid = img_data["imageid"]
fname = f"{imageid}01.png"
img_path = os.path.join(src_dir, fname)
# Download if missing
if not os.path.exists(img_path):
src_url = img_data.get("src")
if src_url:
os.makedirs(src_dir, exist_ok=True)
subprocess.run(["wget", "-q", src_url, "-O", img_path], timeout=30)
if not os.path.exists(img_path) or os.path.getsize(img_path) < 1000:
continue
# Copy to work dir
work_img = os.path.join(work_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(work_img):
cv2.imwrite(work_img, cv2.imread(img_path))
# CLAHE
clahe_path = os.path.join(preproc_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(clahe_path):
apply_clahe_single(work_img, clahe_path)
originals.append(work_img)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" Prepared {len(originals)} images")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
img_w, img_h = 1648, 1200
output_name = f"panorama_sol{sol}_mcz"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name,
camera=camera)
print_timings(timings)
return result
def print_timings(timings):
"""Print pipeline timing summary."""
if not timings:
return
total = sum(timings.values())
print(f"\n Timings:")
for step, t in timings.items():
print(f" {step:<20} {t:>8.1f}s")
print(f" {'TOTAL':<20} {total:>8.1f}s")
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="Mars Rover Panorama Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# NavCam panorama
%(prog)s --sol 1813 --camera NAVCAM_LEFT
# Mastcam-Z panorama
%(prog)s --sol 1817 --camera MCZ_LEFT
# List available sequences for a sol
%(prog)s --sol 1813 --camera NAVCAM_LEFT --list-only
""")
parser.add_argument("--sol", type=int, required=True, help="Sol number")
parser.add_argument("--camera", required=True,
choices=["NAVCAM_LEFT", "NAVCAM_RIGHT", "MCZ_LEFT", "MCZ_RIGHT"],
help="Camera name")
parser.add_argument("--rover", default="perseverance",
choices=["perseverance", "curiosity"],
help="Rover name (default: perseverance)")
parser.add_argument("--data-dir", default="/data",
help="Base data directory (default: /data)")
parser.add_argument("--output-dir", default="/output",
help="Output directory (default: /output)")
parser.add_argument("--list-only", action="store_true",
help="Only list available sequences, don't process")
args = parser.parse_args()
# When running in Docker, /data and /output are volume mounts
# When running standalone, use actual paths
data_dir = args.data_dir
output_dir = args.output_dir
if args.list_only:
sequences = find_panorama_sequences(args.sol, args.camera, args.rover)
if not sequences:
print(f"No valid sequences for sol {args.sol} {args.camera}")
return
print(f"{'SeqID':<15} {'Ptgs':>5} {'Range':>7} {'MaxGap':>7}")
print("-" * 40)
for s in sequences:
print(f"{s['seq_id']:<15} {s['n_pointings']:>5} "
f"{s['az_range']:>6.0f}° {s['max_gap']:>6.1f}°")
return
# Process
is_navcam = "NAVCAM" in args.camera
if is_navcam:
result = process_navcam(args.sol, args.camera, args.rover, data_dir, output_dir)
else:
result = process_mastcamz(args.sol, args.camera, args.rover, data_dir, output_dir)
if result:
print(f"\nSUCCESS: {result}")
else:
print("\nFAILED")
sys.exit(1)
if __name__ == "__main__":
main()