Visualizzazione post con etichetta Python. Mostra tutti i post
Visualizzazione post con etichetta Python. Mostra tutti i post

domenica 9 marzo 2025

Physics informed neural network Fukuzono

Visto che puro ML non funziona per le serie tempo di cui mi sto occupando ed le regressioni basate su formule analitiche mostrano dei limiti in presenza del rumore degli strumenti proviamo con PINN (Physics informed Neural Networks) in cui si conosce a priori la legge che regola il fenomeno e si demanda alla rete neurale di approssimare al meglio le variabili sconosciute o di gestire il rumore strumentale

Data la mancanza di dati mi sono fatto (...ha fatto ChatGPT) un generatore di dati sintetici che puo' aggiungere un livello arbitrario di rumore

Sono stati creati due dataset 

Dataset prova 1

Dataset prova 2


 

import csv
import random
import matplotlib.pyplot as plt

def generate_landslide_data(num_points=100, initial_displacement=10.0, velocity_decrease_rate=0.005, noise_level=0.01):
"""
Generates simulated landslide data with a *linear* decrease in inverse velocity,
and calculates inverse velocity. Correctly applies noise to the velocity.
"""
data = []
inverse_velocity = 1.0 # Start with an arbitrary inverse velocity
displacement = initial_displacement
previous_displacement = initial_displacement #set initial displacement

for day in range(1, num_points + 1):
# Linearly decrease the inverse velocity:
inverse_velocity -= velocity_decrease_rate

# Ensure inverse velocity doesn't go below a small positive value to avoid div by zero
inverse_velocity = max(inverse_velocity, 0.0001)

# Calculate velocity (V = 1 / (1/V)):
velocity = 1 / inverse_velocity

# Add noise to the velocity *directly*:
noise = random.uniform(-noise_level * velocity, noise_level * velocity)
velocity += noise

# Ensure velocity doesn't become negative after adding noise (important!)
velocity = max(velocity, 0.0001) #Very small positive value to avoid zero velocity.
inverse_velocity = 1/velocity #Recalculate inverse velocity based on the noisy velocity.

# Calculate displacement increase (ΔDisplacement = Velocity * ΔTime)
displacement_increase = velocity

# Calculate new displacement
displacement += displacement_increase

data.append((day, displacement, inverse_velocity))

return data

def write_to_csv(data, filename="landslide_data.csv"):
"""
Writes the simulated landslide data to a CSV file with semicolon as a separator,
including a column for inverse velocity.
"""
with open(filename, "w", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=";")
writer.writerow(["Day", "Cumulative_Displacement", "Inverse_Velocity"])
for row in data:
inverse_velocity_str = "{:.6f}".format(row[2]) if row[2] is not None else ""
writer.writerow([row[0], row[1], inverse_velocity_str])

print(f"Data written to {filename}")



def plot_inverse_velocity(filename="landslide_data.csv"):
"""
Reads the landslide data from the CSV file and plots inverse velocity vs. time using matplotlib.
"""
days = []
inverse_velocities = []

with open(filename, "r") as csvfile:
reader = csv.reader(csvfile, delimiter=";")
next(reader) # Skip the header row

for row in reader:
try:
day = int(row[0])
inverse_velocity = float(row[2]) if row[2] else None #Handle empty cells
if inverse_velocity is not None:
days.append(day)
inverse_velocities.append(inverse_velocity)
except (ValueError, IndexError) as e:
print(f"Error reading row: {row}. Skipping. Error: {e}")


# Plotting the Data:

if days and inverse_velocities: # Check to make sure data was read.
plt.figure(figsize=(10, 6)) # Set plot size
plt.plot(days, inverse_velocities, marker='o', linestyle='-', color='blue') #Plot data

plt.title("Fukuzono Plot: Inverse Velocity vs. Time") #Set title
plt.xlabel("Time (Days)") #set labels
plt.ylabel("Inverse Velocity (days/mm)")
plt.grid(True) #Add grid
plt.show() #Display plot
else:
print("No data to plot.")


# --- Main execution ---
if __name__ == "__main__":
landslide_data = generate_landslide_data(num_points=100, initial_displacement=10.0, velocity_decrease_rate=0.011, noise_level=0.05) #Tuned parameters
write_to_csv(landslide_data)
plot_inverse_velocity() #Call the plotting function


