domenica 9 marzo 2025

Physics informed neural network Fukuzono

Visto che puro ML non funziona per le serie tempo di cui mi sto occupando ed le regressioni basate su formule analitiche mostrano dei limiti in presenza del rumore degli strumenti proviamo con PINN (Physics informed Neural Networks) in cui si conosce a priori la legge che regola il fenomeno e si demanda alla rete neurale di approssimare al meglio le variabili sconosciute o di gestire il rumore strumentale

Data la mancanza di dati mi sono fatto (...ha fatto ChatGPT) un generatore di dati sintetici che puo' aggiungere un livello arbitrario di rumore

Sono stati creati due dataset 

Dataset prova 1

Dataset prova 2


 

import csv
import random
import matplotlib.pyplot as plt

def generate_landslide_data(num_points=100, initial_displacement=10.0, velocity_decrease_rate=0.005, noise_level=0.01):
"""
Generates simulated landslide data with a *linear* decrease in inverse velocity,
and calculates inverse velocity. Correctly applies noise to the velocity.
"""
data = []
inverse_velocity = 1.0 # Start with an arbitrary inverse velocity
displacement = initial_displacement
previous_displacement = initial_displacement #set initial displacement

for day in range(1, num_points + 1):
# Linearly decrease the inverse velocity:
inverse_velocity -= velocity_decrease_rate

# Ensure inverse velocity doesn't go below a small positive value to avoid div by zero
inverse_velocity = max(inverse_velocity, 0.0001)

# Calculate velocity (V = 1 / (1/V)):
velocity = 1 / inverse_velocity

# Add noise to the velocity *directly*:
noise = random.uniform(-noise_level * velocity, noise_level * velocity)
velocity += noise

# Ensure velocity doesn't become negative after adding noise (important!)
velocity = max(velocity, 0.0001) #Very small positive value to avoid zero velocity.
inverse_velocity = 1/velocity #Recalculate inverse velocity based on the noisy velocity.

# Calculate displacement increase (ΔDisplacement = Velocity * ΔTime)
displacement_increase = velocity

# Calculate new displacement
displacement += displacement_increase

data.append((day, displacement, inverse_velocity))

return data

def write_to_csv(data, filename="landslide_data.csv"):
"""
Writes the simulated landslide data to a CSV file with semicolon as a separator,
including a column for inverse velocity.
"""
with open(filename, "w", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=";")
writer.writerow(["Day", "Cumulative_Displacement", "Inverse_Velocity"])
for row in data:
inverse_velocity_str = "{:.6f}".format(row[2]) if row[2] is not None else ""
writer.writerow([row[0], row[1], inverse_velocity_str])

print(f"Data written to {filename}")



def plot_inverse_velocity(filename="landslide_data.csv"):
"""
Reads the landslide data from the CSV file and plots inverse velocity vs. time using matplotlib.
"""
days = []
inverse_velocities = []

with open(filename, "r") as csvfile:
reader = csv.reader(csvfile, delimiter=";")
next(reader) # Skip the header row

for row in reader:
try:
day = int(row[0])
inverse_velocity = float(row[2]) if row[2] else None #Handle empty cells
if inverse_velocity is not None:
days.append(day)
inverse_velocities.append(inverse_velocity)
except (ValueError, IndexError) as e:
print(f"Error reading row: {row}. Skipping. Error: {e}")


# Plotting the Data:

if days and inverse_velocities: # Check to make sure data was read.
plt.figure(figsize=(10, 6)) # Set plot size
plt.plot(days, inverse_velocities, marker='o', linestyle='-', color='blue') #Plot data

plt.title("Fukuzono Plot: Inverse Velocity vs. Time") #Set title
plt.xlabel("Time (Days)") #set labels
plt.ylabel("Inverse Velocity (days/mm)")
plt.grid(True) #Add grid
plt.show() #Display plot
else:
print("No data to plot.")


# --- Main execution ---
if __name__ == "__main__":
landslide_data = generate_landslide_data(num_points=100, initial_displacement=10.0, velocity_decrease_rate=0.011, noise_level=0.05) #Tuned parameters
write_to_csv(landslide_data)
plot_inverse_velocity() #Call the plotting function


