Durante il dottorato avevo verificato in modo sperimentale l'influenza della granulometria sulla firma spettrale frantumando un pezzo di roccia e selezionando vari sottovagli
Lo scattering della luce modifica tutte le frequenze della luce tanto che il campione diventa piu' riflettente con il diminuire della granulometria
Per ridurre questo effetto e per rendere comparabili gli spettri indipendentemente dalla granulometria si puo' usare il filtro EMSC (Extended Multiplicative Signal Correction) che funziona decisamente meglio di SVN
"""
Plot an ENVI Spectral Library (.hdr + binary data file).
Processing applied:
- Data truncated at 2400 nm
- Atmospheric absorption windows (1340-1460 nm, 1790-1960 nm) masked
and linearly interpolated across each spectrum
- Physically implausible spikes removed before interpolation
Spectral transforms (--mode):
reflectance Raw reflectance R (default)
absorbance Absorbance A = log10(1 / R)
emsc Extended Multiplicative Signal Correction (EMSC)
all Three-panel figure with all transforms side by side
Usage:
python plot_envi_spectral_library.py campione6
python plot_envi_spectral_library.py campione6 --mode absorbance
python plot_envi_spectral_library.py campione6 --mode emsc
python plot_envi_spectral_library.py campione6 --mode emsc --emsc-degree 2
python plot_envi_spectral_library.py campione6 --mode all --save out.png
"""
import sys
import re
import argparse
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from pathlib import Path
from scipy.signal import savgol_filter
# ── 1. Parse ENVI header ─────────────────────────────────────────────────────
def parse_envi_header(hdr_path: Path) -> dict:
"""Parse an ENVI .hdr file into a dictionary."""
header = {}
with open(hdr_path, encoding="utf-8", errors="replace") as f:
text = f.read()
# Remove the leading 'ENVI' line
text = re.sub(r"^ENVI\s*\n?", "", text)
# Multi-line values are wrapped in { ... }
def parse_value(raw: str):
raw = raw.strip()
if raw.startswith("{"):
inner = re.sub(r"[{}\r\n]", " ", raw)
parts = [p.strip() for p in inner.split(",") if p.strip()]
# Try numeric conversion
try:
return [float(p) for p in parts]
except ValueError:
return parts
try:
return int(raw)
except ValueError:
pass
try:
return float(raw)
except ValueError:
return raw.strip()
# Match key = value, where value may span multiple lines inside { }
pattern = re.compile(r"(\w[\w\s]*)=(.+?)(?=\n\w[\w\s]*=|\Z)", re.DOTALL)
for m in pattern.finditer(text):
key = m.group(1).strip().lower().replace(" ", "_")
header[key] = parse_value(m.group(2))
return header
# ── 2. Read binary data ───────────────────────────────────────────────────────
DTYPE_MAP = {
1: np.uint8, 2: np.int16, 3: np.int32, 4: np.float32,
5: np.float64, 6: np.complex64, 9: np.complex128,
12: np.uint16, 13: np.uint32, 14: np.int64, 15: np.uint64,
}
def read_envi_speclib(data_path: Path, header: dict) -> np.ndarray:
"""
Read a BSQ ENVI spectral library.
Returns array of shape (n_spectra, n_bands).
In an ENVI spectral library: lines = n_spectra, samples = n_bands.
"""
n_bands = int(header["samples"])
n_spectra = int(header["lines"])
dtype = DTYPE_MAP[int(header.get("data_type", 4))]
byte_order = int(header.get("byte_order", 0))
offset = int(header.get("header_offset", 0))
raw = np.fromfile(data_path, dtype=dtype, offset=offset)
data = raw.reshape(n_spectra, n_bands)
if byte_order == 1: # big-endian → swap
data = data.byteswap().newbyteorder()
return data.astype(np.float64)
# ── 3. Processing ────────────────────────────────────────────────────────────
# Atmospheric water-vapour absorption windows to interpolate across (nm)
ATMO_BANDS = [
(1340, 1460), # first water-vapour window
(1790, 1970), # second water-vapour window
]
WL_MAX = 2400 # nm — all data beyond this is discarded
def process(wl: np.ndarray, data: np.ndarray,
atmo_bands=ATMO_BANDS, wl_max=WL_MAX,
spike_lo: float = 1e-4, spike_hi: float = 1.5):
"""
1. Truncate wavelengths to wl_max.
2. Mark spikes (physically implausible values) as NaN.
3. Mask atmospheric window bands as NaN.
4. Linearly interpolate all NaN regions per spectrum.
Returns
-------
wl_out : 1-D array, truncated wavelength axis
data_out : 2-D array (n_spectra, n_bands), cleaned & interpolated
atmo_mask : 1-D bool array, True where bands were interpolated
"""
# 1 – truncate
keep = wl <= wl_max
wl = wl[keep]
data = data[:, keep]
n_spectra, n_bands = data.shape
data_out = data.copy()
# 2 – spike mask
data_out[(data_out < spike_lo) | (data_out > spike_hi)] = np.nan
# 3 – atmospheric mask (uniform across all spectra for consistency)
atmo_mask = np.zeros(n_bands, dtype=bool)
for lo, hi in atmo_bands:
atmo_mask |= (wl >= lo) & (wl <= hi)
data_out[:, atmo_mask] = np.nan
# 4 – linear interpolation per spectrum
for i in range(n_spectra):
y = data_out[i]
nans = np.isnan(y)
if nans.any() and (~nans).sum() >= 2:
good_x = np.where(~nans)[0]
good_y = y[~nans]
y[nans] = np.interp(np.where(nans)[0], good_x, good_y)
data_out[i] = y
return wl, data_out, atmo_mask
# ── 4. Spectral transforms ────────────────────────────────────────────────────
def to_absorbance(data: np.ndarray) -> np.ndarray:
"""
Absorbance A = log10(1 / R)
Values <= 0 are set to NaN before the log to avoid -inf / invalid results.
"""
with np.errstate(divide="ignore", invalid="ignore"):
R = np.where(data > 0, data, np.nan)
return np.log10(1.0 / R)
def apply_savgol(data: np.ndarray,
window_length: int = 11,
polyorder: int = 2) -> np.ndarray:
"""
Savitzky-Golay smoothing applied row-wise (per spectrum).
Parameters
----------
window_length : int
Number of bands in the smoothing window (must be odd, >= polyorder+1).
Larger values give stronger smoothing. Default: 11 bands.
polyorder : int
Degree of the fitting polynomial. Default: 2 (quadratic).
The filter preserves peak positions and heights better than a simple
moving average. Applied here on reflectance before EMSC so that EMSC
operates on already-smoothed data.
"""
# Enforce odd window length
if window_length % 2 == 0:
window_length += 1
if window_length <= polyorder:
raise ValueError(
f"window_length ({window_length}) must be > polyorder ({polyorder})"
)
return savgol_filter(data, window_length=window_length,
polyorder=polyorder, axis=1)
def to_emsc(data: np.ndarray, wl: np.ndarray,
reference: np.ndarray = None,
poly_degree: int = 2) -> np.ndarray:
"""
Extended Multiplicative Signal Correction (EMSC).
Each spectrum x is modelled as:
x ≈ b0 + b1·r + b2·wl + b3·wl² + … + bn·wl^d
where
r = reference spectrum (mean of all spectra if not supplied)
b0 = additive offset coefficient
b1 = multiplicative scatter coefficient
b2…bn = polynomial baseline coefficients (degree *poly_degree*)
wl = wavelength axis, mean-centred for numerical stability
The corrected spectrum is:
x_corr = (x − b0 − b2·wl − … − bn·wl^d) / b1
i.e. additive and polynomial baseline contributions are removed and
the spectrum is rescaled to the reference level.
Parameters
----------
data : (n_spectra, n_bands) array of reflectance values
wl : (n_bands,) wavelength axis in nm
reference : (n_bands,) reference spectrum; defaults to the mean spectrum
poly_degree : degree of the polynomial baseline model (default: 2)
Returns
-------
emsc_data : (n_spectra, n_bands) corrected spectra
"""
if reference is None:
reference = data.mean(axis=0)
# Mean-centre wavelength axis for numerical stability
wl_c = wl - wl.mean()
# Build design matrix: [reference | wl^0 | wl^1 | … | wl^poly_degree]
# wl^0 = constant offset column
poly_cols = np.column_stack(
[wl_c ** d for d in range(poly_degree + 1)] # d=0 gives the intercept
)
# Reference column goes first so coefficient index 0 → b1 (multiplicative)
design = np.column_stack([reference, poly_cols]) # shape: (n_bands, 2+poly_degree)
emsc_data = np.empty_like(data)
for i, spectrum in enumerate(data):
# Ordinary least squares: design @ coeffs ≈ spectrum
coeffs, _, _, _ = np.linalg.lstsq(design, spectrum, rcond=None)
b1 = coeffs[0] # multiplicative scatter coefficient
poly = poly_cols @ coeffs[1:] # additive + polynomial baseline
# Guard against near-zero multiplicative coefficient
if abs(b1) < 1e-10:
b1 = 1.0
emsc_data[i] = (spectrum - poly) / b1
return emsc_data
TRANSFORMS = {
"reflectance": (lambda d, wl=None: d, "Reflectance", "R"),
"absorbance": (lambda d, wl=None: to_absorbance(d), "Absorbance log₁₀(1/R)", "A"),
"emsc": (to_emsc, "SG + EMSC", "EMSC(R)"),
}
# ── 5. Plot ──────────────────────────────────────────────────────────────────
def _shade_atmo(ax, wl, atmo_mask):
"""Shade interpolated atmospheric windows on an axes."""
in_band, band_start, first = False, None, True
for j, flag in enumerate(np.append(atmo_mask, False)):
if flag and not in_band:
band_start = wl[j]
in_band = True
elif not flag and in_band:
lbl = "interpolated (atmo.)" if first else "_nolegend_"
ax.axvspan(band_start, wl[j - 1], alpha=0.15, color="gray", label=lbl)
in_band = False
first = False
def plot_single(wl, data, names, atmo_mask, ylabel, title, save_path=None):
"""Plot one transformed dataset."""
n = data.shape[0]
colors = cm.tab10(np.linspace(0, 0.9, n))
fig, ax = plt.subplots(figsize=(13, 6))
for i in range(n):
ax.plot(wl, data[i], color=colors[i], linewidth=1.2, label=names[i])
_shade_atmo(ax, wl, atmo_mask)
ax.set_xlabel("Wavelength (nm)", fontsize=12)
ax.set_ylabel(ylabel, fontsize=12)
ax.set_title(title, fontsize=13, fontweight="bold")
ax.set_xlim(wl[0], wl[-1])
ax.legend(fontsize=9, loc="upper right", framealpha=0.85)
ax.grid(True, linewidth=0.4, alpha=0.5)
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
print(f"Saved -> {save_path}")
else:
plt.show()
plt.close(fig)
def plot_all(wl, data_r, names, atmo_mask, base_name, save_path=None,
emsc_degree: int = 2):
"""Three-panel figure: Reflectance | Absorbance | EMSC."""
data_a = to_absorbance(data_r)
data_e = to_emsc(data_r, wl, poly_degree=emsc_degree)
n = data_r.shape[0]
colors = cm.tab10(np.linspace(0, 0.9, n))
fig, axes = plt.subplots(1, 3, figsize=(18, 5), sharey=False)
panels = [
(data_r, "Reflectance", "R"),
(data_a, "Absorbance log₁₀(1/R)","A"),
(data_e, "EMSC", "EMSC(R)"),
]
for ax, (dat, ylabel, _) in zip(axes, panels):
for i in range(n):
lbl = names[i] if ax is axes[0] else "_nolegend_"
ax.plot(wl, dat[i], color=colors[i], linewidth=1.1, label=lbl)
_shade_atmo(ax, wl, atmo_mask)
ax.set_xlabel("Wavelength (nm)", fontsize=11)
ax.set_ylabel(ylabel, fontsize=11)
ax.set_xlim(wl[0], wl[-1])
ax.grid(True, linewidth=0.4, alpha=0.5)
axes[0].legend(fontsize=8, loc="upper right", framealpha=0.85)
fig.suptitle(f"{base_name} — spectral transforms (350–{int(wl[-1])} nm)",
fontsize=13, fontweight="bold")
fig.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
print(f"Saved -> {save_path}")
else:
plt.show()
plt.close(fig)
# ── 6. Main ──────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Plot ENVI spectral library")
parser.add_argument("path", help="Base path to the ENVI file (without extension)")
parser.add_argument("--mode", choices=["reflectance", "absorbance", "emsc", "all"],
default="reflectance",
help="Spectral transform to apply (default: reflectance)")
parser.add_argument("--save", metavar="FILE", default=None,
help="Save the plot to a file instead of displaying it")
parser.add_argument("--wl-max", type=float, default=WL_MAX,
help=f"Discard wavelengths beyond this value (default: {WL_MAX} nm)")
parser.add_argument("--sg-window", type=int, default=11,
help="Savitzky-Golay window length in bands, must be odd "
"(applied before EMSC; default: 11)")
parser.add_argument("--sg-poly", type=int, default=2,
help="Savitzky-Golay polynomial order (default: 2)")
parser.add_argument("--emsc-degree", type=int, default=2,
help="Polynomial degree for the EMSC baseline model "
"(default: 2; higher values capture more complex baselines)")
args = parser.parse_args()
base = Path(args.path)
# Find header
hdr_path = base.with_suffix(".hdr")
if not hdr_path.exists():
hdr_path = Path(str(base) + ".hdr")
if not hdr_path.exists():
raise FileNotFoundError(f"Cannot find .hdr for: {base}")
# Data file sits alongside the header with no extension
data_path = hdr_path.with_suffix("") if hdr_path.suffix == ".hdr" else base
if not data_path.exists():
raise FileNotFoundError(f"Cannot find data file: {data_path}")
print(f"Header : {hdr_path}")
print(f"Data : {data_path}")
header = parse_envi_header(hdr_path)
n_bands = int(header["samples"])
n_spectra = int(header["lines"])
print(f"Spectra: {n_spectra} Bands: {n_bands}")
# Wavelengths
if "wavelength" in header and isinstance(header["wavelength"], list):
wavelengths = np.array(header["wavelength"])
else:
wavelengths = np.arange(n_bands, dtype=float)
# Spectra names
if "spectra_names" in header and isinstance(header["spectra_names"], list):
names = [str(s).strip() for s in header["spectra_names"]]
else:
names = [f"Spectrum {i+1}" for i in range(n_spectra)]
# Read raw data
data_raw = read_envi_speclib(data_path, header)
print(f"Wavelength range : {wavelengths[0]:.0f}-{wavelengths[-1]:.0f} nm "
f"-> truncated at {args.wl_max:.0f} nm")
print(f"Atmospheric windows interpolated: "
f"{[f'{lo}-{hi} nm' for lo, hi in ATMO_BANDS]}")
print(f"Mode : {args.mode}")
# Process: truncate, mask spikes, interpolate atmospheric windows
wl, data_r, atmo_mask = process(wavelengths, data_raw, wl_max=args.wl_max)
# Summary statistics on reflectance
print("\nSpectrum summary — reflectance (median):")
for i, name in enumerate(names):
print(f" [{i+1}] {name:<22s} R = {np.nanmedian(data_r[i]):.4f}")
# Savitzky-Golay smoothed version (used as input to EMSC)
sg_win = args.sg_window if args.sg_window % 2 == 1 else args.sg_window + 1
data_sg = apply_savgol(data_r, window_length=sg_win, polyorder=args.sg_poly)
print(f"\nSavitzky-Golay filter: window={sg_win} bands, polyorder={args.sg_poly}"
f" (applied before EMSC)")
print(f"EMSC polynomial degree: {args.emsc_degree}")
# ── Plot ──
if args.mode == "all":
# For the all-panel view, EMSC panel uses SG-smoothed input
data_emsc = to_emsc(data_sg, wl, poly_degree=args.emsc_degree)
n = data_r.shape[0]
colors = cm.tab10(np.linspace(0, 0.9, n))
fig, axes = plt.subplots(1, 3, figsize=(18, 5), sharey=False)
panels = [
(data_r, "Reflectance", "R"),
(to_absorbance(data_r), "Absorbance log₁₀(1/R)", "A"),
(data_emsc,
f"SG (w={sg_win}, p={args.sg_poly}) + EMSC (deg={args.emsc_degree})",
"EMSC(R)"),
]
for ax, (dat, ylabel, _) in zip(axes, panels):
for i in range(n):
lbl = names[i] if ax is axes[0] else "_nolegend_"
ax.plot(wl, dat[i], color=colors[i], linewidth=1.1, label=lbl)
_shade_atmo(ax, wl, atmo_mask)
ax.set_xlabel("Wavelength (nm)", fontsize=11)
ax.set_ylabel(ylabel, fontsize=11)
ax.set_xlim(wl[0], wl[-1])
ax.grid(True, linewidth=0.4, alpha=0.5)
axes[0].legend(fontsize=8, loc="upper right", framealpha=0.85)
fig.suptitle(
f"{base.name} — spectral transforms (350–{int(wl[-1])} nm)",
fontsize=13, fontweight="bold")
fig.tight_layout()
if args.save:
fig.savefig(args.save, dpi=150, bbox_inches="tight")
print(f"Saved -> {args.save}")
else:
plt.show()
plt.close(fig)
elif args.mode == "emsc":
data_t = to_emsc(data_sg, wl, poly_degree=args.emsc_degree)
print(f"\nSpectrum summary — SG+EMSC (median):")
for i, name in enumerate(names):
print(f" [{i+1}] {name:<22s} EMSC = {np.nanmedian(data_t[i]):.4f}")
title = (f"SG (w={sg_win}, p={args.sg_poly})"
f" + EMSC (deg={args.emsc_degree}) — {base.name}"
f" (350–{int(args.wl_max)} nm)")
plot_single(wl, data_t, names, atmo_mask,
ylabel="SG + EMSC", title=title, save_path=args.save)
else:
fn, ylabel, _ = TRANSFORMS[args.mode]
data_t = fn(data_r)
if args.mode != "reflectance":
label = "A"
print(f"\nSpectrum summary — {args.mode} (median):")
for i, name in enumerate(names):
print(f" [{i+1}] {name:<22s} {label} = {np.nanmedian(data_t[i]):.4f}")
title = (f"{args.mode.capitalize()} spectra — {base.name}"
f" (350–{int(args.wl_max)} nm)")
plot_single(wl, data_t, names, atmo_mask,
ylabel=ylabel, title=title, save_path=args.save)
if __name__ == "__main__":
main()
Nessun commento:
Posta un commento