i dati vengono distribuiti in formato .npz in due folder train (1732 patches) and test (1154 patches) con una GSD di 2 metri.
Ogni npz corrisponde ci sono due array data e mask: in data c'e' una immagine di dimensioni variabili e 150 bande spettrali (le lunghezze d'onda sono nel file wavelength.csv e partono da 462 nm fino a 938 nm ad intervalli di circa 3.2 nm)
una prima ripulitura avviene usando l'array mask ed estraendo solo gli spettri True
a questo punto entra in gioco il file train_gt.csv in cui per ogni file npz sono riportati i valori di P,K,Mg e pH
visto che c'e' un valore di verita' a terra per ogni patch e' stato calcolato lo spettro medio di ogni patch
se si plottano tutti gli spettri medi di ogni patch si osserva che alcuni sono relativi a zone vegetate e questo e' un problema per la correlazione con i parametri del suolo. Per questo motivo su ogni spettro e' stato calcolato l'indice NDVI e scartato gli spettri con indice maggiore di 0.3
al termine risultano 622 spettri che sono attribuibili a suolo nudo su cui poter iniziare le analisi statistiche
Ho provato con una Random Forest ma con risultati pessimi....ho quindi deciso di imparare da quelli bravi ovvero i vincitori della gara che hanno pubblicato l'algoritmo nel lavoro
il dataset e' stato diviso in 572 spettri di training e 50 spettri di validazione
per lanciare il training si usano sue fasi, la prima relativa strettamente alla rete neurale CNN backbone basata sugli spettri raw, la seconda dopo la trasformazione degli spettri con applicazione di Random Forest, XGBoost e KNN
"""
HyperKon Backbone
=================
Based on: "HyperKon: A Self-Supervised Contrastive Network for Hyperspectral Image Analysis"
https://www.mdpi.com/2072-4292/16/18/3399
Architecture
------------
ResNeXt-style multibranch CNN with Squeeze-and-Excitation Blocks (SEB) for
adaptive spectral-channel recalibration (paper Section 2.2, eq. 1-3).
The backbone operates on 1-D spectral vectors (your reflectance spectra).
Since your data has no spatial dimensions (one spectrum per sample),
we use a 1-D convolutional equivalent of the paper's 2-D architecture.
Contrastive pretraining (NT-Xent, paper Section 2.3, eq. 4) is supported
when you have unlabelled spectra to pretrain on.
Usage
-----
# 1. Pretrain on ALL available spectra (labelled + unlabelled, no labels needed)
python hyperkon.py --spectra_dir spectra/ --epochs 200 --output hyperkon_weights.pt
# 2. Fine-tune on labelled data and extract features for the ML ensemble
python hyperkon.py --spectra_dir spectra/ --labels label.csv \
--weights hyperkon_weights.pt --finetune \
--output_features features.npy --output_ids ids.txt
# The feature file can then be passed to hypersoilnet.py via --precomputed_features
"""
import argparse
import os
from pathlib import Path
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
# ---------------------------------------------------------------------------
# Reproducibility
# ---------------------------------------------------------------------------
torch.manual_seed(42)
np.random.seed(42)
TARGETS = ["P", "K", "Mg", "pH"]
# ===========================================================================
# 1. SQUEEZE-AND-EXCITATION BLOCK (paper eq. 1-2, Section 2.2)
#
# For a 1-D feature map X of shape (B, C, L):
# s_c = mean over L of x_{c} (global avg pool)
# e_c = sigmoid(beta * s_c + gamma) (learnable per-channel gate)
# x'_{c} = e_c * x_{c} (recalibration)
# ===========================================================================
class SqueezeExcitation1d(nn.Module):
"""Channel-wise attention / recalibration for 1-D spectral feature maps."""
def __init__(self, channels: int, reduction: int = 16):
super().__init__()
mid = max(1, channels // reduction)
# beta and gamma in the paper implemented as a small MLP
self.fc = nn.Sequential(
nn.Linear(channels, mid, bias=False),
nn.ReLU(inplace=True),
nn.Linear(mid, channels, bias=False),
nn.Sigmoid(),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (B, C, L)
s = x.mean(dim=2) # global average pool → (B, C)
e = self.fc(s) # channel gates → (B, C)
return x * e.unsqueeze(2) # recalibrated map → (B, C, L)
# ===========================================================================
# 2. RESNEXT BOTTLENECK BLOCK (paper Section 2.2 – multibranch cardinality)
#
# Cardinality = 32, bottleneck width = 4 (from HyperSoilNet description
# of HyperKon: "cardinality of 32 and bottleneck width of 4, ~5.54 M params")
# ===========================================================================
class ResNeXtBlock1d(nn.Module):
"""
1-D ResNeXt bottleneck with Squeeze-and-Excitation attention.
Groups = cardinality.
in_channels → bottleneck → out_channels
"""
def __init__(self, in_channels: int, out_channels: int,
cardinality: int = 32, bottleneck_width: int = 4,
stride: int = 1):
super().__init__()
width = max(cardinality, int(out_channels * (bottleneck_width / 64.0)) * cardinality)
self.conv1 = nn.Conv1d(in_channels, width, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm1d(width)
self.conv2 = nn.Conv1d(width, width, kernel_size=3, stride=stride,
padding=1, groups=cardinality, bias=False)
self.bn2 = nn.BatchNorm1d(width)
self.conv3 = nn.Conv1d(width, out_channels, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm1d(out_channels)
self.se = SqueezeExcitation1d(out_channels)
self.relu = nn.ReLU(inplace=True)
# Skip connection projection when in != out
if in_channels != out_channels or stride != 1:
self.downsample = nn.Sequential(
nn.Conv1d(in_channels, out_channels,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm1d(out_channels),
)
else:
self.downsample = None
def forward(self, x: torch.Tensor) -> torch.Tensor:
identity = x
out = self.relu(self.bn1(self.conv1(x)))
out = self.relu(self.bn2(self.conv2(out)))
out = self.bn3(self.conv3(out))
out = self.se(out)
if self.downsample is not None:
identity = self.downsample(x)
return self.relu(out + identity)
# ===========================================================================
# 3. SPECTRAL ATTENTION MODULE (HyperSoilNet Section 3.3, eq. 1-3)
#
# Applied after the initial conv layer; highlights most informative
# wavelengths via a two-layer MLP on the global-average-pooled features.
# ===========================================================================
class SpectralAttention(nn.Module):
def __init__(self, channels: int, reduction: int = 16):
super().__init__()
mid = max(1, channels // reduction)
self.mlp = nn.Sequential(
nn.Linear(channels, mid),
nn.ReLU(inplace=True),
nn.Linear(mid, channels),
nn.Sigmoid(),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (B, C, L)
z = x.mean(dim=2) # GAP → (B, C)
s = self.mlp(z) # attention weights → (B, C)
return x * s.unsqueeze(2) # (B, C, L)
# ===========================================================================
# 4. GLOBAL CONTEXT MODULE (HyperSoilNet Section 3.3)
#
# Combines global average + max pooling, then reduces dimension.
# Produces the 128-d embedding used by the ML ensemble.
# ===========================================================================
class GlobalContextModule(nn.Module):
def __init__(self, in_channels: int, embed_dim: int = 128):
super().__init__()
self.fc = nn.Sequential(
nn.Linear(in_channels * 2, embed_dim),
nn.ReLU(inplace=True),
nn.LayerNorm(embed_dim),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (B, C, L)
avg = x.mean(dim=2) # (B, C)
mx = x.max(dim=2).values # (B, C)
z = torch.cat([avg, mx], dim=1) # (B, 2C)
return self.fc(z) # (B, embed_dim)
# ===========================================================================
# 5. PROJECTION HEAD (paper Section 2.2, eq. 3 – used during SSL only)
# ===========================================================================
class ProjectionHead(nn.Module):
def __init__(self, embed_dim: int = 128, proj_dim: int = 64):
super().__init__()
self.net = nn.Sequential(
nn.Linear(embed_dim, embed_dim),
nn.ReLU(inplace=True),
nn.Linear(embed_dim, proj_dim),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return F.normalize(self.net(x), dim=1)
# ===========================================================================
# 6. HYPERKON BACKBONE (full model)
# ===========================================================================
class HyperKonBackbone(nn.Module):
"""
HyperKon 1-D backbone for spectral data.
Input : (B, 1, n_bands) – unsqueeze the spectrum along channel dim
Output: (B, embed_dim) – 128-d embedding for the ML ensemble
"""
# ResNeXt stage block counts from HyperSoilNet paper: [3, 4, 6, 3]
STAGE_BLOCKS = [3, 4, 6, 3]
def __init__(self, n_bands: int, embed_dim: int = 128,
cardinality: int = 32, bottleneck_width: int = 4):
super().__init__()
# ---- Initial spectral embedding ----
self.stem = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3, bias=False),
nn.BatchNorm1d(64),
nn.ReLU(inplace=True),
nn.MaxPool1d(kernel_size=3, stride=2, padding=1),
)
# ---- Spectral attention (after stem) ----
self.spectral_attn = SpectralAttention(64)
# ---- Four ResNeXt stages ----
# Each block doubles channels (expansion=2), applied n_blocks times.
# Stage input channels must match the output of the previous stage.
# stem output : 64
# after stage1: 64 * 2^3 = 512 (3 blocks)
# after stage2: 512 * 2^4 = 8192 — too large for small spectra
#
# To keep the network tractable on 150-band 1-D signals we cap
# channel growth with a projection conv between stages instead of
# letting expansion run unchecked. Stage channel plan:
# stem → 64 → stage1(64→128) → stage2(128→256)
# → stage3(256→512) → stage4(512→512)
# We achieve this by using expansion=1 inside blocks and handling
# the channel increase via the first block's downsample projection.
self.stage1 = self._make_stage(64, 128, cardinality, bottleneck_width,
self.STAGE_BLOCKS[0])
self.stage2 = self._make_stage(128, 256, cardinality, bottleneck_width,
self.STAGE_BLOCKS[1])
self.stage3 = self._make_stage(256, 512, cardinality, bottleneck_width,
self.STAGE_BLOCKS[2])
self.stage4 = self._make_stage(512, 512, cardinality, bottleneck_width,
self.STAGE_BLOCKS[3])
# ---- Global context → 128-d embedding ----
# After stage4: 512 channels
self.global_ctx = GlobalContextModule(512, embed_dim)
self._init_weights()
@staticmethod
def _make_stage(in_channels: int, out_channels: int,
cardinality: int, bottleneck_width: int,
n_blocks: int) -> nn.Sequential:
layers = []
for i in range(n_blocks):
# Only the first block changes channel count
block_in = in_channels if i == 0 else out_channels
layers.append(ResNeXtBlock1d(block_in, out_channels,
cardinality, bottleneck_width))
return nn.Sequential(*layers)
def _init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight, mode="fan_out",
nonlinearity="relu")
elif isinstance(m, nn.BatchNorm1d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.xavier_normal_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (B, n_bands) → add channel dim → (B, 1, n_bands)
if x.dim() == 2:
x = x.unsqueeze(1)
x = self.stem(x)
x = self.spectral_attn(x)
x = self.stage1(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.stage4(x)
return self.global_ctx(x) # (B, 128)
# ===========================================================================
# 7. FULL HYPERKON MODEL (backbone + projection head for SSL)
# ===========================================================================
class HyperKon(nn.Module):
def __init__(self, n_bands: int, embed_dim: int = 128, proj_dim: int = 64):
super().__init__()
self.backbone = HyperKonBackbone(n_bands, embed_dim)
self.projector = ProjectionHead(embed_dim, proj_dim)
def forward(self, x: torch.Tensor):
z = self.backbone(x)
return self.projector(z)
def encode(self, x: torch.Tensor) -> torch.Tensor:
"""Return backbone embeddings (no projection head) – for fine-tuning."""
return self.backbone(x)
# ===========================================================================
# 8. REGRESSION HEAD (fine-tuning on labelled data)
# ===========================================================================
class HyperKonRegressor(nn.Module):
"""Backbone + multi-task regression head for 4 soil properties."""
def __init__(self, n_bands: int, n_targets: int = 4,
embed_dim: int = 128):
super().__init__()
self.backbone = HyperKonBackbone(n_bands, embed_dim)
self.head = nn.Sequential(
nn.Linear(embed_dim, 64),
nn.ReLU(inplace=True),
nn.Dropout(0.2),
nn.Linear(64, n_targets),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.head(self.backbone(x))
def encode(self, x: torch.Tensor) -> torch.Tensor:
return self.backbone(x)
# ===========================================================================
# 9. NT-XENT CONTRASTIVE LOSS (paper Section 2.3, eq. 4)
# ===========================================================================
class NTXentLoss(nn.Module):
"""
Normalized Temperature-Scaled Cross-Entropy loss (SimCLR).
Operates on a batch of paired embeddings (z_i, z_j).
"""
def __init__(self, temperature: float = 0.5):
super().__init__()
self.tau = temperature
def forward(self, z_i: torch.Tensor, z_j: torch.Tensor) -> torch.Tensor:
N = z_i.size(0)
z = torch.cat([z_i, z_j], dim=0) # (2N, D)
sim = F.cosine_similarity(z.unsqueeze(1),
z.unsqueeze(0), dim=2) / self.tau # (2N, 2N)
# Mask out self-similarity
mask = torch.eye(2 * N, dtype=torch.bool, device=z.device)
sim.masked_fill_(mask, -1e9)
# Positive pairs: (i, i+N) and (i+N, i)
labels = torch.cat([torch.arange(N, 2 * N),
torch.arange(0, N)]).to(z.device)
return F.cross_entropy(sim, labels)
# ===========================================================================
# 10. DATA AUGMENTATION FOR CONTRASTIVE LEARNING (paper Section 2.3.2)
#
# Paper augmentations: random spectral scaling, spatial cropping,
# random channel permutations. For 1-D spectra we use:
# • Gaussian noise injection (spectral scaling equivalent)
# • Random band dropout
# • Random spectral shift (rolling)
# ===========================================================================
def augment_spectrum(x: torch.Tensor) -> torch.Tensor:
"""Apply random spectral augmentation to a batch (B, n_bands)."""
# Gaussian noise
if torch.rand(1).item() > 0.5:
x = x + 0.01 * torch.randn_like(x)
# Random band dropout (zero out up to 10% of bands)
if torch.rand(1).item() > 0.5:
n = x.size(1)
n_drop = max(1, int(0.1 * n))
idx = torch.randint(0, n, (n_drop,))
x = x.clone()
x[:, idx] = 0.0
# Random spectral shift (circular roll by ±5 bands)
if torch.rand(1).item() > 0.5:
shift = torch.randint(-5, 6, (1,)).item()
x = torch.roll(x, shift, dims=1)
# Random amplitude scaling
if torch.rand(1).item() > 0.5:
scale = 0.9 + 0.2 * torch.rand(x.size(0), 1, device=x.device)
x = x * scale
return x
# ===========================================================================
# 11. DATASETS
# ===========================================================================
class SpectralDataset(Dataset):
"""Unlabelled spectra for contrastive pretraining."""
def __init__(self, spectra: np.ndarray):
self.X = torch.tensor(spectra, dtype=torch.float32)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx]
class LabelledSpectralDataset(Dataset):
"""Labelled spectra for fine-tuning."""
def __init__(self, spectra: np.ndarray, labels: np.ndarray):
self.X = torch.tensor(spectra, dtype=torch.float32)
self.y = torch.tensor(labels, dtype=torch.float32)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# ===========================================================================
# 12. DATA LOADING HELPERS (same convention as hypersoilnet.py)
# ===========================================================================
def load_spectrum(filepath: str) -> np.ndarray:
"""
Load a spectrum CSV with no header.
Handles two layouts automatically:
- One value per row, single column (n_bands × 1)
- All values in one row (1 × n_bands)
"""
df = pd.read_csv(filepath, header=None)
if df.shape[1] > df.shape[0]:
# Wide: values across columns in a single row
return df.iloc[0, :].values.astype(float)
else:
# Long: one value per row
return df.iloc[:, 0].values.astype(float)
def load_all_spectra(spectra_dir: str):
paths = sorted(Path(spectra_dir).glob("*.csv"))
if not paths:
raise FileNotFoundError(f"No .csv files found in '{spectra_dir}'")
spectra, ids = [], []
for p in paths:
spectra.append(load_spectrum(str(p)))
ids.append(p.stem)
return np.vstack(spectra), ids
def load_labelled(labels_csv: str, spectra_dir: str):
labels_df = pd.read_csv(labels_csv)
labels_df.columns = labels_df.columns.str.strip()
spectra_list, label_rows, ids = [], [], []
for _, row in labels_df.iterrows():
sid = int(row["sample_id"])
fpath = os.path.join(spectra_dir, f"{sid}.csv")
if not os.path.exists(fpath):
continue
spectra_list.append(load_spectrum(fpath))
label_rows.append(row)
ids.append(sid)
spectra = np.vstack(spectra_list)
labels = pd.DataFrame(label_rows)[TARGETS].reset_index(drop=True).values
return spectra, labels, ids
# ===========================================================================
# 13. NORMALISATION
# ===========================================================================
def normalise(spectra: np.ndarray, mean=None, std=None):
if mean is None:
mean = spectra.mean(axis=0)
std = spectra.std(axis=0) + 1e-8
return (spectra - mean) / std, mean, std
# ===========================================================================
# 14. CONTRASTIVE PRETRAINING (paper Section 2.3, NT-Xent, 1000 epochs)
# ===========================================================================
def pretrain(model: HyperKon, spectra: np.ndarray, device: torch.device,
epochs: int = 200, batch_size: int = 32, lr: float = 1e-4,
temperature: float = 0.5):
print(f"\n Contrastive pretraining for {epochs} epochs …")
spectra_norm, _, _ = normalise(spectra)
dataset = SpectralDataset(spectra_norm)
loader = DataLoader(dataset, batch_size=batch_size,
shuffle=True, drop_last=True, num_workers=0)
criterion = NTXentLoss(temperature=temperature)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
step_size=epochs // 5,
gamma=0.5)
model.train()
for epoch in range(1, epochs + 1):
total_loss = 0.0
for x in loader:
x = x.to(device)
x_i = augment_spectrum(x)
x_j = augment_spectrum(x)
z_i = model(x_i)
z_j = model(x_j)
loss = criterion(z_i, z_j)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
scheduler.step()
if epoch % max(1, epochs // 10) == 0:
avg = total_loss / len(loader)
print(f" Epoch {epoch:4d}/{epochs} loss={avg:.4f} "
f"lr={scheduler.get_last_lr()[0]:.2e}")
print(" Pretraining done.\n")
# ===========================================================================
# 15. FINE-TUNING (paper Section 3.6 – 100 epochs, AdamW, cosine annealing)
# ===========================================================================
def finetune(model: HyperKonRegressor, spectra: np.ndarray,
labels: np.ndarray, device: torch.device,
epochs: int = 100, batch_size: int = 24, lr: float = 1e-4,
weight_decay: float = 1e-4):
"""Fine-tune backbone + regression head on labelled data."""
print(f"\n Fine-tuning for {epochs} epochs …")
spectra_norm, mean, std = normalise(spectra)
n = len(spectra_norm)
val_n = max(1, int(0.2 * n))
idx = np.random.permutation(n)
val_idx, tr_idx = idx[:val_n], idx[val_n:]
train_ds = LabelledSpectralDataset(spectra_norm[tr_idx], labels[tr_idx])
val_ds = LabelledSpectralDataset(spectra_norm[val_idx], labels[val_idx])
train_dl = DataLoader(train_ds, batch_size=batch_size,
shuffle=True, num_workers=0)
val_dl = DataLoader(val_ds, batch_size=batch_size,
shuffle=False, num_workers=0)
# Property-specific loss weights (paper eq. 9): K, P, Mg, pH
loss_weights = torch.tensor([1.0, 1.2, 1.0, 1.5],
dtype=torch.float32, device=device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr,
weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=epochs, eta_min=1e-6)
best_val, best_state = float("inf"), None
model.train()
for epoch in range(1, epochs + 1):
# ---- Train ----
model.train()
tr_loss = 0.0
for x, y in train_dl:
x, y = x.to(device), y.to(device)
pred = model(x)
# Weighted MSE per property (eq. 9)
mse = ((pred - y) ** 2).mean(dim=0)
loss = (mse * loss_weights).sum()
optimizer.zero_grad()
loss.backward()
optimizer.step()
tr_loss += loss.item()
# ---- Validate ----
model.eval()
val_loss = 0.0
with torch.no_grad():
for x, y in val_dl:
x, y = x.to(device), y.to(device)
pred = model(x)
mse = ((pred - y) ** 2).mean(dim=0)
val_loss += (mse * loss_weights).sum().item()
scheduler.step()
avg_val = val_loss / len(val_dl)
if avg_val < best_val:
best_val = avg_val
best_state = {k: v.cpu().clone()
for k, v in model.state_dict().items()}
if epoch % max(1, epochs // 10) == 0:
print(f" Epoch {epoch:4d}/{epochs} "
f"train={tr_loss/len(train_dl):.4f} "
f"val={avg_val:.4f}")
model.load_state_dict(best_state)
print(" Fine-tuning done (best model restored).\n")
return mean, std # needed to normalise new samples
# ===========================================================================
# 16. FEATURE EXTRACTION
# ===========================================================================
@torch.no_grad()
def extract_embeddings(backbone: HyperKonBackbone,
spectra_norm: np.ndarray,
device: torch.device,
batch_size: int = 64) -> np.ndarray:
backbone.eval()
ds = SpectralDataset(spectra_norm)
loader = DataLoader(ds, batch_size=batch_size,
shuffle=False, num_workers=0)
parts = []
for x in loader:
parts.append(backbone(x.to(device)).cpu().numpy())
return np.vstack(parts)
# ===========================================================================
# 17. MAIN
# ===========================================================================
def parse_args():
p = argparse.ArgumentParser(
description="HyperKon – self-supervised hyperspectral backbone")
p.add_argument("--spectra_dir", required=True,
help="Directory with all <id>.csv spectra "
"(labelled + unlabelled for pretraining)")
p.add_argument("--labels", default=None,
help="label.csv for fine-tuning / feature extraction "
"(optional – required only with --finetune)")
p.add_argument("--weights", default=None,
help="Path to existing .pt weights to load "
"(skips pretraining)")
p.add_argument("--output", default="hyperkon_weights.pt",
help="Where to save pretrained weights "
"(default: hyperkon_weights.pt)")
p.add_argument("--output_features", default="hyperkon_features.npy",
help="Where to save extracted embeddings "
"(default: hyperkon_features.npy)")
p.add_argument("--output_ids", default="hyperkon_ids.txt",
help="Sample IDs matching rows of output_features")
p.add_argument("--epochs", type=int, default=200,
help="Pretraining epochs (paper uses 1000; 200 is faster)")
p.add_argument("--finetune_epochs", type=int, default=100)
p.add_argument("--batch_size", type=int, default=32)
p.add_argument("--lr", type=float, default=1e-4)
p.add_argument("--temperature", type=float, default=0.5,
help="NT-Xent temperature τ")
p.add_argument("--embed_dim", type=int, default=128,
help="Embedding dimension (default: 128 as in paper)")
p.add_argument("--finetune", action="store_true",
help="Fine-tune on labelled data and extract features")
p.add_argument("--no_pretrain", action="store_true",
help="Skip SSL pretraining (use random init or --weights)")
return p.parse_args()
def main():
args = parse_args()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n[HyperKon] device = {device}")
# ---- Load ALL spectra (for pretraining) --------------------------------
print("\n[1] Loading spectra …")
spectra_all, ids_all = load_all_spectra(args.spectra_dir)
n_bands = spectra_all.shape[1]
print(f" {len(ids_all)} spectra found, {n_bands} bands each")
# ---- Build model -------------------------------------------------------
ssl_model = HyperKon(n_bands, embed_dim=args.embed_dim).to(device)
print(f" Params: {sum(p.numel() for p in ssl_model.parameters()):,}")
# ---- Optionally load existing weights ----------------------------------
if args.weights and os.path.exists(args.weights):
print(f"\n[2] Loading weights from {args.weights}")
ssl_model.load_state_dict(torch.load(args.weights, map_location=device))
elif not args.no_pretrain:
# ---- Contrastive pretraining ---------------------------------------
print("\n[2] Self-supervised contrastive pretraining …")
pretrain(ssl_model, spectra_all, device,
epochs=args.epochs,
batch_size=args.batch_size,
lr=args.lr,
temperature=args.temperature)
torch.save(ssl_model.state_dict(), args.output)
print(f" Weights saved → {args.output}")
else:
print("\n[2] Skipping pretraining (--no_pretrain)")
# ---- Fine-tune on labelled data ----------------------------------------
if args.finetune:
if not args.labels:
raise ValueError("--labels is required when --finetune is set")
print("\n[3] Loading labelled data for fine-tuning …")
spectra_lab, labels_arr, ids_lab = load_labelled(
args.labels, args.spectra_dir)
# Build regressor and copy pretrained backbone weights
reg_model = HyperKonRegressor(n_bands, n_targets=4,
embed_dim=args.embed_dim).to(device)
reg_model.backbone.load_state_dict(ssl_model.backbone.state_dict())
mean, std = finetune(reg_model, spectra_lab, labels_arr, device,
epochs=args.finetune_epochs,
batch_size=args.batch_size,
lr=args.lr)
# Save fine-tuned weights
ft_path = args.output.replace(".pt", "_finetuned.pt")
torch.save({
"backbone": reg_model.backbone.state_dict(),
"head": reg_model.head.state_dict(),
"mean": mean,
"std": std,
}, ft_path)
print(f" Fine-tuned weights saved → {ft_path}")
# Extract embeddings for ML ensemble
print("\n[4] Extracting 128-d embeddings from fine-tuned backbone …")
spectra_norm = (spectra_lab - mean) / (std + 1e-8)
features = extract_embeddings(reg_model.backbone,
spectra_norm, device)
else:
# Extract from pretrained backbone only
print("\n[3] Extracting embeddings from pretrained backbone …")
spectra_norm, _, _ = normalise(spectra_all)
features = extract_embeddings(ssl_model.backbone, spectra_norm, device)
ids_lab = ids_all
# ---- Save features -----------------------------------------------------
np.save(args.output_features, features)
with open(args.output_ids, "w") as f:
f.write("\n".join(str(i) for i in ids_lab))
print(f"\n Features saved → {args.output_features} "
f"shape={features.shape}")
print(f" IDs saved → {args.output_ids}")
print("\nDone.\n")
if __name__ == "__main__":
main()
"""
HyperSoilNet Pipeline
=====================
Based on: "A Hybrid Framework for Soil Property Estimation from Hyperspectral Imaging"
https://www.mdpi.com/2072-4292/17/15/2568
Implements:
- Spectral feature engineering (derivatives, DWT, SVD, FFT)
- ML ensemble: Random Forest + XGBoost + KNN
- Property-specific weighted ensemble (Bayesian-optimised weights)
- 5-fold cross-validation with per-property R², RMSE, custom challenge score
Input layout
------------
labels.csv : sample_id, P, K, Mg, pH (header on first row)
<spectra_dir>/1.csv : one reflectance value per row, one column (no header)
<spectra_dir>/6.csv : same for sample 6, etc.
Usage
-----
# Train + cross-validate on labelled data
python hypersoilnet.py --labels label.csv --spectra_dir spectra/
# Train on all labelled data, then predict an unlabelled set
python hypersoilnet.py --labels label.csv --spectra_dir spectra/ \
--predict_dir unlabelled/ --output predictions.csv
"""
import argparse
import os
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.fft import fft
from scipy.signal import savgol_filter
try:
import pywt
HAS_PYWT = True
except ImportError:
HAS_PYWT = False
warnings.warn("PyWavelets (pywt) not found – wavelet features will be skipped. "
"Install with: pip install PyWavelets")
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, r2_score
try:
from xgboost import XGBRegressor
HAS_XGB = True
except ImportError:
HAS_XGB = False
warnings.warn("XGBoost not found – ensemble will use RF + KNN only. "
"Install with: pip install xgboost")
try:
from scipy.optimize import minimize
HAS_SCIPY_OPT = True
except ImportError:
HAS_SCIPY_OPT = False
# ---------------------------------------------------------------------------
# Target column names (must match label CSV header)
# ---------------------------------------------------------------------------
TARGETS = ["P", "K", "Mg", "pH"]
# ---------------------------------------------------------------------------
# Property-specific ensemble weights from the paper (Table in Section 3.5)
# Order: [RF, XGB, KNN]
# ---------------------------------------------------------------------------
DEFAULT_WEIGHTS = {
"K": [0.45, 0.40, 0.15],
"P": [0.38, 0.45, 0.17], # paper calls it P2O5; mapped to "P" here
"Mg": [0.42, 0.38, 0.20],
"pH": [0.35, 0.50, 0.15],
}
# ===========================================================================
# 1. DATA LOADING
# ===========================================================================
def load_spectrum(filepath: str) -> np.ndarray:
"""Load a single-column CSV (no header) of reflectance values."""
df = pd.read_csv(filepath, header=None)
return df.iloc[:, 0].values.astype(float)
def load_dataset(labels_csv: str, spectra_dir: str):
"""
Returns
-------
spectra : np.ndarray shape (n_samples, n_bands)
labels : pd.DataFrame columns = TARGETS
sample_ids: list of sample ids
"""
labels_df = pd.read_csv(labels_csv)
# Strip whitespace from column names
labels_df.columns = labels_df.columns.str.strip()
spectra_list, valid_ids, valid_rows = [], [], []
for _, row in labels_df.iterrows():
sid = int(row["sample_id"])
fpath = os.path.join(spectra_dir, f"{sid}.csv")
if not os.path.exists(fpath):
warnings.warn(f"Spectrum file not found, skipping: {fpath}")
continue
spectra_list.append(load_spectrum(fpath))
valid_ids.append(sid)
valid_rows.append(row)
spectra = np.vstack(spectra_list)
labels = pd.DataFrame(valid_rows)[TARGETS].reset_index(drop=True)
return spectra, labels, valid_ids
def load_unlabelled(spectra_dir: str):
"""Load all CSVs in a directory."""
paths = sorted(Path(spectra_dir).glob("*.csv"))
if not paths:
raise FileNotFoundError(
f"No .csv files found in '{spectra_dir}'.\n"
f"Make sure your unlabelled spectra files (e.g. 100.csv) are in that folder."
)
spectra_list, ids = [], []
for p in paths:
spectra_list.append(load_spectrum(str(p)))
ids.append(p.stem)
return np.vstack(spectra_list), ids
# ===========================================================================
# 2. FEATURE ENGINEERING (Section 3.4 of the paper)
# ===========================================================================
def spectral_derivatives(spectrum: np.ndarray, orders=(1, 2, 3)) -> np.ndarray:
"""1st, 2nd, 3rd order Savitzky-Golay derivatives."""
feats = []
n = len(spectrum)
for order in orders:
polyorder = order + 1
# window must be odd, > polyorder, and <= len(spectrum)
window = min(11, n)
if window % 2 == 0:
window -= 1
window = max(window, polyorder + 1)
if (polyorder + 1) % 2 == 0:
window = polyorder + 2 # next odd number above polyorder
if window > n:
# spectrum too short for this derivative order – return zeros
feats.append(np.zeros(n))
continue
d = savgol_filter(spectrum, window_length=window, polyorder=polyorder,
deriv=order)
feats.append(d)
return np.concatenate(feats)
def wavelet_features(spectrum: np.ndarray, wavelet="dmey", level=4) -> np.ndarray:
"""DWT approximation + detail coefficient statistics (paper eq. 5)."""
if not HAS_PYWT:
return np.array([])
# Clamp level to what the signal length allows
max_level = pywt.dwt_max_level(len(spectrum), wavelet)
level = min(level, max_level)
coeffs = pywt.wavedec(spectrum, wavelet, level=level)
feats = []
for c in coeffs:
feats.extend([c.mean(), c.std(), np.abs(c).max()])
return np.array(feats)
def svd_features(spectrum: np.ndarray, n_singular=5) -> np.ndarray:
"""
Treat spectrum as a 1-D signal; reshape into a 2-D matrix and compute SVD.
Top singular values + their ratios (paper eq. 6).
"""
n = len(spectrum)
# Build a Hankel-like matrix
n_rows = max(2, n // 10)
n_cols = n - n_rows + 1
mat = np.array([spectrum[i:i + n_cols] for i in range(n_rows)])
_, s, _ = np.linalg.svd(mat, full_matrices=False)
s = s[:n_singular] if len(s) >= n_singular else np.pad(s, (0, n_singular - len(s)))
ratios = s[:-1] / (s[1:] + 1e-10)
return np.concatenate([s, ratios])
def fft_features(spectrum: np.ndarray, n_components=20) -> np.ndarray:
"""Real + imaginary FFT components (paper eq. 7)."""
f = fft(spectrum)
f = f[:n_components]
return np.concatenate([f.real, f.imag])
def statistical_features(spectrum: np.ndarray) -> np.ndarray:
"""Basic statistics of the raw spectrum."""
return np.array([
spectrum.mean(),
spectrum.std(),
np.percentile(spectrum, 25),
np.percentile(spectrum, 75),
spectrum.max() - spectrum.min(),
np.argmax(spectrum),
np.argmin(spectrum),
])
def extract_features(spectrum: np.ndarray) -> np.ndarray:
"""Combine all feature groups into one vector."""
parts = [
spectrum, # raw reflectance
statistical_features(spectrum),
spectral_derivatives(spectrum),
wavelet_features(spectrum),
svd_features(spectrum),
fft_features(spectrum),
]
return np.concatenate([p for p in parts if len(p) > 0])
def build_feature_matrix(spectra: np.ndarray) -> np.ndarray:
rows = [extract_features(s) for s in spectra]
return np.array(rows)
# ===========================================================================
# 3. MODEL BUILDING (Section 3.5)
# ===========================================================================
def build_models():
models = {
"RF": RandomForestRegressor(
n_estimators=100,
max_depth=20,
min_samples_leaf=5,
criterion="squared_error",
bootstrap=True,
n_jobs=-1,
random_state=42,
),
"KNN": KNeighborsRegressor(
n_neighbors=7,
weights="distance",
metric="euclidean",
),
}
if HAS_XGB:
models["XGB"] = XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
reg_alpha=0.01,
reg_lambda=1.0,
early_stopping_rounds=15,
eval_metric="rmse",
random_state=42,
verbosity=0,
)
return models
# ===========================================================================
# 4. WEIGHTED ENSEMBLE PREDICTION (paper eq. 8)
# ===========================================================================
def get_weights(target: str, models: dict) -> list:
"""
Return per-model weights for a target property.
If XGB is unavailable, re-normalise between RF and KNN.
"""
paper_w = DEFAULT_WEIGHTS.get(target, [0.40, 0.40, 0.20])
rf_w, xgb_w, knn_w = paper_w
if not HAS_XGB:
total = rf_w + knn_w
return [rf_w / total, knn_w / total]
return [rf_w, xgb_w, knn_w]
def ensemble_predict(fitted_models: dict, X: np.ndarray, target: str) -> np.ndarray:
keys = list(fitted_models.keys())
weights = get_weights(target, fitted_models)
preds = np.stack([fitted_models[k].predict(X) for k in keys], axis=1)
w = np.array(weights[:len(keys)])
w = w / w.sum()
return preds @ w
# ===========================================================================
# 5. TRAINING HELPERS
# ===========================================================================
def fit_ensemble(X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray = None, y_val: np.ndarray = None,
target: str = "P") -> dict:
"""Fit all regressors for one target property."""
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_train)
X_val_s = scaler.transform(X_val) if X_val is not None else None
models = build_models()
fitted = {}
for name, mdl in models.items():
if name == "XGB" and X_val_s is not None:
mdl.fit(X_tr_s, y_train,
eval_set=[(X_val_s, y_val)],
verbose=False)
elif name == "XGB":
# No validation set → disable early stopping
mdl.set_params(early_stopping_rounds=None)
mdl.fit(X_tr_s, y_train)
else:
mdl.fit(X_tr_s, y_train)
fitted[name] = mdl
return {"scaler": scaler, "models": fitted}
# ===========================================================================
# 6. CROSS-VALIDATION (Section 3.6 — 5-fold CV)
# ===========================================================================
def cross_validate(X: np.ndarray, labels: pd.DataFrame, n_splits: int = 5):
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
results = {t: {"r2": [], "rmse": [], "mse": []} for t in TARGETS}
for fold, (tr_idx, val_idx) in enumerate(kf.split(X), 1):
X_tr, X_val = X[tr_idx], X[val_idx]
print(f"\n Fold {fold}/{n_splits}")
for target in TARGETS:
y_tr = labels[target].values[tr_idx]
y_val = labels[target].values[val_idx]
bundle = fit_ensemble(X_tr, y_tr, X_val, y_val, target)
scaler = bundle["scaler"]
X_tr_s = scaler.transform(X_tr)
X_val_s = scaler.transform(X_val)
preds = ensemble_predict(bundle["models"], X_val_s, target)
mse = mean_squared_error(y_val, preds)
r2 = r2_score(y_val, preds)
rmse = np.sqrt(mse)
results[target]["r2"].append(r2)
results[target]["rmse"].append(rmse)
results[target]["mse"].append(mse)
print(f" {target:4s} R²={r2:.3f} RMSE={rmse:.3f}")
print("\n" + "=" * 55)
print(f"{'Target':<6} {'R² mean':>9} {'R² std':>8} {'RMSE mean':>10}")
print("-" * 55)
baseline_mse = {}
for target in TARGETS:
r2_m = np.mean(results[target]["r2"])
r2_s = np.std(results[target]["r2"])
rmse_m = np.mean(results[target]["rmse"])
print(f"{target:<6} {r2_m:>9.3f} {r2_s:>8.3f} {rmse_m:>10.3f}")
baseline_mse[target] = np.var(labels[target].values) # naive baseline
# Custom challenge score (paper eq. 11) – mean ratio algo_MSE / baseline_MSE
mean_mses = {t: np.mean(results[t]["mse"]) for t in TARGETS}
score = np.mean([mean_mses[t] / baseline_mse[t] for t in TARGETS])
print(f"\n Custom challenge score (lower is better): {score:.4f}")
print("=" * 55)
return results
# ===========================================================================
# 7. FULL TRAINING + PREDICTION
# ===========================================================================
def train_full(X: np.ndarray, labels: pd.DataFrame):
"""Train on the entire labelled dataset."""
# Hold out 20 % for ensemble weight validation (paper Section 3.6)
n = len(X)
val_n = max(1, int(0.2 * n))
rng = np.random.default_rng(42)
val_idx = rng.choice(n, val_n, replace=False)
tr_idx = np.setdiff1d(np.arange(n), val_idx)
bundles = {}
for target in TARGETS:
y = labels[target].values
bundle = fit_ensemble(X[tr_idx], y[tr_idx],
X[val_idx], y[val_idx], target)
bundles[target] = bundle
print(f" Trained ensemble for {target}")
return bundles
def predict(bundles: dict, X_new: np.ndarray) -> pd.DataFrame:
preds = {}
for target in TARGETS:
scaler = bundles[target]["scaler"]
X_s = scaler.transform(X_new)
preds[target] = ensemble_predict(bundles[target]["models"], X_s, target)
return pd.DataFrame(preds)
# ===========================================================================
# 8. MAIN
# ===========================================================================
def parse_args():
p = argparse.ArgumentParser(description="HyperSoilNet – soil property estimation")
p.add_argument("--labels", required=True, help="Path to label CSV")
p.add_argument("--spectra_dir", required=True, help="Dir containing <sample_id>.csv spectra")
p.add_argument("--predict_dir", default=None, help="Dir with unlabelled spectra to predict")
p.add_argument("--output", default="predictions.csv",
help="Output CSV for predictions (default: predictions.csv)")
p.add_argument("--no_cv", action="store_true",
help="Skip cross-validation (faster)")
p.add_argument("--cv_folds", type=int, default=5)
# HyperKon integration
p.add_argument("--precomputed_features", default=None,
help="Path to .npy file of HyperKon CNN embeddings "
"(produced by hyperkon.py --finetune). "
"When provided, these 128-d vectors are PREPENDED "
"to the hand-crafted features.")
p.add_argument("--precomputed_ids", default=None,
help="Path to .txt file of sample IDs matching "
"--precomputed_features (produced by hyperkon.py)")
return p.parse_args()
def load_cnn_features(feat_path: str, ids_path: str) -> dict:
"""Load HyperKon embeddings and return a dict {sample_id: embedding}."""
feats = np.load(feat_path)
with open(ids_path) as f:
ids = [line.strip() for line in f if line.strip()]
if len(ids) != len(feats):
raise ValueError(f"Feature count ({len(feats)}) != ID count ({len(ids)})")
return {sid: feats[i] for i, sid in enumerate(ids)}
def prepend_cnn_features(X: np.ndarray, ids: list,
cnn_feat_map: dict) -> np.ndarray:
"""Prepend 128-d CNN embeddings to the hand-crafted feature matrix."""
rows = []
missing = 0
for i, sid in enumerate(ids):
key = str(sid)
if key in cnn_feat_map:
rows.append(np.concatenate([cnn_feat_map[key], X[i]]))
else:
# Pad with zeros if this sample has no CNN embedding
missing += 1
embed_dim = next(iter(cnn_feat_map.values())).shape[0]
rows.append(np.concatenate([np.zeros(embed_dim), X[i]]))
if missing:
warnings.warn(f"{missing} samples had no CNN embedding – padded with zeros.")
return np.array(rows)
def main():
args = parse_args()
# ---- Load & featurise labelled data ------------------------------------
print("\n[1/4] Loading labelled data …")
spectra, labels, ids = load_dataset(args.labels, args.spectra_dir)
print(f" {len(ids)} samples loaded, {spectra.shape[1]} bands each")
print("\n[2/4] Extracting hand-crafted features …")
X = build_feature_matrix(spectra)
print(f" Hand-crafted feature matrix: {X.shape}")
# ---- Optionally prepend HyperKon CNN embeddings ------------------------
cnn_feat_map = None
if args.precomputed_features:
if not args.precomputed_ids:
raise ValueError("--precomputed_ids is required when "
"--precomputed_features is set")
print(f"\n Loading HyperKon embeddings from: "
f"{args.precomputed_features}")
cnn_feat_map = load_cnn_features(args.precomputed_features,
args.precomputed_ids)
X = prepend_cnn_features(X, [str(i) for i in ids], cnn_feat_map)
print(f" Combined feature matrix (CNN + hand-crafted): {X.shape}")
# ---- Cross-validation --------------------------------------------------
if not args.no_cv:
print(f"\n[3/4] {args.cv_folds}-fold cross-validation …")
cross_validate(X, labels, n_splits=args.cv_folds)
else:
print("\n[3/4] Cross-validation skipped (--no_cv)")
# ---- Train on full dataset ---------------------------------------------
print("\n[4/4] Training on full dataset …")
bundles = train_full(X, labels)
# ---- Predict unlabelled data if requested ------------------------------
if args.predict_dir:
print(f"\n[+] Predicting unlabelled spectra in: {args.predict_dir}")
spectra_new, new_ids = load_unlabelled(args.predict_dir)
X_new = build_feature_matrix(spectra_new)
if cnn_feat_map is not None:
X_new = prepend_cnn_features(X_new, [str(i) for i in new_ids],
cnn_feat_map)
pred_df = predict(bundles, X_new)
pred_df.insert(0, "sample_id", new_ids)
pred_df.to_csv(args.output, index=False)
print(f" Predictions saved to: {args.output}")
print(pred_df.to_string(index=False))
else:
# Run predictions on the training set itself as a sanity check
print("\n[+] Running predictions on training set (sanity check) …")
all_preds = predict(bundles, X)
all_preds.insert(0, "sample_id", ids)
out_path = "train_predictions.csv"
all_preds.to_csv(out_path, index=False)
print(f" Saved to: {out_path}")
print("\nDone.\n")
if __name__ == "__main__":
main()
Questi sono i risultati del training sui 4 parametri (valori misurati contro valori del modello)
Per la inferenza gli spettri di validazione sono stati inseriti nel folde new_spectra