A questo punto ho provato a farci girare la PINN

Il metodo di Fukuzono dell'inverso della velocita' deve essere espresso in modo differenziale


La formula completa senza la semplificazione dell'esponente 2    (le lettere tra le due formule indicano grandezze non coincidenti)
 

 


con la rete che cerca di ottimizzare il parametro m

modello dataset 1

modello dataset 2


 

alla fine si vede che il modello ha sovrastimato il momento del collasso

come nota il modello ha bisogno di un mumero elevato (> 5000 epochs) per convergere in maniera minimamente soddisfacente

import tensorflow as tf
import numpy as np
import pandas as pd

# Carica i dati dal file CSV (Day, Cumulative_Displacement, Inverse_Velocity)
# Modifica qui con il nome del file
file_path = './fuku/landslide_data3.csv'
df = pd.read_csv(file_path, delimiter=';')

t = df['Day'].values.reshape(-1, 1) # Tempo (giorni)
inv_v = df['Inverse_Velocity'].values.reshape(-1, 1) # Inverso della velocità

# Parametro iniziale m (da ottimizzare)
m_init = tf.Variable(1.5, dtype=tf.float32, trainable=True)

# Modello PINN
class PINN(tf.keras.Model):
def __init__(self):
super(PINN, self).__init__()
self.hidden1 = tf.keras.layers.Dense(20, activation='tanh')
self.hidden2 = tf.keras.layers.Dense(20, activation='tanh')
self.hidden3 = tf.keras.layers.Dense(20, activation='tanh')
self.out = tf.keras.layers.Dense(1)
def call(self, t):
x = self.hidden1(t)
x = self.hidden2(x)
x = self.hidden3(x)
return self.out(x)

# Inizializza il modello
model = PINN()

def loss_fn(t, inv_v, m):
t = tf.convert_to_tensor(t, dtype=tf.float32) # Conversione a tensore
inv_v = tf.convert_to_tensor(inv_v, dtype=tf.float32)
with tf.GradientTape() as tape:
tape.watch(t)
u = model(t)
du_dt = tape.gradient(u, t)
if du_dt is None:
du_dt = tf.zeros_like(t) # Evita errori se il gradiente è None
eqn = du_dt - m * ((u*u) / tf.convert_to_tensor(t_f - t, dtype=tf.float32)) # Converte t_f - t in tensore
mse_data = tf.reduce_mean(tf.square(u - inv_v))
mse_phys = tf.reduce_mean(tf.square(eqn))
return mse_data + mse_phys

# Ottimizzatore
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Tempo di collasso iniziale (da stimare)
t_f = tf.Variable(np.max(t) + 1.0, dtype=tf.float32, trainable=True)

# Training Loop
epochs = 10000
for epoch in range(epochs):
with tf.GradientTape() as tape:
loss = loss_fn(t, inv_v, m_init)
grads = tape.gradient(loss, model.trainable_variables + [t_f, m_init])
optimizer.apply_gradients(zip(grads, model.trainable_variables + [t_f, m_init]))

if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss.numpy()}, t_f: {t_f.numpy()}, m: {m_init.numpy()}")



from scipy.interpolate import interp1d
from scipy.optimize import brentq
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
# Compute min and max time from dataset
t_min = np.min(t)
t_max = np.max(t)

# Generate predictions for a fine grid of time values
t_pred = np.linspace(t_min, t_max, 500).reshape(-1, 1).astype(np.float32)
inv_v_pred = model(t_pred).numpy().flatten()

# Ensure `t_pred_original` has the correct shape
t_pred_original = t_pred.flatten()

# Fit a linear model to estimate collapse time
lin_reg = LinearRegression()
lin_reg.fit(t_pred_original.reshape(-1, 1), inv_v_pred)
slope = lin_reg.coef_[0]
intercept = lin_reg.intercept_

# Compute collapse time as x-intercept (1/V = 0)
if slope != 0:
collapse_time = -intercept / slope
print(f"Estimated collapse time: {collapse_time:.2f} days")
else:
collapse_time = None
print("Could not estimate collapse time (slope is zero).")

# Visualization
plt.figure(figsize=(10, 6))

# Scatter plot of real data
plt.scatter(t, inv_v, label='Data', color='red', marker='o')

