Durante la tesi di dottorato avevo provato a determinare il contenuto di CaCO3 in suoli naturali basandomi solo sul picco di assorbimento caratteristico a 2340 nm con risultati decisamente scarsi
Loaded 59 samples, 2151 bands each.
Principal Components selected: 24
Total explained variance: 0.9907
Training RF on PCA components...
==============================
PCA + Random Forest Results
R² Score: 0.1746
RMSE: 25.6226
==============================
Preso atto che Random Forest puro non funziona ho provato a modificare Hypersoilnet per avere in input spettri .asd
# -*- coding: utf-8 -*-
"""HyperSoilNet_Colab.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1YYv1Q9_LRitI9GmWCntw2J7BV69uR3ct
# 🌱 HyperSoilNet — Unified Pipeline
**Soil property estimation from ASD FieldSpec hyperspectral data**
Based on two papers:
- **HyperKon** CNN backbone: https://www.mdpi.com/2072-4292/16/18/3399
- **HyperSoilNet** ensemble: https://www.mdpi.com/2072-4292/17/15/2568
### Architecture
```
ASD FieldSpec spectrum (2151 bands, 350–2500 nm)
↓
HyperKon CNN backbone → 128-d embedding (ResNeXt + SE attention)
+
Hand-crafted features → derivatives, DWT, SVD, FFT
↓
RF + XGBoost + KNN → property-specific weighted ensemble
↓
CaCO3, FeO, Fe2O3 predictions
```
### How to use
1. Run **Cell 1** to install dependencies
2. Run **Cell 2** to mount Google Drive
3. Edit **Cell 3** to set your file paths
4. Run all remaining cells in order
## 📦 Cell 1 — Install dependencies
"""
!pip install -q xgboost PyWavelets scikit-learn scipy pandas numpy torch
"""## 📂 Cell 2 — Mount Google Drive"""
from google.colab import drive
drive.mount('/content/drive')
print("Drive mounted at /content/drive")
"""## ⚙️ Cell 3 — Configuration
Edit the paths and settings below to match your data.
"""
# ── Paths ────────────────────────────────────────────────────────────────────
# Your label CSV (Id, CacO3_gr_Kg, CaCO3_per, FeO_mg_Kg, FeO_per, Fe2O3_mg_Kg, Fe2O3_per)
LABELS_CSV = "/content/drive/MyDrive/mugello/label.csv"
# Validation CSV with known labels (Id, CacO3, FeO, Fe2O3) — set None to skip
VALIDA_CSV = "/content/drive/MyDrive/mugello/valida.csv"
# Folder containing labelled ASD files (e.g. 0102.asd, 0103.asd …)
SPECTRA_DIR = "/content/drive/MyDrive/mugello/spectra/"
# Folder containing NEW unlabelled ASD files to predict (set None to skip)
PREDICT_DIR = "/content/drive/MyDrive/mugello/new_spectra/"
# Where to save predictions
OUTPUT_CSV = "/content/drive/MyDrive/mugello/predictions.csv"
# Where to save / load CNN backbone weights
CNN_WEIGHTS = "/content/drive/MyDrive/mugello/backbone.pt"
# ── CNN settings ─────────────────────────────────────────────────────────────
USE_CNN = True # False = hand-crafted features only (fast, no GPU needed)
LOAD_CNN = True # True = load saved weights (skip pretraining)
SAVE_CNN = True # True = save backbone after training
CNN_EPOCHS = 200 # Contrastive pretraining epochs (200 in paper; 50 for small datasets)
FINETUNE_EPOCHS= 100 # Fine-tuning epochs
BATCH_SIZE = 16 # Reduce if GPU runs out of memory
LR = 1e-4
TEMPERATURE = 0.3 # NT-Xent temperature
EMBED_DIM = 128 # CNN embedding dimension
# ── Cross-validation ─────────────────────────────────────────────────────────
RUN_CV = True
CV_FOLDS = 5
print("Configuration set ✓")
"""## 📚 Cell 4 — Imports & global setup"""
import os
import struct as _struct
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.fft import fft
from scipy.signal import savgol_filter
try:
import pywt
HAS_PYWT = True
except ImportError:
HAS_PYWT = False
print("⚠️ PyWavelets not found – wavelet features will be skipped")
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, r2_score
try:
from xgboost import XGBRegressor
HAS_XGB = True
except ImportError:
HAS_XGB = False
print("⚠️ XGBoost not found – using RF + KNN only")
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
torch.manual_seed(42)
np.random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
if device.type == "cuda":
print(f"GPU: {torch.cuda.get_device_name(0)}")
# ── Target columns ────────────────────────────────────────────────────────────
TARGETS = ["CacO3", "FeO", "Fe2O3"]
DEFAULT_WEIGHTS = {
"CacO3": [0.42, 0.42, 0.16],
"FeO": [0.40, 0.44, 0.16],
"Fe2O3": [0.40, 0.44, 0.16],
}
print("Imports complete ✓")
"""## 🧠 Cell 5 — HyperKon CNN backbone
*(ResNeXt + Squeeze-and-Excitation + Spectral Attention)*
"""
class SqueezeExcitation1d(nn.Module):
"""Channel-wise SE recalibration (HyperKon paper eq. 1-2)."""
def __init__(self, channels, reduction=16):
super().__init__()
mid = max(1, channels // reduction)
self.fc = nn.Sequential(
nn.Linear(channels, mid, bias=False), nn.ReLU(inplace=True),
nn.Linear(mid, channels, bias=False), nn.Sigmoid())
def forward(self, x):
return x * self.fc(x.mean(dim=2)).unsqueeze(2)
class ResNeXtBlock1d(nn.Module):
"""1-D ResNeXt bottleneck + SE (HyperKon paper Section 2.2)."""
def __init__(self, in_ch, out_ch, cardinality=32, bw=4):
super().__init__()
width = max(cardinality, int(out_ch * (bw / 64.0)) * cardinality)
self.conv1 = nn.Conv1d(in_ch, width, 1, bias=False)
self.bn1 = nn.BatchNorm1d(width)
self.conv2 = nn.Conv1d(width, width, 3, padding=1, groups=cardinality, bias=False)
self.bn2 = nn.BatchNorm1d(width)
self.conv3 = nn.Conv1d(width, out_ch, 1, bias=False)
self.bn3 = nn.BatchNorm1d(out_ch)
self.se = SqueezeExcitation1d(out_ch)
self.relu = nn.ReLU(inplace=True)
self.downsample = (
nn.Sequential(nn.Conv1d(in_ch, out_ch, 1, bias=False), nn.BatchNorm1d(out_ch))
if in_ch != out_ch else None)
def forward(self, x):
identity = x
out = self.relu(self.bn1(self.conv1(x)))
out = self.relu(self.bn2(self.conv2(out)))
out = self.bn3(self.conv3(out))
out = self.se(out)
if self.downsample is not None: identity = self.downsample(x)
return self.relu(out + identity)
class SpectralAttention(nn.Module):
"""Spectral attention module (HyperSoilNet Section 3.3)."""
def __init__(self, channels, reduction=16):
super().__init__()
mid = max(1, channels // reduction)
self.mlp = nn.Sequential(
nn.Linear(channels, mid), nn.ReLU(inplace=True),
nn.Linear(mid, channels), nn.Sigmoid())
def forward(self, x):
return x * self.mlp(x.mean(dim=2)).unsqueeze(2)
class GlobalContextModule(nn.Module):
"""Global avg+max pooling → 128-d embedding (HyperSoilNet Section 3.3)."""
def __init__(self, in_ch, embed_dim=128):
super().__init__()
self.fc = nn.Sequential(
nn.Linear(in_ch * 2, embed_dim), nn.ReLU(inplace=True), nn.LayerNorm(embed_dim))
def forward(self, x):
return self.fc(torch.cat([x.mean(dim=2), x.max(dim=2).values], dim=1))
def _make_stage(in_ch, out_ch, cardinality, bw, n_blocks):
layers = []
for i in range(n_blocks):
layers.append(ResNeXtBlock1d(in_ch if i == 0 else out_ch, out_ch, cardinality, bw))
return nn.Sequential(*layers)
class HyperKonBackbone(nn.Module):
"""HyperKon 1-D backbone. Input: (B, n_bands) → Output: (B, 128)"""
STAGE_BLOCKS = [3, 4, 6, 3]
def __init__(self, n_bands, embed_dim=128, cardinality=32, bw=4):
super().__init__()
self.stem = nn.Sequential(
nn.Conv1d(1, 64, 7, stride=2, padding=3, bias=False),
nn.BatchNorm1d(64), nn.ReLU(inplace=True),
nn.MaxPool1d(3, stride=2, padding=1))
self.spectral_attn = SpectralAttention(64)
self.stage1 = _make_stage(64, 128, cardinality, bw, self.STAGE_BLOCKS[0])
self.stage2 = _make_stage(128, 256, cardinality, bw, self.STAGE_BLOCKS[1])
self.stage3 = _make_stage(256, 512, cardinality, bw, self.STAGE_BLOCKS[2])
self.stage4 = _make_stage(512, 512, cardinality, bw, self.STAGE_BLOCKS[3])
self.global_ctx = GlobalContextModule(512, embed_dim)
for m in self.modules():
if isinstance(m, nn.Conv1d): nn.init.kaiming_normal_(m.weight, nonlinearity="relu")
elif isinstance(m, nn.BatchNorm1d): nn.init.ones_(m.weight); nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear) and m.weight is not None:
nn.init.xavier_normal_(m.weight)
if m.bias is not None: nn.init.zeros_(m.bias)
def forward(self, x):
if x.dim() == 2: x = x.unsqueeze(1)
x = self.stem(x)
x = self.spectral_attn(x)
x = self.stage1(x); x = self.stage2(x)
x = self.stage3(x); x = self.stage4(x)
return self.global_ctx(x)
class ProjectionHead(nn.Module):
def __init__(self, embed_dim=128, proj_dim=64):
super().__init__()
self.net = nn.Sequential(
nn.Linear(embed_dim, embed_dim), nn.ReLU(inplace=True), nn.Linear(embed_dim, proj_dim))
def forward(self, x): return F.normalize(self.net(x), dim=1)
class NTXentLoss(nn.Module):
"""NT-Xent contrastive loss (HyperKon paper eq. 4)."""
def __init__(self, temperature=0.5):
super().__init__()
self.tau = temperature
def forward(self, z_i, z_j):
N = z_i.size(0)
z = torch.cat([z_i, z_j], dim=0)
sim = F.cosine_similarity(z.unsqueeze(1), z.unsqueeze(0), dim=2) / self.tau
sim.masked_fill_(torch.eye(2*N, dtype=torch.bool, device=z.device), -1e9)
labels = torch.cat([torch.arange(N, 2*N), torch.arange(0, N)]).to(z.device)
return F.cross_entropy(sim, labels)
def augment(x):
"""Spectral augmentation for contrastive pretraining (HyperKon Section 2.3.2)."""
if torch.rand(1) > 0.5: x = x + 0.01 * torch.randn_like(x)
if torch.rand(1) > 0.5:
n_drop = max(1, int(0.1 * x.size(1)))
idx = torch.randint(0, x.size(1), (n_drop,))
x = x.clone(); x[:, idx] = 0.0
if torch.rand(1) > 0.5: x = torch.roll(x, int(torch.randint(-5, 6, (1,))), dims=1)
if torch.rand(1) > 0.5:
scale = 0.9 + 0.2 * torch.rand(x.size(0), 1, device=x.device)
x = x * scale
return x
class _ArrayDataset(Dataset):
"""Simple dataset wrapper for numpy arrays."""
def __init__(self, X): self.X = torch.tensor(X, dtype=torch.float32)
def __len__(self): return len(self.X)
def __getitem__(self, i): return self.X[i]
print("CNN architecture + dataset defined ✓")
"""## 📡 Cell 6 — ASD FieldSpec file loader"""
import struct as _struct
_ASD_HEADER_SIZE = 484
_ASD_WL_START_OFF = 191 # float32 - starting wavelength (nm)
_ASD_WL_STEP_OFF = 195 # float32 - wavelength step (nm/channel)
_ASD_CHANNELS_OFF = 204 # uint16 - number of channels
_ASD_REF_GAP = 20 # bytes between spectrum block and reference block
_ASD_ITEM_SIZE = 8 # bytes per sample (float64)
def _read_asd_binary(filepath):
"""
Parse a raw ASD FieldSpec binary file.
Returns reflectance = spectrum_DN / reference_DN (1-D float64 array).
Layout:
[0-483] 484-byte header
[484 - 484+ch*8] Spectrum block (float64 raw DN)
[+20 byte gap]
[ref block] White reference block (float64 raw DN)
"""
with open(filepath, "rb") as f:
raw = f.read()
if len(raw) < _ASD_HEADER_SIZE + 100:
raise ValueError(f"File too small ({len(raw)} bytes)")
sig = raw[:2].decode("latin-1", errors="replace").lower()
if sig != "as":
raise ValueError(f"Unrecognised ASD signature: {raw[:3]!r}")
wl_start = _struct.unpack_from("<f", raw, _ASD_WL_START_OFF)[0]
wl_step = _struct.unpack_from("<f", raw, _ASD_WL_STEP_OFF)[0]
channels = _struct.unpack_from("<H", raw, _ASD_CHANNELS_OFF)[0]
if not (300 <= wl_start <= 500): wl_start = 350.0
if not (0.5 <= wl_step <= 2.0): wl_step = 1.0
if not (100 <= channels <= 5000): channels = 2151
off_spec = _ASD_HEADER_SIZE
if off_spec + channels * _ASD_ITEM_SIZE > len(raw):
raise ValueError("File too short for spectrum block")
spec = np.frombuffer(raw[off_spec: off_spec + channels * _ASD_ITEM_SIZE], dtype=np.float64).copy()
off_ref = off_spec + channels * _ASD_ITEM_SIZE + _ASD_REF_GAP
if off_ref + channels * _ASD_ITEM_SIZE > len(raw):
raise ValueError("Reference block not found — file may be pre-processed (non-raw)")
ref = np.frombuffer(raw[off_ref: off_ref + channels * _ASD_ITEM_SIZE], dtype=np.float64).copy()
with np.errstate(divide="ignore", invalid="ignore"):
reflectance = np.where(ref > 0, spec / ref, np.nan)
return reflectance
def _read_asd_ascii(filepath):
"""Read two-column ASCII export from ViewSpecPro (wavelength, value)."""
values = []
with open(filepath, "r", encoding="latin-1") as f:
for line in f:
line = line.strip()
if not line or line[0].isalpha() or line.startswith("#"): continue
parts = line.replace(",", " ").split()
if len(parts) >= 2:
try: values.append(float(parts[1]))
except ValueError: continue
if not values: raise ValueError("No numeric data found in ASCII file")
return np.array(values, dtype=np.float64)
def load_spectrum(filepath):
"""
Load spectrum from ASD binary (.asd) or CSV/ASCII.
ASD binary: reflectance = spec_DN / ref_DN
"""
ext = Path(filepath).suffix.lower()
if ext in (".csv", ".txt"):
df = pd.read_csv(filepath, header=None)
return df.iloc[0,:].values.astype(float) if df.shape[1] > df.shape[0] \
else df.iloc[:,0].values.astype(float)
bin_err = asc_err = spec_err = None
try: return _read_asd_binary(filepath)
except Exception as e: bin_err = e
try: return _read_asd_ascii(filepath)
except Exception as e: asc_err = e
try:
from specdal import Spectrum
return Spectrum(filepath=filepath).measurement.values.astype(np.float64)
except Exception as e: spec_err = e
raise RuntimeError(
f"All parsers failed for {filepath}.\n"
f" Binary: {bin_err}\n ASCII: {asc_err}\n specdal: {spec_err}")
def _normalise_id(raw_id):
s = str(raw_id).strip()
if s.endswith(".0"): s = s[:-2]
return s
def _find_spectrum_file(directory, sample_id):
sid = _normalise_id(sample_id)
sid_int = sid.lstrip("0") or "0"
seen, candidates = set(), []
for name in [sid, sid_int] + [sid_int.zfill(n) for n in (3, 4, 5, 6)]:
if name not in seen: seen.add(name); candidates.append(name)
for name in candidates:
base = os.path.join(directory, name)
for ext in (".asd", ".ASD", ".csv", ".txt"):
if os.path.exists(base + ext): return base + ext
return None
def load_labelled(labels_csv, spectra_dir):
labels_df = pd.read_csv(labels_csv)
labels_df.columns = labels_df.columns.str.strip()
id_col = next((c for c in ["Id","id","sample_id","ID"] if c in labels_df.columns), None)
if id_col is None:
raise ValueError(f"No id column found. Columns: {list(labels_df.columns)}")
missing_targets = [t for t in TARGETS if t not in labels_df.columns]
if missing_targets:
raise ValueError(f"Missing target columns: {missing_targets}")
spectra_list, valid_ids, valid_rows = [], [], []
for _, row in labels_df.iterrows():
sid = _normalise_id(row[id_col])
fpath = _find_spectrum_file(spectra_dir, sid)
if fpath is None:
sid_int = sid.lstrip("0") or "0"
tried = list(dict.fromkeys([sid, sid_int] + [sid_int.zfill(n) for n in (3,4,5,6)]))
warnings.warn(f"No file for id={sid}. Tried: {[t+'.asd' for t in tried]}")
continue
try:
spectra_list.append(load_spectrum(fpath))
valid_ids.append(sid); valid_rows.append(row)
except Exception as e:
warnings.warn(f"Failed to load {fpath}: {e}")
if not spectra_list:
raise RuntimeError("No spectra loaded. Check your paths and file names.")
lengths = [len(s) for s in spectra_list]
if len(set(lengths)) > 1:
min_len = min(lengths)
warnings.warn(f"Different lengths found, truncating to {min_len}")
spectra_list = [s[:min_len] for s in spectra_list]
spectra = np.vstack(spectra_list)
labels = pd.DataFrame(valid_rows)[TARGETS].reset_index(drop=True)
return spectra, labels, valid_ids
def load_directory(spectra_dir):
paths = sorted([p for p in Path(spectra_dir).iterdir()
if p.suffix.lower() in (".asd", ".csv", ".txt")])
if not paths:
raise FileNotFoundError(f"No ASD or CSV files found in '{spectra_dir}'")
spectra, ids = [], []
for p in paths:
try: spectra.append(load_spectrum(str(p))); ids.append(p.stem)
except Exception as e: warnings.warn(f"Skipping {p.name}: {e}")
lengths = [len(s) for s in spectra]
if len(set(lengths)) > 1:
min_len = min(lengths)
spectra = [s[:min_len] for s in spectra]
return np.vstack(spectra), ids
print("ASD FieldSpec loader defined (reflectance = spec_DN / ref_DN)")
"""## 🔬 Cell 7 — Hand-crafted feature engineering
*(Derivatives, DWT, SVD, FFT — HyperSoilNet Section 3.4)*
"""
def spectral_derivatives(spectrum, orders=(1, 2, 3)):
feats, n = [], len(spectrum)
for order in orders:
polyorder = order + 1
window = min(11, n)
if window % 2 == 0: window -= 1
window = max(window, polyorder + 1)
if window % 2 == 0: window += 1
if window > n: feats.append(np.zeros(n)); continue
feats.append(savgol_filter(spectrum, window, polyorder, deriv=order))
return np.concatenate(feats)
def wavelet_features(spectrum):
if not HAS_PYWT: return np.array([])
wavelet = "dmey"
coeffs = pywt.wavedec(spectrum, wavelet, level=min(4, pywt.dwt_max_level(len(spectrum), wavelet)))
feats = []
for c in coeffs: feats.extend([c.mean(), c.std(), np.abs(c).max()])
return np.array(feats)
def svd_features(spectrum, n_sv=5):
"""SVD features with fallback to zeros if SVD does not converge."""
try:
n = len(spectrum); n_rows = max(2, n // 10); n_cols = n - n_rows + 1
mat = np.array([spectrum[i:i+n_cols] for i in range(n_rows)])
# Replace any NaN/Inf before SVD
mat = np.nan_to_num(mat, nan=0.0, posinf=0.0, neginf=0.0)
_, s, _ = np.linalg.svd(mat, full_matrices=False)
s = s[:n_sv] if len(s) >= n_sv else np.pad(s, (0, n_sv - len(s)))
return np.concatenate([s, s[:-1] / (s[1:] + 1e-10)])
except np.linalg.LinAlgError:
return np.zeros(n_sv + n_sv - 1)
def fft_features(spectrum, n=20):
f = fft(spectrum)[:n]; return np.concatenate([f.real, f.imag])
def statistical_features(spectrum):
return np.array([
spectrum.mean(), spectrum.std(),
np.percentile(spectrum, 25), np.percentile(spectrum, 75),
spectrum.max() - spectrum.min(),
float(np.argmax(spectrum)), float(np.argmin(spectrum))])
def handcrafted_features(spectrum):
# Sanitise spectrum before feature extraction
spectrum = np.nan_to_num(spectrum, nan=0.0, posinf=0.0, neginf=0.0)
parts = [spectrum, statistical_features(spectrum), spectral_derivatives(spectrum),
wavelet_features(spectrum), svd_features(spectrum), fft_features(spectrum)]
return np.concatenate([p for p in parts if len(p) > 0])
def build_handcrafted_matrix(spectra):
return np.array([handcrafted_features(s) for s in spectra])
print("Feature engineering functions defined")
"""## 🏋️ Cell 8 — CNN training functions"""
def pretrain_backbone(backbone, projector, spectra, epochs, batch_size, lr, temperature):
"""Self-supervised contrastive pretraining (HyperKon Section 2.3)."""
print(f"\n[CNN] Contrastive pretraining for {epochs} epochs ...")
norm = (spectra - spectra.mean(0)) / (spectra.std(0) + 1e-8)
# Replace any NaN/Inf that may come from zero-std bands
norm = np.nan_to_num(norm, nan=0.0, posinf=0.0, neginf=0.0)
loader = DataLoader(_ArrayDataset(norm), batch_size=batch_size,
shuffle=True, drop_last=True, num_workers=2)
loss_fn = NTXentLoss(temperature)
optimizer = torch.optim.Adam(
list(backbone.parameters()) + list(projector.parameters()), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(
optimizer, step_size=max(1, epochs // 5), gamma=0.5)
backbone.train(); projector.train()
for epoch in range(1, epochs + 1):
total = 0.0; n_batches = 0
for x in loader:
x = x.to(device)
optimizer.zero_grad()
zi = projector(backbone(augment(x)))
zj = projector(backbone(augment(x)))
loss = loss_fn(zi, zj)
if not torch.isfinite(loss):
continue
loss.backward()
torch.nn.utils.clip_grad_norm_(
list(backbone.parameters()) + list(projector.parameters()), max_norm=1.0)
optimizer.step()
total += loss.item(); n_batches += 1
scheduler.step()
if epoch % max(1, epochs // 10) == 0:
avg = total / n_batches if n_batches > 0 else float("nan")
print(f" epoch {epoch:4d}/{epochs} loss={avg:.4f} lr={scheduler.get_last_lr()[0]:.1e}")
print("[CNN] Pretraining complete")
def finetune_backbone(backbone, spectra, labels_arr, epochs, batch_size, lr):
"""Fine-tune with multi-task regression head (HyperSoilNet eq. 9)."""
print(f"\n[CNN] Fine-tuning for {epochs} epochs ...")
mean = spectra.mean(0); std = spectra.std(0) + 1e-8
norm = (spectra - mean) / std
norm = np.nan_to_num(norm, nan=0.0, posinf=0.0, neginf=0.0)
n = len(norm); val_n = max(1, int(0.2 * n))
idx = np.random.permutation(n); vi, ti = idx[:val_n], idx[val_n:]
n_targets = len(TARGETS)
# Normalise labels to unit variance to avoid huge MSE
label_mean = labels_arr.mean(0); label_std = labels_arr.std(0) + 1e-8
labels_norm = (labels_arr - label_mean) / label_std
head = nn.Sequential(nn.Linear(128, 64), nn.ReLU(), nn.Dropout(0.2),
nn.Linear(64, n_targets)).to(device)
loss_w = torch.ones(n_targets, device=device)
def make_loader(idxs, shuffle):
Xs = torch.tensor(norm[idxs], dtype=torch.float32)
ys = torch.tensor(labels_norm[idxs], dtype=torch.float32)
class _D(Dataset):
def __len__(self): return len(Xs)
def __getitem__(self, i): return Xs[i], ys[i]
return DataLoader(_D(), batch_size=batch_size, shuffle=shuffle, num_workers=2)
optimizer = torch.optim.AdamW(
list(backbone.parameters()) + list(head.parameters()), lr=lr, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=epochs, eta_min=1e-6)
best_val = float("inf")
best_state = {k: v.cpu().clone() for k, v in backbone.state_dict().items()}
for epoch in range(1, epochs + 1):
backbone.train(); head.train()
for x, y in make_loader(ti, True):
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
pred = head(backbone(x))
loss = (((pred - y) ** 2).mean(0) * loss_w).sum()
if not torch.isfinite(loss):
continue
loss.backward()
torch.nn.utils.clip_grad_norm_(
list(backbone.parameters()) + list(head.parameters()), max_norm=1.0)
optimizer.step()
scheduler.step()
backbone.eval(); head.eval(); vl = 0.0; n_vl = 0
with torch.no_grad():
for x, y in make_loader(vi, False):
x, y = x.to(device), y.to(device)
bl = (((head(backbone(x)) - y)**2).mean(0) * loss_w).sum().item()
if np.isfinite(bl): vl += bl; n_vl += 1
if n_vl > 0:
vl /= n_vl
if vl < best_val:
best_val = vl
best_state = {k: v.cpu().clone() for k, v in backbone.state_dict().items()}
if epoch % max(1, epochs // 10) == 0:
print(f" epoch {epoch:4d}/{epochs} val_loss={vl:.4f}")
backbone.load_state_dict(best_state)
print("[CNN] Fine-tuning complete")
return mean, std
@torch.no_grad()
def extract_cnn_features(backbone, spectra, mean, std, batch_size=64):
backbone.eval()
norm = (spectra - mean) / (std + 1e-8)
norm = np.nan_to_num(norm, nan=0.0, posinf=0.0, neginf=0.0)
loader = DataLoader(_ArrayDataset(norm), batch_size=batch_size, shuffle=False, num_workers=2)
return np.vstack([backbone(x.to(device)).cpu().numpy() for x in loader])
print("CNN training functions defined")
"""## 🌲 Cell 9 — ML Ensemble (RF + XGBoost + KNN)"""
def build_regressors():
models = {
"RF": RandomForestRegressor(n_estimators=100, max_depth=20, min_samples_leaf=5,
criterion="squared_error", bootstrap=True,
n_jobs=-1, random_state=42),
"KNN": KNeighborsRegressor(n_neighbors=7, weights="distance", metric="euclidean")}
if HAS_XGB:
models["XGB"] = XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=5,
reg_alpha=0.01, reg_lambda=1.0, early_stopping_rounds=15,
eval_metric="rmse", random_state=42, verbosity=0)
return models
def fit_ensemble(X_tr, y_tr, X_val=None, y_val=None):
scaler = StandardScaler()
Xtr_s = scaler.fit_transform(X_tr)
Xvl_s = scaler.transform(X_val) if X_val is not None else None
models = build_regressors(); fitted = {}
for name, mdl in models.items():
if name == "XGB" and Xvl_s is not None:
mdl.fit(Xtr_s, y_tr, eval_set=[(Xvl_s, y_val)], verbose=False)
elif name == "XGB":
mdl.set_params(early_stopping_rounds=None); mdl.fit(Xtr_s, y_tr)
else:
mdl.fit(Xtr_s, y_tr)
fitted[name] = mdl
return {"scaler": scaler, "models": fitted}
def ensemble_predict(bundle, X, target):
X_s = bundle["scaler"].transform(X)
keys = list(bundle["models"].keys())
w = np.array(get_weights(target)[:len(keys)])
preds = np.stack([bundle["models"][k].predict(X_s) for k in keys], axis=1)
return preds @ (w / w.sum())
def cross_validate(X, labels, n_splits=5):
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
results = {t: {"r2": [], "mse": []} for t in TARGETS}
for fold, (ti, vi) in enumerate(kf.split(X), 1):
print(f"\n Fold {fold}/{n_splits}")
for target in TARGETS:
y_tr, y_val = labels[target].values[ti], labels[target].values[vi]
bundle = fit_ensemble(X[ti], y_tr, X[vi], y_val)
preds = ensemble_predict(bundle, X[vi], target)
results[target]["r2"].append(r2_score(y_val, preds))
results[target]["mse"].append(mean_squared_error(y_val, preds))
print(f" {target:15s} R²={results[target]['r2'][-1]:.3f} "
f"RMSE={np.sqrt(results[target]['mse'][-1]):.4f}")
print("\n" + "="*60)
print(f"{'Target':<16} {'R² mean':>9} {'R² std':>8} {'RMSE mean':>10}")
print("-"*60)
baseline = {t: np.var(labels[t].values) for t in TARGETS}
for t in TARGETS:
r2s = results[t]["r2"]; mses = results[t]["mse"]
print(f"{t:<16} {np.mean(r2s):>9.3f} {np.std(r2s):>8.3f} {np.mean(np.sqrt(mses)):>10.4f}")
score = np.mean([np.mean(results[t]["mse"]) / baseline[t] for t in TARGETS])
print(f"\n Challenge score (lower=better): {score:.4f}")
print("="*60)
return results
def train_full(X, labels):
n = len(X); val_n = max(1, int(0.2 * n))
idx = np.random.default_rng(42).choice(n, val_n, replace=False)
vi = idx; ti = np.setdiff1d(np.arange(n), vi)
bundles = {}
for target in TARGETS:
y = labels[target].values
bundles[target] = fit_ensemble(X[ti], y[ti], X[vi], y[vi])
print(f" Trained ensemble for {target}")
return bundles
def make_predictions(bundles, X, ids):
df = pd.DataFrame({t: ensemble_predict(bundles[t], X, t) for t in TARGETS})
df.insert(0, "sample_id", ids)
return df
# ── Optimised weights (populated by optimise_weights()) ──────────────────────
OPTIMISED_WEIGHTS = {}
def get_weights(target):
"""Return per-model weights — uses optimised weights if available."""
if target in OPTIMISED_WEIGHTS:
return OPTIMISED_WEIGHTS[target]
w = list(DEFAULT_WEIGHTS.get(target, [0.34, 0.33, 0.33]))
if not HAS_XGB: w = [w[0], w[2]]
total = sum(w)
return [x / total for x in w]
def optimise_weights(X, labels, n_splits=5):
"""
Find optimal ensemble weights [RF, XGB, KNN] for each target via
Leave-One-Out CV (n<20) or k-fold CV + grid search on the weight simplex.
Results stored in OPTIMISED_WEIGHTS and used automatically by ensemble_predict.
"""
n = len(X)
if n < 20:
from sklearn.model_selection import LeaveOneOut
splitter = LeaveOneOut()
cv_name = "LOO"
else:
splitter = KFold(n_splits=n_splits, shuffle=True, random_state=42)
cv_name = f"{n_splits}-fold"
model_names = list(build_regressors().keys())
n_models = len(model_names)
print(f"\nOptimising ensemble weights ({cv_name} CV, {n_models} models) ...")
# Collect out-of-fold predictions for every model and target
oof_preds = {t: {m: np.zeros(n) for m in model_names} for t in TARGETS}
oof_true = {t: np.zeros(n) for t in TARGETS}
for ti, vi in splitter.split(X):
for target in TARGETS:
b = fit_ensemble(X[ti], labels[target].values[ti])
X_vi_s = b["scaler"].transform(X[vi])
for m in model_names:
oof_preds[target][m][vi] = b["models"][m].predict(X_vi_s)
oof_true[target][vi] = labels[target].values[vi]
# Grid search over weight simplex (step = 0.05 → 231 combos for 3 models)
step = 0.05
ticks = np.round(np.arange(0, 1 + step, step), 6)
if n_models == 2:
combos = [(round(a, 4), round(1-a, 4)) for a in ticks]
else:
combos = [
(a, b, round(1.0 - a - b, 6))
for a in ticks for b in ticks
if 0 <= round(1.0 - a - b, 6) <= 1.0
]
results = {}
for target in TARGETS:
best_mse, best_w = float("inf"), None
y_true = oof_true[target]
model_preds = np.stack([oof_preds[target][m] for m in model_names], axis=1)
for combo in combos:
w = np.array(combo)
mse = np.mean((y_true - model_preds @ w) ** 2)
if mse < best_mse:
best_mse = mse
best_w = list(combo)
OPTIMISED_WEIGHTS[target] = best_w
results[target] = best_w
# Print summary table
col = 10
print(f"\n {'Target':<14}" + "".join(f"{m:>{col}}" for m in model_names))
print(" " + "-" * (14 + col * n_models))
for t in TARGETS:
print(f" {t:<14}" + "".join(f"{wi:>{col}.2f}" for wi in results[t]))
print(" → Weights saved in OPTIMISED_WEIGHTS, used automatically by ensemble_predict\n")
return results
print("ML ensemble + weight optimiser defined ✓")
"""## 🚀 Cell 10 — Run the full pipeline"""
# ── 1. Load labelled data ──────────────────────────────────────────────────
print("[1] Loading labelled data …")
spectra, labels, ids = load_labelled(LABELS_CSV, SPECTRA_DIR)
n_bands = spectra.shape[1]
print(f" {len(ids)} samples loaded, {n_bands} bands each")
# ── Plot reflectance spectra ───────────────────────────────────────────────
import matplotlib.pyplot as plt
import matplotlib.cm as cm
wavelengths = np.arange(350, 350 + n_bands) # FieldSpec: 350-2500 nm
fig, axes = plt.subplots(1, 2, figsize=(16, 5))
# Left: individual spectra coloured by sample, with mean
colors = cm.tab20(np.linspace(0, 1, len(ids)))
for i, (spec, sid) in enumerate(zip(spectra, ids)):
axes[0].plot(wavelengths, spec, color=colors[i], alpha=0.5, lw=0.8, label=str(sid))
axes[0].plot(wavelengths, spectra.mean(0), color='black', lw=2, label='Mean')
axes[0].set_xlabel("Wavelength (nm)", fontsize=12)
axes[0].set_ylabel("Reflectance", fontsize=12)
axes[0].set_title(f"Individual spectra ({len(ids)} samples)", fontsize=13)
axes[0].set_xlim(wavelengths[0], wavelengths[-1])
if len(ids) <= 20:
axes[0].legend(fontsize=7, ncol=2, loc='upper left')
# Right: mean ± 1 std band
mean_spec = spectra.mean(0)
std_spec = spectra.std(0)
axes[1].plot(wavelengths, mean_spec, color='steelblue', lw=2, label='Mean')
axes[1].fill_between(wavelengths, mean_spec - std_spec, mean_spec + std_spec,
alpha=0.3, color='steelblue', label='±1 std')
axes[1].fill_between(wavelengths, mean_spec - 2*std_spec, mean_spec + 2*std_spec,
alpha=0.15, color='steelblue', label='±2 std')
axes[1].set_xlabel("Wavelength (nm)", fontsize=12)
axes[1].set_ylabel("Reflectance", fontsize=12)
axes[1].set_title("Mean reflectance ± std", fontsize=13)
axes[1].set_xlim(wavelengths[0], wavelengths[-1])
axes[1].legend(fontsize=10)
# Mark FieldSpec splice points (detector transitions)
for ax in axes:
ax.axvline(1000, color='gray', lw=0.8, ls='--', alpha=0.6)
ax.axvline(1830, color='gray', lw=0.8, ls='--', alpha=0.6)
ax.text(1000, ax.get_ylim()[1]*0.95, 'VNIR/SWIR1', fontsize=7,
ha='center', color='gray')
ax.text(1830, ax.get_ylim()[1]*0.95, 'SWIR1/SWIR2', fontsize=7,
ha='center', color='gray')
plt.suptitle("ASD FieldSpec Reflectance Spectra", fontsize=14, y=1.01)
plt.tight_layout()
plt.savefig("/content/drive/MyDrive/mugello/reflectance_spectra.png",
dpi=150, bbox_inches='tight')
plt.show()
print(f"Plot saved → reflectance_spectra.png")
"""## 📈 Reflectance spectra"""
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# ── Reflectance plot ──────────────────────────────────────────────────────
wavelengths = np.arange(350, 350 + spectra.shape[1]) # 350–2500 nm
fig, axes = plt.subplots(2, 1, figsize=(14, 9))
# --- Panel 1: individual spectra coloured by sample ---
ax = axes[0]
colors = cm.tab20(np.linspace(0, 1, len(ids)))
for i, (sid, color) in enumerate(zip(ids, colors)):
ax.plot(wavelengths, spectra[i], lw=0.7, alpha=0.7, color=color, label=str(sid))
ax.set_xlabel('Wavelength (nm)', fontsize=11)
ax.set_ylabel('Reflectance', fontsize=11)
ax.set_title(f'Individual reflectance spectra ({len(ids)} samples)', fontsize=13)
ax.set_xlim(wavelengths[0], wavelengths[-1])
# Mark FieldSpec detector transitions
for wl, label in [(1000, 'VNIR/SWIR1'), (1800, 'SWIR1/SWIR2')]:
ax.axvline(wl, color='gray', lw=0.8, ls='--', alpha=0.6)
ax.text(wl + 5, ax.get_ylim()[1] * 0.95, label, fontsize=7, color='gray')
if len(ids) <= 30:
ax.legend(fontsize=6, ncol=4, loc='upper right')
# --- Panel 2: mean ± 1 std ---
ax2 = axes[1]
mean_spec = spectra.mean(0)
std_spec = spectra.std(0)
ax2.plot(wavelengths, mean_spec, lw=1.5, color='steelblue', label='Mean')
ax2.fill_between(wavelengths,
mean_spec - std_spec,
mean_spec + std_spec,
alpha=0.25, color='steelblue', label=r'$\pm$1 std')
ax2.fill_between(wavelengths,
mean_spec - 2*std_spec,
mean_spec + 2*std_spec,
alpha=0.10, color='steelblue', label=r'$\pm$2 std')
ax2.set_xlabel('Wavelength (nm)', fontsize=11)
ax2.set_ylabel('Reflectance', fontsize=11)
ax2.set_title('Mean reflectance ± std', fontsize=13)
ax2.set_xlim(wavelengths[0], wavelengths[-1])
for wl in [1000, 1800]:
ax2.axvline(wl, color='gray', lw=0.8, ls='--', alpha=0.6)
ax2.legend(fontsize=10)
plt.tight_layout()
plot_path = '/content/drive/MyDrive/mugello/reflectance_spectra.png'
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.show()
print(f'Plot saved -> {plot_path}')
# ── 2. HyperKon CNN backbone ───────────────────────────────────────────────
backbone = None
train_mean = spectra.mean(0)
train_std = spectra.std(0) + 1e-8
cnn_feats = None
if USE_CNN:
print(f"\n[2] HyperKon CNN backbone (device={device})")
backbone = HyperKonBackbone(n_bands, EMBED_DIM).to(device)
projector = ProjectionHead(EMBED_DIM).to(device)
print(f" Parameters: {sum(p.numel() for p in backbone.parameters()):,}")
if LOAD_CNN and os.path.exists(CNN_WEIGHTS):
print(f" Loading saved weights from {CNN_WEIGHTS}")
backbone.load_state_dict(torch.load(CNN_WEIGHTS, map_location=device))
else:
# Include predict_dir spectra in SSL pretraining if available
all_spectra = spectra
if PREDICT_DIR:
try:
extra, _ = load_directory(PREDICT_DIR)
all_spectra = np.vstack([spectra, extra])
print(f" Including {len(extra)} new spectra in SSL pretraining")
except Exception:
pass
pretrain_backbone(backbone, projector, all_spectra,
CNN_EPOCHS, BATCH_SIZE, LR, TEMPERATURE)
train_mean, train_std = finetune_backbone(
backbone, spectra, labels.values, FINETUNE_EPOCHS, BATCH_SIZE, LR)
if SAVE_CNN:
torch.save(backbone.state_dict(), CNN_WEIGHTS)
print(f" Backbone saved → {CNN_WEIGHTS}")
print("\n Extracting CNN embeddings …")
cnn_feats = extract_cnn_features(backbone, spectra, train_mean, train_std)
print(f" CNN feature matrix: {cnn_feats.shape}")
else:
print("\n[2] CNN skipped (USE_CNN=False)")
# ── 3. Hand-crafted features ───────────────────────────────────────────────
print("\n[3] Hand-crafted feature engineering …")
X_hc = build_handcrafted_matrix(spectra)
print(f" Hand-crafted matrix: {X_hc.shape}")
X = np.hstack([cnn_feats, X_hc]) if cnn_feats is not None else X_hc
if cnn_feats is not None:
print(f" Combined matrix (CNN + HC): {X.shape}")
# ── 4. Cross-validation ────────────────────────────────────────────────────
if RUN_CV:
print(f"\n[4] {CV_FOLDS}-fold cross-validation …")
cv_results = cross_validate(X, labels, n_splits=CV_FOLDS)
else:
print("\n[4] Cross-validation skipped")
# ── 4b. Optimise ensemble weights ──────────────────────────────────────────
print("\n[4b] Optimising ensemble weights ...")
optimise_weights(X, labels, n_splits=CV_FOLDS)
# ── 5. Train ensemble on full dataset ──────────────────────────────────────
print("\n[5] Training ensemble on full dataset ...")
bundles = train_full(X, labels)
print("\nPipeline training complete ✓")
"""## 🔮 Cell 11 — Predict new spectra"""
if PREDICT_DIR:
print(f"[6] Predicting spectra in: {PREDICT_DIR}")
new_spectra, new_ids = load_directory(PREDICT_DIR)
print(f" {len(new_ids)} new spectra loaded")
X_new_hc = build_handcrafted_matrix(new_spectra)
if backbone is not None:
X_new_cnn = extract_cnn_features(backbone, new_spectra, train_mean, train_std)
X_new = np.hstack([X_new_cnn, X_new_hc])
else:
X_new = X_new_hc
pred_df = make_predictions(bundles, X_new, new_ids)
pred_df.to_csv(OUTPUT_CSV, index=False)
print(f"\n Predictions saved → {OUTPUT_CSV}")
display(pred_df)
else:
print("PREDICT_DIR is None – running sanity check on training set …")
pred_df = make_predictions(bundles, X, ids)
pred_df.to_csv(OUTPUT_CSV, index=False)
print(f" Saved → {OUTPUT_CSV}")
display(pred_df)
"""## ✅ Cell 12 — Evaluate on validation set (valida.csv)"""
# ── Validation: predict valida.csv and compare with known labels ──────────────
if "VALIDA_CSV" in dir() and VALIDA_CSV and os.path.exists(VALIDA_CSV):
print("[Validation] Loading validation set ...")
val_df = pd.read_csv(VALIDA_CSV)
val_df.columns = val_df.columns.str.strip()
# Find id column
id_col = next((c for c in ["Id","id","sample_id","ID"] if c in val_df.columns), None)
val_ids = [_normalise_id(v) for v in val_df[id_col].values]
# Load spectra
val_spectra, loaded_ids = [], []
for sid in val_ids:
fpath = _find_spectrum_file(SPECTRA_DIR, sid)
if fpath is None:
print(f" WARNING: no spectrum for id={sid}, skipping")
continue
try:
val_spectra.append(load_spectrum(fpath))
loaded_ids.append(sid)
except Exception as e:
print(f" WARNING: failed to load {sid}: {e}")
if val_spectra:
val_spectra = np.vstack(val_spectra)
print(f" {len(loaded_ids)} validation spectra loaded")
# Build features
X_val_hc = build_handcrafted_matrix(val_spectra)
if backbone is not None:
X_val_cnn = extract_cnn_features(backbone, val_spectra, train_mean, train_std)
X_val = np.hstack([X_val_cnn, X_val_hc])
else:
X_val = X_val_hc
# Predict
pred_val = make_predictions(bundles, X_val, loaded_ids)
# Merge with known labels
true_df = val_df.set_index(val_df[id_col].apply(_normalise_id))
rows = []
for sid in loaded_ids:
row = {"Id": sid}
for t in TARGETS:
row[f"{t}_true"] = true_df.loc[sid, t] if sid in true_df.index else np.nan
row[f"{t}_pred"] = float(pred_val.loc[pred_val["sample_id"]==sid, t].values[0])
rows.append(row)
result_df = pd.DataFrame(rows)
# Print metrics
print(f"\n{'Target':<10} {'R²':>8} {'RMSE':>10} {'MAE':>10}")
print("-" * 42)
for t in TARGETS:
y_true = result_df[f"{t}_true"].values
y_pred = result_df[f"{t}_pred"].values
mask = ~np.isnan(y_true)
if mask.sum() < 2:
print(f"{t:<10} {'n/a':>8}")
continue
r2 = r2_score(y_true[mask], y_pred[mask])
rmse = np.sqrt(mean_squared_error(y_true[mask], y_pred[mask]))
mae = np.mean(np.abs(y_true[mask] - y_pred[mask]))
print(f"{t:<10} {r2:>8.3f} {rmse:>10.3f} {mae:>10.3f}")
# Save results
val_out = OUTPUT_CSV.replace(".csv", "_validation.csv")
result_df.to_csv(val_out, index=False)
print(f"\nValidation results saved → {val_out}")
display(result_df)
# Plot predicted vs true
fig, axes = plt.subplots(1, len(TARGETS), figsize=(5*len(TARGETS), 5))
if len(TARGETS) == 1: axes = [axes]
for ax, t in zip(axes, TARGETS):
y_true = result_df[f"{t}_true"].values
y_pred = result_df[f"{t}_pred"].values
mask = ~np.isnan(y_true)
ax.scatter(y_true[mask], y_pred[mask], s=80, edgecolors="k", lw=0.5, zorder=3)
for i, sid in enumerate(result_df["Id"].values):
ax.annotate(sid, (y_true[i], y_pred[i]), fontsize=7,
xytext=(4, 4), textcoords="offset points")
lims = [min(y_true[mask].min(), y_pred[mask].min()) * 0.95,
max(y_true[mask].max(), y_pred[mask].max()) * 1.05]
ax.plot(lims, lims, "r--", lw=1.5, label="1:1 line")
r2 = r2_score(y_true[mask], y_pred[mask])
rmse = np.sqrt(mean_squared_error(y_true[mask], y_pred[mask]))
ax.set_title(f"{t}\nR²={r2:.3f} RMSE={rmse:.3f}", fontsize=11)
ax.set_xlabel("Measured", fontsize=10)
ax.set_ylabel("Predicted", fontsize=10)
ax.legend(fontsize=8)
plt.suptitle("Validation Set — Predicted vs Measured", fontsize=13, y=1.02)
plt.tight_layout()
val_plot = OUTPUT_CSV.replace(".csv", "_validation_scatter.png")
plt.savefig(val_plot, dpi=150, bbox_inches="tight")
plt.show()
print(f"Plot saved → {val_plot}")
else:
print("VALIDA_CSV not set or file not found — skipping validation")
"""## 📊 Cell 12 — Visualise results"""
import matplotlib.pyplot as plt
# ── Plot 1: Predicted vs actual on training set (cross-validation) ─────────
if RUN_CV:
fig, axes = plt.subplots(1, 3, figsize=(15, 9))
axes = axes.flatten()
# Re-run a single fold for illustration
from sklearn.model_selection import KFold
kf = KFold(n_splits=CV_FOLDS, shuffle=True, random_state=42)
all_preds = {t: [] for t in TARGETS}
all_true = {t: [] for t in TARGETS}
for ti, vi in kf.split(X):
for target in TARGETS:
y_tr = labels[target].values[ti]
y_val = labels[target].values[vi]
bundle = fit_ensemble(X[ti], y_tr, X[vi], y_val)
preds = ensemble_predict(bundle, X[vi], target)
all_preds[target].extend(preds)
all_true[target].extend(y_val)
for ax, target in zip(axes, TARGETS):
y_true = np.array(all_true[target])
y_pred = np.array(all_preds[target])
r2 = r2_score(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
ax.scatter(y_true, y_pred, alpha=0.6, edgecolors='k', linewidths=0.3, s=40)
lims = [min(y_true.min(), y_pred.min()), max(y_true.max(), y_pred.max())]
ax.plot(lims, lims, 'r--', lw=1.5, label='1:1 line')
ax.set_xlabel(f"Measured {target}", fontsize=10)
ax.set_ylabel(f"Predicted {target}", fontsize=10)
ax.set_title(f"{target}\nR²={r2:.3f} RMSE={rmse:.4f}", fontsize=11)
ax.legend(fontsize=8)
plt.suptitle("Predicted vs Measured — Cross-validation", fontsize=14, y=1.01)
plt.tight_layout()
plt.savefig("/content/drive/MyDrive/mugello/cv_scatter.png", dpi=150, bbox_inches='tight')
plt.show()
print("Plot saved → cv_scatter.png")
# ── Plot 2: Mean spectrum of labelled samples ──────────────────────────────
wavelengths = np.arange(350, 350 + spectra.shape[1])
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(wavelengths, spectra.mean(0), lw=1.5, label='Mean spectrum')
ax.fill_between(wavelengths,
spectra.mean(0) - spectra.std(0),
spectra.mean(0) + spectra.std(0),
alpha=0.2, label='±1 std')
ax.set_xlabel("Wavelength (nm)"); ax.set_ylabel("Reflectance")
ax.set_title(f"Mean reflectance spectrum ({len(ids)} labelled samples)")
ax.legend()
plt.tight_layout()
plt.savefig("/content/drive/MyDrive/mugello/mean_spectrum.png", dpi=150, bbox_inches='tight')
plt.show()
print("Plot saved → mean_spectrum.png")
if PREDICT_DIR:
print(f"[6] Predicting spectra in: {PREDICT_DIR}")
new_spectra, new_ids = load_directory(PREDICT_DIR)
print(f" {len(new_ids)} new spectra loaded")
X_new_hc = build_handcrafted_matrix(new_spectra)
if backbone is not None:
X_new_cnn = extract_cnn_features(backbone, new_spectra, train_mean, train_std)
X_new = np.hstack([X_new_cnn, X_new_hc])
else:
X_new = X_new_hc
pred_df = make_predictions(bundles, X_new, new_ids)
pred_df.to_csv(OUTPUT_CSV, index=False)
print(f"\n Predictions saved → {OUTPUT_CSV}")
display(pred_df)
else:
print("PREDICT_DIR is None – running sanity check on training set …")
pred_df = make_predictions(bundles, X, ids)
pred_df.to_csv(OUTPUT_CSV, index=False)
print(f" Saved → {OUTPUT_CSV}")
display(pred_df)