venerdì 7 febbraio 2025

Filtraggio dati estensimetro con Kalman

 Ho ripreso in mano il discorso filtro Kalman applicato pero' a dati reali di un estensimetro (i dati sono espressi in decimi di millimetro)










import numpy as np
import matplotlib.pyplot as plt

dati = np.genfromtxt('est.csv', delimiter=';')
dist = dati[:,2]
z = dist.T
dimensione = dist.shape
n_iter = dimensione[0]
Q = 1e-5 # process variance

# allocate space for arrays
xhat=np.zeros(dimensione) # a posteri estimate of x
P=np.zeros(dimensione) # a posteri error estimate
xhatminus=np.zeros(dimensione) # a priori estimate of x
Pminus=np.zeros(dimensione) # a priori error estimate
K=np.zeros(dimensione) # gain or blending factor
R = 0.1**2 # estimate of measurement variance, change to see effect

x = 23.6
xhat[0] = 23.6
P[0] = 1.0

for k in range(1,n_iter):
# time update
xhatminus[k] = xhat[k-1]
Pminus[k] = P[k-1]+Q

# measurement update
K[k] = Pminus[k]/( Pminus[k]+R )
xhat[k] = xhatminus[k]+K[k]*(z[k]-xhatminus[k])
P[k] = (1-K[k])*Pminus[k]

sp = np.diff(xhat, prepend=xhat[0])

plt.plot(z,'k+',label='noisy measurements')
plt.plot(xhat,'b-',label='a posteri estimate')
plt.title('Kalman filter ', fontweight='bold')

valid_iter = range(1,n_iter) # Pminus not valid at step 0
plt.plot(valid_iter,Pminus[valid_iter],label='a priori error estimate')
plt.title('Estimated $\it{\mathbf{a \ priori}}$ error vs. iteration step', fontweight='bold')

plt.title('Speed', fontweight='bold')
plt.ylabel('Speed (0.1mm/day)')




giovedì 6 febbraio 2025

Bayesian linear regression

Volevo provare un approccio differente dal solito per la regressione lineare

Il problema e' relativo alla terza fase del creep in frane la cui legge tra il tempo e l'inverso della velocita' e' approssimata a lineare

Questi dati sono ripresi dall' articolo Tommaso Carlà,Emanuele Intrieri,Federico Di Traglia,Teresa Nolesini,Giovanni Gigli,Nicola Casagli "Guidelines on the use of inverse velocity method as a tool for setting alarm thresholds and forecasting landslides and structure collapses" Landslides DOI 10.1007/s10346-016-0731-5 per la frana di Montebeni. La freccia verde indica la data dell'evento reale e la freccia rossa la data stimata

 La terza fase e' stata modellata tramite regressione a minimi quadrati applicando alcuni filtri

Ho provato a trattare gli stessi dati tramite regressione lineare bayesiana con Metropolis-Hastings Sampler

Alla fine i risultati sono molto simili. C'e' da notare che 

1) essendo il modello Montecarlo non deterministico i risultati non sono mai identici tra una run e la successiva. Ciao' comporta anche uno spostamento dell'intercetta della retta con l'asse del tempo spostando il TOF (time of failure)

2) sia la regressione a minimi quadrati che quella bayesiana sono molto sensibili al dataset (anche perche' alla fine ci sono solo una vendita di misure). Basta omettere un solo dato all'inizio che la correlazione e' sensibilmente differente. Questo comporta la difficolta' di definire in modo univoco il passaggio dalla fase 2 alla fase 3 del creep  

I dati sono

14, 0.4586046511627907
28, 0.44000000000000006
42, 0.30604651162790697
56, 0.27813953488372095
70, 0.28930232558139535
84, 0.3227906976744187
98, 0.21860465116279076
112, 0.17023255813953395
126, 0.17953488372092963
140, 0.1367441860465109
154, 0.13302325581395344
168, 0.11441860465116252
182, 0.08837209302325501
196, 0.0958139534883719
210, 0.14046511627906993
224, 0.13860465116279058
238, 0.09581395348837188
252, 0.04744186046511625
266, 0.051162790697674376
280, 0.02883720930232564
294, 0.030697674418604465
308, 0.010232558139534276

Per il codice sottostante ho modificato un esempio preso da chatgpt

import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Load CSV data
def load_data(csv_path):
data = pd.read_csv(csv_path)
return data

# Define the likelihood function (Gaussian likelihood)
def likelihood(y, X, beta):
predictions =, beta)
return -0.5 * np.sum((y - predictions) ** 2)

# Define the prior function (Normal prior)
def prior(beta, sigma=10):
return -0.5 * np.sum(beta ** 2) / (sigma ** 2)