# PINN-predicted curve
plt.plot(t_pred_original, inv_v_pred, label='PINN Prediction', color='blue')

# Linear regression line (dashed black)
t_reg_line = np.linspace(t_min, t_max, 500)
inv_v_reg_line = lin_reg.predict(t_reg_line.reshape(-1, 1))
plt.plot(t_reg_line, inv_v_reg_line, linestyle='dashed', color='black', label="Linear Fit")

# Vertical line at estimated collapse time
if collapse_time is not None and t_min <= collapse_time <= t_max:
plt.axvline(collapse_time, color='green', linestyle='dashed', label=f'Collapse Time: {collapse_time:.2f} days')

plt.xlabel('Time (days)')
plt.ylabel('Inverse Velocity (1/V)')
plt.legend()
plt.grid()
plt.title(f"Forecasted Collapse Time: {collapse_time:.2f}" if collapse_time else "Forecasted Collapse Time: Not Found")
plt.show()

venerdì 7 febbraio 2025

Filtraggio dati estensimetro con Kalman

 Ho ripreso in mano il discorso filtro Kalman applicato pero' a dati reali di un estensimetro (i dati sono espressi in decimi di millimetro)

 

 


 


 

 

 

 

 

 

import numpy as np
import matplotlib.pyplot as plt


dati = np.genfromtxt('est.csv', delimiter=';')
dist = dati[:,2]
z = dist.T
print(z)
dimensione = dist.shape
#print(dimensione[0])
n_iter = dimensione[0]
Q = 1e-5 # process variance

# allocate space for arrays
xhat=np.zeros(dimensione) # a posteri estimate of x
P=np.zeros(dimensione) # a posteri error estimate
xhatminus=np.zeros(dimensione) # a priori estimate of x
Pminus=np.zeros(dimensione) # a priori error estimate
K=np.zeros(dimensione) # gain or blending factor
R = 0.1**2 # estimate of measurement variance, change to see effect

x = 23.6
xhat[0] = 23.6
P[0] = 1.0


for k in range(1,n_iter):
# time update
xhatminus[k] = xhat[k-1]
Pminus[k] = P[k-1]+Q

# measurement update
K[k] = Pminus[k]/( Pminus[k]+R )
xhat[k] = xhatminus[k]+K[k]*(z[k]-xhatminus[k])
P[k] = (1-K[k])*Pminus[k]

sp = np.diff(xhat, prepend=xhat[0])

plt.figure()
plt.plot(z,'k+',label='noisy measurements')
plt.plot(xhat,'b-',label='a posteri estimate')
plt.legend()
plt.title('Kalman filter ', fontweight='bold')
plt.xlabel('Time')
plt.ylabel('Distance')


plt.figure()
valid_iter = range(1,n_iter) # Pminus not valid at step 0
plt.plot(valid_iter,Pminus[valid_iter],label='a priori error estimate')
plt.title('Estimated $\it{\mathbf{a \ priori}}$ error vs. iteration step', fontweight='bold')
plt.xlabel('Iteration')
plt.ylabel('$(Voltage)^2$')
plt.setp(plt.gca(),'ylim',[0,.01])

plt.figure()
plt.plot(sp,'b-')
plt.title('Speed', fontweight='bold')
plt.xlabel('Time')
plt.ylabel('Speed (0.1mm/day)')

plt.show()

 

 

 

giovedì 6 febbraio 2025

Bayesian linear regression

Volevo provare un approccio differente dal solito per la regressione lineare

Il problema e' relativo alla terza fase del creep in frane la cui legge tra il tempo e l'inverso della velocita' e' approssimata a lineare

Questi dati sono ripresi dall' articolo Tommaso Carlà,Emanuele Intrieri,Federico Di Traglia,Teresa Nolesini,Giovanni Gigli,Nicola Casagli "Guidelines on the use of inverse velocity method as a tool for setting alarm thresholds and forecasting landslides and structure collapses" Landslides DOI 10.1007/s10346-016-0731-5 per la frana di Montebeni. La freccia verde indica la data dell'evento reale e la freccia rossa la data stimata


 La terza fase e' stata modellata tramite regressione a minimi quadrati applicando alcuni filtri

Ho provato a trattare gli stessi dati tramite regressione lineare bayesiana con Metropolis-Hastings Sampler

