visto che le strisciate possono essere anche molto lunghe si puo' selezionare un intervallo di righe
Se si mettono a confronto la riflettanza (prima immagine) e la radianza (seconda immagine) si vede che la riflettanza e' decisamente piu' rumorosa ed ha valori superiori ad 1 (dovrebbero essere sempre inferiori ad 1)
se poi vado ad estrarre lo spettro sul pannello di riferimento lo spettro non e' piatto (il dato e' clippato ma e' un artefatto del programma, se si vede l'immagine successiva si deve che i DN sono intorno a 3000-3100 quando il limite superiore e' 4095)
Temo che questo derivi dal metodo con cui lo strumento viene calibrato...a terra viene effettuato una immagine di dark current con l'otturatore chiuso ed una immagine di bianco tenendo in mano il drone ed inquadrando il bianco di riferimento..in pratica il bianco non e' ricavato direttamente dalla scena e possono passare anche diversi minuti tra il bianco a terra e la ripresa di tutta la scena...il che vuol dire che le condizioni di illuminazione possono essere cambiate
#!/usr/bin/env python3
"""
Computes a full calibrated reflectance cube (all bands) from an ENVI BIL
hyperspectral cube using dark-current and white-reference correction files
plus the actual spectral reflectance curve of the calibration panel, and
saves the result as a standard ENVI file (float32, BIL).
Usage:
python3 make_reflectance_cube.py figspec.hdr figspec.spe \
--black figspec.figspecblack \
--white figspec.figspecwhite \
--panel Reflectance_Calibration_Cloth.figspecref \
--out reflectance_cube
Produces:
reflectance_cube.hdr
reflectance_cube.img (float32, BIL, same samples/lines/bands as input)
Reflectance is computed per band, per spatial column as:
reflectance = (raw - dark) / (white - dark + eps) * panel_reflectance(wavelength)
Processes the cube in chunks of lines to keep memory usage low regardless
of file size.
"""
import os
import re
import argparse
import numpy as np
from scipy.signal import savgol_filter
def parse_hdr(hdr_path):
txt = open(hdr_path, "r", errors="ignore").read()
def get_int(key):
m = re.search(rf"{key}\s*=\s*(\d+)", txt)
return int(m.group(1))
samples = get_int("samples")
lines = get_int("lines")
bands = get_int("bands")
dtype_code = get_int("data type")
m = re.search(r"interleave\s*=\s*(\w+)", txt, re.IGNORECASE)
interleave = m.group(1).lower()
m = re.search(r"byte order\s*=\s*(\d+)", txt)
byte_order = int(m.group(1)) if m else 0
m = re.search(r"wavelength\s*=\s*\{([^}]+)\}", txt, re.IGNORECASE)
wavelengths = [float(x) for x in m.group(1).split(",")] if m else None
m = re.search(r"wavelength units\s*=\s*(\S+)", txt, re.IGNORECASE)
wl_units = m.group(1).strip() if m else "Nanometers"
dtype_map = {1: np.uint8, 2: np.int16, 3: np.int32, 4: np.float32,
5: np.float64, 12: np.uint16, 13: np.uint32,
14: np.int64, 15: np.uint64}
np_dtype = np.dtype(dtype_map[dtype_code])
np_dtype = np_dtype.newbyteorder(">" if byte_order == 1 else "<")
return dict(samples=samples, lines=lines, bands=bands, dtype=np_dtype,
interleave=interleave, wavelengths=wavelengths,
wl_units=wl_units, raw_text=txt)
def auto_offset_and_lines(path, bands, samples, itemsize, forced_lines=None,
forced_offset=None):
per_line_bytes = bands * samples * itemsize
file_size = os.path.getsize(path)
if forced_offset is not None and forced_lines is not None:
return forced_offset, forced_lines
expected_lines = forced_lines if forced_lines is not None else file_size // per_line_bytes
expected = per_line_bytes * expected_lines
if file_size == expected:
return 0, expected_lines
remainder = file_size % per_line_bytes
lines = file_size // per_line_bytes
print(f" [{os.path.basename(path)}] size={file_size} -> "
f"auto offset={remainder} bytes, lines={lines}")
return remainder, lines
def parse_correction_file(path, n_bands):
"""Parse a .figspecblack/.figspecwhite correction file -> (bands, spatial_bins)."""
data = open(path, "rb").read()
text = data.decode("latin1")
matches = list(re.finditer(r"[0-9a-fA-F]{200,}", text))
if not matches:
raise ValueError(f"No hex-encoded correction block found in {path}")
m = matches[0]
hexstr = data[m.start():m.end()].decode("ascii")
raw = bytes.fromhex(hexstr)
arr = np.frombuffer(raw, dtype="<f4")
if arr.size % n_bands != 0:
raise ValueError(f"{path}: {arr.size} floats doesn't divide evenly by {n_bands} bands")
spatial_bins = arr.size // n_bands
return arr.reshape(n_bands, spatial_bins)
def parse_panel_reflectance(path):
"""Parse the calibration cloth CSV -> sorted (wavelengths_nm, reflectance_fraction) arrays."""
data = open(path, "rb").read().decode("latin1")
rows = []
for line in data.splitlines()[1:]:
line = line.strip().strip(",")
if not line:
continue
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
continue
try:
wl = float(parts[0])
refl_pct = float(parts[1])
rows.append((wl, refl_pct))
except ValueError:
continue
rows.sort()
wl = np.array([r[0] for r in rows])
refl = np.array([r[1] for r in rows]) / 100.0
return wl, refl
def upsample_spatial_2d(profile_2d, target_len):
"""Upsample a (bands, spatial_bins) map to (bands, target_len) via nearest-neighbor repeat."""
bins = profile_2d.shape[1]
factor = target_len / bins
if float(factor).is_integer():
return np.repeat(profile_2d, int(factor), axis=1)
x_old = np.linspace(0, 1, bins)
x_new = np.linspace(0, 1, target_len)
out = np.empty((profile_2d.shape[0], target_len), dtype=np.float32)
for b in range(profile_2d.shape[0]):
out[b] = np.interp(x_new, x_old, profile_2d[b])
return out
def write_envi_hdr(out_hdr_path, samples, lines, bands, wavelengths, wl_units,
interleave="bil", description="Calibrated reflectance cube"):
with open(out_hdr_path, "w") as f:
f.write("ENVI\n")
f.write(f"description = {{{description}}}\n")
f.write(f"samples = {samples}\n")
f.write(f"lines = {lines}\n")
f.write(f"bands = {bands}\n")
f.write("header offset = 0\n")
f.write("file type = ENVI Standard\n")
f.write("data type = 4\n") # float32
f.write(f"interleave = {interleave}\n")
f.write("byte order = 0\n")
f.write(f"wavelength units = {wl_units}\n")
wl_str = ", ".join(f"{w:.4f}" for w in wavelengths)
f.write(f"wavelength = {{{wl_str}}}\n")
f.write("reflectance scale factor = 1.0\n")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("hdr_path")
ap.add_argument("data_path")
ap.add_argument("--black", required=True)
ap.add_argument("--white", required=True)
ap.add_argument("--panel", required=True,
help="CSV file with the calibration panel's true spectral reflectance")
ap.add_argument("--out", default="reflectance_cube",
help="Output base path (produces <out>.hdr and <out>.img)")
ap.add_argument("--offset", type=int, default=None)
ap.add_argument("--lines", type=int, default=None)
ap.add_argument("--from-line", type=int, default=0,
help="First line to process (0-based, default: 0)")
ap.add_argument("--to-line", type=int, default=None,
help="Last line to process, exclusive (default: process to the end)")
ap.add_argument("--cap", type=float, default=2.0,
help="Clip reflectance to [0, cap] to suppress numerical outliers "
"(use a large number or negative to disable)")
ap.add_argument("--chunk-lines", type=int, default=100,
help="Number of lines to process per chunk (memory/speed tradeoff)")
ap.add_argument("--no-radiance", action="store_true",
help="Skip writing the radiance (dark-subtracted DN) cube")
ap.add_argument("--no-saturation", action="store_true",
help="Skip saturation detection/report")
ap.add_argument("--saturation-value", type=float, default=4095,
help="Raw DN value considered 'saturated' (default 4095, typical for a "
"12-bit ADC stored in 16-bit words - adjust if your sensor differs)")
ap.add_argument("--no-calib-smoothing", action="store_true",
help="Skip Savitzky-Golay smoothing of the dark/white correction maps "
"(smoothing is ON by default - it removes band-to-band noise coming "
"from the correction maps themselves, which otherwise gets amplified "
"into the reflectance by the division)")
ap.add_argument("--calib-smooth-window", type=int, default=9,
help="Savitzky-Golay window (in bands) for dark/white map smoothing")
ap.add_argument("--calib-smooth-polyorder", type=int, default=2,
help="Savitzky-Golay polynomial order for dark/white map smoothing")
args = ap.parse_args()
meta = parse_hdr(args.hdr_path)
samples, bands = meta["samples"], meta["bands"]
dtype = meta["dtype"]
itemsize = dtype.itemsize
interleave = meta["interleave"]
wavelengths = np.array(meta["wavelengths"])
wl_units = meta["wl_units"]
if interleave != "bil":
raise ValueError("This script currently assumes BIL interleave.")
print("Detecting main cube offset...")
offset, lines = auto_offset_and_lines(
args.data_path, bands, samples, itemsize,
forced_lines=args.lines, forced_offset=args.offset)
per_line_bytes = bands * samples * itemsize
from_line = max(0, args.from_line)
to_line = args.to_line if args.to_line is not None else lines
to_line = min(to_line, lines)
if from_line >= to_line:
raise ValueError(f"--from-line ({from_line}) must be less than --to-line ({to_line}); "
f"cube has {lines} lines available.")
if from_line > 0 or to_line < lines:
print(f"Restricting to lines [{from_line}, {to_line}) out of {lines} total")
offset = offset + from_line * per_line_bytes
lines = to_line - from_line
print(f"Main cube: offset={offset}, lines={lines}, bands={bands}, samples={samples}")
print("Parsing dark/white correction maps...")
dark_map = parse_correction_file(args.black, bands) # (bands, spatial_bins)
white_map = parse_correction_file(args.white, bands) # (bands, spatial_bins)
print(f"Correction map shape: {dark_map.shape}")
if not args.no_calib_smoothing:
w = args.calib_smooth_window
if w % 2 == 0:
w += 1
print(f"Smoothing dark/white correction maps along the band axis "
f"(window={w}, polyorder={args.calib_smooth_polyorder})...")
dark_map = savgol_filter(dark_map, window_length=w,
polyorder=args.calib_smooth_polyorder, axis=0)
white_map = savgol_filter(white_map, window_length=w,
polyorder=args.calib_smooth_polyorder, axis=0)
print("Upsampling correction maps to full sample resolution...")
dark_full = upsample_spatial_2d(dark_map, samples).astype(np.float32) # (bands, samples)
white_full = upsample_spatial_2d(white_map, samples).astype(np.float32) # (bands, samples)
print("Parsing calibration panel reflectance curve...")
panel_wl, panel_refl = parse_panel_reflectance(args.panel)
# interpolate panel reflectance to our band wavelengths (clip at edges)
panel_frac = np.interp(wavelengths, panel_wl, panel_refl,
left=panel_refl[0], right=panel_refl[-1]).astype(np.float32)
print(f"Panel reflectance range used: {panel_frac.min():.3f} - {panel_frac.max():.3f}")
denom = (white_full - dark_full)
denom[np.abs(denom) < 1e-3] = 1e-3 # avoid divide-by-zero
# pre-multiply panel fraction into denom's reciprocal for speed:
gain = (panel_frac[:, None] / denom).astype(np.float32) # (bands, samples)
offset_term = (dark_full * gain).astype(np.float32) # (bands, samples), to subtract
out_hdr_path = args.out + ".hdr"
out_img_path = args.out + ".img"
write_envi_hdr(out_hdr_path, samples, lines, bands, wavelengths, wl_units,
description="Calibrated reflectance cube")
print(f"Wrote header: {out_hdr_path}")
rad_hdr_path = None
rad_img_path = None
if not args.no_radiance:
rad_hdr_path = args.out + "_radiance.hdr"
rad_img_path = args.out + "_radiance.img"
write_envi_hdr(rad_hdr_path, samples, lines, bands, wavelengths, wl_units,
description="Dark-subtracted radiance-proportional cube (raw DN units, "
"NOT absolute radiometric units - no sensor gain calibration applied)")
print(f"Wrote header: {rad_hdr_path}")
sat_value = args.saturation_value
band_sat_counts = np.zeros(bands, dtype=np.int64) # per-band saturated pixel count
pixel_sat_counts = np.zeros((lines, samples), dtype=np.uint16) # per-pixel: how many bands saturated
total_pixels_per_band = lines * samples
cube = np.memmap(args.data_path, dtype=dtype, mode="r", offset=offset,
shape=(lines, bands, samples))
cap = args.cap
chunk = args.chunk_lines
print(f"Processing {lines} lines in chunks of {chunk}...")
fout_rad = open(rad_img_path, "wb") if rad_img_path else None
with open(out_img_path, "wb") as fout:
for start in range(0, lines, chunk):
end = min(start + chunk, lines)
raw_chunk = np.array(cube[start:end, :, :], dtype=np.float32) # (n, bands, samples)
if not args.no_saturation:
sat_mask = raw_chunk >= sat_value # (n, bands, samples)
band_sat_counts += sat_mask.sum(axis=(0, 2))
pixel_sat_counts[start:end, :] += sat_mask.sum(axis=1).astype(np.uint16)
if fout_rad is not None:
rad_chunk = raw_chunk - dark_full[None, :, :]
np.maximum(rad_chunk, 0, out=rad_chunk)
fout_rad.write(rad_chunk.astype(np.float32).tobytes())
refl_chunk = raw_chunk * gain[None, :, :] - offset_term[None, :, :]
if cap is not None and cap > 0:
np.clip(refl_chunk, 0, cap, out=refl_chunk)
else:
np.maximum(refl_chunk, 0, out=refl_chunk)
fout.write(refl_chunk.astype(np.float32).tobytes())
print(f" lines {start}-{end} / {lines}", end="\r")
if fout_rad is not None:
fout_rad.close()
print(f"\nDone. Saved {out_img_path} ({os.path.getsize(out_img_path)} bytes) "
f"and {out_hdr_path}")
if rad_img_path:
print(f"Saved {rad_img_path} and {rad_hdr_path}")
if not args.no_saturation:
sat_report_path = args.out + "_saturation_report.txt"
with open(sat_report_path, "w") as f:
f.write(f"Saturation threshold: DN >= {sat_value}\n")
f.write(f"Total pixels per band: {total_pixels_per_band}\n\n")
f.write("band_idx wavelength_nm n_saturated_pixels pct_saturated\n")
any_saturated = False
for b in range(bands):
n = int(band_sat_counts[b])
if n > 0:
any_saturated = True
pct = 100.0 * n / total_pixels_per_band
f.write(f"{b:4d} {wavelengths[b]:7.1f} {n:10d} {pct:6.3f}%\n")
print(f"Saved per-band saturation report: {sat_report_path}")
if not any_saturated:
print("No saturated pixels found in any band at the given threshold.")
else:
worst_band = int(np.argmax(band_sat_counts))
print(f"Most-saturated band: {worst_band} ({wavelengths[worst_band]:.1f} nm), "
f"{band_sat_counts[worst_band]} pixels "
f"({100.0*band_sat_counts[worst_band]/total_pixels_per_band:.3f}%)")
# 2D map: how many bands are saturated at each pixel
sat_map_hdr = args.out + "_saturation_map.hdr"
sat_map_img = args.out + "_saturation_map.img"
with open(sat_map_hdr, "w") as f:
f.write("ENVI\n")
f.write("description = {Per-pixel count of saturated bands}\n")
f.write(f"samples = {samples}\n")
f.write(f"lines = {lines}\n")
f.write("bands = 1\n")
f.write("header offset = 0\n")
f.write("file type = ENVI Standard\n")
f.write("data type = 12\n") # uint16
f.write("interleave = bsq\n")
f.write("byte order = 0\n")
pixel_sat_counts.tofile(sat_map_img)
print(f"Saved per-pixel saturation-band-count map: {sat_map_img} / {sat_map_hdr}")
n_pixels_any_sat = int((pixel_sat_counts > 0).sum())
print(f"Pixels with at least 1 saturated band: {n_pixels_any_sat} "
f"({100.0*n_pixels_any_sat/total_pixels_per_band:.3f}% of image)")
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
plt.figure(figsize=(6, 10))
plt.imshow(pixel_sat_counts, aspect="auto", cmap="inferno")
plt.colorbar(label="N. bande sature")
plt.title("Mappa saturazione (n. bande sature per pixel)")
plt.xlabel("Sample")
plt.ylabel("Line")
plt.tight_layout()
sat_png = args.out + "_saturation_map.png"
plt.savefig(sat_png, dpi=120)
print(f"Saved saturation heatmap preview: {sat_png}")
except ImportError:
pass
if __name__ == "__main__":
main()