# Define the proposal distribution (random walk)
def proposal(beta, step_size=0.1):
return beta + np.random.normal(0, step_size, size=beta.shape)

# Metropolis-Hastings sampler
def metropolis_hastings(X, y, initial_beta, n_samples, step_size=0.1):
beta = initial_beta
samples = []

for _ in range(n_samples):
# Propose new beta
beta_new = proposal(beta, step_size)

# Compute likelihood and prior for current and proposed beta
current_likelihood = likelihood(y, X, beta) + prior(beta)
new_likelihood = likelihood(y, X, beta_new) + prior(beta_new)

# Metropolis-Hastings acceptance step
acceptance_ratio = min(1, np.exp(new_likelihood - current_likelihood))

# Accept or reject the proposed beta
if np.random.rand() < acceptance_ratio:
beta = beta_new


return np.array(samples)

# Prediction function based on posterior samples
def predict(X, posterior_samples):
predictions =, posterior_samples.T)
return predictions.mean(axis=1), predictions.std(axis=1)

def predictLeastSquare(X, beta):
# Add intercept term to the new data
X_with_intercept = np.hstack([np.ones((X.shape[0], 1)), X])
return X_with_intercept @ beta

# Forecast function for new input data
def forecast(X_new, posterior_samples):
# X_new is the new data for which we want to forecast
return predict(X_new, posterior_samples)

def linear_least_squares(X, y):
# Add intercept term (column of ones)
X_with_intercept = np.hstack([np.ones((X.shape[0], 1)), X])

# Compute the coefficients using the normal equation: β = (X^T X)^(-1) X^T y
beta = np.linalg.inv(X_with_intercept.T @ X_with_intercept) @ X_with_intercept.T @ y
return beta

# Main function
def run_bayesian_linear_regression(csv_path, n_samples=1000000, step_size=0.1):
# Load data from CSV
data = load_data(csv_path)

# Assuming the last column is the target 'y' and the rest are features 'X'
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

beta = linear_least_squares(X, y)
y_pred = predictLeastSquare(X, beta)

# Add a column of ones for the intercept term (beta_0)
X = np.hstack([np.ones((X.shape[0], 1)), X])

# Initialize the parameters (beta_0, beta_1, ...)
initial_beta = np.zeros(X.shape[1])

# Perform Metropolis-Hastings sampling
posterior_samples = metropolis_hastings(X, y, initial_beta, n_samples, step_size)

# Get predictions
mean_predictions, std_predictions = predict(X, posterior_samples)

# Plot results
plt.figure(figsize=(10, 6))
plt.plot(y, label="True values")
plt.plot(mean_predictions, label="Bayesian Regr")
plt.fill_between(range(len(y)), mean_predictions - std_predictions,
mean_predictions + std_predictions, alpha=0.3, label="Err")
plt.plot(y_pred, label="Least Square", color='red')


return posterior_samples

# Usage example
csv_path = 'dati.csv' # Replace with the path to your CSV file
posterior_samples = run_bayesian_linear_regression(csv_path)