Alla fine i risultati sono molto simili. C'e' da notare che 

1) essendo il modello Montecarlo non deterministico i risultati non sono mai identici tra una run e la successiva. Ciao' comporta anche uno spostamento dell'intercetta della retta con l'asse del tempo spostando il TOF (time of failure)

2) sia la regressione a minimi quadrati che quella bayesiana sono molto sensibili al dataset (anche perche' alla fine ci sono solo una vendita di misure). Basta omettere un solo dato all'inizio che la correlazione e' sensibilmente differente. Questo comporta la difficolta' di definire in modo univoco il passaggio dalla fase 2 alla fase 3 del creep  

I dati sono

Tempo,1/V
14, 0.4586046511627907
28, 0.44000000000000006
42, 0.30604651162790697
56, 0.27813953488372095
70, 0.28930232558139535
84, 0.3227906976744187
98, 0.21860465116279076
112, 0.17023255813953395
126, 0.17953488372092963
140, 0.1367441860465109
154, 0.13302325581395344
168, 0.11441860465116252
182, 0.08837209302325501
196, 0.0958139534883719
210, 0.14046511627906993
224, 0.13860465116279058
238, 0.09581395348837188
252, 0.04744186046511625
266, 0.051162790697674376
280, 0.02883720930232564
294, 0.030697674418604465
308, 0.010232558139534276


Per il codice sottostante ho modificato un esempio preso da chatgpt

import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


# Load CSV data
def load_data(csv_path):
data = pd.read_csv(csv_path)
return data


# Define the likelihood function (Gaussian likelihood)
def likelihood(y, X, beta):
predictions = np.dot(X, beta)
return -0.5 * np.sum((y - predictions) ** 2)


# Define the prior function (Normal prior)
def prior(beta, sigma=10):
return -0.5 * np.sum(beta ** 2) / (sigma ** 2)


# Define the proposal distribution (random walk)
def proposal(beta, step_size=0.1):
return beta + np.random.normal(0, step_size, size=beta.shape)


# Metropolis-Hastings sampler
def metropolis_hastings(X, y, initial_beta, n_samples, step_size=0.1):
beta = initial_beta
samples = []

for _ in range(n_samples):
# Propose new beta
beta_new = proposal(beta, step_size)

# Compute likelihood and prior for current and proposed beta
current_likelihood = likelihood(y, X, beta) + prior(beta)
new_likelihood = likelihood(y, X, beta_new) + prior(beta_new)

# Metropolis-Hastings acceptance step
acceptance_ratio = min(1, np.exp(new_likelihood - current_likelihood))

# Accept or reject the proposed beta
if np.random.rand() < acceptance_ratio:
beta = beta_new

samples.append(beta)

return np.array(samples)


# Prediction function based on posterior samples
def predict(X, posterior_samples):
predictions = np.dot(X, posterior_samples.T)
return predictions.mean(axis=1), predictions.std(axis=1)


def predictLeastSquare(X, beta):
# Add intercept term to the new data
X_with_intercept = np.hstack([np.ones((X.shape[0], 1)), X])
return X_with_intercept @ beta

# Forecast function for new input data
def forecast(X_new, posterior_samples):
# X_new is the new data for which we want to forecast
return predict(X_new, posterior_samples)


def linear_least_squares(X, y):
# Add intercept term (column of ones)
X_with_intercept = np.hstack([np.ones((X.shape[0], 1)), X])

# Compute the coefficients using the normal equation: β = (X^T X)^(-1) X^T y
beta = np.linalg.inv(X_with_intercept.T @ X_with_intercept) @ X_with_intercept.T @ y
return beta

# Main function
def run_bayesian_linear_regression(csv_path, n_samples=1000000, step_size=0.1):
# Load data from CSV
data = load_data(csv_path)

# Assuming the last column is the target 'y' and the rest are features 'X'
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

beta = linear_least_squares(X, y)
y_pred = predictLeastSquare(X, beta)

# Add a column of ones for the intercept term (beta_0)
X = np.hstack([np.ones((X.shape[0], 1)), X])

# Initialize the parameters (beta_0, beta_1, ...)
initial_beta = np.zeros(X.shape[1])

# Perform Metropolis-Hastings sampling
posterior_samples = metropolis_hastings(X, y, initial_beta, n_samples, step_size)

# Get predictions
mean_predictions, std_predictions = predict(X, posterior_samples)


# Plot results
plt.figure(figsize=(10, 6))
plt.plot(y, label="True values")
plt.plot(mean_predictions, label="Bayesian Regr")
plt.xlabel("Time")
plt.ylabel("1/V")
plt.fill_between(range(len(y)), mean_predictions - std_predictions,
mean_predictions + std_predictions, alpha=0.3, label="Err")
plt.plot(y_pred, label="Least Square", color='red')

plt.legend()
plt.show()

return posterior_samples


# Usage example
csv_path = 'dati.csv' # Replace with the path to your CSV file
posterior_samples = run_bayesian_linear_regression(csv_path)


new_data = pd.DataFrame({
'feature1': [30, 35] # Replace with your features
})

X_new = np.hstack([np.ones((new_data.shape[0], 1)), new_data.values])

# Forecast values using the posterior samples
mean_forecast, std_forecast = forecast(X_new, posterior_samples)

# Output the forecasts
print("Forecasted values (mean):", mean_forecast)
print("Forecasted values (95% CI):", mean_forecast - 1.96 * std_forecast, mean_forecast + 1.96 * std_forecast)

venerdì 17 gennaio 2025

Pandas su serie tempo

Problema: hai un csv che riporta una serie tempo datetime/valore di un sensore

Effettuare calcoli, ordina le righe, ricampiona il passo temporale, colma i buchi della serie tempo con interpolazione in modo semplice

Data;Valore_p
2024-07-24 12:42:05;0
2024-07-24 12:44:05;0
2024-07-24 12:46:05;0

 soluzione : usa Pandas di Python

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import argrelextrema


#legge i dati dal csv
df_b = pd.read_csv('battente.csv', sep=';',header=0, parse_dates = ['Data'], date_format = '%Y-%m-%d %H:%M:%S')
#ricampiona i dati con un dato al giorno usando il valore medio
df_b_res = df_b.resample('1h', on='Data').mean()
#df_b.iloc[:,1] = df_b.iloc[:,1].apply(lambda x: (x-x.mean())/ x.std())
print(df_b)



df_p = pd.read_csv('pluviometro.csv', sep=';',header=0, parse_dates = ['Data'], date_format = '%Y-%m-%d %H:%M:%S')
#df_p = df_p.shift(periods=3)
df_p_res = df_p.resample('1h', on='Data').mean()

#unisce i due file usando come chiave la colonna temporale
merged_dataframe = pd.merge_asof(df_p_res, df_b_res, on="Data")
#rimuove le righe in cui non ci sono valori
df_finale = merged_dataframe.dropna(how='any')


#cerca i massimi locali facendo scorrere una finestra di 12 ore
df_finale['local_max_p'] = df_finale.iloc[argrelextrema(df_finale.Valore_p.values, np.greater_equal,order=12)[0]]['Valore_p']
df_finale['local_max_b'] = df_finale.iloc[argrelextrema(df_finale.Valore_b.values, np.greater_equal,order=12)[0]]['Valore_b']

#elimina i massimi locali che sono sotto ad una soglia
df_finale['local_max_p'] = np.where(df_finale["local_max_p"] < 1.0,np.nan,df_finale["local_max_p"])
df_finale['local_max_b'] = np.where(df_finale["local_max_b"] < 1.0,np.nan,df_finale["local_max_b"])

df_finale.to_csv("allineato.csv", sep=";")


fig, ax1 = plt.subplots()
# Plot first time series on the primary axis
ax1.plot(df_finale['Data'], df_finale['Valore_b'], 'g-')
ax1.plot(df_finale['Data'], df_finale['local_max_b'], 'mo')

ax1.set_xlabel('Data')
ax1.set_ylabel('Battente (m)', color='g')
ax2 = ax1.twinx()
ax2.plot(df_finale['Data'], df_finale['Valore_p'], 'b-')
ax2.plot(df_finale['Data'], df_finale['local_max_p'], 'ro')

ax2.set_ylabel('Prec.(mm)', color='b')
plt.savefig('grafico.png')
plt.show()






Physics informed neural network Fukuzono

Visto che puro ML non funziona per le serie tempo di cui mi sto occupando ed le regressioni basate su formule analitiche mostrano dei limiti...