Visualizzazione post con etichetta TensorFlow. Mostra tutti i post
Visualizzazione post con etichetta TensorFlow. Mostra tutti i post

sabato 22 febbraio 2025

ConvLSTM

Continua l'esplorazione della previsione di serie tempo stavolta con ConvLSTM (il codice e' stato riadattato partendo da Gemini AI)

Il dataset e' sempre multivariato con Est come variabile che vuole essere prevista

Data,Est,Temp,Rain
2023-10-01,-55.7,18.7,0
2023-10-02,-55.6,19,0
2023-10-03,-55.6,19.2,0
2023-10-04,-55.5,19.5,0.2
2023-10-05,-55.9,18.8,0.2
2023-10-06,-55.7,18.1,0

 


 

import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# 1. Caricamento e preparazione dei dati
def load_and_prepare_data(filepath):
df = pd.read_csv(filepath, parse_dates=['Data'], index_col='Data')
df = df.fillna(method='ffill') # Gestione dei valori mancanti
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df)
return scaled_data, scaler, df.columns.get_loc('Est')

# 2. Creazione delle sequenze per il training
def create_sequences(data, target_index, seq_length):
xs, ys = [], []
for i in range(len(data) - seq_length):
x = data[i:i + seq_length]
y = data[i + seq_length, target_index]
xs.append(x)
ys.append(y)
return np.array(xs), np.array(ys)

# 3. Costruzione del modello LSTM
def build_model(input_shape):
model = tf.keras.Sequential([
tf.keras.layers.LSTM(50, return_sequences=True, input_shape=input_shape),
tf.keras.layers.LSTM(50),
tf.keras.layers.Dense(1)
])
model.compile(optimizer='adam', loss='mse')
return model

# 4. Addestramento e valutazione del modello
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test):
model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.1, verbose=1)
loss = model.evaluate(X_test, y_test)
print(f'Test Loss: {loss}')
return model

# 5. Previsioni e inversione della scala
def make_predictions(model, X_test, scaler, target_index):
predictions = model.predict(X_test)
dummy = np.zeros((len(predictions), X_test.shape[2]))
dummy[:, target_index] = predictions[:, 0]
predictions_original = scaler.inverse_transform(dummy)[:, target_index]
return predictions_original

def plot_predictions(predictions, y_test, scaler, target_index):
# Inverti la scala dei dati reali
dummy_real = np.zeros((len(y_test), X_test.shape[2]))
dummy_real[:, target_index] = y_test
y_test_original = scaler.inverse_transform(dummy_real)[:, target_index]

plt.figure(figsize=(12, 6))
plt.plot(y_test_original, label='Dati reali')
plt.plot(predictions, label='Previsioni')
plt.legend()
plt.title('Previsioni vs. Dati reali')
plt.xlabel('Tempo')
plt.ylabel('Valore di est')
plt.show()

def make_single_prediction(model, scaler, seq_length, temp, rain, target_index):
# Crea una sequenza di input fittizia
dummy_data = np.zeros((seq_length, 3))
dummy_data[:, 1] = temp # temp nella seconda colonna
dummy_data[:, 2] = rain # rain nella terza colonna

# Scala la sequenza di input
scaled_dummy_data = scaler.transform(dummy_data)

# Rimodella la sequenza per l'input del modello
input_data = np.reshape(scaled_dummy_data, (1, seq_length, 3))

# Effettua la previsione
prediction = model.predict(input_data)

# Inverti la scala della previsione
dummy_prediction = np.zeros((1, 3))
dummy_prediction[0, target_index] = prediction[0, 0]
prediction_original = scaler.inverse_transform(dummy_prediction)[0, target_index]

return prediction_original


# Main
filepath = 'prima.csv'
seq_length = 20 # Lunghezza della sequenza per il modello LSTM
prediction_steps = 20 # dati futuri previsti

scaled_data, scaler, target_index = load_and_prepare_data(filepath)
X, y = create_sequences(scaled_data, target_index, seq_length)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

model = build_model(X_train.shape[1:])
trained_model = train_and_evaluate_model(model, X_train, y_train, X_test, y_test)
predictions = make_predictions(trained_model, X_test, scaler, target_index)