A questo punto ho provato a farci girare la PINN

Il metodo di Fukuzono dell'inverso della velocita' deve essere espresso in modo differenziale


La formula completa senza la semplificazione dell'esponente 2    (le lettere tra le due formule indicano grandezze non coincidenti)
 

 


con la rete che cerca di ottimizzare il parametro m

modello dataset 1

modello dataset 2


 

alla fine si vede che il modello ha sovrastimato il momento del collasso

come nota il modello ha bisogno di un mumero elevato (> 5000 epochs) per convergere in maniera minimamente soddisfacente

import tensorflow as tf
import numpy as np
import pandas as pd

# Carica i dati dal file CSV (Day, Cumulative_Displacement, Inverse_Velocity)
# Modifica qui con il nome del file
file_path = './fuku/landslide_data3.csv'
df = pd.read_csv(file_path, delimiter=';')

t = df['Day'].values.reshape(-1, 1) # Tempo (giorni)
inv_v = df['Inverse_Velocity'].values.reshape(-1, 1) # Inverso della velocità

# Parametro iniziale m (da ottimizzare)
m_init = tf.Variable(1.5, dtype=tf.float32, trainable=True)

# Modello PINN
class PINN(tf.keras.Model):
def __init__(self):
super(PINN, self).__init__()
self.hidden1 = tf.keras.layers.Dense(20, activation='tanh')
self.hidden2 = tf.keras.layers.Dense(20, activation='tanh')
self.hidden3 = tf.keras.layers.Dense(20, activation='tanh')
self.out = tf.keras.layers.Dense(1)
def call(self, t):
x = self.hidden1(t)
x = self.hidden2(x)
x = self.hidden3(x)
return self.out(x)

# Inizializza il modello
model = PINN()

def loss_fn(t, inv_v, m):
t = tf.convert_to_tensor(t, dtype=tf.float32) # Conversione a tensore
inv_v = tf.convert_to_tensor(inv_v, dtype=tf.float32)
with tf.GradientTape() as tape:
tape.watch(t)
u = model(t)
du_dt = tape.gradient(u, t)
if du_dt is None:
du_dt = tf.zeros_like(t) # Evita errori se il gradiente è None
eqn = du_dt - m * ((u*u) / tf.convert_to_tensor(t_f - t, dtype=tf.float32)) # Converte t_f - t in tensore
mse_data = tf.reduce_mean(tf.square(u - inv_v))
mse_phys = tf.reduce_mean(tf.square(eqn))
return mse_data + mse_phys

# Ottimizzatore
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Tempo di collasso iniziale (da stimare)
t_f = tf.Variable(np.max(t) + 1.0, dtype=tf.float32, trainable=True)

# Training Loop
epochs = 10000
for epoch in range(epochs):
with tf.GradientTape() as tape:
loss = loss_fn(t, inv_v, m_init)
grads = tape.gradient(loss, model.trainable_variables + [t_f, m_init])
optimizer.apply_gradients(zip(grads, model.trainable_variables + [t_f, m_init]))

if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss.numpy()}, t_f: {t_f.numpy()}, m: {m_init.numpy()}")



from scipy.interpolate import interp1d
from scipy.optimize import brentq
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
# Compute min and max time from dataset
t_min = np.min(t)
t_max = np.max(t)

# Generate predictions for a fine grid of time values
t_pred = np.linspace(t_min, t_max, 500).reshape(-1, 1).astype(np.float32)
inv_v_pred = model(t_pred).numpy().flatten()

# Ensure `t_pred_original` has the correct shape
t_pred_original = t_pred.flatten()

# Fit a linear model to estimate collapse time
lin_reg = LinearRegression()
lin_reg.fit(t_pred_original.reshape(-1, 1), inv_v_pred)
slope = lin_reg.coef_[0]
intercept = lin_reg.intercept_

# Compute collapse time as x-intercept (1/V = 0)
if slope != 0:
collapse_time = -intercept / slope
print(f"Estimated collapse time: {collapse_time:.2f} days")
else:
collapse_time = None
print("Could not estimate collapse time (slope is zero).")