new_data = pd.DataFrame({
'feature1': [30, 35] # Replace with your features

X_new = np.hstack([np.ones((new_data.shape[0], 1)), new_data.values])

# Forecast values using the posterior samples
mean_forecast, std_forecast = forecast(X_new, posterior_samples)

# Output the forecasts
print("Forecasted values (mean):", mean_forecast)
print("Forecasted values (95% CI):", mean_forecast - 1.96 * std_forecast, mean_forecast + 1.96 * std_forecast)

venerdì 17 gennaio 2025

Pandas su serie tempo

Problema: hai un csv che riporta una serie tempo datetime/valore di un sensore

Effettuare calcoli, ordina le righe, ricampiona il passo temporale, colma i buchi della serie tempo con interpolazione in modo semplice

2024-07-24 12:42:05;0
2024-07-24 12:44:05;0
2024-07-24 12:46:05;0

 soluzione : usa Pandas di Python

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import argrelextrema

#legge i dati dal csv
df_b = pd.read_csv('battente.csv', sep=';',header=0, parse_dates = ['Data'], date_format = '%Y-%m-%d %H:%M:%S')
#ricampiona i dati con un dato al giorno usando il valore medio
df_b_res = df_b.resample('1h', on='Data').mean()
#df_b.iloc[:,1] = df_b.iloc[:,1].apply(lambda x: (x-x.mean())/ x.std())

df_p = pd.read_csv('pluviometro.csv', sep=';',header=0, parse_dates = ['Data'], date_format = '%Y-%m-%d %H:%M:%S')
#df_p = df_p.shift(periods=3)
df_p_res = df_p.resample('1h', on='Data').mean()

#unisce i due file usando come chiave la colonna temporale
merged_dataframe = pd.merge_asof(df_p_res, df_b_res, on="Data")
#rimuove le righe in cui non ci sono valori
df_finale = merged_dataframe.dropna(how='any')

#cerca i massimi locali facendo scorrere una finestra di 12 ore
df_finale['local_max_p'] = df_finale.iloc[argrelextrema(df_finale.Valore_p.values, np.greater_equal,order=12)[0]]['Valore_p']
df_finale['local_max_b'] = df_finale.iloc[argrelextrema(df_finale.Valore_b.values, np.greater_equal,order=12)[0]]['Valore_b']

#elimina i massimi locali che sono sotto ad una soglia
df_finale['local_max_p'] = np.where(df_finale["local_max_p"] < 1.0,np.nan,df_finale["local_max_p"])
df_finale['local_max_b'] = np.where(df_finale["local_max_b"] < 1.0,np.nan,df_finale["local_max_b"])

df_finale.to_csv("allineato.csv", sep=";")

fig, ax1 = plt.subplots()
# Plot first time series on the primary axis
ax1.plot(df_finale['Data'], df_finale['Valore_b'], 'g-')
ax1.plot(df_finale['Data'], df_finale['local_max_b'], 'mo')

ax1.set_ylabel('Battente (m)', color='g')
ax2 = ax1.twinx()
ax2.plot(df_finale['Data'], df_finale['Valore_p'], 'b-')
ax2.plot(df_finale['Data'], df_finale['local_max_p'], 'ro')

ax2.set_ylabel('Prec.(mm)', color='b')

venerdì 3 gennaio 2025

Change Detection with structural similarity

L'idea di base e' quella di cercare le differenze tra le due immagini sottostanti

Non e' immediatamente visibile ma ci sono dei sassi che si sono spostati nella zona a coordinata 2600,1600 circa. Per questa prova e' stato impiegato l'algoritmo di Structural Similarity  di SkImage

Foto 1


il calcolo indica un indice di somiglianza di circa il 71%


 Questa e' l'elaborazione (bianco minima differenza)



Il risultato finale e' la vegetazione con il suo movimento ha completamente obliterato il segnale relativo allo spostamento dei sassi

from skimage.metrics import structural_similarity
import cv2
import numpy as np

# Load images
before = cv2.imread('20241114.jpg')
after = cv2.imread('20241115.jpg')

# Convert images to grayscale
before_gray = cv2.cvtColor(before, cv2.COLOR_BGR2GRAY)
after_gray = cv2.cvtColor(after, cv2.COLOR_BGR2GRAY)

# Compute SSIM between the two images
(score, diff) = structural_similarity(before_gray, after_gray, full=True)
print("Image Similarity: {:.4f}%".format(score * 100))

# The diff image contains the actual image differences between the two images
# and is represented as a floating point data type in the range [0,1]
# so we must convert the array to 8-bit unsigned integers in the range
# [0,255] before we can use it with OpenCV
diff = (diff * 255).astype("uint8")
diff_box = cv2.merge([diff, diff, diff])

# Threshold the difference image, followed by finding contours to
# obtain the regions of the two input images that differ
thresh = cv2.threshold(diff, 20, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]
contours = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = contours[0] if len(contours) == 2 else contours[1]

mask = np.zeros(before.shape, dtype='uint8')
filled_after = after.copy()

for c in contours:
area = cv2.contourArea(c)
if area > 40:
x,y,w,h = cv2.boundingRect(c)
cv2.rectangle(before, (x, y), (x + w, y + h), (36,255,12), 2)
cv2.rectangle(after, (x, y), (x + w, y + h), (36,255,12), 2)
cv2.rectangle(diff_box, (x, y), (x + w, y + h), (36,255,12), 2)
cv2.drawContours(mask, [c], 0, (255,255,255), -1)
cv2.drawContours(filled_after, [c], 0, (0,255,0), -1)

cv2.imshow('20241114', before)
cv2.imshow('20241115', after)
cv2.imshow('diff', diff)
cv2.imshow('diff_box', diff_box)
cv2.imshow('mask', mask)
cv2.imshow('filled after', filled_after)


lunedì 4 novembre 2024

Dockerizza Flask

Un esempio semplice per inserire in un container Docker una applicazione Flask

Partiamo da una semplice applicazione che ha un file ed un requirements.txt

Si crea nello stesso folder dei due files precedenti il Dockerfile


FROM python:3.8-slim-buster

WORKDIR /python-docker

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "-m" , "flask", "run", "--host="]


e si effettua la build con

docker build --tag flask-docker .

terminata la build del container

docker run -d -p 5000:5000 flask-docker

la applicazione sara' visibile su localhost:5000

lunedì 21 ottobre 2024

Change detection monocular metric

Ho provato ad usare la rete di neurale a questo indirizzo per ottenere una mappa di profondita' da un unico jpg

 Questa l'immagine di partenza

con una NVidia 4070 l'elaborazione e' inferiore al secondo e questo il risultato

La rete viene definita metrica ma i risultati sono lontanissimi dalle misure reali degli oggetti fotografati

Se si prendono due immagini distanti nel tempo

 si puo' calcolare la differenza delle distanze ed eventuali cambiamenti

 Una volta ottenuta la mappa di profondita' e' possibile avere anche la nuvola dei punti utilizzando il seguente programmino in Python

import open3d as o3d
import sys

a23 = np.load("agosto23.npz")
a24 = np.load("agosto24.npz")
dd = a24['depth']-a23['depth']

plt.imshow(dd, interpolation='nearest',cmap='plasma')

#versione O3D
depth = o3d.geometry.Image(a23['depth'])
color ="rgb.jpg")

rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(color, depth,convert_rgb_to_intensity = False)
intrinsic =, height=2160, fx=2520.7, fy=2518.2, cx=1918, cy=1022)

pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, intrinsic)
o3d.visualization.draw_geometries([pcd])"nuvola2.ply", pcd)

questo il risultato






lunedì 14 ottobre 2024

Inner Join con Pandas

 Avevo la necessita' di unire due file csv (misura di tempo; misura del sensore) ma i due sensori hanno avuto dei fermi per cui non era sufficiente mettere le colonne affiancate ed era necessaria una inner join (ma sulla macchine di lavoro non ho accesso un sql server) ..sono oltre 85000 dati quindi e' escluso fare a mano

La libreria Pandas e' venuta in aiuto

CSV Portata


CSV Tordibita


import pandas as pd
from matplotlib import pyplot as plt

por = pd.read_csv('portata.csv',sep=';')
tor = pd.read_csv('torbidita.csv',sep=';')
por['tempo'] = por['tempo'].astype(str)
tor['tempo'] = tor['tempo'].astype(str)

unione = por.merge(tor, on="tempo", how='inner')
title= '25/07/06 - 27/02/2017',
#unione.to_csv("unione.csv", sep=';')

venerdì 11 ottobre 2024

Aruco e Image Stacking

Uno dei problemi maggiori dell'uso dei tag aruco in esterno per misurare distanze e' che le condizioni di illuminazione solare sono molto variabili e la compressione jpg completa l'opera aggiungendo rumore su rumore

Per cercare di limitare il problema ho provato a fare image stacking sovrapponendo 8 foto (una ogni ora dalle 09 alle 16) con lo script al link ed applicando il programma visto qui

Il miglioramento non e' trascurabile perche' la standard deviation e' passata

3.1 m => da 0.19% a 0.16% (0.5 cm)

5.4 m => da 0.35% a 0.28% (1.5 cm)

7.6 m => da 0.71% a 0.38% (2.8 cm)

9.6 m => da 0.5% a 0.41% (4.8 cm)


import os
import cv2
import numpy as np
from time import time

# Align and stack images with ECC method
# Slower but more accurate
def stackImagesECC(file_list):
M = np.eye(3, 3, dtype=np.float32)

first_image = None
stacked_image = None

for file in file_list:
image = cv2.imread(file,1).astype(np.float32) / 255
if first_image is None:
# convert to gray scale floating point image
first_image = cv2.cvtColor(image,cv2.COLOR_BGR2GRAY)
stacked_image = image
# Estimate perspective transform
s, M = cv2.findTransformECC(cv2.cvtColor(image,cv2.COLOR_BGR2GRAY), first_image, M, cv2.MOTION_HOMOGRAPHY)
w, h, _ = image.shape
# Align image to first image
image = cv2.warpPerspective(image, M, (h, w))
stacked_image += image

stacked_image /= len(file_list)
stacked_image = (stacked_image*255).astype(np.uint8)
return stacked_image

# Align and stack images by matching ORB keypoints
# Faster but less accurate
def stackImagesKeypointMatching(file_list):

orb = cv2.ORB_create()

# disable OpenCL to because of bug in ORB in OpenCV 3.1

stacked_image = None
first_image = None
first_kp = None
first_des = None
for file in file_list:
image = cv2.imread(file,1)
imageF = image.astype(np.float32) / 255

# compute the descriptors with ORB
kp = orb.detect(image, None)
kp, des = orb.compute(image, kp)

# create BFMatcher object
matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