print(predictions)
plot_predictions(predictions, y_test, scaler, target_index)
# Esempio di singola previsione
temp_input = 25.0
rain_input = 10.0
single_prediction = make_single_prediction(trained_model, scaler, seq_length, temp_input, rain_input, target_index)
print(f"Previsione per temp={temp_input} e rain={rain_input}: {single_prediction}")
 

venerdì 21 febbraio 2025

LSTM BiLSTM multivariato

Ho provato ad estendere la prova di questo post utilizzando un dataset multivariato composto da temperatura e pioggia  (dati giornalieri)

I dati sono estesi da ottobre 2023 a marzo 2024. La cosa interessante e' che il dataset di addestramento non comprende la accelerazione che e' iniziata il 3 di marzo e quindi la rete di fatto "non conosce" il trend di accelerazione

 E' stato utilizzato lo script in calce (riadattando questo esempio) usando sia LSTM che BiLSTM

La differenza risulta nel fatto che LSTM si puo' usare per forecasting puro mentre per una post-analisi e' piu' conveniente BiLSTM


LSTM


BiLSTM





# Commented out IPython magic to ensure Python compatibility.
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
# %matplotlib inline
import seaborn as sns
import time

df = pd.read_csv('/content/ridotto.csv')
df['date'] = pd.to_datetime(df['Data'])

del df['Data']
columns = list(df.columns)
print(columns)

df_corr = df.corr()
df_corr

look_back = 20
sample_size = len(df) - look_back
past_size = int(sample_size*0.8)
future_size = sample_size - past_size +1

def make_dataset(raw_data, look_back=20):
_X = []
_y = []

for i in range(len(raw_data) - look_back):
_X.append(raw_data[i : i + look_back])
_y.append(raw_data[i + look_back])
_X = np.array(_X).reshape(len(_X), look_back, 1)
_y = np.array(_y).reshape(len(_y), 1)

return _X, _y

from sklearn import preprocessing

Xs = []
for i in range(len(columns)):
Xs.append(preprocessing.minmax_scale(df[columns[i]]))
Xs = np.array(Xs)

X_est, y_est = make_dataset(Xs[0], look_back=look_back)
X_temp, y_temp = make_dataset(Xs[1], look_back=look_back)
X_rain, y_rain = make_dataset(Xs[2], look_back=look_back)

X_con = np.concatenate([X_est, X_temp, X_rain], axis=2)

X = X_con
y = y_est

X.shape

y.shape

X_past = X[:past_size]
X_future = X[past_size-1:]
y_past = y[:past_size]
y_future = y[past_size-1:]

X_train = X_past
y_train = y_past

import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, BatchNormalization, Bidirectional
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD, Adam

# LSTM
def create_LSTM_model():
input = Input(shape=(np.array(X_train).shape[1], np.array(X_train).shape[2]))
x = LSTM(64, return_sequences=True)(input)
x = BatchNormalization()(x)
x = LSTM(64)(x)
output = Dense(1, activation='relu')(x)
model = Model(input, output)
return model

# Bidirectional-LSTM
def create_BiLSTM_model():
input = Input(shape=(np.array(X_train).shape[1], np.array(X_train).shape[2]))
x = Bidirectional(LSTM(64, return_sequences=True))(input)
x = BatchNormalization()(x)
x = Bidirectional(LSTM(64))(x)
output = Dense(1, activation='relu')(x)
model = Model(input, output)
return model

model = create_BiLSTM_model()
model.summary()
model.compile(optimizer=Adam(learning_rate=0.0001), loss='mean_squared_error')

t1 = time.time()
history = model.fit(X_train, y_train, epochs = 350, batch_size = 64, verbose = 0)
t2 = time.time()

tt = t2 - t1
t_h = tt//3600
t_m = (tt - t_h*3600)//60
t_s = (tt - t_h*3600 - t_m*60)//1
print('Training Time : '+str(t_h)+' h, '+str(t_m)+' m, '+str(t_s)+' s')

predictions = model.predict(X_past)
future_predictions = model.predict(X_future)

plt.figure(figsize=(18, 9))
plt.plot(df['date'][look_back:], y, color="b", label="Real data")
plt.plot(df['date'][look_back:look_back + past_size], predictions, color="r", linestyle="dashed", label="prediction")
plt.plot(df['date'][-future_size:], future_predictions, color="g", linestyle="dashed", label="future_predisction")
plt.legend()
plt.show()



