Initial commit: Dockerfile, pipeline script, docker-compose

- Dockerfile based on debian:bookworm with Hugin 2024, Python 3, OpenCV
- Automated panorama pipeline for NavCam and Mastcam-Z
- CLAHE preprocessing for cpfind, render with originals
- NavCam tile 01+04 combining with 16px overlap
- docker-compose.yml for homelab deployment

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Franck Garnier
2026-04-12 10:26:47 -04:00
commit 97e7101855
5 changed files with 762 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
__pycache__/
*.pyc
*.tif
*.tiff
*.png
!pipeline/*.py
.env
*.log

47
Dockerfile Normal file
View File

@@ -0,0 +1,47 @@
FROM debian:bookworm-slim
LABEL maintainer="franck.garnier" \
description="Mars rover panorama stitching pipeline - Hugin 2024 + Python + OpenCV"
# Install Hugin CLI tools + Python + dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
hugin-tools \
enblend \
python3 \
python3-pip \
python3-venv \
python3-numpy \
python3-opencv \
python3-pil \
python3-mysql.connector \
wget \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Install additional Python packages
RUN pip3 install --no-cache-dir --break-system-packages \
requests \
python-dotenv
# Create working directories
RUN mkdir -p /data /output /app
WORKDIR /app
# Copy pipeline scripts
COPY pipeline/ /app/
# Make scripts executable
RUN chmod +x /app/*.py 2>/dev/null || true
RUN chmod +x /app/*.sh 2>/dev/null || true
# Verify installations
RUN nona 2>&1 | head -1 || true && \
cpfind 2>&1 | head -1 || true && \
enblend --version 2>&1 | head -1 || true && \
python3 -c "import cv2; print(f'OpenCV {cv2.__version__}')" && \
python3 -c "import numpy; print(f'NumPy {numpy.__version__}')" && \
python3 -c "from PIL import Image; print('Pillow OK')"
ENTRYPOINT ["python3", "/app/panorama_pipeline.py"]
CMD ["--help"]

64
README.md Normal file
View File

@@ -0,0 +1,64 @@
# Mars Panorama Pipeline
Automated panorama stitching for NASA Mars rovers (Perseverance, Curiosity) using Hugin CLI tools in Docker.
## Quick Start
```bash
# Build the image
docker build -t mars-panorama-pipeline .
# Run a NavCam panorama (sol 1813)
docker run --rm \
-v /mnt/astro/mars_rovers:/data \
-v /mnt/astro/mars_rovers/images/panorama/perseverance:/output \
mars-panorama-pipeline \
--sol 1813 --camera NAVCAM_LEFT
# Run a Mastcam-Z panorama (sol 1817)
docker run --rm \
-v /mnt/astro/mars_rovers:/data \
-v /mnt/astro/mars_rovers/images/panorama/perseverance:/output \
mars-panorama-pipeline \
--sol 1817 --camera MCZ_LEFT
# List available sequences for a sol
docker run --rm \
-v /mnt/astro/mars_rovers:/data \
mars-panorama-pipeline \
--sol 1813 --camera NAVCAM_LEFT --list-only
```
## With Docker Compose
```bash
docker compose run panorama --sol 1813 --camera NAVCAM_LEFT
```
## Pipeline Steps
1. **Find sequence** — Query MySQL for panorama-capable image sequences
2. **Combine tiles** — NavCam tiles 01+04 with 16px overlap blending
3. **CLAHE preprocessing** — Vignette correction + contrast normalization (for cpfind only)
4. **cpfind** — Feature matching on CLAHE images (aggressive params)
5. **cpclean** — Remove outlier control points
6. **Swap to originals** — Replace CLAHE images with originals in PTO
7. **autooptimiser** — Geometry-only optimization (NO photometric -m flag)
8. **nona + enblend** — Remap and blend final panorama
9. **Export PNG** — Convert TIFF output to PNG
## Supported Cameras
| Camera | Rover | FOV | Tile combining |
|--------|-------|-----|---------------|
| NAVCAM_LEFT/RIGHT | Perseverance | 82.17° (combined) | Yes (01+04, 16px overlap) |
| MCZ_LEFT/RIGHT | Perseverance | 25.6° (wide zoom) | No |
## Configuration
Environment variables (or .env file):
- `MYSQL_HOST` — MySQL server (default: 192.168.1.42)
- `MYSQL_PORT` — MySQL port (default: 3306)
- `MYSQL_USER` — MySQL user (default: soldan)
- `MYSQL_PASSWORD` — MySQL password
- `MYSQL_DATABASE` — Database name (default: mars_rovers)

18
docker-compose.yml Normal file
View File

@@ -0,0 +1,18 @@
version: '3.8'
services:
panorama:
build: .
image: mars-panorama-pipeline:latest
volumes:
# Mount NAS astro disk (images source + output)
- /mnt/astro/mars_rovers:/data
- /mnt/astro/mars_rovers/images/panorama/perseverance:/output
environment:
- MYSQL_HOST=192.168.1.42
- MYSQL_PORT=3306
- MYSQL_USER=soldan
- MYSQL_PASSWORD=Sol17Smr
- MYSQL_DATABASE=mars_rovers
# Override entrypoint for interactive use
# docker compose run panorama --sol 1813 --camera NAVCAM_LEFT

View File

@@ -0,0 +1,625 @@
#!/usr/bin/env python3
"""
Mars Rover Panorama Pipeline
=============================
Automated panorama stitching for Perseverance and Curiosity NavCam/Mastcam-Z images.
Usage:
# From Docker:
docker run -v /path/to/images:/data -v /path/to/output:/output mars-panorama-pipeline \
--sol 1813 --camera NAVCAM_LEFT --rover perseverance
# Standalone:
python3 panorama_pipeline.py --sol 1813 --camera NAVCAM_LEFT --rover perseverance \
--data-dir /mnt/astro/mars_rovers --output-dir /mnt/astro/mars_rovers/images/panorama
"""
import argparse
import cv2
import glob
import math
import mysql.connector
import numpy as np
import os
import re
import subprocess
import sys
import time
from collections import defaultdict, OrderedDict
from PIL import Image
Image.MAX_IMAGE_PIXELS = 500000000
# ============================================================
# Configuration
# ============================================================
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "192.168.1.42"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "soldan"),
"password": os.getenv("MYSQL_PASSWORD", "Sol17Smr"),
"database": os.getenv("MYSQL_DATABASE", "mars_rovers"),
}
# NavCam tile overlap
TILE_OVERLAP_PX = 16
# Camera FOV reference (empirical measurements)
CAMERA_FOV = {
# Perseverance NavCam: combined tiles 01+04 (2560px after 16px overlap)
"NAVCAM_LEFT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
"NAVCAM_RIGHT": {"combined_hfov": 82.17, "single_hfov": 47.4, "f_px": 1468.0},
# Mastcam-Z at wide zoom (26mm)
"MCZ_LEFT": {"hfov": 25.6, "native_res": (1648, 1200)},
"MCZ_RIGHT": {"hfov": 25.6, "native_res": (1648, 1200)},
}
# ============================================================
# Utility functions
# ============================================================
def get_seq(imageid):
"""Extract sequence_id from Perseverance imageid."""
parts = imageid.split("_")
if len(parts) >= 5 and len(parts[4]) >= 17:
return parts[4][8:]
return None
def get_tile(imageid):
"""Extract tile number from imageid."""
parts = imageid.split("_")
if len(parts) >= 7:
return parts[-2]
return None
def run_hugin(tool, args, cwd=None, timeout=600):
"""Run a Hugin CLI tool."""
cmd = f"{tool} {args}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True,
timeout=timeout, cwd=cwd)
return result
# ============================================================
# Step 1: Find panorama sequence
# ============================================================
def find_panorama_sequences(sol, camera, rover="perseverance"):
"""Find complete panorama sequences for a given sol and camera."""
conn = mysql.connector.connect(**DB_CONFIG)
cur = conn.cursor()
is_navcam = "NAVCAM" in camera
dim_filter = "AND dimension = '(1288,968)'" if is_navcam else "AND dimension = '(1648,1200)'"
cur.execute(f"""
SELECT imageid, mast_az, mast_el, dimension, image_src
FROM photos_{rover}
WHERE sol = %s AND camera_name = %s
AND mast_az IS NOT NULL AND mast_az != 'UNK'
AND sample_type = 'full'
{dim_filter}
ORDER BY CAST(mast_az AS DECIMAL(10,2)), imageid
""", (sol, camera))
rows = cur.fetchall()
cur.close()
conn.close()
# Group by (sequence_id, azimuth)
sequences = defaultdict(lambda: defaultdict(list))
for imageid, az, el, dim, src in rows:
seq = get_seq(imageid)
if seq:
az_key = round(float(az), 0)
sequences[seq][az_key].append({
"imageid": imageid, "tile": get_tile(imageid),
"az": float(az), "el": float(el), "src": src,
})
# Determine FOV for chain validation
if is_navcam:
fov = CAMERA_FOV[camera]["combined_hfov"]
else:
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
# Find valid sequences
results = []
for seq_id, pointings in sequences.items():
azs = sorted(pointings.keys())
n_ptg = len(azs)
if n_ptg < 4:
continue
if is_navcam:
has_pairs = all(len(pointings[az]) >= 2 for az in azs)
if not has_pairs:
continue
gaps = [azs[i + 1] - azs[i] for i in range(len(azs) - 1)]
max_gap = max(gaps) if gaps else 0
chain_ok = all(g < fov for g in gaps)
if chain_ok:
results.append({
"seq_id": seq_id,
"n_pointings": n_ptg,
"az_range": azs[-1] - azs[0],
"max_gap": max_gap,
"pointings": dict(pointings),
})
results.sort(key=lambda x: -x["az_range"])
return results
# ============================================================
# Step 2: Prepare images (combine tiles, CLAHE)
# ============================================================
def combine_navcam_tiles(t01_path, t04_path, output_path, overlap=TILE_OVERLAP_PX):
"""Combine NavCam tiles 01+04 with overlap blending."""
t01 = cv2.imread(t01_path)
t04 = cv2.imread(t04_path)
if t01 is None or t04 is None:
return None
h, w = t01.shape[:2]
combined_w = w + w - overlap
combined = np.zeros((h, combined_w, 3), dtype=np.uint8)
combined[:, :w - overlap, :] = t01[:, :w - overlap, :]
combined[:, w:, :] = t04[:, overlap:, :]
# Linear blend in overlap zone
for px in range(overlap):
alpha = px / overlap
combined[:, w - overlap + px, :] = (
(1 - alpha) * t01[:, w - overlap + px, :].astype(float) +
alpha * t04[:, px, :].astype(float)
).astype(np.uint8)
cv2.imwrite(output_path, combined)
return combined
def apply_clahe_navcam(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + per-tile vignette correction for NavCam combined images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
hw = w // 2
corrected = img.astype(np.float32)
for tile_start in [0, hw]:
cx = tile_start + hw // 2
cy = h // 2
Y, X = np.ogrid[:h, tile_start:tile_start + hw]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt((hw // 2) ** 2 + (h // 2) ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:h, tile_start:tile_start + hw, ch] = np.clip(
corrected[:h, tile_start:tile_start + hw, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
def apply_clahe_single(img_path, out_path, vignette_strength=0.35, clip_limit=2.0):
"""CLAHE + single radial vignette correction for Mastcam-Z images."""
img = cv2.imread(img_path)
h, w = img.shape[:2]
corrected = img.astype(np.float32)
cy, cx = h // 2, w // 2
Y, X = np.ogrid[:h, :w]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2).astype(np.float32)
max_dist = np.sqrt(cx ** 2 + cy ** 2)
gain = 1.0 + vignette_strength * (dist / max_dist) ** 2
for ch in range(3):
corrected[:, :, ch] = np.clip(corrected[:, :, ch] * gain, 0, 255)
corrected = corrected.astype(np.uint8)
lab = cv2.cvtColor(corrected, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(16, 16))
l_clahe = clahe.apply(l)
result = cv2.cvtColor(cv2.merge([l_clahe, a, b]), cv2.COLOR_LAB2BGR)
cv2.imwrite(out_path, result)
return result
# ============================================================
# Step 3: Build PTO
# ============================================================
def build_pto(pto_path, img_paths, azs, els, img_w, img_h, fov, ref_idx=None):
"""Build a Hugin PTO file."""
if ref_idx is None:
ref_idx = len(img_paths) // 2
lines = []
for i, (path, az, el) in enumerate(zip(img_paths, azs, els)):
if i == 0:
lines.append(
f'i w{img_w} h{img_h} f0 v{fov} Ra0 Rb0 Rc0 Rd0 Re0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a0 b0 c0 d0 e0 g0 t0 Va1 Vb0 Vc0 Vd0 Vx0 Vy0 Vm5 n"{path}"')
else:
lines.append(
f'i w{img_w} h{img_h} f0 v=0 Ra=0 Rb=0 Rc=0 Rd=0 Re=0 Eev0 Er1 Eb1 '
f'r0 p{el} y{az} TrX0 TrY0 TrZ0 Tpy0 Tpp0 j0 '
f'a=0 b=0 c=0 d=0 e=0 g=0 t=0 Va=0 Vb=0 Vc=0 Vd=0 Vx=0 Vy=0 Vm5 n"{path}"')
geo_vars = "\n".join([f"v r{i}\nv p{i}\nv y{i}" for i in range(len(img_paths))])
content = "# hugin project file\n#hugin_ptoversion 2\n"
content += 'p f2 w6064 h1330 v360 k0 E0 R0 n"TIFF_m c:LZW r:CROP"\nm i0\n\n'
content += "\n".join(lines) + "\n\n"
content += geo_vars + "\nv\n\n"
content += f"#hugin_optimizeReferenceImage {ref_idx}\n"
with open(pto_path, "w") as f:
f.write(content)
return pto_path
# ============================================================
# Step 4: Run Hugin pipeline
# ============================================================
def run_pipeline(work_dir, clahe_images, original_images, azs, els,
img_w, img_h, fov, output_name):
"""Full pipeline: cpfind(CLAHE) -> swap(originals) -> optimize(geo) -> nona -> enblend."""
timings = {}
# Build CLAHE PTO
pto_clahe = os.path.join(work_dir, f"{output_name}_clahe.pto")
build_pto(pto_clahe, clahe_images, azs, els, img_w, img_h, fov)
# cpfind on CLAHE images
print(" cpfind...", flush=True)
t0 = time.time()
r = run_hugin("cpfind",
f"--multirow --celeste "
f"--sieve1width 20 --sieve1height 20 --sieve1size 1000 "
f"--sieve2width 10 --sieve2height 10 --sieve2size 5 "
f"--minmatches 3 --ransaciter 2000 --ransacdist 50 "
f'-o "{pto_clahe}" "{pto_clahe}"',
cwd=work_dir)
timings["cpfind"] = time.time() - t0
for line in r.stdout.split("\n"):
if "Found" in line:
print(f" {line.strip()}")
# cpclean
print(" cpclean...", flush=True)
t0 = time.time()
run_hugin("cpclean", f'-o "{pto_clahe}" "{pto_clahe}"', cwd=work_dir)
timings["cpclean"] = time.time() - t0
with open(pto_clahe) as f:
cp_lines = [l for l in f if l.startswith("c ")]
print(f" CPs: {len(cp_lines)}")
if len(cp_lines) < 5:
print(" ERROR: Not enough control points")
return None, timings
# Build render PTO with originals + CPs from CLAHE
pto_render = os.path.join(work_dir, f"{output_name}_render.pto")
build_pto(pto_render, original_images, azs, els, img_w, img_h, fov)
with open(pto_render, "a") as f:
f.writelines(cp_lines)
# Geometry-only optimization (NO -m, NO -a to avoid d/e distortion)
print(" autooptimiser (geo only)...", flush=True)
t0 = time.time()
run_hugin("autooptimiser", f'-a -l -s -o "{pto_render}" "{pto_render}"', cwd=work_dir)
timings["autooptimiser"] = time.time() - t0
# Remove S (crop) lines and reset distortion params
with open(pto_render) as f:
lines = f.readlines()
with open(pto_render, "w") as f:
for l in lines:
if l.startswith("S "):
continue
f.write(l)
# nona
print(" nona...", flush=True)
prefix = os.path.join(work_dir, f"{output_name}_tmp_")
t0 = time.time()
run_hugin("nona", f'-m TIFF_m -o "{prefix}" "{pto_render}"', cwd=work_dir)
timings["nona"] = time.time() - t0
tifs = sorted(glob.glob(f"{prefix}*.tif"))
print(f" Remapped: {len(tifs)} files")
if not tifs:
print(" ERROR: nona produced no output")
return None, timings
# enblend
print(" enblend...", flush=True)
out_tif = os.path.join(work_dir, f"{output_name}.tif")
tif_list = " ".join([f'"{t}"' for t in tifs])
t0 = time.time()
run_hugin("enblend", f'--pre-assemble -o "{out_tif}" {tif_list}', cwd=work_dir)
timings["enblend"] = time.time() - t0
# Convert to PNG
out_png = os.path.join(work_dir, f"{output_name}.png")
try:
img = Image.open(out_tif)
img.save(out_png, "PNG")
size_str = f"{img.size[0]}x{img.size[1]}"
fsize = os.path.getsize(out_png) / 1024 / 1024
print(f" Output: {size_str}, {fsize:.1f}MB")
except Exception as e:
print(f" ERROR converting: {e}")
out_png = None
# Cleanup temp files
for t in tifs:
os.remove(t)
if os.path.exists(out_tif):
os.remove(out_tif)
return out_png, timings
# ============================================================
# Main orchestrator
# ============================================================
def process_navcam(sol, camera, rover, data_dir, output_dir):
"""Process a NavCam panorama: find sequence, combine tiles, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"NAVCAM Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0] # Best sequence
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
t01 = [i for i in imgs if i["tile"] == "01"]
t04 = [i for i in imgs if i["tile"] == "04"]
if not t01 or not t04:
continue
az, el = t01[0]["az"], t01[0]["el"]
t01_path = os.path.join(src_dir, f"{t01[0]['imageid']}01.png")
t04_path = os.path.join(src_dir, f"{t04[0]['imageid']}01.png")
# Download if missing
for tile_img, tile_src in [(t01_path, t01[0].get("src")), (t04_path, t04[0].get("src"))]:
if not os.path.exists(tile_img) and tile_src:
os.makedirs(os.path.dirname(tile_img), exist_ok=True)
subprocess.run(["wget", "-q", tile_src, "-O", tile_img], timeout=30)
if not os.path.exists(t01_path) or not os.path.exists(t04_path):
print(f" Missing tiles for az={az:.1f}")
continue
# Combine tiles
fname = f"pointage_az{az:.2f}.png"
orig_path = os.path.join(work_dir, fname)
combine_navcam_tiles(t01_path, t04_path, orig_path)
# CLAHE
clahe_path = os.path.join(preproc_dir, fname)
apply_clahe_navcam(orig_path, clahe_path)
originals.append(orig_path)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" az={az:>7.1f}: tiles combined + CLAHE")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV[camera]["combined_hfov"]
img_w = 2560 # 1288 + 1288 - 16
img_h = 968
output_name = f"panorama_sol{sol}"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name)
print_timings(timings)
return result
def process_mastcamz(sol, camera, rover, data_dir, output_dir):
"""Process a Mastcam-Z panorama: find sequence, CLAHE, stitch."""
print(f"\n{'=' * 60}")
print(f"MASTCAM-Z Panorama: Sol {sol}, {camera}, {rover}")
print(f"{'=' * 60}")
sequences = find_panorama_sequences(sol, camera, rover)
if not sequences:
print(f" No valid panorama sequences found for sol {sol}")
return None
seq = sequences[0]
print(f" Sequence: {seq['seq_id']}, {seq['n_pointings']} pointings, "
f"{seq['az_range']:.0f}° range, max_gap={seq['max_gap']:.1f}°")
work_dir = os.path.join(output_dir, f"sol_{sol}_mcz")
preproc_dir = os.path.join(work_dir, "preprocessed")
os.makedirs(preproc_dir, exist_ok=True)
src_dir = os.path.join(data_dir, "images", rover, f"sol_{sol}", camera)
originals = []
clahe_imgs = []
azs = []
els = []
# Deduplicate by azimuth
seen_az = OrderedDict()
for az_key in sorted(seq["pointings"].keys()):
imgs = seq["pointings"][az_key]
if az_key not in seen_az:
seen_az[az_key] = imgs[0]
for az_key, img_data in seen_az.items():
az, el = img_data["az"], img_data["el"]
imageid = img_data["imageid"]
fname = f"{imageid}01.png"
img_path = os.path.join(src_dir, fname)
# Download if missing
if not os.path.exists(img_path):
src_url = img_data.get("src")
if src_url:
os.makedirs(src_dir, exist_ok=True)
subprocess.run(["wget", "-q", src_url, "-O", img_path], timeout=30)
if not os.path.exists(img_path) or os.path.getsize(img_path) < 1000:
continue
# Copy to work dir
work_img = os.path.join(work_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(work_img):
cv2.imwrite(work_img, cv2.imread(img_path))
# CLAHE
clahe_path = os.path.join(preproc_dir, f"az{az:.1f}_{imageid}.png")
if not os.path.exists(clahe_path):
apply_clahe_single(work_img, clahe_path)
originals.append(work_img)
clahe_imgs.append(clahe_path)
azs.append(az)
els.append(el)
print(f" Prepared {len(originals)} images")
if len(originals) < 4:
print(" ERROR: Not enough images")
return None
fov = CAMERA_FOV.get(camera, {}).get("hfov", 25.6)
img_w, img_h = 1648, 1200
output_name = f"panorama_sol{sol}_mcz"
result, timings = run_pipeline(work_dir, clahe_imgs, originals,
azs, els, img_w, img_h, fov, output_name)
print_timings(timings)
return result
def print_timings(timings):
"""Print pipeline timing summary."""
if not timings:
return
total = sum(timings.values())
print(f"\n Timings:")
for step, t in timings.items():
print(f" {step:<20} {t:>8.1f}s")
print(f" {'TOTAL':<20} {total:>8.1f}s")
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="Mars Rover Panorama Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# NavCam panorama
%(prog)s --sol 1813 --camera NAVCAM_LEFT
# Mastcam-Z panorama
%(prog)s --sol 1817 --camera MCZ_LEFT
# List available sequences for a sol
%(prog)s --sol 1813 --camera NAVCAM_LEFT --list-only
""")
parser.add_argument("--sol", type=int, required=True, help="Sol number")
parser.add_argument("--camera", required=True,
choices=["NAVCAM_LEFT", "NAVCAM_RIGHT", "MCZ_LEFT", "MCZ_RIGHT"],
help="Camera name")
parser.add_argument("--rover", default="perseverance",
choices=["perseverance", "curiosity"],
help="Rover name (default: perseverance)")
parser.add_argument("--data-dir", default="/data",
help="Base data directory (default: /data)")
parser.add_argument("--output-dir", default="/output",
help="Output directory (default: /output)")
parser.add_argument("--list-only", action="store_true",
help="Only list available sequences, don't process")
args = parser.parse_args()
# When running in Docker, /data and /output are volume mounts
# When running standalone, use actual paths
data_dir = args.data_dir
output_dir = args.output_dir
if args.list_only:
sequences = find_panorama_sequences(args.sol, args.camera, args.rover)
if not sequences:
print(f"No valid sequences for sol {args.sol} {args.camera}")
return
print(f"{'SeqID':<15} {'Ptgs':>5} {'Range':>7} {'MaxGap':>7}")
print("-" * 40)
for s in sequences:
print(f"{s['seq_id']:<15} {s['n_pointings']:>5} "
f"{s['az_range']:>6.0f}° {s['max_gap']:>6.1f}°")
return
# Process
is_navcam = "NAVCAM" in args.camera
if is_navcam:
result = process_navcam(args.sol, args.camera, args.rover, data_dir, output_dir)
else:
result = process_mastcamz(args.sol, args.camera, args.rover, data_dir, output_dir)
if result:
print(f"\nSUCCESS: {result}")
else:
print("\nFAILED")
sys.exit(1)
if __name__ == "__main__":
main()