# Visualization
plt.figure(figsize=(10, 6))

# Scatter plot of real data
plt.scatter(t, inv_v, label='Data', color='red', marker='o')

# PINN-predicted curve
plt.plot(t_pred_original, inv_v_pred, label='PINN Prediction', color='blue')

# Linear regression line (dashed black)
t_reg_line = np.linspace(t_min, t_max, 500)
inv_v_reg_line = lin_reg.predict(t_reg_line.reshape(-1, 1))
plt.plot(t_reg_line, inv_v_reg_line, linestyle='dashed', color='black', label="Linear Fit")

# Vertical line at estimated collapse time
if collapse_time is not None and t_min <= collapse_time <= t_max:
plt.axvline(collapse_time, color='green', linestyle='dashed', label=f'Collapse Time: {collapse_time:.2f} days')

plt.xlabel('Time (days)')
plt.ylabel('Inverse Velocity (1/V)')
plt.legend()
plt.grid()
plt.title(f"Forecasted Collapse Time: {collapse_time:.2f}" if collapse_time else "Forecasted Collapse Time: Not Found")
plt.show()

martedì 4 marzo 2025

Realsense L515 e SDK 2.56

Intel lo ha fatto di nuovo....cercando di usare di nuovo la Realsense L515 questa non veniva riconosciuta dal viewer perche' Intel la ha resa obsoleta ed ha rimosso il supporto dall'SDK

Per tornare ad usarla ho dovuto ricompilare a mano i sorgenti con 


wget https://github.com/IntelRealSense/librealsense/archive/refs/tags/v2.54.2.zip
unzip v2.54.2.zip
cd librealsense-2.54.2/
mkdir build
cd build
cmake ..
sudo make install
cd ../config
sudo cp ~/.99-realsense-libusb.rules /etc/udev/rules.d/99-realsense-libusb.rules
sudo udevadm control --reload-rules
sudo udevadm trigger

nota: l'installazione la avevo fatta su Debian Stable. Passando ad Ubuntu gli stessi comandi non permettevano di compilare. In sintesi di deve aggiungere la riga

#include <cstdint>

al file /third-party/rsutils/include/version.h 


 

Per il avere il wrapper su Python si deve compilare come segue

sudo apt install python3-pybind11

cmake ../ -DBUILD_EXAMPLES=true -DBUILD_PYTHON_BINDINGS:bool=true

giovedì 27 febbraio 2025

XGBoost

Leggendo alcuni articoli ho visto che viene consigliato l'utlilizzo di XGBoost al posto di LSTM sul forecasting di serie tempo...proviamoci con i dati gia' usati nei tentativi precedenti

 

 

Guardando il grafico si conferma che se il modello non conosce una accelerazione non e' in grado di prevederla anche se si usa variabile forzante come la pioggia

Inoltre non vedo particolari motivi per privilegiare XGBoost su LSTM

 

 # -*- coding: utf-8 -*-
"""xgboost.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1zmFI2Djb4bD1hVbJfvFGYgqh0EpfRnRj
"""

import pandas as pd
import matplotlib.pyplot as plt


dati = pd.read_csv('/content/prima.csv')
print(dati)
train = dati.head(int(len(dati)*0.8))
test = dati.tail(int(len(dati)*0.2))

train["Data"] = pd.to_datetime(train["Data"])
test["Data"] = pd.to_datetime(test["Data"])

train = train.set_index("Data")
test = test.set_index("Data")

train["Est"].plot(    , figsize=(10, 5), label="train")
test["Est"].plot(style="b", figsize=(10, 5), label="test")
plt.title("Dati")
plt.legend()

X_train = train.drop('Est', axis =1)
y_train = train['Est']

X_test = test.drop('Est', axis =1)
y_test = test['Est']

!pip install xgboost

import xgboost as xgb

reg = xgb.XGBRegressor(n_estimators=1000)
reg.fit(X_train, y_train, verbose = False)

xgb.plot_importance(reg)

test['Est_Prediction'] = reg.predict(X_test)