se si vuole aggiungere in dataset di validazione

X_train, X_val, y_train, y_val = train_test_split(X_past, y_past, test_size=0.2, shuffle=False)

history = model.fit(X_train, y_train, epochs=350, batch_size=64, verbose=0, validation_data=(X_val, y_val))

# ... (codice successivo)

# Visualizzazione della perdita di addestramento e validazione
plt.figure(figsize=(12, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.show()


sabato 8 febbraio 2025

Bilateral LSTM e GRU con dati di estensimetro

Avevo gia' provato ad usare LSTM per dati derivanti da un estensimetro qui ... adesso ci riprovo con la variante Bilateral LSTM e GRU prendendo spunto da questa pagina 

Questi sono i dati raw. I dati sono campionati ogni 20 minuti e sono presenti diverse fasi di un movimento di versante


Prima di iniziare un paio di considerazioni


In questo grafico sono riassunti gli spostamenti registrati dall'estensimetro, la velocita' di movimento ed i dati pluviometrici

Dalla cross correlazione tra il dato meteo e quello di spostamento si un lag di 27 giorni

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

df_b = pd.read_csv('est2.csv', sep=',',header=0, parse_dates = ['Date'], date_format = '%Y-%m-%d %H:%M:%S')

pluvio = pd.read_csv('arpa.csv', sep=';',header=0, parse_dates = ['Date'], date_format = '%d/%m/%Y %H:%M:%S')

df_b_res = df_b.resample('24h', on='Date').mean()
pluvio_res = pluvio.resample('24h', on='Date').mean()

print(df_b_res)
print(pluvio)

df_b_res['Diff'] = df_b_res['Est'].diff()
df_b_res['Inv'] = 1/df_b_res['Diff']

time = pd.date_range('2024-02-01', periods=214, freq='D')
print(time)
print(pluvio_res.iloc[:,0])
print(df_b_res.iloc[:,0])

df = pd.DataFrame({'series1': df_b_res.iloc[:,0], 'series2': pluvio_res.iloc[:,0]}, index=time)
cross_corr = df['series1'].corr(df['series2'])
def cross_correlation(series1, series2, max_lag):
corr = []
for lag in range(-max_lag, max_lag + 1):
corr.append(df['series1'].shift(lag).corr(df['series2']))
return corr
max_lag = 40 # You can adjust the maximum lag
lags = range(-max_lag, max_lag + 1)
correlation_values = cross_correlation(df['series1'], df['series2'], max_lag)

# Find the lag that maximizes the correlation
best_lag = lags[np.argmax(correlation_values)]
print(f"The best lag is: {best_lag} days")

#print(df_b_res)

plt.figure(figsize=(11,7))
plt.plot(df_b_res['Est'],"-b",label="Est. (0.1 mm)")
plt.plot(df_b_res['Diff']*35,"-r",label="35*Vel. (0.1 mm/day)")
plt.plot(pluvio_res['Prec'],"-g",label="Rain (mm)")


plt.title('Velocita')
plt.xlabel('Date')
plt.ylabel('Estensimetro 0.1mm')
plt.legend(loc="lower left")

plt.show()

plt.figure(figsize=(11,7))
plt.plot(df_b_res['Inv'],"+b",label="Est.")
plt.title('1/Velocita')
plt.xlabel('Date')
plt.ylabel('1/V day/0.1mm')
plt.legend(loc="lower left")

plt.show()






Un paio di considerazioni
1) LSTM e' molto sensibile all'overfitting del modello.Nel caso specifico valori di 5-6 epochs sono gia' ottimali per la convergenza del modello
2) se il modello non converge la predizione magari ha una forma e' corretta ma e' shiftata (tipo il grafico sottostante

3) si deve giocare sul valore della finestra sia come units che come time_steps (oltre che sulle epochs) per ottenere un modello ottimale




import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Bidirectional
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

data = pd.read_csv('est2.csv')
data = data['Est'].values.reshape(-1, 1)

scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