if first_image is None:
# Save keypoints for first image
stacked_image = imageF
first_image = image
first_kp = kp
first_des = des
# Find matches and sort them in the order of their distance
matches = matcher.match(first_des, des)
matches = sorted(matches, key=lambda x: x.distance)

src_pts = np.float32(
[first_kp[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
dst_pts = np.float32(
[kp[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)

# Estimate perspective transformation
M, mask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 5.0)
w, h, _ = imageF.shape
imageF = cv2.warpPerspective(imageF, M, (h, w))
stacked_image += imageF

stacked_image /= len(file_list)
stacked_image = (stacked_image*255).astype(np.uint8)
return stacked_image

# ===== MAIN =====
# Read all files in directory
import argparse

if __name__ == '__main__':

parser = argparse.ArgumentParser(description='')
parser.add_argument('input_dir', help='Input directory of images ()')
parser.add_argument('output_image', help='Output image name')
parser.add_argument('--method', help='Stacking method ORB (faster) or ECC (more precise)')
parser.add_argument('--show', help='Show result image',action='store_true')
args = parser.parse_args()

image_folder = args.input_dir
if not os.path.exists(image_folder):
print("ERROR {} not found!".format(image_folder))

file_list = os.listdir(image_folder)
file_list = [os.path.join(image_folder, x)
for x in file_list if x.endswith(('.jpg', '.png','.bmp'))]

if args.method is not None:
method = str(args.method)
method = 'KP'

tic = time()

if method == 'ECC':
# Stack images using ECC method
description = "Stacking images using ECC method"
stacked_image = stackImagesECC(file_list)

elif method == 'ORB':
#Stack images using ORB keypoint method
description = "Stacking images using ORB method"
stacked_image = stackImagesKeypointMatching(file_list)

print("ERROR: method {} not found!".format(method))

print("Stacked {0} in {1} seconds".format(len(file_list), (time()-tic) ))

print("Saved {}".format(args.output_image))

# Show image
cv2.imshow(description, stacked_image)





mercoledì 2 ottobre 2024

Aruco Tag e filtri Kalman

Usando gli Aruco tag in ambiente esterno con illuminazione solare a diverse ore e in diversi periodi dell'anno ho trovato un errore nella stima della distanza tra due tag compresi tra 0.7% e 1.3% della distanza misurata. La domanda e' stata: e' possibile ridurre l'errore ?

Ho provato ad applicare il filtro Kalman per condizioni statiche (le distanze tra i tag nel tempo non sono variate) usando usando il codice a questo indirizzo 

Nel  grafico sottostante i punti blu sono i dati non filtrati, in linea continua blu i dati filtrati, in linea rossa il valore reale della misura di distanza

questo il codice impiegato con modestissime modifiche rispetto a quello di esempio


import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

def kalman_1d(x, P, measurement, R_est, Q_est):
x_pred = x
P_pred = P + Q_est
K = P_pred / (P_pred + R_est)
x_est = x_pred + K * (measurement - x_pred)
P_est = (1 - K) * P_pred
return x_est, P_est

def plot_1d_comparison(measurements_made, estimate, true_value, axis):
axis.plot(measurements_made ,'k+' ,label='measurements' ,alpha=0.3)
axis.plot(estimate ,'-' ,label='KF estimate')
if not isinstance(true_value, (list, tuple, np.ndarray)):
# plot line for a constant value
axis.axhline(true_value ,color='r' ,label='true value', alpha=0.5)
# for a list, tuple or array, plot the points
axis.plot(true_value ,color='r' ,label='true value', alpha=0.5)
axis.legend(loc = 'lower right')
axis.set_title('Estimated position vs. time step')

def plot_1d_error(estimated_error, lower_limit, upper_limit, axis):
# lower_limit and upper_limit are the lower and upper limits of the vertical axis
axis.plot(estimated_error, label='KF estimate for $P$')
axis.legend(loc = 'upper right')
axis.set_title('Estimated error vs. time step')
plt.setp(axis ,'ylim' ,[lower_limit, upper_limit])

nome_file = "20_40"
df = pd.read_csv(nome_file+'.csv',header=0,sep=";")

#11 5.38
#15 3.39
#25 7.46
#40 4.26

mu = 4.26 # Actual position
R = 0.1 # Actual standard deviation of actual measurements (R)

# Generate measurements
#n_measurements = 1000 # Change the number of points to see how the convergence changes
#Z = np.random.normal(mu, np.sqrt(R), size=n_measurements)
Z = df[df.columns[2]].to_numpy()
# Estimated covariances
Q_est = 1e-4
R_est = 2e-2

# initial guesses
x = 5.1 # Use an integer (imagine the initial guess is determined with a meter stick)
P = 0.04 # error covariance P

KF_estimate=[] # To store the position estimate at each time point
KF_error=[] # To store estimated error at each time point
for z in Z:
x, P = kalman_1d(x, P, z, R_est, Q_est)

fig, axes = plt.subplots(1 ,2, figsize=(12, 5))
plot_1d_comparison(Z, KF_estimate, mu, axes[0])
plot_1d_error(KF_error, 0, 0.015, axes[1])





lunedì 23 settembre 2024

Spot Robot Calibration Board Boston Bynamics e OpenCV

Ho avuto la fortuna di poter usare una enorma Charuco Board legata al cane robot Spot della Boston Dynamics

Nonostante non sia esplicitamente indicato si tratta di una Charuco Board 4x4_100 di 9 colonne e 4 righe

Per usarla per calibrare una camera si puo' usare il seguente script OpenCV

Attenzione: per funzionare lo script necessita setLegacyPattern(True)

import os
import numpy as np
import cv2

# ------------------------------
ARUCO_DICT = cv2.aruco.DICT_4X4_250
# ...
PATH_TO_YOUR_IMAGES = './hikvision'
# ------------------------------

def calibrate_and_save_parameters():
# Define the aruco dictionary and charuco board
dictionary = cv2.aruco.getPredefinedDictionary(ARUCO_DICT)
params = cv2.aruco.DetectorParameters()
params.cornerRefinementMethod = 0

image_files = [os.path.join(PATH_TO_YOUR_IMAGES, f) for f in os.listdir(PATH_TO_YOUR_IMAGES) if f.endswith(".jpg")]

all_charuco_corners = []
all_charuco_ids = []

for image_file in image_files:
image = cv2.imread(image_file)
marker_corners, marker_ids, _ = cv2.aruco.detectMarkers(image, dictionary, parameters=params)

# If at least one marker is detected
if len(marker_ids) > 0:
charuco_retval, charuco_corners, charuco_ids = cv2.aruco.interpolateCornersCharuco(marker_corners, marker_ids, image, board)

if charuco_retval:

# Calibrate camera
retval, camera_matrix, dist_coeffs, rvecs, tvecs = cv2.aruco.calibrateCameraCharuco(all_charuco_corners, all_charuco_ids, board, image.shape[:2], None, None)

# Save calibration data'hik_camera_matrix.npy', camera_matrix)'hik_dist_coeffs.npy', dist_coeffs)



giovedì 23 maggio 2024

Regressione umidita' Random Forest con YDF

 Volevo sperimentare la libreria YDF (Decision Trees) ed ho preso i dati del post precedente (soil moisture su terreni naturali

Non ho fatto nessuna ottimizzazione, ho usato l'esempio base della regressione

(da notare che i grafici funzionano su Colab non riesco a visualizzarli su jupyter notebook locali)

nel codice sottostante e' stato attivato il GradientBoostTreeLearner ma vi sono anche RandomForestLearner, CartLearner


from google.colab import drive

!pip install ydf
import ydf # Yggdrasil Decision Forests
import pandas as pd # We use Pandas to load small datasets

all_ds = pd.read_csv(f"/content/drive/MyDrive/soilmoisture.csv")

# Randomly split the dataset into a training (70%) and testing (30%) dataset
all_ds = all_ds.sample(frac=1)
split_idx = len(all_ds) * 7 // 10
train_ds = all_ds.iloc[:split_idx]
test_ds = all_ds.iloc[split_idx:]

# Print the first 5 training examples

model = ydf.GradientBoostedTreesLearner(label="soil_moisture",

evaluation = model.evaluate(test_ds)



La sintassi e' estremamente concisa ma il risultato della regressione e' ottimale

per vedere quale ' la feature che influenza il modello si puo' usare

model.analyze(test_ds, sampling=0.1)

martedì 30 aprile 2024

Ricampionare un segnale con SciPy

Un sistema rapido per ricampionare dati con spaziatura non omogenea  usando una curva di interpolazione


Dati originali


import matplotlib.pyplot as plt
from scipy import interpolate
import numpy as np

x = np.array([1, 2, 5, 10])
y = np.array([1, 4, 25, 100])
fcubic = interpolate.interp1d(x, y,kind='cubic')

xnew = np.arange(1, 10, 0.25)
ynew = fcubic(xnew)
plt.plot(x, y, 'X', xnew, ynew,'bo')


da osservare che l'interpolazione avviene tramite una spline cubica per cui su cuspidi non e' detto che il risultato sia accettabile. SciPy offre altre opzioni per la curva di interpolazione


giovedì 14 marzo 2024


ATTENZIONE: per funzionare alla massima risoluzione la Realsense deve usare una porta USB 3 ed un cavo idoneo ad USB 3 altrimenti si limita a 640x480



Solo Ottico 1920x1080

import pyrealsense2 as rs
import numpy as np
import cv2
import time
import math

pipeline = rs.pipeline()
config = rs.config()

config.enable_stream(, 1920, 1080, rs.format.bgr8, 30)

profile = pipeline.start(config)

align_to =
align = rs.align(align_to)

frames = pipeline.wait_for_frames()
aligned_frames = align.process(frames)
color_frame = aligned_frames.get_color_frame()
color_image = np.asanyarray(color_frame.get_data())
imageName1 = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_Color.png'
cv2.imwrite(imageName1, color_image)



Ottico + Profondita' 1280x720 



import pyrealsense2 as rs
import numpy as np
import cv2
import time
import math

pipeline = rs.pipeline()
config = rs.config()

config.enable_stream(, 1280, 720, rs.format.z16, 30)
config.enable_stream(, 1280, 720, rs.format.bgr8, 30)

profile = pipeline.start(config)
depth_sensor = profile.get_device().first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()

# We will be removing the background of objects more than
#  clipping_distance_in_meters meters away
clipping_distance_in_meters = 1.5
clipping_distance = clipping_distance_in_meters / depth_scale

align_to =
align = rs.align(align_to)

frames = pipeline.wait_for_frames()

aligned_frames = align.process(frames)
aligned_depth_frame = aligned_frames.get_depth_frame()
color_frame = aligned_frames.get_color_frame()

depth_image = np.asanyarray(aligned_depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())

# Remove background - Set pixels further than clipping_distance to grey
grey_color = 153
depth_image_3d = np.dstack((depth_image,depth_image,depth_image)) #depth image is 1 channel, color is 3 channels
bg_removed = np.where((depth_image_3d > clipping_distance) | (depth_image_3d <= 0), grey_color, color_image)

# Render images
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
images = np.hstack((bg_removed, depth_colormap))
#cv2.namedWindow('Align Example', cv2.WINDOW_AUTOSIZE)

# Filename
imageName1 = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_Color.png'
imageName2 = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_Depth.png'
imageName3 = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_bg_removed.png'
imageName4 = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_ColorDepth.png'
imageName5 = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_DepthColormap.png'

# Saving the image
cv2.imwrite(imageName1, color_image)
cv2.imwrite(imageName2, depth_image)
cv2.imwrite(imageName3, images)
cv2.imwrite(imageName4, bg_removed )
cv2.imwrite(imageName5, depth_colormap )



Infrarosso' 1280x720


import pyrealsense2 as rs
import numpy as np
import cv2
import time
import math
pipeline = rs.pipeline()
config = rs.config()

config.enable_stream(, 1, 1280, 720, rs.format.y8, 30)
profile = pipeline.start(config)
frames = pipeline.wait_for_frames()
ir1_frame = frames.get_infrared_frame(1)
image = np.asanyarray(ir1_frame.get_data())
imageIR = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_IR.png'
cv2.imwrite(imageIR, image)


Infrarosso' IR Emitter OFF 1280x720


import pyrealsense2 as rs
import numpy as np
import cv2
import time
import math
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(, 1, 1280, 720, rs.format.y8, 30)

#disabilita disable IR emitter
pipeline_profile = pipeline.start(config)
device = pipeline_profile.get_device()
depth_sensor = device.query_sensors()[0]
if depth_sensor.supports(rs.option.emitter_enabled):
    depth_sensor.set_option(rs.option.emitter_enabled, 0)

frames = pipeline.wait_for_frames()
ir1_frame = frames.get_infrared_frame(1)
image = np.asanyarray(ir1_frame.get_data())
imageIR = str(time.strftime("%Y_%m_%d_%H_%M_%S")) +  '_IR_OFF.png'
cv2.imwrite(imageIR, image)





import pyrealsense2 as rs

import numpy as np

def initialize_camera():
    # start the frames pipe
    p = rs.pipeline()
    conf = rs.config()
    prof = p.start(conf)
    return p

def gyro_data(gyro):
    return np.asarray([gyro.x, gyro.y, gyro.z])

def accel_data(accel):
    return np.asarray([accel.x, accel.y, accel.z])

p = initialize_camera()
    while True:
        f = p.wait_for_frames()
        accel = accel_data(f[0].as_motion_frame().get_motion_data())
        gyro = gyro_data(f[1].as_motion_frame().get_motion_data())
        print("accelerometer: ", accel)
        print("gyro: ", gyro)



gyro:  [0.         0.         0.00349066]
accelerometer:  [-0.24516624 -8.78675842 -2.75566864]





import pyrealsense2 as rs
pipeline = rs.pipeline()
config = rs.config()
profile = pipeline.start(config) # Start streaming
sensor_dep = profile.get_device().first_depth_sensor()
print("Trying to set Exposure")
exp = sensor_dep.get_option(rs.option.exposure)
print ("exposure = %d" % exp)
print ("Setting exposure to new value")
exp = sensor_dep.set_option(rs.option.exposure, 25000)
exp = sensor_dep.get_option(rs.option.exposure)
print ("New exposure = %d" % exp)
profile = pipeline.stop

l'esposizione si puo' regolare anche su ROI

p = rs.pipeline()
prof = p.start()
s = prof.get_device().first_roi_sensor()
roi = s.get_region_of_interest()



ADVANVCED MODE (regola parametri di dettaglio)



import pyrealsense2 as rs
import time
import json

DS5_product_ids = ["0AD1", "0AD2", "0AD3", "0AD4", "0AD5", "0AF6", "0AFE", "0AFF", "0B00", "0B01", "0B03", "0B07", "0B3A", "0B5C"]

def find_device_that_supports_advanced_mode() :
    ctx = rs.context()
    ds5_dev = rs.device()
    devices = ctx.query_devices();
    for dev in devices:
        if dev.supports(rs.camera_info.product_id) and str(dev.get_info(rs.camera_info.product_id)) in DS5_product_ids:
            if dev.supports(
                print("Found device that supports advanced mode:", dev.get_info(
            return dev
    raise Exception("No D400 product line device that supports advanced mode was found")

    dev = find_device_that_supports_advanced_mode()
    advnc_mode = rs.rs400_advanced_mode(dev)
    print("Advanced mode is", "enabled" if advnc_mode.is_enabled() else "disabled")

    # Loop until we successfully enable advanced mode
    while not advnc_mode.is_enabled():
        print("Trying to enable advanced mode...")
        # At this point the device will disconnect and re-connect.
        print("Sleeping for 5 seconds...")
        # The 'dev' object will become invalid and we need to initialize it again
        dev = find_device_that_supports_advanced_mode()
        advnc_mode = rs.rs400_advanced_mode(dev)
        print("Advanced mode is", "enabled" if advnc_mode.is_enabled() else "disabled")

    # Get each control's current value
    print("Depth Control: \n", advnc_mode.get_depth_control())
    print("RSM: \n", advnc_mode.get_rsm())
    print("RAU Support Vector Control: \n", advnc_mode.get_rau_support_vector_control())
    print("Color Control: \n", advnc_mode.get_color_control())
    print("RAU Thresholds Control: \n", advnc_mode.get_rau_thresholds_control())
    print("SLO Color Thresholds Control: \n", advnc_mode.get_slo_color_thresholds_control())
    print("SLO Penalty Control: \n", advnc_mode.get_slo_penalty_control())
    print("HDAD: \n", advnc_mode.get_hdad())
    print("Color Correction: \n", advnc_mode.get_color_correction())
    print("Depth Table: \n", advnc_mode.get_depth_table())
    print("Auto Exposure Control: \n", advnc_mode.get_ae_control())
    print("Census: \n", advnc_mode.get_census())

    #To get the minimum and maximum value of each control use the mode value:
    query_min_values_mode = 1
    query_max_values_mode = 2
    current_std_depth_control_group = advnc_mode.get_depth_control()
    min_std_depth_control_group = advnc_mode.get_depth_control(query_min_values_mode)
    max_std_depth_control_group = advnc_mode.get_depth_control(query_max_values_mode)
    print("Depth Control Min Values: \n ", min_std_depth_control_group)
    print("Depth Control Max Values: \n ", max_std_depth_control_group)

    # Set some control with a new (median) value
    current_std_depth_control_group.scoreThreshA = int((max_std_depth_control_group.scoreThreshA - min_std_depth_control_group.scoreThreshA) / 2)
    print("After Setting new value, Depth Control: \n", advnc_mode.get_depth_control())

    # Serialize all controls to a Json string
    serialized_string = advnc_mode.serialize_json()
    print("Controls as JSON: \n", serialized_string)
    as_json_object = json.loads(serialized_string)

    # We can also load controls from a json string
    # For Python 2, the values in 'as_json_object' dict need to be converted from unicode object to utf-8
    if type(next(iter(as_json_object))) != str:
        as_json_object = {k.encode('utf-8'): v.encode("utf-8") for k, v in as_json_object.items()}
    # The C++ JSON parser requires double-quotes for the json object so we need
    # to replace the single quote of the pythonic json to double-quotes
    json_string = str(as_json_object).replace("'", '\"')

except Exception as e:


Continua l'esplorazione della previsione di serie tempo stavolta con ConvLSTM (il codice e' stato riadattato partendo da Gemini AI) ...