Files
mars-panorama-pipeline/pipeline/panorama_pipeline.py
Franck Garnier 806d2be732 feat: auto-select blender based on camera type
- NavCam -> enblend --pre-assemble (better vignette/horizon handling)
- Mastcam-Z -> verdandi (eliminates overexposure at seam transitions)
- Based on visual comparison: enblend default is best for NavCam,
  verdandi is best for MCZ (user confirmed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 15:11:19 -04:00

643 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Mars Rover Panorama Pipeline
=============================
Automated panorama stitching for Perseverance and Curiosity NavCam/Mastcam-Z images.
Usage:
# From Docker:
docker run -v /path/to/images:/data -v /path/to/output:/output mars-panorama-pipeline \
--sol 1813 --camera NAVCAM_LEFT --rover perseverance
# Standalone:
python3 panorama_pipeline.py --sol 1813 --camera NAVCAM_LEFT --rover perseverance \
--data-dir /mnt/astro/mars_rovers --output-dir /mnt/astro/mars_rovers/images/panorama
"""
import argparse
import cv2
import glob
import math
import mysql.connector
import numpy as np
import os
import re
import subprocess
import sys
import time
from collections import defaultdict, OrderedDict
from PIL import Image
Image.MAX_IMAGE_PIXELS = 500000000
# ============================================================
# Configuration
# ============================================================
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "192.168.1.42"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "soldan"),
"password": os.getenv("MYSQL_PASSWORD", "Sol17Smr"),
"database": os.getenv("MYSQL_DATABASE", "mars_rovers"),
}
# NavCam tile overlap
TILE_OVERLAP_PX = 16
# Camera FOV reference (empirical measurements)
CAMERA_FOV = {
# Perseverance NavCam: combined tiles 01+04 (2560px after 16px overlap)
"NAVCAM_LEFT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
"NAVCAM_RIGHT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
# Mastcam-Z at wide zoom (26mm)
"MCZ_LEFT": {"hfov": 25.6, "native_res": (1648, 1200)},
"MCZ_RIGHT": {"hfov": 25.6, "native_res": (1648, 1200)},
}
# ============================================================
# Utility functions
# ============================================================
def get_seq(imageid):
"""Extract sequence_id from Perseverance imageid."""
parts = imageid.split("_")
if len(parts) >= 5 and len(parts[4]) >= 17:
return parts[4][8:]
return None
def get_tile(imageid):
"""Extract tile number from imageid."""
parts = imageid.split("_")
if len(parts) >= 7:
return parts[-2]
return None
def run_hugin(tool, args, cwd=None, timeout=600):
"""Run a Hugin CLI tool."""
cmd = f"{tool} {args}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True,
timeout=timeout, cwd=cwd)
return result
# ============================================================
# Step 1: Find panorama sequence
# ============================================================
def find_panorama_sequences(sol, camera, rover="perseverance"):
"""Find complete panorama sequences for a given sol and camera."""
conn = mysql.connector.connect(**DB_CONFIG)
cur = conn.cursor()
is_navcam = "NAVCAM" in camera
dim_filter = "AND dimension = '(1288,968)'" if is_navcam else "AND dimension = '(1648,1200)'"
cur.execute(f"""
SELECT imageid, mast_az, mast_el, dimension, image_src
FROM photos_{rover}
WHERE sol = %s AND camera_name = %s
AND mast_az IS NOT NULL AND mast_az != 'UNK'
AND sample_type = 'full'
{dim_filter}
ORDER BY CAST(mast_az AS DECIMAL(10,2)), imageid
""", (sol, camera))
rows = cur.fetchall()
cur.close()
conn.close()
# Group by (sequence_id, azimuth)
sequences = defaultdict(lambda: defaultdict(list))
for imageid, az, el, dim, src in rows:
seq = get_seq(imageid)
if seq:
az_key = round(float(az), 0)
sequences[seq][az_key].append({
"imageid": imageid, "tile": get_tile(imageid),
"az": float(az), "el": float(el), "src": src,
})
# Determine FOV for chain validation
if is_navcam:
fov = CAMERA_FOV[camera]["combined_hfov"]
else:
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
# Find valid sequences
results = []
for seq_id, pointings in sequences.items():
azs = sorted(pointings.keys())
n_ptg = len(azs)
if n_ptg < 4:
continue
if is_navcam:
has_pairs = all(len(pointings[az]) >= 2 for az in azs)
if not has_pairs:
continue
gaps = [azs[i + 1] - azs[i] for i in range(len(azs) - 1)]
max_gap = max(gaps) if gaps else 0
chain_ok = all(g < fov for g in gaps)
if chain_ok:
results.append({
"seq_id": seq_id,
"n_pointings": n_ptg,
"az_range": azs[-1] - azs[0],
"max_gap": max_gap,
"pointings": dict(pointings),
})
results.sort(key=lambda x: -x["az_range"])
return results
# ============================================================
# Step 2: Prepare images (combine tiles, CLAHE)
# ============================================================
def combine_navcam_tiles(t01_path, t04_path, output_path, overlap=TILE_OVERLAP_PX):
"""Combine NavCam tiles 01+04 with overlap blending."""
t01 = cv2.imread(t01_path)
t04 = cv2.imread(t04_path)
if t01 is None or t04 is None:
return None
h, w = t01.shape[:2]
combined_w = w + w - overlap
combined = np.zeros((h, combined_w, 3), dtype=np.uint8)
combined[:, :w - overlap, :] = t01[:, :w - overlap, :]
combined[:, w:, :] = t04[:, overlap:, :]
# Linear blend in overlap zone
for px in range(overlap):
alpha = px / overlap
combined[:, w - overlap + px, :] = (
(1 - alpha) * t01[:, w - overlap + px, :].astype(float) +
alpha * t04[:, px, :].astype(float)
).astype(np.uint8)
cv2.imwrite(output_path, combined)
return combined
def apply_clahe_navcam(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + per-tile vignette correction for NavCam combined images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
hw = w // 2
corrected = img.astype(np.float32)
for tile_start in [0, hw]:
cx = tile_start + hw // 2
cy = h // 2
Y, X = np.ogrid[:h, tile_start:tile_start + hw]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt((hw // 2) ** 2 + (h // 2) ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:h, tile_start:tile_start + hw, ch] = np.clip(
corrected[:h, tile_start:tile_start + hw, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
def apply_clahe_single(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + single radial vignette correction for Mastcam-Z images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
corrected = img.astype(np.float32)
cy, cx = h // 2, w // 2
Y, X = np.ogrid[:h, :w]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt(cx ** 2 + cy ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:, :, ch] = np.clip(corrected[:, :, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
# ============================================================
# Step 3: Build PTO
# ============================================================
def build_pto(pto_path, img_paths, azs, els, img_w, img_h, fov, ref_idx=None):
"""Build a Hugin PTO file."""
if ref_idx is None:
ref_idx = len(img_paths) // 2
lines = []
for i, (path, az, el) in enumerate(zip(img_paths, azs, els)):
if i == 0:
lines.append(
f'i w{img_w} h{img_h} f0 v{fov} Ra0 Rb0 Rc0 Rd0 Re0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a0 b0 c0 d0 e0 g0 t0 Va1 Vb0 Vc0 Vd0 Vx0 Vy0 Vm5 n"{path}"')
else:
lines.append(
f'i w{img_w} h{img_h} f0 v=0 Ra=0 Rb=0 Rc=0 Rd=0 Re=0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a=0 b=0 c=0 d=0 e=0 g=0 t=0 Va=0 Vb=0 Vc=0 Vd=0 Vx=0 Vy=0 Vm5 n"{path}"')
geo_vars = "\n".join([f"v r{i}\nv p{i}\nv y{i}" for i in range(len(img_paths))])
content = "# hugin project file\n#hugin_ptoversion 2\n"
content += 'p f2 w6064 h1330 v360 k0 E0 R0 n"TIFF_m c:LZW r:CROP"\nm i0\n\n'
content += "\n".join(lines) + "\n\n"
content += geo_vars + "\nv\n\n"
content += f"#hugin_optimizeReferenceImage {ref_idx}\n"
with open(pto_path, "w") as f:
f.write(content)
return pto_path
# ============================================================
# Step 4: Run Hugin pipeline
# ============================================================
def run_pipeline(work_dir, clahe_images, original_images, azs, els,
img_w, img_h, fov, output_name, blender="auto", camera=None):
"""Full pipeline: cpfind(CLAHE) -> swap(originals) -> optimize(geo) -> nona -> blend.
Blender selection:
- auto: enblend for NavCam (better vignette/horizon), verdandi for Mastcam-Z (no seam overexposure)
- enblend: force enblend --pre-assemble
- verdandi: force verdandi
"""
# Select blender
if blender == "auto":
if camera and "MCZ" in camera:
blender = "verdandi"
else:
blender = "enblend"
timings = {}
# Build CLAHE PTO
pto_clahe = os.path.join(work_dir, f"{output_name}_clahe.pto")
build_pto(pto_clahe, clahe_images, azs, els, img_w, img_h, fov)
# cpfind on CLAHE images
print(" cpfind...", flush=True)
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{pto_clahe}" "{pto_clahe}"',
cwd=work_dir)
timings["cpfind"] = time.time() - t0
for line in r.stdout.split("\n"):
if "Found" in line:
print(f" {line.strip()}")
# cpclean
print(" cpclean...", flush=True)
t0 = time.time()
run_hugin("cpclean", f'-o "{pto_clahe}" "{pto_clahe}"', cwd=work_dir)
timings["cpclean"] = time.time() - t0
with open(pto_clahe) as f:
cp_lines = [l for l in f if l.startswith("c ")]
print(f" CPs: {len(cp_lines)}")
if len(cp_lines) < 5:
print(" ERROR: Not enough control points")
return None, timings
# Build render PTO with originals + CPs from CLAHE
pto_render = os.path.join(work_dir, f"{output_name}_render.pto")
build_pto(pto_render, original_images, azs, els, img_w, img_h, fov)
with open(pto_render, "a") as f:
f.writelines(cp_lines)
# Geometry-only optimization (NO -m, NO -a to avoid d/e distortion)
print(" autooptimiser (geo only)...", flush=True)
t0 = time.time()
run_hugin("autooptimiser", f'-a -l -s -o "{pto_render}" "{pto_render}"', cwd=work_dir)
timings["autooptimiser"] = time.time() - t0
# Remove S (crop) lines and reset distortion params
with open(pto_render) as f:
lines = f.readlines()
with open(pto_render, "w") as f:
for l in lines:
if l.startswith("S "):
continue
f.write(l)
# nona
print(" nona...", flush=True)
prefix = os.path.join(work_dir, f"{output_name}_tmp_")
t0 = time.time()
run_hugin("nona", f'-m TIFF_m -o "{prefix}" "{pto_render}"', cwd=work_dir)
timings["nona"] = time.time() - t0
tifs = sorted(glob.glob(f"{prefix}*.tif"))
print(f" Remapped: {len(tifs)} files")
if not tifs:
print(" ERROR: nona produced no output")
return None, timings
# Blend: verdandi for MCZ (no seam overexposure), enblend for NavCam (better vignette)
print(f" {blender}...", flush=True)
out_tif = os.path.join(work_dir, f"{output_name}.tif")
tif_list = " ".join([f'"{t}"' for t in tifs])
t0 = time.time()
if blender == "verdandi":
run_hugin("verdandi", f'-o "{out_tif}" {tif_list}', cwd=work_dir)
else:
run_hugin("enblend", f'--pre-assemble -o "{out_tif}" {tif_list}', cwd=work_dir)
timings["blend"] = time.time() - t0
# Convert to PNG
out_png = os.path.join(work_dir, f"{output_name}.png")
try:
img = Image.open(out_tif)
img.save(out_png, "PNG")
size_str = f"{img.size[0]}x{img.size[1]}"
fsize = os.path.getsize(out_png) / 1024 / 1024
print(f" Output: {size_str}, {fsize:.1f}MB")
except Exception as e:
print(f" ERROR converting: {e}")
out_png = None
# Cleanup temp files
for t in tifs:
os.remove(t)
if os.path.exists(out_tif):
os.remove(out_tif)
return out_png, timings
# ============================================================
# Main orchestrator
# ============================================================
def process_navcam(sol, camera, rover, data_dir, output_dir):
"""Process a NavCam panorama: find sequence, combine tiles, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"NAVCAM Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0] # Best sequence
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
t01 = [i for i in imgs if i["tile"] == "01"]
t04 = [i for i in imgs if i["tile"] == "04"]
if not t01 or not t04:
continue
az, el = t01[0]["az"], t01[0]["el"]
t01_path = os.path.join(src_dir, f"{t01[0]['imageid']}01.png")
t04_path = os.path.join(src_dir, f"{t04[0]['imageid']}01.png")
# Download if missing
for tile_img, tile_src in [(t01_path, t01[0].get("src")), (t04_path, t04[0].get("src"))]:
if not os.path.exists(tile_img) and tile_src:
os.makedirs(os.path.dirname(tile_img), exist_ok=True)
subprocess.run(["wget", "-q", tile_src, "-O", tile_img], timeout=30)
if not os.path.exists(t01_path) or not os.path.exists(t04_path):
print(f" Missing tiles for az={az:.1f}")
continue
# Combine tiles
fname = f"pointage_az{az:.2f}.png"
orig_path = os.path.join(work_dir, fname)
combine_navcam_tiles(t01_path, t04_path, orig_path)
# CLAHE
clahe_path = os.path.join(preproc_dir, fname)
apply_clahe_navcam(orig_path, clahe_path)
originals.append(orig_path)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" az={az:>7.1f}: tiles combined + CLAHE")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV[camera]["combined_hfov"]
img_w = 2560 # 1288 + 1288 - 16
img_h = 968
output_name = f"panorama_sol{sol}"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name,
camera=camera)
print_timings(timings)
return result
def process_mastcamz(sol, camera, rover, data_dir, output_dir):
"""Process a Mastcam-Z panorama: find sequence, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"MASTCAM-Z Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0]
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}_mcz")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
# Deduplicate by azimuth
seen_az = OrderedDict()
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
if az_key not in seen_az:
seen_az[az_key] = imgs[0]
for az_key, img_data in seen_az.items():
az, el = img_data["az"], img_data["el"]
imageid = img_data["imageid"]
fname = f"{imageid}01.png"
img_path = os.path.join(src_dir, fname)
# Download if missing
if not os.path.exists(img_path):
src_url = img_data.get("src")
if src_url:
os.makedirs(src_dir, exist_ok=True)
subprocess.run(["wget", "-q", src_url, "-O", img_path], timeout=30)
if not os.path.exists(img_path) or os.path.getsize(img_path) < 1000:
continue
# Copy to work dir
work_img = os.path.join(work_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(work_img):
cv2.imwrite(work_img, cv2.imread(img_path))
# CLAHE
clahe_path = os.path.join(preproc_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(clahe_path):
apply_clahe_single(work_img, clahe_path)
originals.append(work_img)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" Prepared {len(originals)} images")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
img_w, img_h = 1648, 1200
output_name = f"panorama_sol{sol}_mcz"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name,
camera=camera)
print_timings(timings)
return result
def print_timings(timings):
"""Print pipeline timing summary."""
if not timings:
return
total = sum(timings.values())
print(f"\n Timings:")
for step, t in timings.items():
print(f" {step:<20} {t:>8.1f}s")
print(f" {'TOTAL':<20} {total:>8.1f}s")
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="Mars Rover Panorama Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# NavCam panorama
%(prog)s --sol 1813 --camera NAVCAM_LEFT
# Mastcam-Z panorama
%(prog)s --sol 1817 --camera MCZ_LEFT
# List available sequences for a sol
%(prog)s --sol 1813 --camera NAVCAM_LEFT --list-only
""")
parser.add_argument("--sol", type=int, required=True, help="Sol number")
parser.add_argument("--camera", required=True,
choices=["NAVCAM_LEFT", "NAVCAM_RIGHT", "MCZ_LEFT", "MCZ_RIGHT"],
help="Camera name")
parser.add_argument("--rover", default="perseverance",
choices=["perseverance", "curiosity"],
help="Rover name (default: perseverance)")
parser.add_argument("--data-dir", default="/data",
help="Base data directory (default: /data)")
parser.add_argument("--output-dir", default="/output",
help="Output directory (default: /output)")
parser.add_argument("--list-only", action="store_true",
help="Only list available sequences, don't process")
args = parser.parse_args()
# When running in Docker, /data and /output are volume mounts
# When running standalone, use actual paths
data_dir = args.data_dir
output_dir = args.output_dir
if args.list_only:
sequences = find_panorama_sequences(args.sol, args.camera, args.rover)
if not sequences:
print(f"No valid sequences for sol {args.sol} {args.camera}")
return
print(f"{'SeqID':<15} {'Ptgs':>5} {'Range':>7} {'MaxGap':>7}")
print("-" * 40)
for s in sequences:
print(f"{s['seq_id']:<15} {s['n_pointings']:>5} "
f"{s['az_range']:>6.0f}° {s['max_gap']:>6.1f}°")
return
# Process
is_navcam = "NAVCAM" in args.camera
if is_navcam:
result = process_navcam(args.sol, args.camera, args.rover, data_dir, output_dir)
else:
result = process_mastcamz(args.sol, args.camera, args.rover, data_dir, output_dir)
if result:
print(f"\nSUCCESS: {result}")
else:
print("\nFAILED")
sys.exit(1)
if __name__ == "__main__":
main()