def create_dataset(data, time_step=1):
X, y = [], []
for i in range(len(data) - time_step):
X.append(data[i:i + time_step, 0])
y.append(data[i + time_step, 0])
return np.array(X), np.array(y)

time_step = 200
X, y = create_dataset(data_scaled, time_step)
X = X.reshape(X.shape[0], X.shape[1], 1)

train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

model = Sequential()
model.add(Bidirectional(LSTM(units=200, return_sequences=True), input_shape=(time_step, 1)))
model.add(Bidirectional(LSTM(units=200)))
model.add(Dense(units=1))

model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=5, batch_size=32)

predictions = model.predict(X_test)

predictions_rescaled = scaler.inverse_transform(predictions)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))

plt.figure(figsize=(10,6))
plt.plot(y_test_rescaled, label='Actual Data')
plt.plot(predictions_rescaled, label='Predicted Data')
plt.title('Bidirectional LSTM Time Series Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()

forecast_steps = 1000 # Number of time steps to predict
input_sequence = X_test[-1]

forecast = []
for _ in range(forecast_steps):
predicted_value = model.predict(input_sequence.reshape(1, time_step, 1))
forecast.append(predicted_value[0, 0])
input_sequence = np.append(input_sequence[1:], predicted_value, axis=0)
forecast_rescaled = scaler.inverse_transform(np.array(forecast).reshape(-1, 1))

predictions_rescaled = scaler.inverse_transform(predictions)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))

# Plot the results
plt.figure(figsize=(10,6))
plt.plot(forecast_rescaled, label='Forecasted Data')
plt.title('Bidirectional LSTM Time Series Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()

risultati sul set di validazione


questi sono i dati di forecast


Ho provato a chiedere a ChatGPT che mi ha risposto con questo codice



import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input, Bidirectional
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

def load_data(file_path):
df = pd.read_csv(file_path)
data = df['Est'].values
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data.reshape(-1, 1))
return data_scaled, scaler

def create_dataset(data, time_step=200):
X, y = [], []
for i in range(len(data) - time_step):
X.append(data[i:i + time_step, 0])
y.append(data[i + time_step, 0])
X = np.array(X)
y = np.array(y)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
return X, y

def create_bilateral_lstm_model(input_shape):
inputs = Input(shape=input_shape)
forward_lstm = LSTM(200, return_sequences=False)(inputs)
backward_lstm = LSTM(200, return_sequences=False, go_backwards=True)(inputs)
merged = tf.keras.layers.concatenate([forward_lstm, backward_lstm])
output = Dense(1)(merged)
model = Model(inputs=inputs, outputs=output)
return model

def train_and_forecast(file_path, time_step=200, forecast_days=200):
data_scaled, scaler = load_data(file_path)
X, y = create_dataset(data_scaled, time_step)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
model = create_bilateral_lstm_model(input_shape=(X_train.shape[1], 1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_test, y_test))
y_pred = model.predict(X_test)
y_pred_rescaled = scaler.inverse_transform(y_pred)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))
plt.figure(figsize=(12, 6))
plt.plot(y_test_rescaled, label="True Data")
plt.plot(y_pred_rescaled, label="Predicted Data")
plt.title("True vs Predicted Data (Test Set)")
plt.legend()
plt.show()

last_sequence = data_scaled[-time_step:].reshape(1, time_step, 1)

future_predictions = []
for _ in range(forecast_days):
forecast = model.predict(last_sequence)
future_predictions.append(forecast[0, 0])
last_sequence = np.append(last_sequence[:, 1:, :], forecast.reshape(1, 1, 1), axis=1)

future_predictions_rescaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))

plt.figure(figsize=(12, 6))
plt.plot(range(len(data_scaled)), scaler.inverse_transform(data_scaled), label="Historical Data")
plt.plot(range(len(data_scaled), len(data_scaled) + forecast_days), future_predictions_rescaled, label="Forecasted Data", linestyle="--")
plt.title(f"Forecasted Data for Next {forecast_days} Steps")
plt.legend()
plt.show()
return future_predictions_rescaled

file_path = 'est2.csv'
forecast = train_and_forecast(file_path, time_step=200, forecast_days=200)
print("Forecasted future values:")
print(forecast)





ingrandimento dei soli dati di forevast



