Durante il dottorato avevo provato a fare unmixing di suoli naturali
Una discreta serie di campioni di suolo naturale era stato raccolto in Mugello ed analizzato in laboratorio con varie tecniche tra cui XRD quantitativa.
Sulla base di questi dati avevo provato, senza successo, ad usare la spettrometria e l'unmixing per vedere se ricavavo risultati simili a quelli le laboratorio chimici..la sfida non e' banale perche' le specie chimiche od hanno poche features come il quarzo od hanno caratteristiche spettrali molto simili tra di loro
Visto che sono passati 15 anni e le tecniche matematiche si sono evolute ho volute riprovare aggiornando una pipeline piu' moderna composta
1) HySime (identificazione della dimensionalita', computazionalmente molto impegnativa)
2) ATGP (estrazione degli endmembers)
3) SVD SAM+SFF+SID-SAM (per definire a quale specie mineralogica appartiene lo spettro dell'endmember usando come base USGS Spectral Library v7)
4) FCLS (per calcolo delle percentuali delle specie mineralogiche nei campioni)
E' stata fatta una prima prova usando tutti gli spettri in ASCIIdata/ASCIIdata_splib07b_cvASD/ChapterM_Minerals ma i risultati non erano buoni..ho quindi usato tutte gli spettri delle specie mineralogiche identificate dal laboratorio mineralogico
I risultati sono abbastanza sconfortanti..il generale l'algoritmo estrae in automatico solo Illite e Montmorillonite con un rapporto di circa 5:1 (circa 80% Illite, 20% Montmorillonite quando il laboratorio mineralogico indica un rapporto di circa 1:1) Non e' sorprendente che non venga visto il Quarzo per la firma spettrale molto piatta e la Kaolinite per la bassa concentrazione.
Codide composto da Claude AI
"""
=============================================================================
Spectral Unmixing Pipeline
=============================================================================
Pipeline completo:
1. Caricamento libreria ENVI FieldSpec
2. Stima numero endmember con HySime
3. Estrazione endmember con VCA (Vertex Component Analysis)
4. Identificazione endmember vs USGS Spectral Library (ENVI format)
5. Quantificazione abbondanze con FCLS (Fully Constrained Least Squares)
Dipendenze:
pip install spectral numpy scipy scikit-learn matplotlib seaborn
=============================================================================
"""
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
from scipy.linalg import eig, svd
from scipy.optimize import nnls
from scipy.interpolate import interp1d
import spectral
import spectral.io.envi as envi
import warnings
import os
import sys
from pathlib import Path
warnings.filterwarnings("ignore")
# ---------------------------------------------------------------------------
# Configurazione colori per i plot
# ---------------------------------------------------------------------------
PALETTE = [
"#2196F3", "#F44336", "#4CAF50", "#FF9800", "#9C27B0",
"#00BCD4", "#FF5722", "#8BC34A", "#E91E63", "#3F51B5"
]
def atgp(Y: np.ndarray, p: int, verbose: bool = True) -> tuple[np.ndarray, np.ndarray]:
"""
ATGP – Automatic Target Generation Process.
Chang et al. (2006), IEEE TGRS.
A differenza di VCA, ATGP non assume pixel puri nel dataset.
Trova iterativamente i target più ortogonali rispetto a quelli già
selezionati, massimizzando la componente del segnale non ancora spiegata.
Algoritmo:
1. Inizializza con il pixel di norma massima (più lontano dall'origine)
2. A ogni iterazione: proietta tutti i pixel sullo spazio ortogonale
agli endmember già trovati, seleziona quello con norma massima
3. Ripeti fino a p endmember
Vantaggi rispetto a VCA per dati di campo:
- Non richiede l'assunzione di pixel puri
- Più robusto a outlier e rumore di misura
- Non dipende da SNR stimato
- Deterministico (nessuna componente casuale)
Parameters
----------
Y : np.ndarray (n_bands, n_pixels)
p : int numero di endmember da estrarre
Returns
-------
endmembers : np.ndarray (n_bands, p)
indices : np.ndarray (p,) indici degli spettri selezionati
"""
print(f"\n[3] ATGP – estrazione {p} endmember")
n_bands, n_pixels = Y.shape
# ── Pre-whitening: proiezione PCA per ridurre collinearità ───────────────
# Centra i dati
mu = Y.mean(axis=1, keepdims=True)
Yc = Y - mu
# SVD per whitening
U, s, Vt = svd(Yc, full_matrices=False)
n_comp = min(p * 3, n_bands - 1, n_pixels - 1) # componenti da usare
Yw = U[:, :n_comp].T @ Yc # (n_comp, n_pixels) — spazio ridotto
# ── Step 1: pixel iniziale = norma massima nello spazio ridotto ──────────
norms = np.linalg.norm(Yw, axis=0) # (n_pixels,)
indices = np.zeros(p, dtype=int)
indices[0] = np.argmax(norms)
if verbose:
print(f" Spazio ridotto : {n_comp} componenti")
print(f" Target #1 (norma massima): indice {indices[0]}, "
f"norma={norms[indices[0]]:.4f}")
# ── Step 2-p: proiezione ortogonale iterativa ────────────────────────────
for i in range(1, p):
# Matrice degli endmember già selezionati (nello spazio ridotto)
E = Yw[:, indices[:i]] # (n_comp, i)
# Proiettore ortogonale: P_orth = I - E (E^T E)^{-1} E^T
try:
EtE = E.T @ E
EtE_inv = np.linalg.inv(EtE + 1e-10 * np.eye(i))
P_orth = np.eye(n_comp) - E @ EtE_inv @ E.T # (n_comp, n_comp)
except np.linalg.LinAlgError:
P_orth = np.eye(n_comp) - E @ np.linalg.pinv(E)
# Proietta tutti i pixel e trova quello con norma massima nel residuo
Yr = P_orth @ Yw # (n_comp, n_pixels)
residual_norms = np.linalg.norm(Yr, axis=0)
# Evita di riselezionare lo stesso indice
residual_norms[indices[:i]] = -np.inf
indices[i] = np.argmax(residual_norms)
if verbose:
print(f" Target #{i+1}: indice {indices[i]}, "
f"norma residuo={residual_norms[indices[i]]:.4f}")
# ── Recupera spettri originali (non whitened) ────────────────────────────
endmembers = Y[:, indices] # (n_bands, p)
if verbose:
print(f" Indici selezionati : {indices.tolist()}")
print(f" Shape endmember matrix: {endmembers.shape}")
return endmembers, indices
# =============================================================================
# SEZIONE 1 – Caricamento libreria ENVI
# =============================================================================
def load_envi_library(hdr_path: str) -> tuple[np.ndarray, np.ndarray, dict]:
"""
Carica una libreria spettrale in formato ENVI (.hdr + .sli/.esl/.lib).
Parameters
----------
hdr_path : str
Percorso al file .hdr della libreria ENVI.
Returns
-------
spectra : np.ndarray shape (n_samples, n_bands)
wavelengths : np.ndarray shape (n_bands,)
meta : dict metadati ENVI
"""
print(f"\n[1] Caricamento libreria ENVI: {hdr_path}")
lib = envi.open(hdr_path)
meta = lib.metadata
# Supporta sia SpectralLibrary (.spectra) sia BsqFile (.load())
if hasattr(lib, "spectra"):
spectra = lib.spectra.astype(np.float64) # (n_samples, n_bands)
else:
raw = lib.load() # (lines, samples, 1) o (lines, samples)
raw = np.squeeze(raw)
# BSQ con shape (lines=n_samples, samples=n_bands)
spectra = raw.astype(np.float64)
if spectra.ndim == 1:
spectra = spectra[np.newaxis, :]
# Lunghezze d'onda
if hasattr(lib, "bands") and lib.bands.centers:
wavelengths = np.array(lib.bands.centers, dtype=np.float64)
elif "wavelength" in meta:
wl_raw = meta["wavelength"]
if isinstance(wl_raw, str):
wl_raw = [float(x.strip()) for x in wl_raw.strip("{} ").split(",")]
wavelengths = np.array(wl_raw, dtype=np.float64)
else:
n_b = spectra.shape[1]
wavelengths = np.linspace(400, 2500, n_b)
n_samples, n_bands = spectra.shape
print(f" Spettri caricati : {n_samples}")
print(f" Bande spettrali : {n_bands}")
print(f" Range λ : {wavelengths[0]:.1f} – {wavelengths[-1]:.1f} nm")
return spectra, wavelengths, meta
def get_spectra_names(hdr_path: str) -> list[str]:
"""Legge i nomi degli spettri dal campo 'spectra names' dell'header ENVI."""
names = []
with open(hdr_path, "r") as f:
content = f.read()
if "spectra names" in content:
start = content.find("{", content.find("spectra names")) + 1
end = content.find("}", start)
raw = content[start:end]
names = [n.strip() for n in raw.split(",") if n.strip()]
return names
# =============================================================================
# SEZIONE 2 – HySime: stima del numero di endmember
# =============================================================================
def estimate_noise_hysime(Y: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Stima il rumore con il metodo Multiple Regression (Chang & Du 2004).
Implementazione vettorizzata: ~50-100x più veloce rispetto al loop per banda.
Trick chiave: la regressione OLS di ogni banda su tutte le altre può essere
riscritta usando la formula di Sherman-Morrison-Woodbury.
Dato B = (Y Y^T)^{-1} calcolato una sola volta, i coefficienti per la
banda i sono:
w_i = B y_i (proiezione su tutte le bande)
ma dobbiamo escludere la banda i stessa dal set di predittori.
Formula leave-one-out equivalente:
residual_i = y_i - y_i^T B y_i / B[i,i] * ... (rank-1 update)
In realtà usiamo un approccio ancora più semplice e numericamente stabile:
la covarianza del rumore diagonale si ottiene da:
Rn[i,i] = (R_yy^{-1})[i,i]^{-1} (inversa dell'elemento diagonale
della matrice di precisione)
che è la formula esatta del metodo di Bioucas-Dias (2008).
Parameters
----------
Y : np.ndarray shape (n_bands, n_pixels)
Returns
-------
Rn : np.ndarray covarianza diagonale del rumore (n_bands, n_bands)
noise : np.ndarray immagine rumore (n_bands, n_pixels)
"""
n_bands, n_pixels = Y.shape
# ── Matrice di covarianza campionaria ────────────────────────────────────
Ryy = (Y @ Y.T) / n_pixels # (n_bands, n_bands)
# ── Regolarizzazione per stabilità numerica ──────────────────────────────
eps = 1e-8 * np.trace(Ryy) / n_bands
Ryy_reg = Ryy + eps * np.eye(n_bands)
# ── Inversa di Ryy (matrice di precisione) ───────────────────────────────
# Rn[i,i] = 1 / P[i,i] dove P = Ryy^{-1}
# Questo è il risultato analitico della regressione leave-one-out
try:
P = np.linalg.inv(Ryy_reg) # (n_bands, n_bands)
except np.linalg.LinAlgError:
P = np.linalg.pinv(Ryy_reg)
# ── Varianza del rumore per ogni banda ──────────────────────────────────
noise_var = 1.0 / np.maximum(np.diag(P), 1e-12) # (n_bands,)
Rn = np.diag(noise_var) # (n_bands, n_bands)
# ── Stima del rumore per pixel (opzionale, usata solo per debug) ────────
# Coefficienti OLS vettorizzati: C = P @ Ryy (tutti i predittori insieme)
# Per semplicità: noise = y_i - predicted_i usando i coefficienti globali
# Nota: non è usata nel calcolo di HySime, quindi la calcoliamo in modo
# approssimato ma veloce.
C = np.diag(noise_var) @ P # (n_bands, n_bands)
# residui: noise[i] = Y[i] - sum_{j!=i} c_ij * Y[j]
# Approx rapida: noise = Y - (I - diag(diag(C))) @ Y
offdiag_C = C - np.diag(np.diag(C))
noise = Y - offdiag_C @ Y # (n_bands, n_pixels)
return Rn, noise
def hysime(Y: np.ndarray, verbose: bool = True) -> int:
"""
HySime – Hyperspectral Signal Identification by Minimum Error.
Bioucas-Dias & Nascimento (2008), IEEE TGRS 46(12):4116-4125.
Parameters
----------
Y : np.ndarray shape (n_bands, n_pixels) – immagine iperspettrale
Returns
-------
p : int numero stimato di endmember (virtual dimensionality)
"""
print("\n[2] HySime – stima numero endmember")
n_bands, n_pixels = Y.shape
# ---- Stima rumore
Rn, noise = estimate_noise_hysime(Y)
# Matrice covarianza del segnale = Ry - Rn
Ry = (Y @ Y.T) / n_pixels
Rx = Ry - Rn
# ---- Autodecomposizione di Ry
eigenvalues_y, eigenvectors_y = np.linalg.eigh(Ry)
# Ordine decrescente
idx = np.argsort(eigenvalues_y)[::-1]
eigenvalues_y = eigenvalues_y[idx]
eigenvectors_y = eigenvectors_y[:, idx]
# ---- Calcolo errore in funzione di p
errors = []
for p in range(1, n_bands + 1):
# Proiettore sul sottospazio di dimensione p
Ep = eigenvectors_y[:, :p]
proj = Ep @ Ep.T
# Errore: tr(Rx) - tr(proj @ Rx @ proj.T) + tr(proj @ Rn @ proj.T)
err = (np.trace(Rx)
- np.trace(proj @ Rx @ proj.T)
+ np.trace(proj @ Rn @ proj.T))
errors.append(err)
errors = np.array(errors)
p_opt = int(np.argmin(errors)) + 1 # +1 per indice 1-based
if verbose:
print(f" Dimensione immagine : {n_bands} bande × {n_pixels} pixel")
print(f" p ottimale (HySime) : {p_opt}")
# Stampa errori attorno al minimo
lo = max(0, p_opt - 3)
hi = min(n_bands, p_opt + 4)
print(f" Errori HySime (p={lo+1}…{hi}):")
for i in range(lo, hi):
mark = " ◀ ottimo" if i == p_opt - 1 else ""
print(f" p={i+1:3d} err={errors[i]:.6e}{mark}")
return p_opt, errors
# =============================================================================
# SEZIONE 3 – VCA: Vertex Component Analysis
# =============================================================================
def vca(Y: np.ndarray, p: int, snr_input: float = None,
verbose: bool = True) -> tuple[np.ndarray, np.ndarray]:
"""
VCA – Vertex Component Analysis.
Nascimento & Bioucas-Dias (2005), IEEE TGRS 43(4):898-910.
Parameters
----------
Y : np.ndarray shape (n_bands, n_pixels)
p : int numero di endmember
snr_input : float SNR in dB (se None viene stimato automaticamente)
Returns
-------
endmembers : np.ndarray shape (n_bands, p)
indices : np.ndarray shape (p,) indici pixel selezionati
"""
print(f"\n[3] VCA – estrazione {p} endmember")
n_bands, n_pixels = Y.shape
# ---- Stima SNR
r_mean = Y.mean(axis=1, keepdims=True)
Yc = Y - r_mean
_, s_val, _ = svd(Yc, full_matrices=False)
p_YY = s_val[:p] ** 2 / n_pixels
p_N = (np.sum(Yc ** 2) / n_pixels - np.sum(p_YY))
p_N = max(p_N, 1e-12)
snr_est = 10 * np.log10(np.sum(p_YY) / (n_bands * p_N))
snr_use = snr_input if snr_input is not None else snr_est
if verbose:
print(f" SNR stimato : {snr_est:.1f} dB (usato: {snr_use:.1f} dB)")
SNR_th = 15 + 10 * np.log10(p)
# ---- Proiezione
max_components = min(n_bands, n_pixels) - 1
if snr_use > SNR_th:
d = min(p, max_components)
U, _, _ = svd(Yc, full_matrices=False)
Ud2 = U[:, :d] # (n_bands, d)
x = Ud2.T @ Yc # (d, n_pixels)
x_sum = np.sqrt(np.clip(1 - np.sum(x ** 2, axis=0, keepdims=True), 0, None))
Yp = np.vstack([x, x_sum]) # (d+1, n_pixels)
else:
d = min(p - 1, max_components)
Yd = Y - r_mean
U, _, _ = svd(Yd, full_matrices=False)
Ud2 = U[:, :d] # (n_bands, d)
x = Ud2.T @ Yd # (d, n_pixels)
Yp = np.vstack([x, np.ones((1, n_pixels))]) # (d+1, n_pixels)
# ---- Ciclo VCA: proiezioni ortogonali successive
p_dim = Yp.shape[0]
endmember_proj = np.zeros((p_dim, p))
indices = np.zeros(p, dtype=int)
A = np.zeros((p_dim, p))
# Inizializzazione con vettore casuale
rng = np.random.default_rng(42)
A[:, 0] = rng.standard_normal(p_dim)
A[:, 0] /= np.linalg.norm(A[:, 0])
for i in range(p):
# Proiettore ortogonale
w = rng.standard_normal(p_dim)
f = w - A @ (A.T @ w)
f /= (np.linalg.norm(f) + 1e-12)
# Trova il pixel con proiezione massima
v = f @ Yp
k = np.argmax(np.abs(v))
indices[i] = k
endmember_proj[:, i] = Yp[:, k]
A[:, i] = Yp[:, k] / (np.linalg.norm(Yp[:, k]) + 1e-12)
# ---- Recupera spettri originali
endmembers = Y[:, indices]
if verbose:
print(f" Indici pixel estratti: {indices}")
print(f" Shape endmember matrix: {endmembers.shape}")
return endmembers, indices
# =============================================================================
# SEZIONE 4 – Identificazione vs USGS Spectral Library
# =============================================================================
def load_usgs_library(hdr_path: str) -> tuple[np.ndarray, np.ndarray, list[str]]:
"""
Carica la USGS Spectral Library in formato ENVI.
Gestisce automaticamente:
- Librerie SpectralLibrary (.spectra) e BsqFile (.load())
- Unità in µm (converte in nm se necessario)
- Flag SPECPR di valori non validi (-1.23e34) → NaN → 0
- Spettri con tutti i valori non validi (vengono scartati)
Returns
-------
spectra : np.ndarray (n_ref, n_bands_ref) valori in [0,1]
wavelengths : np.ndarray (n_bands_ref,) in nm
names : list[str]
"""
print(f"\n[4] Caricamento USGS Spectral Library: {hdr_path}")
lib = envi.open(hdr_path)
meta_u = lib.metadata
# ── Caricamento dati ────────────────────────────────────────────────────
if hasattr(lib, "spectra"):
spectra = lib.spectra.astype(np.float64)
else:
raw = lib.load()
spectra = np.squeeze(raw).astype(np.float64)
if spectra.ndim == 1:
spectra = spectra[np.newaxis, :]
# ── Lunghezze d'onda ────────────────────────────────────────────────────
if hasattr(lib, "bands") and lib.bands.centers:
wavelengths = np.array(lib.bands.centers, dtype=np.float64)
elif "wavelength" in meta_u:
wl_raw = meta_u["wavelength"]
if isinstance(wl_raw, (list, tuple)):
wavelengths = np.array(wl_raw, dtype=np.float64)
else:
wavelengths = np.array(
[float(x.strip()) for x in str(wl_raw).strip("{} ").split(",")],
dtype=np.float64)
else:
wavelengths = np.linspace(400, 2500, spectra.shape[1])
# ── Conversione µm → nm (USGS splib07 usa spesso i µm) ─────────────────
wl_units = str(meta_u.get("wavelength units", "")).lower()
if "micron" in wl_units or "micrometer" in wl_units or (wavelengths.max() < 50):
print(f" [!] Unità rilevate: µm → conversione in nm")
wavelengths = wavelengths * 1000.0
# ── Pulizia valori non validi (flag SPECPR = -1.23e34) ──────────────────
bad_flag = -1.23e34
bad_mask = (spectra < bad_flag * 0.01) | (spectra > 1e10)
spectra[bad_mask] = np.nan
# Sostituisci NaN con 0 (dopo aver tenuto traccia degli spettri buoni)
spectra = np.where(np.isnan(spectra), 0.0, spectra)
# ── Clip in [0, 1] (riflettanza) ────────────────────────────────────────
spectra = np.clip(spectra, 0.0, 1.0)
# ── Nomi ────────────────────────────────────────────────────────────────
names = get_spectra_names(hdr_path)
if not names:
names = [f"Ref_{i:04d}" for i in range(spectra.shape[0])]
print(f" Spettri USGS : {spectra.shape[0]}")
print(f" Bande USGS : {spectra.shape[1]}")
print(f" Range λ USGS : {wavelengths[0]:.1f} – {wavelengths[-1]:.1f} nm")
print(f" Unità header : {meta_u.get('wavelength units', 'non specificato')}")
return spectra, wavelengths, names
def resample_spectra(spectra: np.ndarray, wl_src: np.ndarray,
wl_dst: np.ndarray) -> np.ndarray:
"""
Ricampiona `spectra` dalle lunghezze d'onda sorgente a quelle destinazione
mediante interpolazione lineare (con extrapolazione = 0).
Parameters
----------
spectra : (n, m) m = bande sorgente
wl_src : (m,)
wl_dst : (k,)
Returns
-------
resampled : (n, k)
"""
n = spectra.shape[0]
resampled = np.zeros((n, len(wl_dst)))
for i in range(n):
f = interp1d(wl_src, spectra[i], kind="linear",
bounds_error=False, fill_value=0.0)
resampled[i] = f(wl_dst)
return resampled
def spectral_angle_mapper(a: np.ndarray, b: np.ndarray) -> float:
"""SAM tra due spettri (radianti)."""
dot = np.dot(a, b)
norm = np.linalg.norm(a) * np.linalg.norm(b)
if norm < 1e-12:
return np.pi / 2
return np.arccos(np.clip(dot / norm, -1.0, 1.0))
# Gruppi di minerali spettralmente simili tra cui SAM da solo non distingue bene.
# Per questi gruppi si usa SFF (Spectral Feature Fitting) sulle sole bande SWIR
# dopo rimozione del continuum, che è molto più discriminante.
PHYLLOSILICATE_GROUPS = [
# Argille 2:1 – feature diagnostica a ~2200-2210 nm (Al-OH) + 1900 nm (H2O)
["montmorillonite", "smectite", "nontronite", "beidellite",
"saponite", "hectorite"],
# Miche dioctaedriche – feature a ~2195-2200 nm + spalla a ~2340 nm
["illite", "muscovite", "phengite", "paragonite", "glauconite",
"sericite", "alunite"],
# Caolinite-serpentino – doppietto a ~2160+2200 nm molto stretto
["kaolinite", "dickite", "nacrite", "halloysite",
"serpentine", "antigorite", "lizardite", "chrysotile"],
# Cloriti – feature a ~2250-2350 nm (Mg-OH/Fe-OH)
["chlorite", "chamosite", "clinochlore", "pennantite"],
]
def _mineral_group(name: str) -> int:
"""Ritorna l'indice del gruppo filosilicatico del minerale, -1 se non appartiene."""
name_l = name.lower()
for g, group in enumerate(PHYLLOSILICATE_GROUPS):
if any(m in name_l for m in group):
return g
return -1
def _remove_continuum(spectrum: np.ndarray, wavelengths: np.ndarray) -> np.ndarray:
"""
Rimuove il continuum convex-hull dallo spettro (metodo Clark & Roush 1984).
Restituisce lo spettro con continuum rimosso (valori in [0,1], 1=continuum).
"""
from scipy.spatial import ConvexHull
n = len(spectrum)
pts = np.column_stack([wavelengths, spectrum])
# Aggiungi punti estremi per ancorare il continuum
hull_pts = np.vstack([[wavelengths[0], 0], pts, [wavelengths[-1], 0]])
try:
hull = ConvexHull(hull_pts)
# Prendi solo i vertici del upper hull (y > 0)
vertices = sorted(hull.vertices)
upper = [(hull_pts[v, 0], hull_pts[v, 1]) for v in vertices
if hull_pts[v, 1] >= 0]
upper = sorted(upper, key=lambda x: x[0])
# Interpola il continuum
wl_cont = [p[0] for p in upper]
sp_cont = [p[1] for p in upper]
continuum = np.interp(wavelengths, wl_cont, sp_cont)
except Exception:
continuum = np.ones(n)
continuum = np.where(continuum < 1e-6, 1e-6, continuum)
return spectrum / continuum
def _swir_sff_score(em: np.ndarray, ref: np.ndarray,
wl: np.ndarray) -> float:
"""
SFF (Spectral Feature Fitting) sul solo range SWIR (2000–2500 nm).
Score = correlazione di Pearson tra le feature di assorbimento (dopo
rimozione del continuum) pesata per la profondità delle feature.
Valore più alto = migliore match (range [−1, 1]).
"""
swir_mask = (wl >= 2000) & (wl <= 2500)
if swir_mask.sum() < 5:
return 0.0
wl_s = wl[swir_mask]
em_s = _remove_continuum(em[swir_mask], wl_s)
ref_s = _remove_continuum(ref[swir_mask], wl_s)
# Inverti: assorbimento = 1 - CR
em_abs = 1.0 - em_s
ref_abs = 1.0 - ref_s
# Pearson
if em_abs.std() < 1e-6 or ref_abs.std() < 1e-6:
return 0.0
return float(np.corrcoef(em_abs, ref_abs)[0, 1])
def sid(a: np.ndarray, b: np.ndarray) -> float:
"""
Spectral Information Divergence (SID).
Chang (2000) – misura la divergenza di informazione tra due spettri
trattati come distribuzioni di probabilità.
Più sensibile di SAM alle differenze locali di profondità di banda:
due spettri con angolo simile ma bande di assorbimento diverse avranno
SID alto.
Returns
-------
sid_val : float >= 0, più basso = più simile (0 = identici)
"""
# Shift per garantire valori positivi (riflettanza può essere 0)
a = a - a.min() + 1e-8
b = b - b.min() + 1e-8
# Normalizza a distribuzioni di probabilità
pa = a / a.sum()
pb = b / b.sum()
# Divergenza simmetrica KL
d_ab = np.sum(pa * np.log(pa / pb))
d_ba = np.sum(pb * np.log(pb / pa))
return float(d_ab + d_ba)
def sid_sam(a: np.ndarray, b: np.ndarray, sam_deg: float) -> float:
"""
SID-SAM combinato: SID x tan(SAM).
Du et al. (2004) – prodotto che amplifica le differenze quando
entrambe le metriche concordano nel segnalare dissimilarità.
Più basso = più simile.
"""
sam_rad = np.radians(np.clip(sam_deg, 0.0, 89.9))
return sid(a, b) * np.tan(sam_rad)
def _combined_score(sam_deg: float, sff: float, sid_sam_val: float = 0.0,
w_sam: float = 0.25, w_sff: float = 0.55,
w_sid: float = 0.20) -> float:
"""
Score combinato SAM + SFF + SID-SAM.
Tutte e tre le componenti vengono convertite in similarità [0, 1]:
- SAM -> sim = 1 - sam_deg/90 (angolo spettrale globale)
- SFF -> già in [0,1], 1 = perfetto (feature fitting SWIR)
- SID-SAM -> sim = 1/(1 + sid_sam_val) (divergenza informativa)
SFF rimane il peso dominante per i fillosilicati (discrimina le bande
diagnostiche Al-OH/Mg-OH). SID-SAM cattura differenze di profondità
di banda che SAM ignora. SAM mantiene il ruolo di filtro globale.
"""
sam_sim = 1.0 - sam_deg / 90.0
sid_sim = 1.0 / (1.0 + sid_sam_val) if sid_sam_val > 0.0 else 0.5
return w_sam * sam_sim + w_sff * sff + w_sid * sid_sim
def _prepare_usgs_pool(wl_em, wl_usgs, usgs_spectra, usgs_names):
"""
Calcola overlap, ricampiona USGS e filtra spettri non validi.
Ritorna (usgs_valid, usgs_names_valid, wl_common, em_mask).
"""
wl_lo = max(wl_em[0], wl_usgs[0])
wl_hi = min(wl_em[-1], wl_usgs[-1])
if wl_lo >= wl_hi:
raise ValueError(
f"[ERRORE] Nessuna sovrapposizione spettrale tra FieldSpec "
f"({wl_em[0]:.1f}–{wl_em[-1]:.1f} nm) e USGS "
f"({wl_usgs[0]:.1f}–{wl_usgs[-1]:.1f} nm).\n"
f" Verifica che le unità siano entrambe in nm."
)
em_mask = (wl_em >= wl_lo) & (wl_em <= wl_hi)
wl_common = wl_em[em_mask]
usgs_res = resample_spectra(usgs_spectra, wl_usgs, wl_common)
valid_cnt = (usgs_res > 1e-4).sum(axis=1)
valid_msk = valid_cnt >= int(0.30 * len(wl_common))
if valid_msk.sum() == 0:
raise ValueError(
"[ERRORE] Nessuno spettro USGS valido dopo ricampionamento.\n"
f" Range FieldSpec : {wl_em[0]:.1f}–{wl_em[-1]:.1f} nm\n"
f" Range USGS : {wl_usgs[0]:.1f}–{wl_usgs[-1]:.1f} nm"
)
usgs_valid = usgs_res[valid_msk]
usgs_names_valid = [n for n, v in zip(usgs_names, valid_msk) if v]
return usgs_valid, usgs_names_valid, wl_common, em_mask
def sam_matrix(endmembers_c: np.ndarray, usgs_valid: np.ndarray) -> np.ndarray:
"""
Calcola la matrice SAM (n_em, n_usgs) vettorizzata.
endmembers_c : (n_bands, p)
usgs_valid : (n_usgs, n_bands)
"""
# Normalizza
em_norm = endmembers_c / (np.linalg.norm(endmembers_c, axis=0, keepdims=True) + 1e-12)
us_norm = usgs_valid / (np.linalg.norm(usgs_valid, axis=1, keepdims=True) + 1e-12)
dot = em_norm.T @ us_norm.T # (p, n_usgs)
angles = np.arccos(np.clip(dot, -1.0, 1.0))
return np.degrees(angles) # (p, n_usgs)
def identify_endmembers(endmembers: np.ndarray, wl_em: np.ndarray,
usgs_spectra: np.ndarray, wl_usgs: np.ndarray,
usgs_names: list[str],
top_k: int = 5,
max_minerals: int = 10,
residual_iters: int = 3,
sam_threshold_deg: float = 10.0) -> list[dict]:
"""
Identificazione endmember con strategia a tre livelli progettata per
discriminare correttamente minerali argillosi spettralmente simili
(montmorillonite, illite, kaolinite, clorite).
LIVELLO 1 – SAM globale
Primo filtro rapido: riduce il pool USGS ai candidati entro 25° dall'EM.
LIVELLO 2 – Score combinato SAM + SFF SWIR (per fillosilicati)
Se l'EM appartiene al gruppo filosilicatico, si usa Spectral Feature
Fitting sul solo range 2000–2500 nm dopo rimozione del continuum.
Questo range contiene le feature diagnostiche Al-OH/Mg-OH/Fe-OH che
distinguono illite (2195 nm) da kaolinite (2160+2200 nm doppietto)
da montmorillonite (2210 nm largo + 1900 nm H2O).
LIVELLO 3 – Greedy matching con esclusione di gruppo
Ogni minerale assegnato esclude dal pool tutti i minerali dello
stesso gruppo spettralmente simile, evitando che illite, muscovite
e sericite vengano assegnate tutte allo stesso EM.
LIVELLO 4 – Estrazione iterativa sul residuo
Se i minerali trovati sono < max_minerals, si proietta il residuo
sul pool USGS rimanente per trovare componenti minori.
Parameters
----------
endmembers : np.ndarray (n_bands, p) endmember VCA
wl_em : np.ndarray (n_bands,) lunghezze d'onda FieldSpec
usgs_spectra : np.ndarray (n_ref, n_ref_bands)
wl_usgs : np.ndarray (n_ref_bands,)
usgs_names : list[str]
top_k : int quanti match mostrare per ogni endmember
max_minerals : int numero target di minerali distinti (default 10)
residual_iters : int numero massimo di iterazioni sul residuo
sam_threshold_deg : float soglia SAM in gradi per accettare un match residuo
Returns
-------
results : list[dict] con campi:
endmember_idx – indice progressivo (0-based)
source – 'VCA' o 'residual_iter_N'
best_match – nome USGS assegnato
best_angle_deg – angolo SAM vs best_match
sff_score – score SFF SWIR (1.0 = perfetto, solo per fillosilicati)
top_k_matches – lista (nome, angolo, sff) dei top-k
spectrum – spettro (n_bands,) dell'endmember o del residuo
"""
print(f"\n[4b] Identificazione endmember (SAM + SFF SWIR + greedy clay-aware)")
print(f" Range λ endmember : {wl_em[0]:.1f} – {wl_em[-1]:.1f} nm ({len(wl_em)} bande)")
print(f" Range λ USGS : {wl_usgs[0]:.1f} – {wl_usgs[-1]:.1f} nm ({len(wl_usgs)} bande)")
print(f" Minerali target : {max_minerals}")
usgs_valid, usgs_names_valid, wl_common, em_mask = _prepare_usgs_pool(
wl_em, wl_usgs, usgs_spectra, usgs_names)
print(f" Overlap λ : {wl_common[0]:.1f} – {wl_common[-1]:.1f} nm")
print(f" Spettri USGS validi: {len(usgs_names_valid)}")
endmembers_c = endmembers[em_mask, :] # (n_overlap, p)
p_vca = endmembers_c.shape[1]
# Pool: indice USGS → disponibile (True/False)
available = list(range(len(usgs_names_valid)))
# Traccia quali gruppi filosilicatici sono già stati assegnati
assigned_groups: set = set()
results = []
def _score_candidate(em_spec, usgs_idx, sam_deg):
"""Score combinato SAM + SFF + SID-SAM per un candidato USGS."""
ref_spec = usgs_valid[usgs_idx]
ref_name = usgs_names_valid[usgs_idx]
grp = _mineral_group(ref_name)
if grp >= 0:
# Fillosilicato: usa SFF SWIR come discriminatore primario
sff = _swir_sff_score(em_spec, ref_spec, wl_common)
else:
sff = 0.0
# SID-SAM: sempre calcolato, cattura differenze di profondità di banda
# che SAM ignora (utile anche per minerali non filosilicatici)
sid_sam_val = sid_sam(em_spec, ref_spec, sam_deg)
return _combined_score(sam_deg, sff, sid_sam_val), sff, sid_sam_val
# ─────────────────────────────────────────────────────────────────────────
# FASE A – Greedy matching degli endmember VCA con SAM + SFF
# ─────────────────────────────────────────────────────────────────────────
print(f"\n FASE A – Greedy SAM+SFF su {p_vca} endmember VCA")
angles_all = sam_matrix(endmembers_c, usgs_valid) # (p_vca, n_usgs)
for i in range(p_vca):
if not available:
print(f" [!] Pool USGS esaurito all'endmember #{i+1}")
break
em_spec = endmembers_c[:, i]
# Step 1: pre-filtro SAM < 35° tra i disponibili
candidates = [(idx, angles_all[i, idx]) for idx in available
if angles_all[i, idx] <= 35.0]
if not candidates:
# Allarga soglia se nessun candidato
candidates = [(idx, angles_all[i, idx]) for idx in available]
candidates.sort(key=lambda x: x[1])
# Step 2: calcola score combinato sui top-20 candidati SAM
top_cands = candidates[:20]
scored = []
for usgs_idx, sam_deg in top_cands:
score, sff, sid_val = _score_candidate(em_spec, usgs_idx, sam_deg)
scored.append((usgs_idx, sam_deg, sff, sid_val, score))
scored.sort(key=lambda x: -x[4]) # ordine decrescente di score
# Step 3: scegli il miglior candidato non già escluso per gruppo
chosen = None
for usgs_idx, sam_deg, sff, sid_v, score in scored:
ref_name = usgs_names_valid[usgs_idx]
grp = _mineral_group(ref_name)
# Blocca se il GRUPPO è già rappresentato
# (ma solo se c'è un'alternativa non filosilicatica disponibile,
# altrimenti assegna comunque il miglior candidato)
if grp >= 0 and grp in assigned_groups:
non_phyllo_avail = [x for x in available
if _mineral_group(usgs_names_valid[x]) != grp]
if non_phyllo_avail:
continue # salta, cerca alternative
chosen = (usgs_idx, sam_deg, sff, sid_v, score)
break
if chosen is None:
chosen = scored[0] # fallback: miglior score assoluto
best_usgs_idx, best_angle, best_sff, best_sid, best_score = chosen
best_name = usgs_names_valid[best_usgs_idx]
best_grp = _mineral_group(best_name)
# Top-k tra i candidati per il report
top_k_real = min(top_k, len(scored))
top_matches = [(usgs_names_valid[idx], ang, sff_v, sid_v)
for idx, ang, sff_v, sid_v, _ in scored[:top_k_real]]
results.append({
"endmember_idx" : i,
"source" : "VCA",
"best_match" : best_name,
"best_angle_deg" : best_angle,
"sff_score" : best_sff,
"sid_sam_score" : best_sid,
"top_k_matches" : top_matches,
"spectrum" : endmembers_c[:, i],
})
available.remove(best_usgs_idx)
if best_grp >= 0:
assigned_groups.add(best_grp)
metrics = f"SAM={best_angle:.1f}°"
if best_sff > 0: metrics += f" SFF={best_sff:.3f}"
metrics += f" SID-SAM={best_sid:.4f} score={best_score:.3f}"
print(f" EM #{i+1:2d} → {best_name} ({metrics})")
# ─────────────────────────────────────────────────────────────────────────
# FASE B – Estrazione iterativa sul residuo
# ─────────────────────────────────────────────────────────────────────────
n_found = len(results)
if n_found < max_minerals and available:
print(f"\n FASE B – Estrazione residuo ({n_found} trovati, target {max_minerals})")
E_cur = np.column_stack([r["spectrum"] for r in results
if r["source"] == "VCA"])
Y_proxy = endmembers_c.mean(axis=1)
from scipy.optimize import nnls as _nnls
for it in range(1, residual_iters + 1):
if n_found >= max_minerals or not available:
break
coeffs, _ = _nnls(E_cur, Y_proxy)
s = coeffs.sum()
if s > 1e-12:
coeffs /= s
residual_spec = Y_proxy - E_cur @ coeffs
if residual_spec.std() < 1e-4:
print(f" iter {it}: residuo piatto, stop")
break
# Score combinato anche per il residuo
cands_res = []
for idx in available:
sam_deg = np.degrees(spectral_angle_mapper(residual_spec,
usgs_valid[idx]))
score, sff, sid_val = _score_candidate(residual_spec, idx, sam_deg)
cands_res.append((idx, sam_deg, sff, sid_val, score))
cands_res.sort(key=lambda x: -x[4])
best_usgs_idx, best_angle, best_sff, best_sid, best_score = cands_res[0]
if best_angle > sam_threshold_deg and best_sff < 0.5:
print(f" iter {it}: nessun match accettabile "
f"(SAM={best_angle:.1f}°, SFF={best_sff:.3f}), stop")
break
best_name = usgs_names_valid[best_usgs_idx]
best_spec = usgs_valid[best_usgs_idx]
top_k_real = min(top_k, len(cands_res))
top_matches = [(usgs_names_valid[idx], ang, sff_v, sid_v)
for idx, ang, sff_v, sid_v, _ in cands_res[:top_k_real]]
em_idx = len(results)
results.append({
"endmember_idx" : em_idx,
"source" : f"residual_iter_{it}",
"best_match" : best_name,
"best_angle_deg" : best_angle,
"sff_score" : best_sff,
"sid_sam_score" : best_sid,
"top_k_matches" : top_matches,
"spectrum" : best_spec,
})
available.remove(best_usgs_idx)
n_found += 1
E_cur = np.column_stack([E_cur, best_spec])
metrics_b = f"SAM={best_angle:.1f}°"
if best_sff > 0: metrics_b += f" SFF={best_sff:.3f}"
metrics_b += f" SID-SAM={best_sid:.4f}"
print(f" iter {it}: +{best_name} ({metrics_b}) [{n_found}/{max_minerals}]")
# ─────────────────────────────────────────────────────────────────────────
# RIEPILOGO
# ─────────────────────────────────────────────────────────────────────────
print(f"\n ── Riepilogo identificazione ──")
print(f" Totale minerali identificati: {len(results)}")
for r in results:
src = "VCA " if r["source"] == "VCA" else "res."
sff_s = f" SFF={r['sff_score']:.3f}" if r.get("sff_score", 0) > 0 else ""
sid_s = f" SID-SAM={r['sid_sam_score']:.4f}" if r.get("sid_sam_score") is not None else ""
print(f" [{src}] EM #{r['endmember_idx']+1:2d} "
f"{r['best_match'][:50]} SAM={r['best_angle_deg']:.1f}°{sff_s}{sid_s}")
return results
# =============================================================================
# SEZIONE 5 – FCLS: Fully Constrained Least Squares
# =============================================================================
def fcls(Y: np.ndarray, E: np.ndarray,
max_iter: int = 1000, tol: float = 1e-6) -> np.ndarray:
"""
FCLS – Fully Constrained Least Squares Unmixing.
Impone: (1) abbondanze ≥ 0 (2) sum(abbondanze) = 1 per ogni pixel.
Approccio: proietta il problema vincolato in un problema NNLS aumentato.
Chang & Heinz (2000).
Parameters
----------
Y : np.ndarray (n_bands, n_pixels) – dati da smisurare
E : np.ndarray (n_bands, p) – matrice endmember
Returns
-------
A : np.ndarray (p, n_pixels) – mappe abbondanza
"""
print(f"\n[5] FCLS – quantificazione abbondanze")
n_bands, n_pixels = Y.shape
p = E.shape[1]
# ---- Formulazione aumentata con vincolo di somma = 1
# Aggiungi una riga di delta*1 a E e a Y
delta = 100.0 # peso del vincolo di somma (trade-off)
E_aug = np.vstack([E, delta * np.ones((1, p))]) # (n_bands+1, p)
Y_aug = np.vstack([Y, delta * np.ones((1, n_pixels))]) # (n_bands+1, n_pixels)
A = np.zeros((p, n_pixels))
for k in range(n_pixels):
y_k = Y_aug[:, k]
a_k, _ = nnls(E_aug, y_k)
# Normalizza per vincolo somma = 1
s = a_k.sum()
if s > 1e-12:
a_k /= s
A[:, k] = a_k
# ---- Statistiche
reconstruction = E @ A
residual = Y - reconstruction
rmse = np.sqrt(np.mean(residual ** 2))
print(f" Abbondanze shape : {A.shape}")
print(f" RMSE ricostruzione: {rmse:.6f}")
print(f" Abbondanze medie per endmember:")
for i in range(p):
print(f" EM #{i+1}: mean={A[i].mean():.4f} "
f"std={A[i].std():.4f} "
f"min={A[i].min():.4f} max={A[i].max():.4f}")
return A
# =============================================================================
# SEZIONE 6 – Visualizzazioni
# =============================================================================
def plot_hysime_error(errors: np.ndarray, p_opt: int,
save_path: str = None):
"""Curva errore HySime vs numero di endmember."""
fig, ax = plt.subplots(figsize=(8, 4))
x = np.arange(1, len(errors) + 1)
ax.plot(x, errors, "o-", color="#2196F3", linewidth=2, markersize=5)
ax.axvline(p_opt, color="#F44336", linewidth=2, linestyle="--",
label=f"p ottimale = {p_opt}")
ax.set_xlabel("Numero di endmember (p)", fontsize=12)
ax.set_ylabel("Errore HySime", fontsize=12)
ax.set_title("HySime – Curva errore vs numero endmember", fontsize=13, fontweight="bold")
ax.legend(fontsize=11)
ax.set_xlim(1, min(30, len(errors)))
ax.grid(True, alpha=0.3)
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
print(f" Plot HySime salvato: {save_path}")
def plot_endmembers(endmembers: np.ndarray, wavelengths: np.ndarray,
identification: list[dict], save_path: str = None):
"""Spettri endmember estratti con nome identificato."""
n_em = len(identification) # usa identification come riferimento, non endmembers.shape
n_cols = 2
n_rows = (n_em + 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols,
figsize=(12, 3.5 * n_rows), squeeze=False)
fig.suptitle("VCA Endmembers – Identificazione USGS",
fontsize=14, fontweight="bold", y=1.01)
for i, r in enumerate(identification):
ax = axes[i // n_cols][i % n_cols]
color = PALETTE[i % len(PALETTE)]
em_col = r["endmember_idx"]
ax.plot(wavelengths, endmembers[:, em_col], color=color, linewidth=1.8)
src_label = "" if r["source"] == "VCA" else f" [{r['source']}]"
ax.set_title(
f"EM #{i+1}{src_label} – {r['best_match']}\n"
f"SAM: {r['best_angle_deg']:.1f}°",
fontsize=9, fontweight="bold"
)
ax.set_xlabel("λ (nm)", fontsize=9)
ax.set_ylabel("Riflettanza", fontsize=9)
ax.grid(True, alpha=0.25)
# Nascondi assi vuoti
for j in range(n_em, n_rows * n_cols):
axes[j // n_cols][j % n_cols].set_visible(False)
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
print(f" Plot endmember salvato: {save_path}")
def plot_endmember_vs_usgs(endmembers: np.ndarray, wl_em: np.ndarray,
usgs_spectra: np.ndarray, wl_usgs: np.ndarray,
identification: list[dict],
usgs_names: list[str],
save_path: str = None):
"""Confronto endmember estratto vs spettro USGS migliore."""
n_em = len(identification)
usgs_resampled = resample_spectra(usgs_spectra, wl_usgs, wl_em)
usgs_name_to_idx = {n: i for i, n in enumerate(usgs_names)}
n_cols = 2
n_rows = (n_em + 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols,
figsize=(13, 4 * n_rows), squeeze=False)
fig.suptitle("Confronto Endmember VCA vs USGS Library",
fontsize=14, fontweight="bold", y=1.01)
for i, r in enumerate(identification):
ax = axes[i // n_cols][i % n_cols]
em_color = PALETTE[i % len(PALETTE)]
em_col = r["endmember_idx"]
ax.plot(wl_em, endmembers[:, em_col],
color=em_color, linewidth=2.0, label="VCA endmember")
best_name = r["best_match"]
if best_name in usgs_name_to_idx:
ref_idx = usgs_name_to_idx[best_name]
ax.plot(wl_em, usgs_resampled[ref_idx],
color="#333333", linewidth=1.3,
linestyle="--", label="USGS ref")
ax.set_title(
f"EM #{i+1} vs {best_name}\nSAM = {r['best_angle_deg']:.2f}°",
fontsize=9, fontweight="bold"
)
ax.set_xlabel("λ (nm)", fontsize=9)
ax.set_ylabel("Riflettanza", fontsize=9)
ax.legend(fontsize=8)
ax.grid(True, alpha=0.25)
for j in range(n_em, n_rows * n_cols):
axes[j // n_cols][j % n_cols].set_visible(False)
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
print(f" Plot confronto USGS salvato: {save_path}")
def plot_abundances(A: np.ndarray, identification: list[dict],
save_path: str = None):
"""
Visualizza le abbondanze FCLS.
Se i dati vengono da una libreria (non immagine 2D), mostra barplot per spettro.
"""
p, n_pixels = A.shape
labels = [f"EM #{i+1}\n{identification[i]['best_match'][:20]}"
for i in range(p)]
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle("FCLS – Abbondanze degli Endmember",
fontsize=14, fontweight="bold")
# --- Heatmap abbondanze (spettri x endmember)
ax = axes[0]
im = ax.imshow(A, aspect="auto", cmap="YlOrRd", vmin=0, vmax=1)
ax.set_yticks(range(p))
ax.set_yticklabels(labels, fontsize=8)
ax.set_xlabel("Indice spettro / pixel", fontsize=10)
ax.set_title("Mappa abbondanze (heatmap)", fontsize=11)
plt.colorbar(im, ax=ax, label="Abbondanza")
# --- Boxplot distribuzione per endmember
ax2 = axes[1]
data_bp = [A[i, :] for i in range(p)]
bp = ax2.boxplot(data_bp, labels=[f"EM #{i+1}" for i in range(p)],
patch_artist=True, notch=False)
for patch, color in zip(bp["boxes"], PALETTE[:p]):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ax2.set_ylabel("Abbondanza", fontsize=10)
ax2.set_title("Distribuzione abbondanze", fontsize=11)
ax2.set_ylim(-0.05, 1.05)
ax2.grid(True, alpha=0.3)
ax2.tick_params(axis="x", labelsize=8)
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
print(f" Plot abbondanze salvato: {save_path}")
def plot_mean_abundances(A: np.ndarray, identification: list[dict],
save_path: str = None):
"""Barplot delle abbondanze medie per endmember."""
p = A.shape[0]
means = A.mean(axis=1)
stds = A.std(axis=1)
labels = [f"EM #{i+1}\n{identification[i]['best_match'][:25]}"
for i in range(p)]
fig, ax = plt.subplots(figsize=(max(8, p * 2), 5))
bars = ax.bar(labels, means, yerr=stds, capsize=6,
color=PALETTE[:p], alpha=0.85, edgecolor="white", linewidth=1.2)
ax.set_ylabel("Abbondanza media (± std)", fontsize=11)
ax.set_title("FCLS – Abbondanze medie degli Endmember",
fontsize=13, fontweight="bold")
ax.set_ylim(0, 1.15)
ax.grid(True, axis="y", alpha=0.3)
for bar, mean_val, std_val in zip(bars, means, stds):
ax.text(bar.get_x() + bar.get_width() / 2,
mean_val + std_val + 0.02,
f"{mean_val:.3f}",
ha="center", va="bottom", fontsize=9, fontweight="bold")
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
print(f" Plot abbondanze medie salvato: {save_path}")
# =============================================================================
# SEZIONE 7 – Report testuale
# =============================================================================
def print_per_spectrum_composition(A: np.ndarray, identification: list[dict],
spectra_names: list[str] = None,
min_abundance: float = 0.01):
"""
Stampa a video e restituisce, per ogni spettro/pixel, la composizione
mineralogica in termini di percentuale per ciascun endmember trovato.
Parameters
----------
A : (p, n_pixels) abbondanze FCLS
identification : lista risultati da identify_endmembers
spectra_names : nomi degli spettri (opzionale, altrimenti Spettro #N)
min_abundance : soglia minima sotto cui non stampare il componente (default 1%)
"""
p, n_pixels = A.shape
if spectra_names is None or len(spectra_names) != n_pixels:
spectra_names = [f"Spettro #{k+1:04d}" for k in range(n_pixels)]
mineral_names = [r["best_match"] for r in identification]
header_width = 72
print()
print("=" * header_width)
print(" COMPOSIZIONE MINERALOGICA PER SPETTRO (FCLS abundances)")
print("=" * header_width)
print(f" {'Spettro':<35} {'Minerale':<30} {'%':>6}")
print(" " + "-" * (header_width - 2))
for k in range(n_pixels):
abundances = A[:, k] # (p,)
# Ordina per abbondanza decrescente, filtra < min_abundance
ranked = sorted(
[(mineral_names[i], abundances[i]) for i in range(p) if abundances[i] >= min_abundance],
key=lambda x: x[1], reverse=True
)
name_trunc = spectra_names[k][:34]
if not ranked:
print(f" {name_trunc:<35} {'(nessun componente > soglia)':<30} {'':>6}")
else:
for j, (mineral, frac) in enumerate(ranked):
label = name_trunc if j == 0 else ""
mineral_trunc = mineral[:29]
print(f" {label:<35} {mineral_trunc:<30} {frac*100:6.2f}%")
if k < n_pixels - 1:
print(" " + "·" * (header_width - 2))
print("=" * header_width)
print(f" Soglia minima visualizzata: {min_abundance*100:.1f}%")
print("=" * header_width)
return mineral_names
def save_report(p_opt: int, identification: list[dict], A: np.ndarray,
wavelengths: np.ndarray, save_path: str):
"""Salva un report testuale con tutti i risultati."""
lines = []
lines.append("=" * 70)
lines.append("SPECTRAL UNMIXING PIPELINE – REPORT")
lines.append("=" * 70)
lines.append(f"\nNumero endmember stimato (HySime): {p_opt}")
lines.append(f"Range spettrale analizzato: {wavelengths[0]:.1f} – {wavelengths[-1]:.1f} nm")
lines.append(f"Numero bande: {len(wavelengths)}")
lines.append("\n" + "-" * 70)
lines.append("IDENTIFICAZIONE ENDMEMBER (SAM vs USGS Spectral Library)")
lines.append("-" * 70)
for r in identification:
i = r["endmember_idx"]
lines.append(f"\nEndmember #{i+1}")
lines.append(f" Best match : {r['best_match']}")
lines.append(f" SAM angle : {r['best_angle_deg']:.2f}°")
if r.get("sff_score", 0) > 0:
lines.append(f" SFF score : {r['sff_score']:.3f}")
if r.get("sid_sam_score") is not None:
lines.append(f" SID-SAM : {r['sid_sam_score']:.4f}")
lines.append(f" Top-5 matches:")
for rank, match in enumerate(r["top_k_matches"], 1):
name, ang = match[0], match[1]
sff_s = f" SFF={match[2]:.3f}" if len(match) > 2 and match[2] > 0 else ""
lines.append(f" {rank}. {name} ({ang:.2f}°{sff_s})")
lines.append("\n" + "-" * 70)
lines.append("QUANTIFICAZIONE FCLS – STATISTICHE ABBONDANZE")
lines.append("-" * 70)
p_fcls = A.shape[0]
mineral_names = [r["best_match"] for r in identification]
for i in range(p_fcls):
src = identification[i].get("source", "VCA")
lines.append(f"\nEndmember #{i+1} [{mineral_names[i]}] (source: {src})")
lines.append(f" Media : {A[i].mean():.4f} ({A[i].mean()*100:.1f}%)")
lines.append(f" Std : {A[i].std():.4f}")
lines.append(f" Min : {A[i].min():.4f}")
lines.append(f" Max : {A[i].max():.4f}")
lines.append("\n" + "-" * 70)
lines.append("COMPOSIZIONE PER SPETTRO (soglia visualizzazione: 1%)")
lines.append("-" * 70)
n_pixels = A.shape[1]
for k in range(n_pixels):
ranked = sorted(
[(mineral_names[i], A[i, k]) for i in range(p_fcls) if A[i, k] >= 0.01],
key=lambda x: x[1], reverse=True
)
lines.append(f"\nSpettro #{k+1:04d}")
if not ranked:
lines.append(" (nessun componente > 1%)")
for mineral, frac in ranked:
lines.append(f" {mineral[:45]:<45} {frac*100:6.2f}%")
lines.append("\n" + "-" * 70)
lines.append("RICOSTRUZIONE")
lines.append("-" * 70)
lines.append(f" Vincolo somma media: {A.sum(axis=0).mean():.4f} (ideale: 1.0)")
report_text = "\n".join(lines)
with open(save_path, "w") as f:
f.write(report_text)
print(f"\n Report salvato: {save_path}")
return report_text
# =============================================================================
# PIPELINE B – Direct per-spectrum matching + FCLS decomposition
# =============================================================================
def match_spectrum_to_usgs(
em_spec: np.ndarray,
em_spec_norm: np.ndarray,
usgs_valid: np.ndarray,
usgs_valid_norm: np.ndarray,
usgs_names_valid: list,
wl_common: np.ndarray,
top_k: int = 5,
sam_prefilter_deg: float = 35.0) -> list[tuple]:
"""
Confronta un singolo spettro con tutta la libreria USGS e restituisce
i top_k match ordinati per score combinato SAM+SFF+SID-SAM.
Parametri
---------
em_spec : spettro originale (riflettanze assolute) sul range comune
em_spec_norm : spettro normalizzato L2 per SAM/SID-SAM
usgs_valid : (n_usgs, n_bands) riflettanze originali
usgs_valid_norm : (n_usgs, n_bands) normalizzate per riga
wl_common : lunghezze d'onda del range comune
Returns
-------
lista di (nome, sam_deg, sff, sid_sam_val, score) ordinata per score desc
"""
# SAM vettorizzato su tutta la libreria
dots = usgs_valid_norm @ em_spec_norm # (n_usgs,)
dots = np.clip(dots, -1.0, 1.0)
angles = np.degrees(np.arccos(dots)) # (n_usgs,)
# Pre-filtro SAM
mask = angles <= sam_prefilter_deg
if mask.sum() == 0:
mask = np.ones(len(angles), dtype=bool) # fallback: tutti
scored = []
for idx in np.where(mask)[0]:
sam_deg = float(angles[idx])
ref_orig = usgs_valid[idx]
ref_norm = usgs_valid_norm[idx]
ref_name = usgs_names_valid[idx]
grp = _mineral_group(ref_name)
sff = _swir_sff_score(em_spec, ref_orig, wl_common) if grp >= 0 else 0.0
sid_val = sid_sam(em_spec_norm, ref_norm, sam_deg)
score = _combined_score(sam_deg, sff, sid_val)
scored.append((ref_name, sam_deg, sff, sid_val, score))
scored.sort(key=lambda x: -x[4])
return scored[:top_k]
def run_pipeline_direct(fieldspec_hdr: str,
usgs_hdr: str,
output_dir: str = ".",
top_k_matches: int = 5,
n_endmembers_fcls: int = 6,
sam_prefilter_deg: float = 35.0,
min_abundance: float = 0.01):
"""
Pipeline di matching diretto spettro-per-spettro.
A differenza di run_pipeline (che usa ATGP/VCA per estrarre endmember
dalla libreria come fosse una scena iperspettrale), questa pipeline:
1. Carica tutti gli spettri FieldSpec del Mugello
2. Per ciascuno, cerca i migliori match nella libreria USGS
tramite SAM + SFF + SID-SAM
3. Costruisce un dizionario di endmember USGS dai top match globali
4. Decompone ogni spettro Mugello con FCLS rispetto a quel dizionario
→ ottieni %kaolinite, %illite, %montmorillonite, ecc. per ogni campione
Questo è il modo corretto di trattare una libreria di campioni di campo:
ogni spettro è già un campione (non un pixel di scena da smisurare).
Parameters
----------
fieldspec_hdr : header ENVI della libreria FieldSpec Mugello
usgs_hdr : header ENVI della libreria USGS
output_dir : cartella output
top_k_matches : top-k match USGS da mostrare per spettro
n_endmembers_fcls : quanti endmember USGS usare per FCLS (default 6)
sam_prefilter_deg : soglia pre-filtro SAM in gradi
min_abundance : soglia minima abbondanza da visualizzare
"""
os.makedirs(output_dir, exist_ok=True)
out = lambda fname: os.path.join(output_dir, fname)
print("=" * 65)
print(" SPECTRAL MATCHING PIPELINE (direct per-spectrum)")
print(" FieldSpec Mugello → SAM+SFF+SID-SAM → USGS → FCLS")
print("=" * 65)
# ── 1. Carica librerie ───────────────────────────────────────────────────
spectra_field, wl_field, meta_field = load_envi_library(fieldspec_hdr)
spectra_names = get_spectra_names(fieldspec_hdr)
if not spectra_names:
spectra_names = [f"Spettro #{k+1:04d}" for k in range(len(spectra_field))]
usgs_spectra, wl_usgs, usgs_names = load_usgs_library(usgs_hdr)
# ── 2. Range comune ──────────────────────────────────────────────────────
usgs_valid_orig, usgs_names_valid, wl_common, em_mask = _prepare_usgs_pool(
wl_field, wl_usgs, usgs_spectra, usgs_names)
# Normalizzazione L2
field_on_common = spectra_field[:, em_mask] # (n_field, n_common)
field_norm = normalize_l2(field_on_common, axis=1) # per riga
usgs_valid_norm = normalize_l2(usgs_valid_orig, axis=1) # per riga
n_field = len(spectra_field)
print(f"\n Spettri FieldSpec : {n_field}")
print(f" Spettri USGS validi: {len(usgs_names_valid)}")
print(f" Range comune : {wl_common[0]:.1f}–{wl_common[-1]:.1f} nm "
f"({len(wl_common)} bande)")
# ── 3. Matching per spettro ──────────────────────────────────────────────
print(f"\n[2] Matching SAM+SFF+SID-SAM (top-{top_k_matches} per spettro)")
all_matches = [] # list of list of (name, sam, sff, sid, score)
for k in range(n_field):
matches = match_spectrum_to_usgs(
em_spec = field_on_common[k],
em_spec_norm = field_norm[k],
usgs_valid = usgs_valid_orig,
usgs_valid_norm = usgs_valid_norm,
usgs_names_valid = usgs_names_valid,
wl_common = wl_common,
top_k = top_k_matches,
sam_prefilter_deg = sam_prefilter_deg,
)
all_matches.append(matches)
name_s = spectra_names[k][:38]
top1 = matches[0]
print(f" {name_s:<40} → {top1[0][:35]} "
f"SAM={top1[1]:.1f}° score={top1[4]:.3f}")
# ── 4. Costruisci dizionario endmember USGS per FCLS ─────────────────────
# Prendi i minerali più frequenti nei top-1 match + quelli ad alto score
from collections import Counter
top1_names = [m[0][0] for m in all_matches]
name_counts = Counter(top1_names)
# Prendi anche i top-2 con score > 0.7 per catturare minerali minori
secondary = []
for matches in all_matches:
for name, sam, sff, sid, score in matches[1:3]:
if score > 0.70:
secondary.append(name)
name_counts.update(secondary)
# Seleziona i top n_endmembers_fcls minerali distinti più frequenti
em_usgs_names = [name for name, _ in name_counts.most_common(n_endmembers_fcls)]
# Recupera indici in usgs_names_valid
name_to_idx = {n: i for i, n in enumerate(usgs_names_valid)}
em_usgs_names = [n for n in em_usgs_names if n in name_to_idx]
em_indices = [name_to_idx[n] for n in em_usgs_names]
print(f"\n[3] Dizionario endmember USGS per FCLS ({len(em_usgs_names)} minerali):")
for j, name in enumerate(em_usgs_names):
print(f" {j+1:2d}. {name}")
# ── 5. FCLS per ogni spettro ─────────────────────────────────────────────
print(f"\n[4] FCLS – decomposizione di ogni spettro Mugello")
E_dict = usgs_valid_orig[em_indices].T # (n_common, n_em)
Y_field = field_on_common.T # (n_common, n_field)
A = fcls(Y_field, E_dict) # (n_em, n_field)
# Costruisce identification-like per la stampa
identification_fcls = [
{"endmember_idx": j, "source": "USGS_dict", "best_match": name,
"best_angle_deg": 0.0, "sff_score": 0.0, "sid_sam_score": 0.0,
"top_k_matches": [], "spectrum": usgs_valid_orig[em_indices[j]]}
for j, name in enumerate(em_usgs_names)
]
# ── 6. Output ────────────────────────────────────────────────────────────
print_per_spectrum_composition(A, identification_fcls,
spectra_names=spectra_names,
min_abundance=min_abundance)
# Salva report matching
_save_matching_report(all_matches, A, em_usgs_names, spectra_names,
wl_common, save_path=out("report_matching.txt"))
# Plot abbondanze medie
plot_mean_abundances(A, identification_fcls,
save_path=out("abundances_mean_direct.png"))
plot_abundances(A, identification_fcls,
save_path=out("abundances_heatmap_direct.png"))
return all_matches, A, em_usgs_names
def _save_matching_report(all_matches, A, em_names, spectra_names,
wl_common, save_path: str):
"""Salva il report testuale del matching diretto."""
lines = ["=" * 70,
"SPECTRAL MATCHING PIPELINE – REPORT DIRETTO",
"=" * 70,
f"Range spettrale: {wl_common[0]:.1f}–{wl_common[-1]:.1f} nm",
f"Spettri analizzati: {len(spectra_names)}",
""]
lines += ["-" * 70, "TOP MATCH PER SPETTRO", "-" * 70]
for k, matches in enumerate(all_matches):
lines.append(f"\nSpettro: {spectra_names[k]}")
for rank, (name, sam, sff, sid, score) in enumerate(matches, 1):
sff_s = f" SFF={sff:.3f}" if sff > 0 else ""
lines.append(f" {rank}. {name[:50]:<50} "
f"SAM={sam:.1f}°{sff_s} SID-SAM={sid:.4f} score={score:.3f}")
lines += ["", "-" * 70, "COMPOSIZIONE FCLS PER SPETTRO", "-" * 70]
for k in range(A.shape[1]):
lines.append(f"\nSpettro: {spectra_names[k]}")
ranked = sorted([(em_names[i], A[i, k]) for i in range(len(em_names))
if A[i, k] >= 0.01], key=lambda x: -x[1])
if not ranked:
lines.append(" (nessun componente > 1%)")
for name, frac in ranked:
lines.append(f" {name[:50]:<50} {frac*100:6.2f}%")
with open(save_path, "w") as f:
f.write("\n".join(lines))
print(f"\n Report salvato: {save_path}")
# =============================================================================
# PIPELINE PRINCIPALE
# =============================================================================
def normalize_l2(M: np.ndarray, axis: int = 0, eps: float = 1e-12) -> np.ndarray:
"""
Normalizzazione L2 per colonne (axis=0) o righe (axis=1).
Per la pipeline di unmixing:
- Y (bande×pixel): normalize_l2(Y, axis=0) → ogni spettro/pixel diventa
vettore unitario, rimuove differenze di albedo assoluta
- USGS (campioni×bande): normalize_l2(M, axis=1) → ogni spettro di riferimento
diventa vettore unitario, garantisce confronto per forma
La normalizzazione è applicata SOLO a ATGP/VCA e al matching SAM/SID-SAM.
Le riflettanze originali vengono conservate per FCLS (che richiede valori assoluti).
"""
norms = np.linalg.norm(M, axis=axis, keepdims=True)
return M / np.maximum(norms, eps)
def run_pipeline(fieldspec_hdr: str,
usgs_hdr: str,
output_dir: str = ".",
p_override: int = None,
top_k_matches: int = 5,
max_minerals: int = 10,
residual_iters: int = 3,
sam_threshold_deg: float = 10.0):
"""
Esegue la pipeline completa di spectral unmixing.
Parameters
----------
fieldspec_hdr : str Percorso all'header ENVI della libreria FieldSpec
usgs_hdr : str Percorso all'header ENVI della USGS Spectral Library
output_dir : str Directory di output per plot e report
p_override : int Se specificato, sovrascrive la stima HySime
top_k_matches : int Numero di match da mostrare nell'identificazione
max_minerals : int Numero target di minerali distinti (default 10)
residual_iters : int Iterazioni massime di estrazione sul residuo
sam_threshold_deg : float Soglia SAM in gradi per accettare match residuo
"""
MAX_MINERALS = max_minerals
RESIDUAL_ITERS = residual_iters
SAM_THRESHOLD_DEG = sam_threshold_deg
os.makedirs(output_dir, exist_ok=True)
out = lambda fname: os.path.join(output_dir, fname)
print("=" * 65)
print(" SPECTRAL UNMIXING PIPELINE")
print(" FieldSpec ENVI → HySime → VCA → USGS ID → FCLS")
print("=" * 65)
# 1. Carica libreria FieldSpec
spectra, wavelengths, meta = load_envi_library(fieldspec_hdr)
n_samples, n_bands = spectra.shape
# Transponi per la pipeline: (bande, pixel)
Y = spectra.T.copy()
# Normalizzazione L2 per ATGP/VCA: ogni pixel diventa vettore unitario.
# Rimuove le differenze di albedo assoluta → l'estrazione è guidata
# dalla forma spettrale, non dalla brillantezza.
# Y_raw viene conservato per FCLS (abbondanze su riflettanze assolute).
Y_raw = Y
Y = normalize_l2(Y, axis=0)
# 2. HySime (saltato se p_override è impostato)
if p_override is not None:
p_hysime = p_override
p = p_override
print(f"\n[2] HySime saltato – p fissato a {p}")
else:
p_hysime, errors = hysime(Y_raw, verbose=True)
plot_hysime_error(errors, p_hysime, save_path=out("hysime_error.png"))
p = p_hysime
# 2. HySime
#p_hysime, errors = hysime(Y, verbose=True)
#plot_hysime_error(errors, p_hysime, save_path=out("hysime_error.png"))
#p = p_override if p_override is not None else p_hysime
#if p_override:
# print(f"\n [!] p_override = {p_override} (HySime suggeriva {p_hysime})")
# Clamp p: non puo' superare n_samples-1 ne' n_bands-1
p = max(2, min(p, n_samples - 1, n_bands - 1))
print(f"\n Numero endmember finale: p = {p}")
# 3. VCA
#endmembers, em_indices = vca(Y, p=p, verbose=True)
endmembers, em_indices = atgp(Y, p=p, verbose=True)
# Normalizza gli endmember per colonna → vettori unitari coerenti con USGS.
# ATGP li estrae da Y già normalizzata, ma li ri-normalizziamo esplicitamente
# per garantire coerenza anche se si passa a VCA o ad altri estrattori.
endmembers_norm = normalize_l2(endmembers, axis=0)
# 4. Carica USGS e identifica
usgs_spectra, wl_usgs, usgs_names = load_usgs_library(usgs_hdr)
# Normalizzazione L2 degli spettri USGS per riga → vettori unitari.
usgs_spectra_norm = normalize_l2(usgs_spectra, axis=1)
identification = identify_endmembers(
endmembers_norm, wavelengths,
usgs_spectra_norm, wl_usgs, usgs_names,
top_k=top_k_matches,
max_minerals=MAX_MINERALS,
residual_iters=RESIDUAL_ITERS,
sam_threshold_deg=SAM_THRESHOLD_DEG,
)
# Ricostruisci matrice endmember estesa (VCA + residuo)
# per passarla a FCLS
_, _, wl_common_full, em_mask_full = _prepare_usgs_pool(
wavelengths, wl_usgs, usgs_spectra, usgs_names) # spettri originali per plot/FCLS
endmembers_ext = np.zeros((len(wavelengths), len(identification)))
for r in identification:
col = np.zeros(len(wavelengths))
col[em_mask_full] = r["spectrum"]
endmembers_ext[:, r["endmember_idx"]] = col
# Plot endmember (usa endmembers_ext: solo i minerali identificati)
plot_endmembers(endmembers_ext, wavelengths, identification,
save_path=out("endmembers.png"))
plot_endmember_vs_usgs(endmembers_ext, wavelengths,
usgs_spectra, wl_usgs,
identification, usgs_names,
save_path=out("endmember_vs_usgs.png"))
# 5. FCLS (usa matrice endmember estesa con VCA + residuo)
A = fcls(Y_raw, endmembers_ext) # abbondanze su riflettanze originali
plot_abundances(A, identification, save_path=out("abundances_heatmap.png"))
plot_mean_abundances(A, identification, save_path=out("abundances_mean.png"))
# 5b. Stampa composizione per-spettro
print_per_spectrum_composition(A, identification, spectra_names=get_spectra_names(fieldspec_hdr))
# 6. Report
report = save_report(p_hysime, identification, A, wavelengths,
save_path=out("report.txt"))
print("\n" + report)
print("\n" + "=" * 65)
print(" Pipeline completata.")
print(f" Output salvati in: {os.path.abspath(output_dir)}")
print("=" * 65)
return {
"spectra" : spectra,
"wavelengths" : wavelengths,
"p_hysime" : p_hysime,
"p_used" : p,
"endmembers" : endmembers,
"endmembers_ext" : endmembers_ext,
"em_indices" : em_indices,
"identification" : identification,
"abundances" : A,
}
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
# -------------------------------------------------------------------------
# Modifica questi percorsi con i tuoi file
# -------------------------------------------------------------------------
FIELDSPEC_HDR = "mugello.hdr" # Libreria ENVI FieldSpec
USGS_HDR = "suolo.hdr" # USGS Spectral Library ENVI
OUTPUT_DIR = "unmixing_results"
P_OVERRIDE = 70 # None = usa HySime; int = forza numero endmember # None = usa HySime; int = forza numero endmember
TOP_K = 10 # Top-K match USGS
# Verifica file di input
for fpath in [FIELDSPEC_HDR, USGS_HDR]:
if not os.path.isfile(fpath):
print(f"[ERRORE] File non trovato: {fpath}")
print(" Modifica le variabili FIELDSPEC_HDR e USGS_HDR nel blocco __main__.")
sys.exit(1)
MAX_MINERALS = 9 # Numero target di minerali distinti
RESIDUAL_ITERS = 3 # Iterazioni massime di ricerca sul residuo
SAM_THRESHOLD_DEG = 5.0 # Soglia SAM (°) per accettare un match residuo
results, A, minerals = run_pipeline_direct(
fieldspec_hdr = FIELDSPEC_HDR,
usgs_hdr = USGS_HDR,
output_dir = OUTPUT_DIR,
top_k_matches = TOP_K, # da TOP_K = 5
n_endmembers_fcls = 10, # da P_OVERRIDE = 6 → 6 minerali nel dizionario FCLS
sam_prefilter_deg = SAM_THRESHOLD_DEG * 3.5, # 10° × 3.5 = 35° pre-filtro
min_abundance = 0.01,
)
# results = run_pipeline(
# fieldspec_hdr=FIELDSPEC_HDR,
# usgs_hdr=USGS_HDR,
# output_dir=OUTPUT_DIR,
# p_override=P_OVERRIDE,
# top_k_matches=TOP_K,
# max_minerals=MAX_MINERALS,
# residual_iters=RESIDUAL_ITERS,
# sam_threshold_deg=SAM_THRESHOLD_DEG,
# )


