Riguardo al post precedente, la parte di CNN e' piuttosto pesante dal punto di vista di calcolo
Ho provato a vedere che risultati si ottengono passando direttamente alla seconda fase
"""
HyperSoilNet Pipeline
=====================
Based on: "A Hybrid Framework for Soil Property Estimation from Hyperspectral Imaging"
https://www.mdpi.com/2072-4292/17/15/2568
Implements:
- Spectral feature engineering (derivatives, DWT, SVD, FFT)
- ML ensemble: Random Forest + XGBoost + KNN
- Property-specific weighted ensemble (Bayesian-optimised weights)
- 5-fold cross-validation with per-property R², RMSE, custom challenge score
Input layout
------------
labels.csv : sample_id, P, K, Mg, pH (header on first row)
<spectra_dir>/1.csv : one reflectance value per row, one column (no header)
<spectra_dir>/6.csv : same for sample 6, etc.
Usage
-----
# Train + cross-validate on labelled data
python hypersoilnet.py --labels label.csv --spectra_dir spectra/
# Train on all labelled data, then predict an unlabelled set
python hypersoilnet.py --labels label.csv --spectra_dir spectra/ \
--predict_dir unlabelled/ --output predictions.csv
"""
import argparse
import os
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.fft import fft
from scipy.signal import savgol_filter
try:
import pywt
HAS_PYWT = True
except ImportError:
HAS_PYWT = False
warnings.warn("PyWavelets (pywt) not found – wavelet features will be skipped. "
"Install with: pip install PyWavelets")
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, r2_score
try:
from xgboost import XGBRegressor
HAS_XGB = True
except ImportError:
HAS_XGB = False
warnings.warn("XGBoost not found – ensemble will use RF + KNN only. "
"Install with: pip install xgboost")
try:
from scipy.optimize import minimize
HAS_SCIPY_OPT = True
except ImportError:
HAS_SCIPY_OPT = False
# ---------------------------------------------------------------------------
# Target column names (must match label CSV header)
# ---------------------------------------------------------------------------
TARGETS = ["P", "K", "Mg", "pH"]
# ---------------------------------------------------------------------------
# Property-specific ensemble weights from the paper (Table in Section 3.5)
# Order: [RF, XGB, KNN]
# ---------------------------------------------------------------------------
DEFAULT_WEIGHTS = {
"K": [0.45, 0.40, 0.15],
"P": [0.38, 0.45, 0.17], # paper calls it P2O5; mapped to "P" here
"Mg": [0.42, 0.38, 0.20],
"pH": [0.35, 0.50, 0.15],
}
# ===========================================================================
# 1. DATA LOADING
# ===========================================================================
def load_spectrum(filepath: str) -> np.ndarray:
"""Load a single-column CSV (no header) of reflectance values."""
df = pd.read_csv(filepath, header=None)
return df.iloc[:, 0].values.astype(float)
def load_dataset(labels_csv: str, spectra_dir: str):
"""
Returns
-------
spectra : np.ndarray shape (n_samples, n_bands)
labels : pd.DataFrame columns = TARGETS
sample_ids: list of sample ids
"""
labels_df = pd.read_csv(labels_csv)
# Strip whitespace from column names
labels_df.columns = labels_df.columns.str.strip()
spectra_list, valid_ids, valid_rows = [], [], []
for _, row in labels_df.iterrows():
sid = int(row["sample_id"])
fpath = os.path.join(spectra_dir, f"{sid}.csv")
if not os.path.exists(fpath):
warnings.warn(f"Spectrum file not found, skipping: {fpath}")
continue
spectra_list.append(load_spectrum(fpath))
valid_ids.append(sid)
valid_rows.append(row)
spectra = np.vstack(spectra_list)
labels = pd.DataFrame(valid_rows)[TARGETS].reset_index(drop=True)
return spectra, labels, valid_ids
def load_unlabelled(spectra_dir: str):
"""Load all CSVs in a directory that have no matching label."""
paths = sorted(Path(spectra_dir).glob("*.csv"))
spectra_list, ids = [], []
for p in paths:
spectra_list.append(load_spectrum(str(p)))
ids.append(p.stem)
return np.vstack(spectra_list), ids
# ===========================================================================
# 2. FEATURE ENGINEERING (Section 3.4 of the paper)
# ===========================================================================
def spectral_derivatives(spectrum: np.ndarray, orders=(1, 2, 3)) -> np.ndarray:
"""1st, 2nd, 3rd order Savitzky-Golay derivatives."""
feats = []
n = len(spectrum)
for order in orders:
polyorder = order + 1
# window must be odd, > polyorder, and <= len(spectrum)
window = min(11, n)
if window % 2 == 0:
window -= 1
window = max(window, polyorder + 1)
if (polyorder + 1) % 2 == 0:
window = polyorder + 2 # next odd number above polyorder
if window > n:
# spectrum too short for this derivative order – return zeros
feats.append(np.zeros(n))
continue
d = savgol_filter(spectrum, window_length=window, polyorder=polyorder,
deriv=order)
feats.append(d)
return np.concatenate(feats)
def wavelet_features(spectrum: np.ndarray, wavelet="dmey", level=4) -> np.ndarray:
"""DWT approximation + detail coefficient statistics (paper eq. 5)."""
if not HAS_PYWT:
return np.array([])
# Clamp level to what the signal length allows
max_level = pywt.dwt_max_level(len(spectrum), wavelet)
level = min(level, max_level)
coeffs = pywt.wavedec(spectrum, wavelet, level=level)
feats = []
for c in coeffs:
feats.extend([c.mean(), c.std(), np.abs(c).max()])
return np.array(feats)
def svd_features(spectrum: np.ndarray, n_singular=5) -> np.ndarray:
"""
Treat spectrum as a 1-D signal; reshape into a 2-D matrix and compute SVD.
Top singular values + their ratios (paper eq. 6).
"""
n = len(spectrum)
# Build a Hankel-like matrix
n_rows = max(2, n // 10)
n_cols = n - n_rows + 1
mat = np.array([spectrum[i:i + n_cols] for i in range(n_rows)])
_, s, _ = np.linalg.svd(mat, full_matrices=False)
s = s[:n_singular] if len(s) >= n_singular else np.pad(s, (0, n_singular - len(s)))
ratios = s[:-1] / (s[1:] + 1e-10)
return np.concatenate([s, ratios])
def fft_features(spectrum: np.ndarray, n_components=20) -> np.ndarray:
"""Real + imaginary FFT components (paper eq. 7)."""
f = fft(spectrum)
f = f[:n_components]
return np.concatenate([f.real, f.imag])
def statistical_features(spectrum: np.ndarray) -> np.ndarray:
"""Basic statistics of the raw spectrum."""
return np.array([
spectrum.mean(),
spectrum.std(),
np.percentile(spectrum, 25),
np.percentile(spectrum, 75),
spectrum.max() - spectrum.min(),
np.argmax(spectrum),
np.argmin(spectrum),
])
def extract_features(spectrum: np.ndarray) -> np.ndarray:
"""Combine all feature groups into one vector."""
parts = [
spectrum, # raw reflectance
statistical_features(spectrum),
spectral_derivatives(spectrum),
wavelet_features(spectrum),
svd_features(spectrum),
fft_features(spectrum),
]
return np.concatenate([p for p in parts if len(p) > 0])
def build_feature_matrix(spectra: np.ndarray) -> np.ndarray:
rows = [extract_features(s) for s in spectra]
return np.array(rows)
# ===========================================================================
# 3. MODEL BUILDING (Section 3.5)
# ===========================================================================
def build_models():
models = {
"RF": RandomForestRegressor(
n_estimators=150,
max_depth=30,
min_samples_leaf=5,
criterion="squared_error",
bootstrap=True,
n_jobs=-1,
random_state=42,
),
"KNN": KNeighborsRegressor(
n_neighbors=7,
weights="distance",
metric="euclidean",
),
}
if HAS_XGB:
models["XGB"] = XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
reg_alpha=0.01,
reg_lambda=1.0,
early_stopping_rounds=15,
eval_metric="rmse",
random_state=42,
verbosity=0,
)
return models
# ===========================================================================
# 4. WEIGHTED ENSEMBLE PREDICTION (paper eq. 8)
# ===========================================================================
def get_weights(target: str, models: dict) -> list:
"""
Return per-model weights for a target property.
If XGB is unavailable, re-normalise between RF and KNN.
"""
paper_w = DEFAULT_WEIGHTS.get(target, [0.40, 0.40, 0.20])
rf_w, xgb_w, knn_w = paper_w
if not HAS_XGB:
total = rf_w + knn_w
return [rf_w / total, knn_w / total]
return [rf_w, xgb_w, knn_w]
def ensemble_predict(fitted_models: dict, X: np.ndarray, target: str) -> np.ndarray:
keys = list(fitted_models.keys())
weights = get_weights(target, fitted_models)
preds = np.stack([fitted_models[k].predict(X) for k in keys], axis=1)
w = np.array(weights[:len(keys)])
w = w / w.sum()
return preds @ w
# ===========================================================================
# 5. TRAINING HELPERS
# ===========================================================================
def fit_ensemble(X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray = None, y_val: np.ndarray = None,
target: str = "P") -> dict:
"""Fit all regressors for one target property."""
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_train)
X_val_s = scaler.transform(X_val) if X_val is not None else None
models = build_models()
fitted = {}
for name, mdl in models.items():
if name == "XGB" and X_val_s is not None:
mdl.fit(X_tr_s, y_train,
eval_set=[(X_val_s, y_val)],
verbose=False)
elif name == "XGB":
# No validation set → disable early stopping
mdl.set_params(early_stopping_rounds=None)
mdl.fit(X_tr_s, y_train)
else:
mdl.fit(X_tr_s, y_train)
fitted[name] = mdl
return {"scaler": scaler, "models": fitted}
# ===========================================================================
# 6. CROSS-VALIDATION (Section 3.6 — 5-fold CV)
# ===========================================================================
def cross_validate(X: np.ndarray, labels: pd.DataFrame, n_splits: int = 5):
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
results = {t: {"r2": [], "rmse": [], "mse": []} for t in TARGETS}
for fold, (tr_idx, val_idx) in enumerate(kf.split(X), 1):
X_tr, X_val = X[tr_idx], X[val_idx]
print(f"\n Fold {fold}/{n_splits}")
for target in TARGETS:
y_tr = labels[target].values[tr_idx]
y_val = labels[target].values[val_idx]
bundle = fit_ensemble(X_tr, y_tr, X_val, y_val, target)
scaler = bundle["scaler"]
X_tr_s = scaler.transform(X_tr)
X_val_s = scaler.transform(X_val)
preds = ensemble_predict(bundle["models"], X_val_s, target)
mse = mean_squared_error(y_val, preds)
r2 = r2_score(y_val, preds)
rmse = np.sqrt(mse)
results[target]["r2"].append(r2)
results[target]["rmse"].append(rmse)
results[target]["mse"].append(mse)
print(f" {target:4s} R²={r2:.3f} RMSE={rmse:.3f}")
print("\n" + "=" * 55)
print(f"{'Target':<6} {'R² mean':>9} {'R² std':>8} {'RMSE mean':>10}")
print("-" * 55)
baseline_mse = {}
for target in TARGETS:
r2_m = np.mean(results[target]["r2"])
r2_s = np.std(results[target]["r2"])
rmse_m = np.mean(results[target]["rmse"])
print(f"{target:<6} {r2_m:>9.3f} {r2_s:>8.3f} {rmse_m:>10.3f}")
baseline_mse[target] = np.var(labels[target].values) # naive baseline
# Custom challenge score (paper eq. 11) – mean ratio algo_MSE / baseline_MSE
mean_mses = {t: np.mean(results[t]["mse"]) for t in TARGETS}
score = np.mean([mean_mses[t] / baseline_mse[t] for t in TARGETS])
print(f"\n Custom challenge score (lower is better): {score:.4f}")
print("=" * 55)
return results
# ===========================================================================
# 7. FULL TRAINING + PREDICTION
# ===========================================================================
def train_full(X: np.ndarray, labels: pd.DataFrame):
"""Train on the entire labelled dataset."""
# Hold out 20 % for ensemble weight validation (paper Section 3.6)
n = len(X)
val_n = max(1, int(0.2 * n))
rng = np.random.default_rng(42)
val_idx = rng.choice(n, val_n, replace=False)
tr_idx = np.setdiff1d(np.arange(n), val_idx)
bundles = {}
for target in TARGETS:
y = labels[target].values
bundle = fit_ensemble(X[tr_idx], y[tr_idx],
X[val_idx], y[val_idx], target)
bundles[target] = bundle
print(f" Trained ensemble for {target}")
return bundles
def predict(bundles: dict, X_new: np.ndarray) -> pd.DataFrame:
preds = {}
for target in TARGETS:
scaler = bundles[target]["scaler"]
X_s = scaler.transform(X_new)
preds[target] = ensemble_predict(bundles[target]["models"], X_s, target)
return pd.DataFrame(preds)
# ===========================================================================
# 8. MAIN
# ===========================================================================
def parse_args():
p = argparse.ArgumentParser(description="HyperSoilNet – soil property estimation")
p.add_argument("--labels", required=True, help="Path to label CSV")
p.add_argument("--spectra_dir", required=True, help="Dir containing <sample_id>.csv spectra")
p.add_argument("--predict_dir", default=None, help="Dir with unlabelled spectra to predict")
p.add_argument("--output", default="predictions.csv",
help="Output CSV for predictions (default: predictions.csv)")
p.add_argument("--no_cv", action="store_true",
help="Skip cross-validation (faster)")
p.add_argument("--cv_folds", type=int, default=5)
return p.parse_args()
def main():
args = parse_args()
# ---- Load & featurise labelled data ------------------------------------
print("\n[1/4] Loading labelled data …")
spectra, labels, ids = load_dataset(args.labels, args.spectra_dir)
print(f" {len(ids)} samples loaded, {spectra.shape[1]} bands each")
print("\n[2/4] Extracting features …")
X = build_feature_matrix(spectra)
print(f" Feature matrix: {X.shape}")
# ---- Cross-validation --------------------------------------------------
if not args.no_cv:
print(f"\n[3/4] {args.cv_folds}-fold cross-validation …")
cross_validate(X, labels, n_splits=args.cv_folds)
else:
print("\n[3/4] Cross-validation skipped (--no_cv)")
# ---- Train on full dataset ---------------------------------------------
print("\n[4/4] Training on full dataset …")
bundles = train_full(X, labels)
# ---- Predict unlabelled data if requested ------------------------------
if args.predict_dir:
print(f"\n[+] Predicting unlabelled spectra in: {args.predict_dir}")
spectra_new, new_ids = load_unlabelled(args.predict_dir)
X_new = build_feature_matrix(spectra_new)
pred_df = predict(bundles, X_new)
pred_df.insert(0, "sample_id", new_ids)
pred_df.to_csv(args.output, index=False)
print(f" Predictions saved to: {args.output}")
print(pred_df.to_string(index=False))
else:
# Run predictions on the training set itself as a sanity check
print("\n[+] Running predictions on training set (sanity check) …")
all_preds = predict(bundles, X)
all_preds.insert(0, "sample_id", ids)
out_path = "train_predictions.csv"
all_preds.to_csv(out_path, index=False)
print(f" Saved to: {out_path}")
print("\nDone.\n")
if __name__ == "__main__":
main()
Training
python hypersoilnet.py --labels label.csv --spectra_dir spectra/
Validazione
python hypersoilnet.py --labels label.csv --spectra_dir spectra/ \
--predict_dir new_spectra/ --output predictions.csv
Come si vede dai grafici sottostanti i risultati sono in generali peggiori ma non cosi' tanto da essere scartati a priori ....potrebbe essere una valida pre analisi




Nessun commento:
Posta un commento