Visto che il modello converge adesso vediamo come si comporta quando si ha un evento inatteso (ovvero l'innescarsi del movimento di versante)..ho tagliato i dati sul primo movimento che corrisponda circa con la coordinata di ascissa 2000


l'addestramento del modello non e' male (anche se non perfetto)


il problema e' che quando entra in modalita' forecast sbaglia completamente indicando un movimento esattamente opposto a quello della frana





Con GRU le cose non sono andate molto bene

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import warnings
from scipy import stats
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential, layers, callbacks
from tensorflow.keras.layers import Dense, LSTM, Dropout, GRU, Bidirectional


tf.random.set_seed(1234)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

df = pd.read_csv("est2.csv", parse_dates = ["Date"])

# Define a function to draw time_series plot
def timeseries (x_axis, y_axis, x_label):
plt.figure(figsize = (10, 6))
plt.plot(x_axis, y_axis, color ="black")
plt.xlabel(x_label, {'fontsize': 12})
plt.ylabel('Est', {'fontsize': 12})
plt.show()

print(df)
timeseries(df.index, df['Est'], 'Time (day)')

df = df.drop('Date', axis = 1)

train_size = int(len(df)*0.8)

train_data = df.iloc[:train_size]
test_data = df.iloc[train_size:]

scaler = MinMaxScaler().fit(train_data)
train_scaled = scaler.transform(train_data)
test_scaled = scaler.transform(test_data)

def create_dataset (X, look_back = 1):
Xs, ys = [], []
for i in range(len(X)-look_back):
v = X[i:i+look_back]
Xs.append(v)
ys.append(X[i+look_back])
return np.array(Xs), np.array(ys)


LOOK_BACK = 300
X_train, y_train = create_dataset(train_scaled,LOOK_BACK)
X_test, y_test = create_dataset(test_scaled,LOOK_BACK)
# Print data shape
print("X_train.shape: ", X_train.shape)
print("y_train.shape: ", y_train.shape)
print("X_test.shape: ", X_test.shape)
print("y_test.shape: ", y_test.shape)

# Create GRU model
def create_gru(units):
model = Sequential()
# Input layer
model.add(GRU (units = units, return_sequences = True,
input_shape = [X_train.shape[1], X_train.shape[2]]))
model.add(Dropout(0.2))
# Hidden layer
model.add(GRU(units = units))
model.add(Dropout(0.2))
model.add(Dense(units = 1))
#Compile model
model.compile(optimizer='adam',loss='mse')
return model
model_gru = create_gru(300)

def fit_model(model):
early_stop = keras.callbacks.EarlyStopping(monitor = 'val_loss',patience = 10)
history = model.fit(X_train, y_train, epochs = 5,
validation_split = 0.2,
batch_size = 32, shuffle = False,
callbacks = [early_stop])
return history
history_gru = fit_model(model_gru)

y_test = scaler.inverse_transform(y_test)
y_train = scaler.inverse_transform(y_train)

def plot_loss (history, model_name):
plt.figure(figsize = (10, 6))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Train vs Validation Loss for ' + model_name)
plt.ylabel('Loss')
plt.xlabel('epoch')
plt.legend(['Train loss', 'Validation loss'], loc='upper right')
plt.show()
plot_loss (history_gru, 'GRU')

# Make prediction
def prediction(model):
prediction = model.predict(X_test)
prediction = scaler.inverse_transform(prediction)
return prediction



prediction_gru = prediction(model_gru)
# Plot test data vs prediction
def plot_future(prediction, model_name, y_test):
plt.figure(figsize=(10, 6))
range_future = len(prediction)
plt.plot(np.arange(range_future), np.array(y_test), label='Test data')
plt.plot(np.arange(range_future), np.array(prediction),label='Prediction')
plt.title('Test data vs prediction for ' + model_name)
plt.legend(loc='upper left')
plt.xlabel('Time (day)')
plt.ylabel('Est')
plt.show()

plot_future(prediction_gru, 'GRU', y_test)

# bilstm 5 epoch
#model_bilstm.save("bilstm.keras")
model_gru.save("gru.keras")









ConvLSTM

Continua l'esplorazione della previsione di serie tempo stavolta con ConvLSTM (il codice e' stato riadattato partendo da Gemini AI) ...