Files
mars-panorama-pipeline/pipeline/panorama_pipeline.py
Franck Garnier acd22d41dc feat: incremental assembly for large panoramas
Replace parallel chunk strategy with sequential incremental assembly:
1. Start with first 20 images, full cpfind + optimize
2. Add 5 new images at a time with 5-image bridge to existing set
3. cpfind only on bridge+new group (small, fast)
4. Remap CPs to global indices and accumulate

This ensures spatial coherence: each new batch is constrained by
the already-stable panorama, preventing orientation flips that
occurred with the parallel chunk approach.

Images are sorted by azimuth, so consecutive indices = spatial
neighbors (guaranteed contiguity).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 19:28:38 -04:00

766 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Mars Rover Panorama Pipeline
=============================
Automated panorama stitching for Perseverance and Curiosity NavCam/Mastcam-Z images.
Usage:
# From Docker:
docker run -v /path/to/images:/data -v /path/to/output:/output mars-panorama-pipeline \
--sol 1813 --camera NAVCAM_LEFT --rover perseverance
# Standalone:
python3 panorama_pipeline.py --sol 1813 --camera NAVCAM_LEFT --rover perseverance \
--data-dir /mnt/astro/mars_rovers --output-dir /mnt/astro/mars_rovers/images/panorama
"""
import argparse
import cv2
import glob
import math
import mysql.connector
import numpy as np
import os
import re
import subprocess
import sys
import time
from collections import defaultdict, OrderedDict
from PIL import Image
Image.MAX_IMAGE_PIXELS = 500000000
# ============================================================
# Configuration
# ============================================================
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "192.168.1.42"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "soldan"),
"password": os.getenv("MYSQL_PASSWORD", "Sol17Smr"),
"database": os.getenv("MYSQL_DATABASE", "mars_rovers"),
}
# NavCam tile overlap
TILE_OVERLAP_PX = 16
# Camera FOV reference (empirical measurements)
CAMERA_FOV = {
# Perseverance NavCam: combined tiles 01+04 (2560px after 16px overlap)
"NAVCAM_LEFT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
"NAVCAM_RIGHT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
# Mastcam-Z at wide zoom (26mm)
"MCZ_LEFT": {"hfov": 25.6, "native_res": (1648, 1200)},
"MCZ_RIGHT": {"hfov": 25.6, "native_res": (1648, 1200)},
}
# ============================================================
# Utility functions
# ============================================================
def get_seq(imageid):
"""Extract sequence_id from Perseverance imageid."""
parts = imageid.split("_")
if len(parts) >= 5 and len(parts[4]) >= 17:
return parts[4][8:]
return None
def get_tile(imageid):
"""Extract tile number from imageid."""
parts = imageid.split("_")
if len(parts) >= 7:
return parts[-2]
return None
def run_hugin(tool, args, cwd=None, timeout=None):
"""Run a Hugin CLI tool. Timeout scales with number of images if not specified."""
cmd = f"{tool} {args}"
if timeout is None:
timeout = 7200 # 2 hours default for large panoramas
result = subprocess.run(cmd, shell=True, capture_output=True, text=True,
timeout=timeout, cwd=cwd)
return result
# ============================================================
# Step 1: Find panorama sequence
# ============================================================
def find_panorama_sequences(sol, camera, rover="perseverance"):
"""Find complete panorama sequences for a given sol and camera."""
conn = mysql.connector.connect(**DB_CONFIG)
cur = conn.cursor()
is_navcam = "NAVCAM" in camera
dim_filter = "AND dimension = '(1288,968)'" if is_navcam else "AND dimension = '(1648,1200)'"
cur.execute(f"""
SELECT imageid, mast_az, mast_el, dimension, image_src
FROM photos_{rover}
WHERE sol = %s AND camera_name = %s
AND mast_az IS NOT NULL AND mast_az != 'UNK'
AND sample_type = 'full'
{dim_filter}
ORDER BY CAST(mast_az AS DECIMAL(10,2)), imageid
""", (sol, camera))
rows = cur.fetchall()
cur.close()
conn.close()
# Group by (sequence_id, azimuth)
sequences = defaultdict(lambda: defaultdict(list))
for imageid, az, el, dim, src in rows:
seq = get_seq(imageid)
if seq:
az_key = round(float(az), 0)
sequences[seq][az_key].append({
"imageid": imageid, "tile": get_tile(imageid),
"az": float(az), "el": float(el), "src": src,
})
# Determine FOV for chain validation
if is_navcam:
fov = CAMERA_FOV[camera]["combined_hfov"]
else:
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
# Find valid sequences
results = []
for seq_id, pointings in sequences.items():
azs = sorted(pointings.keys())
n_ptg = len(azs)
if n_ptg < 4:
continue
if is_navcam:
has_pairs = all(len(pointings[az]) >= 2 for az in azs)
if not has_pairs:
continue
gaps = [azs[i + 1] - azs[i] for i in range(len(azs) - 1)]
max_gap = max(gaps) if gaps else 0
chain_ok = all(g < fov for g in gaps)
if chain_ok:
results.append({
"seq_id": seq_id,
"n_pointings": n_ptg,
"az_range": azs[-1] - azs[0],
"max_gap": max_gap,
"pointings": dict(pointings),
})
results.sort(key=lambda x: -x["az_range"])
return results
# ============================================================
# Step 2: Prepare images (combine tiles, CLAHE)
# ============================================================
def combine_navcam_tiles(t01_path, t04_path, output_path, overlap=TILE_OVERLAP_PX):
"""Combine NavCam tiles 01+04 with overlap blending."""
t01 = cv2.imread(t01_path)
t04 = cv2.imread(t04_path)
if t01 is None or t04 is None:
return None
h, w = t01.shape[:2]
combined_w = w + w - overlap
combined = np.zeros((h, combined_w, 3), dtype=np.uint8)
combined[:, :w - overlap, :] = t01[:, :w - overlap, :]
combined[:, w:, :] = t04[:, overlap:, :]
# Linear blend in overlap zone
for px in range(overlap):
alpha = px / overlap
combined[:, w - overlap + px, :] = (
(1 - alpha) * t01[:, w - overlap + px, :].astype(float) +
alpha * t04[:, px, :].astype(float)
).astype(np.uint8)
cv2.imwrite(output_path, combined)
return combined
def apply_clahe_navcam(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + per-tile vignette correction for NavCam combined images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
hw = w // 2
corrected = img.astype(np.float32)
for tile_start in [0, hw]:
cx = tile_start + hw // 2
cy = h // 2
Y, X = np.ogrid[:h, tile_start:tile_start + hw]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt((hw // 2) ** 2 + (h // 2) ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:h, tile_start:tile_start + hw, ch] = np.clip(
corrected[:h, tile_start:tile_start + hw, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
def apply_clahe_single(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + single radial vignette correction for Mastcam-Z images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
corrected = img.astype(np.float32)
cy, cx = h // 2, w // 2
Y, X = np.ogrid[:h, :w]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt(cx ** 2 + cy ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:, :, ch] = np.clip(corrected[:, :, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
# ============================================================
# Step 3: Build PTO
# ============================================================
def build_pto(pto_path, img_paths, azs, els, img_w, img_h, fov, ref_idx=None):
"""Build a Hugin PTO file."""
if ref_idx is None:
ref_idx = len(img_paths) // 2
lines = []
for i, (path, az, el) in enumerate(zip(img_paths, azs, els)):
if i == 0:
lines.append(
f'i w{img_w} h{img_h} f0 v{fov} Ra0 Rb0 Rc0 Rd0 Re0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a0 b0 c0 d0 e0 g0 t0 Va1 Vb0 Vc0 Vd0 Vx0 Vy0 Vm5 n"{path}"')
else:
lines.append(
f'i w{img_w} h{img_h} f0 v=0 Ra=0 Rb=0 Rc=0 Rd=0 Re=0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a=0 b=0 c=0 d=0 e=0 g=0 t=0 Va=0 Vb=0 Vc=0 Vd=0 Vx=0 Vy=0 Vm5 n"{path}"')
geo_vars = "\n".join([f"v r{i}\nv p{i}\nv y{i}" for i in range(len(img_paths))])
content = "# hugin project file\n#hugin_ptoversion 2\n"
content += 'p f2 w6064 h1330 v360 k0 E0 R0 n"TIFF_m c:LZW r:CROP"\nm i0\n\n'
content += "\n".join(lines) + "\n\n"
content += geo_vars + "\nv\n\n"
content += f"#hugin_optimizeReferenceImage {ref_idx}\n"
with open(pto_path, "w") as f:
f.write(content)
return pto_path
# ============================================================
# Step 4: Run Hugin pipeline
# ============================================================
def run_pipeline(work_dir, clahe_images, original_images, azs, els,
img_w, img_h, fov, output_name, blender="auto", camera=None,
chunk_size=20, chunk_overlap=5):
"""Full pipeline: cpfind(CLAHE) -> swap(originals) -> optimize(geo) -> nona -> blend.
For large panoramas (>chunk_size images), cpfind runs on overlapping chunks
to avoid O(n^2) explosion in cpfind --multirow and cpclean.
Blender selection:
- auto: enblend for NavCam, verdandi for Mastcam-Z
- enblend: force enblend --pre-assemble
- verdandi: force verdandi
"""
# Select blender
if blender == "auto":
if camera and "MCZ" in camera:
blender = "verdandi"
else:
blender = "enblend"
timings = {}
n_images = len(clahe_images)
# Build full CLAHE PTO (needed for final assembly)
pto_clahe = os.path.join(work_dir, f"{output_name}_clahe.pto")
build_pto(pto_clahe, clahe_images, azs, els, img_w, img_h, fov)
if n_images <= chunk_size:
# Small panorama: run cpfind on all images at once
print(f" cpfind ({n_images} images)...", flush=True)
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{pto_clahe}" "{pto_clahe}"',
cwd=work_dir)
timings["cpfind"] = time.time() - t0
for line in r.stdout.split("\n"):
if "Found" in line:
print(f" {line.strip()}")
# cpclean
print(" cpclean...", flush=True)
t0 = time.time()
run_hugin("cpclean", f'-o "{pto_clahe}" "{pto_clahe}"', cwd=work_dir)
timings["cpclean"] = time.time() - t0
else:
# Large panorama: incremental assembly strategy
# Images are sorted by azimuth — consecutive indices = spatial neighbors.
# We build the panorama progressively:
# 1. Start with first initial_size images, full cpfind + optimize
# 2. Add batch_size new images at a time
# 3. cpfind only between new images and the tail of existing set
# 4. Optimize incrementally (existing images constrain new ones)
initial_size = chunk_size # first group size
batch_size = chunk_overlap # images to add per iteration
bridge_size = 5 # how many existing images to overlap with new ones
print(f" Incremental assembly: {n_images} images, "
f"initial={initial_size}, batch={batch_size}, bridge={bridge_size}",
flush=True)
all_cp_lines = []
t0_total = time.time()
# Phase 1: initial group — full cpfind
init_end = min(initial_size, n_images)
init_pto = os.path.join(work_dir, f"{output_name}_init.pto")
build_pto(init_pto, clahe_images[:init_end],
azs[:init_end], els[:init_end], img_w, img_h, fov)
print(f" Phase 1: cpfind on images 0-{init_end-1} ({init_end} images)...",
flush=True)
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{init_pto}" "{init_pto}"',
cwd=work_dir)
run_hugin("cpclean", f'-o "{init_pto}" "{init_pto}"', cwd=work_dir)
# Read initial CPs (local indices = global indices for first group)
with open(init_pto) as f:
for line in f:
if line.startswith("c "):
all_cp_lines.append(line)
os.remove(init_pto)
init_time = time.time() - t0
print(f" Initial: {init_time:.0f}s, CPs: {len(all_cp_lines)}", flush=True)
# Phase 2: add batches incrementally
cursor = init_end
batch_num = 0
while cursor < n_images:
batch_num += 1
batch_end = min(cursor + batch_size, n_images)
new_count = batch_end - cursor
# Bridge: last bridge_size images from existing set + new images
bridge_start = max(0, cursor - bridge_size)
group_start = bridge_start
group_end = batch_end
group_size = group_end - group_start
# Build PTO for this bridge+new group
batch_pto = os.path.join(work_dir, f"{output_name}_batch{batch_num}.pto")
build_pto(batch_pto,
clahe_images[group_start:group_end],
azs[group_start:group_end],
els[group_start:group_end],
img_w, img_h, fov)
# cpfind on bridge+new group
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{batch_pto}" "{batch_pto}"',
cwd=work_dir)
run_hugin("cpclean", f'-o "{batch_pto}" "{batch_pto}"', cwd=work_dir)
batch_time = time.time() - t0
# Read CPs and remap local indices to global
batch_cps = 0
with open(batch_pto) as f:
for line in f:
if line.startswith("c "):
parts = line.split()
new_parts = []
for p in parts:
if p.startswith("n") and not p.startswith("N"):
local_idx = int(p[1:])
new_parts.append(f"n{group_start + local_idx}")
elif p.startswith("N"):
local_idx = int(p[1:])
new_parts.append(f"N{group_start + local_idx}")
else:
new_parts.append(p)
all_cp_lines.append(" ".join(new_parts) + "\n")
batch_cps += 1
os.remove(batch_pto)
print(f" Batch {batch_num}: imgs {cursor}-{batch_end-1} "
f"(bridge from {bridge_start}): {batch_time:.0f}s, "
f"+{batch_cps} CPs, total: {len(all_cp_lines)}", flush=True)
cursor = batch_end
timings["cpfind"] = time.time() - t0_total
timings["cpclean"] = 0 # included in per-batch processing
# Write all CPs to the full CLAHE PTO
with open(pto_clahe, "a") as f:
f.writelines(all_cp_lines)
# Count final CPs
with open(pto_clahe) as f:
cp_lines = [l for l in f if l.startswith("c ")]
print(f" Total CPs: {len(cp_lines)}")
if len(cp_lines) < 5:
print(" ERROR: Not enough control points")
return None, timings
# Build render PTO with originals + CPs from CLAHE
pto_render = os.path.join(work_dir, f"{output_name}_render.pto")
build_pto(pto_render, original_images, azs, els, img_w, img_h, fov)
with open(pto_render, "a") as f:
f.writelines(cp_lines)
# Geometry-only optimization (NO -m)
print(" autooptimiser (geo only)...", flush=True)
t0 = time.time()
run_hugin("autooptimiser", f'-a -l -s -o "{pto_render}" "{pto_render}"', cwd=work_dir)
timings["autooptimiser"] = time.time() - t0
# Remove S (crop) lines and reset distortion params
with open(pto_render) as f:
lines = f.readlines()
with open(pto_render, "w") as f:
for l in lines:
if l.startswith("S "):
continue
f.write(l)
# nona
print(" nona...", flush=True)
prefix = os.path.join(work_dir, f"{output_name}_tmp_")
t0 = time.time()
run_hugin("nona", f'-m TIFF_m -o "{prefix}" "{pto_render}"', cwd=work_dir)
timings["nona"] = time.time() - t0
tifs = sorted(glob.glob(f"{prefix}*.tif"))
print(f" Remapped: {len(tifs)} files")
if not tifs:
print(" ERROR: nona produced no output")
return None, timings
# Blend: verdandi for MCZ (no seam overexposure), enblend for NavCam (better vignette)
print(f" {blender}...", flush=True)
out_tif = os.path.join(work_dir, f"{output_name}.tif")
tif_list = " ".join([f'"{t}"' for t in tifs])
t0 = time.time()
if blender == "verdandi":
run_hugin("verdandi", f'-o "{out_tif}" {tif_list}', cwd=work_dir)
else:
run_hugin("enblend", f'--pre-assemble -o "{out_tif}" {tif_list}', cwd=work_dir)
timings["blend"] = time.time() - t0
# Convert to PNG
out_png = os.path.join(work_dir, f"{output_name}.png")
try:
img = Image.open(out_tif)
img.save(out_png, "PNG")
size_str = f"{img.size[0]}x{img.size[1]}"
fsize = os.path.getsize(out_png) / 1024 / 1024
print(f" Output: {size_str}, {fsize:.1f}MB")
except Exception as e:
print(f" ERROR converting: {e}")
out_png = None
# Cleanup temp files
for t in tifs:
os.remove(t)
if os.path.exists(out_tif):
os.remove(out_tif)
return out_png, timings
# ============================================================
# Main orchestrator
# ============================================================
def process_navcam(sol, camera, rover, data_dir, output_dir):
"""Process a NavCam panorama: find sequence, combine tiles, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"NAVCAM Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0] # Best sequence
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
t01 = [i for i in imgs if i["tile"] == "01"]
t04 = [i for i in imgs if i["tile"] == "04"]
if not t01 or not t04:
continue
az, el = t01[0]["az"], t01[0]["el"]
t01_path = os.path.join(src_dir, f"{t01[0]['imageid']}01.png")
t04_path = os.path.join(src_dir, f"{t04[0]['imageid']}01.png")
# Download if missing
for tile_img, tile_src in [(t01_path, t01[0].get("src")), (t04_path, t04[0].get("src"))]:
if not os.path.exists(tile_img) and tile_src:
os.makedirs(os.path.dirname(tile_img), exist_ok=True)
subprocess.run(["wget", "-q", tile_src, "-O", tile_img], timeout=30)
if not os.path.exists(t01_path) or not os.path.exists(t04_path):
print(f" Missing tiles for az={az:.1f}")
continue
# Combine tiles
fname = f"pointage_az{az:.2f}.png"
orig_path = os.path.join(work_dir, fname)
combine_navcam_tiles(t01_path, t04_path, orig_path)
# CLAHE
clahe_path = os.path.join(preproc_dir, fname)
apply_clahe_navcam(orig_path, clahe_path)
originals.append(orig_path)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" az={az:>7.1f}: tiles combined + CLAHE")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV[camera]["combined_hfov"]
img_w = 2560 # 1288 + 1288 - 16
img_h = 968
output_name = f"panorama_sol{sol}"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name,
camera=camera)
print_timings(timings)
return result
def process_mastcamz(sol, camera, rover, data_dir, output_dir):
"""Process a Mastcam-Z panorama: find sequence, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"MASTCAM-Z Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0]
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}_mcz")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
# Deduplicate by azimuth
seen_az = OrderedDict()
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
if az_key not in seen_az:
seen_az[az_key] = imgs[0]
for az_key, img_data in seen_az.items():
az, el = img_data["az"], img_data["el"]
imageid = img_data["imageid"]
fname = f"{imageid}01.png"
img_path = os.path.join(src_dir, fname)
# Download if missing
if not os.path.exists(img_path):
src_url = img_data.get("src")
if src_url:
os.makedirs(src_dir, exist_ok=True)
subprocess.run(["wget", "-q", src_url, "-O", img_path], timeout=30)
if not os.path.exists(img_path) or os.path.getsize(img_path) < 1000:
continue
# Copy to work dir
work_img = os.path.join(work_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(work_img):
cv2.imwrite(work_img, cv2.imread(img_path))
# CLAHE
clahe_path = os.path.join(preproc_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(clahe_path):
apply_clahe_single(work_img, clahe_path)
originals.append(work_img)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" Prepared {len(originals)} images")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
img_w, img_h = 1648, 1200
output_name = f"panorama_sol{sol}_mcz"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name,
camera=camera)
print_timings(timings)
return result
def print_timings(timings):
"""Print pipeline timing summary."""
if not timings:
return
total = sum(timings.values())
print(f"\n Timings:")
for step, t in timings.items():
print(f" {step:<20} {t:>8.1f}s")
print(f" {'TOTAL':<20} {total:>8.1f}s")
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="Mars Rover Panorama Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# NavCam panorama
%(prog)s --sol 1813 --camera NAVCAM_LEFT
# Mastcam-Z panorama
%(prog)s --sol 1817 --camera MCZ_LEFT
# List available sequences for a sol
%(prog)s --sol 1813 --camera NAVCAM_LEFT --list-only
""")
parser.add_argument("--sol", type=int, required=True, help="Sol number")
parser.add_argument("--camera", required=True,
choices=["NAVCAM_LEFT", "NAVCAM_RIGHT", "MCZ_LEFT", "MCZ_RIGHT"],
help="Camera name")
parser.add_argument("--rover", default="perseverance",
choices=["perseverance", "curiosity"],
help="Rover name (default: perseverance)")
parser.add_argument("--data-dir", default="/data",
help="Base data directory (default: /data)")
parser.add_argument("--output-dir", default="/output",
help="Output directory (default: /output)")
parser.add_argument("--list-only", action="store_true",
help="Only list available sequences, don't process")
args = parser.parse_args()
# When running in Docker, /data and /output are volume mounts
# When running standalone, use actual paths
data_dir = args.data_dir
output_dir = args.output_dir
if args.list_only:
sequences = find_panorama_sequences(args.sol, args.camera, args.rover)
if not sequences:
print(f"No valid sequences for sol {args.sol} {args.camera}")
return
print(f"{'SeqID':<15} {'Ptgs':>5} {'Range':>7} {'MaxGap':>7}")
print("-" * 40)
for s in sequences:
print(f"{s['seq_id']:<15} {s['n_pointings']:>5} "
f"{s['az_range']:>6.0f}° {s['max_gap']:>6.1f}°")
return
# Process
is_navcam = "NAVCAM" in args.camera
if is_navcam:
result = process_navcam(args.sol, args.camera, args.rover, data_dir, output_dir)
else:
result = process_mastcamz(args.sol, args.camera, args.rover, data_dir, output_dir)
if result:
print(f"\nSUCCESS: {result}")
else:
print("\nFAILED")
sys.exit(1)
if __name__ == "__main__":
main()