Avevo gia' provato ad usare LSTM per dati derivanti da un estensimetro qui ... adesso ci riprovo con la variante Bilateral LSTM e GRU prendendo spunto da questa pagina
Questi sono i dati raw. I dati sono campionati ogni 20 minuti e sono presenti diverse fasi di un movimento di versante
Prima di iniziare un paio di considerazioni
In questo grafico sono riassunti gli spostamenti registrati dall'estensimetro, la velocita' di movimento ed i dati pluviometrici
Dalla cross correlazione tra il dato meteo e quello di spostamento si un lag di 27 giorni
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
df_b = pd.read_csv('est2.csv', sep=',',header=0, parse_dates = ['Date'], date_format = '%Y-%m-%d %H:%M:%S')
pluvio = pd.read_csv('arpa.csv', sep=';',header=0, parse_dates = ['Date'], date_format = '%d/%m/%Y %H:%M:%S')
df_b_res = df_b.resample('24h', on='Date').mean()
pluvio_res = pluvio.resample('24h', on='Date').mean()
print(df_b_res)
print(pluvio)
df_b_res['Diff'] = df_b_res['Est'].diff()
df_b_res['Inv'] = 1/df_b_res['Diff']
time = pd.date_range('2024-02-01', periods=214, freq='D')
print(time)
print(pluvio_res.iloc[:,0])
print(df_b_res.iloc[:,0])
df = pd.DataFrame({'series1': df_b_res.iloc[:,0], 'series2': pluvio_res.iloc[:,0]}, index=time)
cross_corr = df['series1'].corr(df['series2'])
def cross_correlation(series1, series2, max_lag):
corr = []
for lag in range(-max_lag, max_lag + 1):
corr.append(df['series1'].shift(lag).corr(df['series2']))
return corr
max_lag = 40 # You can adjust the maximum lag
lags = range(-max_lag, max_lag + 1)
correlation_values = cross_correlation(df['series1'], df['series2'], max_lag)
# Find the lag that maximizes the correlation
best_lag = lags[np.argmax(correlation_values)]
print(f"The best lag is: {best_lag} days")
#print(df_b_res)
plt.figure(figsize=(11,7))
plt.plot(df_b_res['Est'],"-b",label="Est. (0.1 mm)")
plt.plot(df_b_res['Diff']*35,"-r",label="35*Vel. (0.1 mm/day)")
plt.plot(pluvio_res['Prec'],"-g",label="Rain (mm)")
plt.title('Velocita')
plt.xlabel('Date')
plt.ylabel('Estensimetro 0.1mm')
plt.legend(loc="lower left")
plt.show()
plt.figure(figsize=(11,7))
plt.plot(df_b_res['Inv'],"+b",label="Est.")
plt.title('1/Velocita')
plt.xlabel('Date')
plt.ylabel('1/V day/0.1mm')
plt.legend(loc="lower left")
plt.show()
1) LSTM e' molto sensibile all'overfitting del modello.Nel caso specifico valori di 5-6 epochs sono gia' ottimali per la convergenza del modello
2) se il modello non converge la predizione magari ha una forma e' corretta ma e' shiftata (tipo il grafico sottostante
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Bidirectional
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
data = pd.read_csv('est2.csv')
data = data['Est'].values.reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)
def create_dataset(data, time_step=1):
X, y = [], []
for i in range(len(data) - time_step):
X.append(data[i:i + time_step, 0])
y.append(data[i + time_step, 0])
return np.array(X), np.array(y)
time_step = 200
X, y = create_dataset(data_scaled, time_step)
X = X.reshape(X.shape[0], X.shape[1], 1)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
model = Sequential()
model.add(Bidirectional(LSTM(units=200, return_sequences=True), input_shape=(time_step, 1)))
model.add(Bidirectional(LSTM(units=200)))
model.add(Dense(units=1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=5, batch_size=32)
predictions = model.predict(X_test)
predictions_rescaled = scaler.inverse_transform(predictions)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))
plt.figure(figsize=(10,6))
plt.plot(y_test_rescaled, label='Actual Data')
plt.plot(predictions_rescaled, label='Predicted Data')
plt.title('Bidirectional LSTM Time Series Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
forecast_steps = 1000 # Number of time steps to predict
input_sequence = X_test[-1]
forecast = []
for _ in range(forecast_steps):
predicted_value = model.predict(input_sequence.reshape(1, time_step, 1))
forecast.append(predicted_value[0, 0])
input_sequence = np.append(input_sequence[1:], predicted_value, axis=0)
forecast_rescaled = scaler.inverse_transform(np.array(forecast).reshape(-1, 1))
predictions_rescaled = scaler.inverse_transform(predictions)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))
# Plot the results
plt.figure(figsize=(10,6))
plt.plot(forecast_rescaled, label='Forecasted Data')
plt.title('Bidirectional LSTM Time Series Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
risultati sul set di validazione
Ho provato a chiedere a ChatGPT che mi ha risposto con questo codice
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input, Bidirectional
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
def load_data(file_path):
df = pd.read_csv(file_path)
data = df['Est'].values
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data.reshape(-1, 1))
return data_scaled, scaler
def create_dataset(data, time_step=200):
X, y = [], []
for i in range(len(data) - time_step):
X.append(data[i:i + time_step, 0])
y.append(data[i + time_step, 0])
X = np.array(X)
y = np.array(y)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
return X, y
def create_bilateral_lstm_model(input_shape):
inputs = Input(shape=input_shape)
forward_lstm = LSTM(200, return_sequences=False)(inputs)
backward_lstm = LSTM(200, return_sequences=False, go_backwards=True)(inputs)
merged = tf.keras.layers.concatenate([forward_lstm, backward_lstm])
output = Dense(1)(merged)
model = Model(inputs=inputs, outputs=output)
return model
def train_and_forecast(file_path, time_step=200, forecast_days=200):
data_scaled, scaler = load_data(file_path)
X, y = create_dataset(data_scaled, time_step)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
model = create_bilateral_lstm_model(input_shape=(X_train.shape[1], 1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_test, y_test))
y_pred = model.predict(X_test)
y_pred_rescaled = scaler.inverse_transform(y_pred)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))
plt.figure(figsize=(12, 6))
plt.plot(y_test_rescaled, label="True Data")
plt.plot(y_pred_rescaled, label="Predicted Data")
plt.title("True vs Predicted Data (Test Set)")
plt.legend()
plt.show()
last_sequence = data_scaled[-time_step:].reshape(1, time_step, 1)
future_predictions = []
for _ in range(forecast_days):
forecast = model.predict(last_sequence)
future_predictions.append(forecast[0, 0])
last_sequence = np.append(last_sequence[:, 1:, :], forecast.reshape(1, 1, 1), axis=1)
future_predictions_rescaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
plt.figure(figsize=(12, 6))
plt.plot(range(len(data_scaled)), scaler.inverse_transform(data_scaled), label="Historical Data")
plt.plot(range(len(data_scaled), len(data_scaled) + forecast_days), future_predictions_rescaled, label="Forecasted Data", linestyle="--")
plt.title(f"Forecasted Data for Next {forecast_days} Steps")
plt.legend()
plt.show()
return future_predictions_rescaled
file_path = 'est2.csv'
forecast = train_and_forecast(file_path, time_step=200, forecast_days=200)
print("Forecasted future values:")
print(forecast)
Visto che il modello converge adesso vediamo come si comporta quando si ha un evento inatteso (ovvero l'innescarsi del movimento di versante)..ho tagliato i dati sul primo movimento che corrisponda circa con la coordinata di ascissa 2000
l'addestramento del modello non e' male (anche se non perfetto)
il problema e' che quando entra in modalita' forecast sbaglia completamente indicando un movimento esattamente opposto a quello della frana
Con GRU le cose non sono andate molto bene
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import warnings
from scipy import stats
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential, layers, callbacks
from tensorflow.keras.layers import Dense, LSTM, Dropout, GRU, Bidirectional
tf.random.set_seed(1234)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
df = pd.read_csv("est2.csv", parse_dates = ["Date"])
# Define a function to draw time_series plot
def timeseries (x_axis, y_axis, x_label):
plt.figure(figsize = (10, 6))
plt.plot(x_axis, y_axis, color ="black")
plt.xlabel(x_label, {'fontsize': 12})
plt.ylabel('Est', {'fontsize': 12})
plt.show()
print(df)
timeseries(df.index, df['Est'], 'Time (day)')
df = df.drop('Date', axis = 1)
train_size = int(len(df)*0.8)
train_data = df.iloc[:train_size]
test_data = df.iloc[train_size:]
scaler = MinMaxScaler().fit(train_data)
train_scaled = scaler.transform(train_data)
test_scaled = scaler.transform(test_data)
def create_dataset (X, look_back = 1):
Xs, ys = [], []
for i in range(len(X)-look_back):
v = X[i:i+look_back]
Xs.append(v)
ys.append(X[i+look_back])
return np.array(Xs), np.array(ys)
LOOK_BACK = 300
X_train, y_train = create_dataset(train_scaled,LOOK_BACK)
X_test, y_test = create_dataset(test_scaled,LOOK_BACK)
# Print data shape
print("X_train.shape: ", X_train.shape)
print("y_train.shape: ", y_train.shape)
print("X_test.shape: ", X_test.shape)
print("y_test.shape: ", y_test.shape)
# Create GRU model
def create_gru(units):
model = Sequential()
# Input layer
model.add(GRU (units = units, return_sequences = True,
input_shape = [X_train.shape[1], X_train.shape[2]]))
model.add(Dropout(0.2))
# Hidden layer
model.add(GRU(units = units))
model.add(Dropout(0.2))
model.add(Dense(units = 1))
#Compile model
model.compile(optimizer='adam',loss='mse')
return model
model_gru = create_gru(300)
def fit_model(model):
early_stop = keras.callbacks.EarlyStopping(monitor = 'val_loss',patience = 10)
history = model.fit(X_train, y_train, epochs = 5,
validation_split = 0.2,
batch_size = 32, shuffle = False,
callbacks = [early_stop])
return history
history_gru = fit_model(model_gru)
y_test = scaler.inverse_transform(y_test)
y_train = scaler.inverse_transform(y_train)
def plot_loss (history, model_name):
plt.figure(figsize = (10, 6))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Train vs Validation Loss for ' + model_name)
plt.ylabel('Loss')
plt.xlabel('epoch')
plt.legend(['Train loss', 'Validation loss'], loc='upper right')
plt.show()
plot_loss (history_gru, 'GRU')
# Make prediction
def prediction(model):
prediction = model.predict(X_test)
prediction = scaler.inverse_transform(prediction)
return prediction
prediction_gru = prediction(model_gru)
# Plot test data vs prediction
def plot_future(prediction, model_name, y_test):
plt.figure(figsize=(10, 6))
range_future = len(prediction)
plt.plot(np.arange(range_future), np.array(y_test), label='Test data')
plt.plot(np.arange(range_future), np.array(prediction),label='Prediction')
plt.title('Test data vs prediction for ' + model_name)
plt.legend(loc='upper left')
plt.xlabel('Time (day)')
plt.ylabel('Est')
plt.show()
plot_future(prediction_gru, 'GRU', y_test)
# bilstm 5 epoch
#model_bilstm.save("bilstm.keras")
model_gru.save("gru.keras")
Nessun commento:
Posta un commento