train['Est'].plot(style='k', figsize=(10,5), label = 'train')
test['Est'].plot(style='b', figsize=(10,5), label = 'test')
test['Est_Prediction'].plot(style='r', figsize=(10,5), label = 'prediction')
plt.title('XGBoost')
plt.legend()

mercoledì 26 febbraio 2025

Ipad 1

Mi e' capitato di tirare fuori dalla discarica informatica di ufficio un paio di Ipad prima generazione per vedere se erano utilizzabili

Una volta connessi all'alimentazione gli Ipad mostravano lo schermo flashare circa una volta al secondo...nessun segno dell'icona di ricarica...nessun altro segno di vita


 

Ho provato a metterli in DFU ed incredibilmente i dispositivi sono stati visti ma il ripristino via ITunes e' fallito con un errore generico che non spiega niente sulle condizioni interne

Ho trovato il programma idevicerestore su Linux che e' molto interessante, specialmente in modo verbose, per capire come funziona il sistema Ipad

La procedura sembra funzionare per il ripristino di IOs ma quando alla fine il dispositivo deve riavviarsi  la procedura fallisce miseramennte (il problema e' segnalato da altre persone https://github.com/libimobiledevice/idevicerestore/issues/324)

Sembra che la batteria sia completamente morta e che non si ricarichi

Purtroppo mi sa che sono diventati dei fermacarte...non so se avro' il tempo e la voglia di aprirli per vedere come sono dentro

martedì 25 febbraio 2025

ConvLSTM (2)

Proviamo un approccio differente

Fino ad adesso avevo usato solo i dati di allungamento come variabile dipendente e pioggia e temperatura come variabili indipendenti. Adesso provo a mettere la derivata seconda dell'allungamento

Deformazione

Derivata prima della deformazione

Derivata seconda della deformazione

Confronto tra pioggia e derivata seconda della deformazione

Grafico di previsione tramite ConvLSTM

# -*- coding: utf-8 -*-
"""convlstm.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1yXzW-fOBePUvgY63uMJTjSprP5vMy0tn
"""

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import ConvLSTM2D, BatchNormalization, Flatten, Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

df = pd.read_csv("prima2.csv", parse_dates=["Data"])
df.set_index("Data", inplace=True)

features = df[['Temp', 'Rain']]  # Input variables
target = df[['Acc']]  # What we are predicting

scaler = MinMaxScaler()
features_scaled = scaler.fit_transform(features)

def create_sequences(data, labels, seq_length=10):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])  # Take the last `seq_length` time steps
        y.append(labels[i+seq_length])  # Predict the next step
    return np.array(X), np.array(y)

seq_length = 20  # Lookback period
X, y = create_sequences(features_scaled, target.to_numpy(), seq_length)

X = X.reshape((X.shape[0], seq_length, 1, X.shape[2], 1))  # (samples, time, height, width, channels)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

model = Sequential([
    ConvLSTM2D(filters=64, kernel_size=(1, 1), activation='relu', return_sequences=True,
               input_shape=(seq_length, 1, X.shape[3], 1)),
    BatchNormalization(),
    ConvLSTM2D(filters=32, kernel_size=(1, 1), activation='relu', return_sequences=False),
    BatchNormalization(),
    Flatten(),
    Dense(32, activation='relu'),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))

y_pred = model.predict(X_test)
#y_pred = (y_pred > 0.5).astype(int)  # Convert probabilities to binary (0 or 1)

# Commented out IPython magic to ensure Python compatibility.
import matplotlib.pyplot as plt
# %matplotlib inline

plt.plot(y_test, label="Actual Acc,")
plt.plot(y_pred, linestyle="dashed", label="Predicted Acc")
plt.legend()
plt.show()








sabato 22 febbraio 2025

ConvLSTM

Continua l'esplorazione della previsione di serie tempo stavolta con ConvLSTM (il codice e' stato riadattato partendo da Gemini AI)

Il dataset e' sempre multivariato con Est come variabile che vuole essere prevista

Data,Est,Temp,Rain
2023-10-01,-55.7,18.7,0
2023-10-02,-55.6,19,0
2023-10-03,-55.6,19.2,0
2023-10-04,-55.5,19.5,0.2
2023-10-05,-55.9,18.8,0.2
2023-10-06,-55.7,18.1,0

 


 

import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# 1. Caricamento e preparazione dei dati
def load_and_prepare_data(filepath):
df = pd.read_csv(filepath, parse_dates=['Data'], index_col='Data')
df = df.fillna(method='ffill') # Gestione dei valori mancanti
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df)
return scaled_data, scaler, df.columns.get_loc('Est')

# 2. Creazione delle sequenze per il training
def create_sequences(data, target_index, seq_length):
xs, ys = [], []
for i in range(len(data) - seq_length):
x = data[i:i + seq_length]
y = data[i + seq_length, target_index]
xs.append(x)
ys.append(y)
return np.array(xs), np.array(ys)

# 3. Costruzione del modello LSTM
def build_model(input_shape):
model = tf.keras.Sequential([
tf.keras.layers.LSTM(50, return_sequences=True, input_shape=input_shape),
tf.keras.layers.LSTM(50),
tf.keras.layers.Dense(1)
])
model.compile(optimizer='adam', loss='mse')
return model

# 4. Addestramento e valutazione del modello
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test):
model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.1, verbose=1)
loss = model.evaluate(X_test, y_test)
print(f'Test Loss: {loss}')
return model

# 5. Previsioni e inversione della scala
def make_predictions(model, X_test, scaler, target_index):
predictions = model.predict(X_test)
dummy = np.zeros((len(predictions), X_test.shape[2]))
dummy[:, target_index] = predictions[:, 0]
predictions_original = scaler.inverse_transform(dummy)[:, target_index]
return predictions_original

def plot_predictions(predictions, y_test, scaler, target_index):
# Inverti la scala dei dati reali
dummy_real = np.zeros((len(y_test), X_test.shape[2]))
dummy_real[:, target_index] = y_test
y_test_original = scaler.inverse_transform(dummy_real)[:, target_index]

plt.figure(figsize=(12, 6))
plt.plot(y_test_original, label='Dati reali')
plt.plot(predictions, label='Previsioni')
plt.legend()
plt.title('Previsioni vs. Dati reali')
plt.xlabel('Tempo')
plt.ylabel('Valore di est')
plt.show()

def make_single_prediction(model, scaler, seq_length, temp, rain, target_index):
# Crea una sequenza di input fittizia
dummy_data = np.zeros((seq_length, 3))
dummy_data[:, 1] = temp # temp nella seconda colonna
dummy_data[:, 2] = rain # rain nella terza colonna

# Scala la sequenza di input
scaled_dummy_data = scaler.transform(dummy_data)

# Rimodella la sequenza per l'input del modello
input_data = np.reshape(scaled_dummy_data, (1, seq_length, 3))

# Effettua la previsione
prediction = model.predict(input_data)

# Inverti la scala della previsione
dummy_prediction = np.zeros((1, 3))
dummy_prediction[0, target_index] = prediction[0, 0]
prediction_original = scaler.inverse_transform(dummy_prediction)[0, target_index]

return prediction_original


# Main
filepath = 'prima.csv'
seq_length = 20 # Lunghezza della sequenza per il modello LSTM
prediction_steps = 20 # dati futuri previsti

scaled_data, scaler, target_index = load_and_prepare_data(filepath)
X, y = create_sequences(scaled_data, target_index, seq_length)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

model = build_model(X_train.shape[1:])
trained_model = train_and_evaluate_model(model, X_train, y_train, X_test, y_test)
predictions = make_predictions(trained_model, X_test, scaler, target_index)

print(predictions)
plot_predictions(predictions, y_test, scaler, target_index)
# Esempio di singola previsione
temp_input = 25.0
rain_input = 10.0
single_prediction = make_single_prediction(trained_model, scaler, seq_length, temp_input, rain_input, target_index)
print(f"Previsione per temp={temp_input} e rain={rain_input}: {single_prediction}")
 

Physics informed neural network Fukuzono

Visto che puro ML non funziona per le serie tempo di cui mi sto occupando ed le regressioni basate su formule analitiche mostrano dei limiti...