lunedì 25 agosto 2025

Borehole images segmentation

Sempre continuando l'avventura delle reti neurali applicate alla geologia ho trovato questo articolo 




The segmentation and intelligent recognition of structural surfaces in borehole images based on the U2-Net network Yu, Qingjun; Wang, Guannan; Cheng, Hai; Guo, Wenzhi; Liu, Yanbiao (2024).  che ha pubblicato il dataset come Creative Commons in due parti (parte 1, parte 2)

Le immagini in totale sono 468 (le ho divise in 468 per il training ed il rimanente come dataset di controllo) ma non risultano annotate

Per questo motivo le ho annotate usando LabelMe

LabelMe


I file Json di LabelMe sono stati poi convertiti in maschera (nero background, bianco classe)


import json
import numpy as np
import cv2
from PIL import Image
import argparse

def create_mask_from_json(json_file_path, image_width, image_height, output_path=None, buffer_size=5):
"""
Create a binary mask from JSON label file.
Args:
json_file_path (str): Path to the JSON label file
image_width (int): Width of the output mask
image_height (int): Height of the output mask
output_path (str, optional): Path to save the mask image
buffer_size (int): Buffer size around lines (line thickness/2)
Returns:
numpy.ndarray: Binary mask array
"""
# Load JSON data
with open(json_file_path, 'r') as f:
data = json.load(f)
# Create black mask (background)
mask = np.zeros((image_height, image_width), dtype=np.uint8)
# Process shapes/annotations in the JSON
shapes = data.get('shapes', [])
for shape in shapes:
# Check if label equals 1
label = shape.get('label', '')
if label == '1' or label == 1:
# Get shape type and points
shape_type = shape.get('shape_type', '')
points = shape.get('points', [])
if shape_type == 'linestrip' and points:
# Convert points to numpy array and ensure integer coordinates
pts = np.array(points, dtype=np.int32)
# Method 1: Simple line thickness
# Draw linestrip with specified thickness
for i in range(len(pts) - 1):
cv2.line(mask, tuple(pts[i]), tuple(pts[i + 1]), 255, thickness=buffer_size * 2 + 1)
# Method 2: Morphological dilation (uncomment to use instead)
# Draw thin lines first, then dilate
# temp_mask = np.zeros_like(mask)
# for i in range(len(pts) - 1):
# cv2.line(temp_mask, tuple(pts[i]), tuple(pts[i + 1]), 255, thickness=1)
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (buffer_size*2+1, buffer_size*2+1))
# temp_mask = cv2.dilate(temp_mask, kernel, iterations=1)
# mask = cv2.bitwise_or(mask, temp_mask)
elif shape_type == 'polygon' and points:
# Handle polygon shapes if present
pts = np.array(points, dtype=np.int32)
cv2.fillPoly(mask, [pts], 255)
elif shape_type == 'rectangle' and points:
# Handle rectangle shapes if present
if len(points) >= 2:
pt1 = tuple(map(int, points[0]))
pt2 = tuple(map(int, points[1]))
cv2.rectangle(mask, pt1, pt2, 255, -1)
# Save mask if output path is provided
if output_path:
cv2.imwrite(output_path, mask)
print(f"Mask saved to: {output_path}")
return mask

def create_mask_from_labelme_json(json_file_path, output_path=None):
"""
Create mask from LabelMe format JSON file (automatically gets image dimensions).
Args:
json_file_path (str): Path to the JSON label file
output_path (str, optional): Path to save the mask image
Returns:
numpy.ndarray: Binary mask array
"""
# Load JSON data
with open(json_file_path, 'r') as f:
data = json.load(f)
# Get image dimensions from JSON
image_width = data.get('imageWidth', 640)
image_height = data.get('imageHeight', 480)
return create_mask_from_json(json_file_path, image_width, image_height, output_path)

# Example usage functions
def example_usage():
"""
Example of how to use the functions
"""
# Example 1: If you know the image dimensions
json_path = "labels.json"
width, height = 1920, 1080
mask = create_mask_from_json(json_path, width, height, "output_mask.png")
# Example 2: If using LabelMe format JSON (contains image dimensions)
# mask = create_mask_from_labelme_json("labelme_annotations.json", "mask.png")
print(f"Created mask with shape: {mask.shape}")
print(f"Mask contains {np.sum(mask == 255)} white pixels")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create binary mask from JSON labels')
parser.add_argument('json_file', help='Path to JSON label file')
parser.add_argument('--width', type=int, help='Image width (required if not in JSON)')
parser.add_argument('--height', type=int, help='Image height (required if not in JSON)')
parser.add_argument('--output', '-o', help='Output mask file path')
parser.add_argument('--labelme', action='store_true', help='Use LabelMe format (gets dimensions from JSON)')
args = parser.parse_args()
try:
if args.labelme:
mask = create_mask_from_labelme_json(args.json_file, args.output)
else:
if not args.width or not args.height:
print("Error: Width and height are required when not using LabelMe format")
exit(1)
mask = create_mask_from_json(args.json_file, args.width, args.height, args.output)
print(f"Successfully created mask with shape: {mask.shape}")
except Exception as e:
print(f"Error: {e}")


lo script di training e' il seguente ed e' parente stretto di quello usato nel post

"""
train_deeplabv3p.py

Requirements:
- tensorflow >= 2.8 (tested on TF 2.10+)
- matplotlib
- opencv-python (cv2) optional if you want to preview images locally

Dataset layout expected:
fin/images/<name>.png (RGB)
fin/mask/<name>.png (grayscale, 0 for background, 127 for class)

Usage:
python train_deeplabv3p.py
"""

import os
import random
import glob
import math
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

# ----------------------
# Config - tweak here
# ----------------------
IM_SIZE = (256, 256) # input size for training (height, width)
BATCH_SIZE = 8
EPOCHS = 30
AUTOTUNE = tf.data.AUTOTUNE
DATA_DIR = "./dataset"
IMAGE_DIR = os.path.join(DATA_DIR, "images")
MASK_DIR = os.path.join(DATA_DIR, "masks")
MODEL_SAVE = "coredrill_deeplabv3p.h5"
VAL_SPLIT = 0.15
SEED = 42
LEARNING_RATE = 1e-4
# ----------------------

class BinaryMeanIoU(tf.keras.metrics.MeanIoU):
def __init__(self, name="iou"):
super().__init__(num_classes=2, name=name)

def update_state(self, y_true, y_pred, sample_weight=None):
y_pred = tf.cast(y_pred > 0.5, tf.int32)
y_true = tf.cast(y_true, tf.int32)
return super().update_state(y_true, y_pred, sample_weight)
# ----------------------
# Utility: DeepLabV3+ model (MobileNetV2 backbone)
# ----------------------
def SepConv_BN(x, filters, prefix, stride=1, kernel_size=3, rate=1):
x = layers.SeparableConv2D(filters, kernel_size=kernel_size, strides=stride,
padding='same', dilation_rate=rate,
use_bias=False, name=prefix + '_sepconv')(x)
x = layers.BatchNormalization(name=prefix + '_bn')(x)
x = layers.ReLU(name=prefix + '_relu')(x)
return x

def ASPP(x, out_channels=256):
# Atrous Spatial Pyramid Pooling
b0 = layers.Conv2D(out_channels, 1, padding='same', use_bias=False)(x)
b0 = layers.BatchNormalization()(b0)
b0 = layers.ReLU()(b0)

b1 = layers.SeparableConv2D(out_channels, 3, padding='same', dilation_rate=6, use_bias=False)(x)
b1 = layers.BatchNormalization()(b1)
b1 = layers.ReLU()(b1)

b2 = layers.SeparableConv2D(out_channels, 3, padding='same', dilation_rate=12, use_bias=False)(x)
b2 = layers.BatchNormalization()(b2)
b2 = layers.ReLU()(b2)

b3 = layers.SeparableConv2D(out_channels, 3, padding='same', dilation_rate=18, use_bias=False)(x)
b3 = layers.BatchNormalization()(b3)
b3 = layers.ReLU()(b3)

# Image pooling branch
b4 = layers.GlobalAveragePooling2D()(x)
b4 = layers.Reshape((1, 1, -1))(b4)
b4 = layers.Conv2D(out_channels, 1, padding='same', use_bias=False)(b4)
b4 = layers.BatchNormalization()(b4)
b4 = layers.ReLU()(b4)
# Instead of using tf.shape(x), we upsample by a fixed scale factor
b4 = layers.UpSampling2D(size=(x.shape[1], x.shape[2]), interpolation='bilinear')(b4)

# Concatenate and project
x = layers.Concatenate()([b0, b1, b2, b3, b4])
x = layers.Conv2D(out_channels, 1, padding='same', use_bias=False)(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
return x


def DeepLabV3Plus(input_shape=(256,256,3), num_classes=1, backbone='mobilenetv2'):
# Encoder (MobileNetV2)
base_model = tf.keras.applications.MobileNetV2(input_shape=input_shape, include_top=False, weights='imagenet')
# Extract feature maps
# low-level feature for decoder
low_level = base_model.get_layer('block_3_expand_relu').output # example low-level
# high-level feature for ASPP
high_level = base_model.get_layer('block_13_expand_relu').output

# ASPP on high-level features
x = ASPP(high_level, out_channels=256)
x = layers.UpSampling2D(size=(4,4), interpolation='bilinear')(x) # scale to match low-level approx

# Process low-level
low = layers.Conv2D(48, 1, padding='same', use_bias=False)(low_level)
low = layers.BatchNormalization()(low)
low = layers.ReLU()(low)

# Concatenate
x = layers.Concatenate()([x, low])
x = SepConv_BN(x, 256, 'decoder_separable_conv0')
x = SepConv_BN(x, 256, 'decoder_separable_conv1')

# Upsample to input size
x = layers.UpSampling2D(size=(4,4), interpolation='bilinear')(x)
# Final conv
if num_classes == 1:
activation = 'sigmoid'
out_filters = 1
else:
activation = 'softmax'
out_filters = num_classes

x = layers.Conv2D(out_filters, 1, padding='same')(x)
x = layers.Activation(activation)(x)

model = tf.keras.Model(inputs=base_model.input, outputs=x)
return model

# ----------------------
# Data pipeline
# ----------------------
def list_pairs(image_dir, mask_dir):
# match by filename (without extension)
images = sorted(glob.glob(os.path.join(image_dir, "*")))
image_map = {os.path.splitext(os.path.basename(p))[0]: p for p in images}
masks = sorted(glob.glob(os.path.join(mask_dir, "*")))
mask_map = {os.path.splitext(os.path.basename(p))[0]: p for p in masks}
common = sorted(set(image_map.keys()).intersection(mask_map.keys()))
pairs = [(image_map[k], mask_map[k]) for k in common]
return pairs

def decode_image(path, target_size=IM_SIZE):
img = tf.io.read_file(path)
img = tf.image.decode_image(img, channels=3)
img.set_shape([None, None, 3])
img = tf.image.resize(img, target_size)
img = tf.cast(img, tf.float32) / 255.0
return img

def decode_mask(path, target_size=IM_SIZE):
m = tf.io.read_file(path)
m = tf.image.decode_image(m, channels=1) # single-channel if possible
m.set_shape([None, None, 1])
m = tf.image.resize(m, target_size, method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
m = tf.cast(m, tf.float32)

# Normalize mask: works for either 0/127 or 0/255 style masks
# Any value >64 becomes 1, otherwise 0
m = tf.where(m > 64.0, 1.0, 0.0)
return m

def load_pair(image_path, mask_path):
image = decode_image(image_path)
mask = decode_mask(mask_path)
return image, mask

def augment(image, mask):
# simple augmentation: random flip and random brightness
if tf.random.uniform(()) > 0.5:
image = tf.image.flip_left_right(image)
mask = tf.image.flip_left_right(mask)
if tf.random.uniform(()) > 0.5:
image = tf.image.flip_up_down(image)
mask = tf.image.flip_up_down(mask)
if tf.random.uniform(()) > 0.5:
image = tf.image.random_brightness(image, max_delta=0.1)
return image, mask

def make_datasets(pairs, batch_size=BATCH_SIZE, val_split=VAL_SPLIT):
random.seed(SEED)
random.shuffle(pairs)
n = len(pairs)
n_val = max(1, int(n * val_split))
val_pairs = pairs[:n_val]
train_pairs = pairs[n_val:]

def gen(pairs_list):
for img_p, m_p in pairs_list:
yield img_p, m_p

train_ds = tf.data.Dataset.from_generator(lambda: gen(train_pairs), output_types=(tf.string, tf.string))
val_ds = tf.data.Dataset.from_generator(lambda: gen(val_pairs), output_types=(tf.string, tf.string))

train_ds = (train_ds
.map(lambda i, m: tf.py_function(load_pair, [i, m], [tf.float32, tf.float32]),
num_parallel_calls=AUTOTUNE)
.map(lambda i, m: (tf.ensure_shape(i, [*IM_SIZE, 3]), tf.ensure_shape(m, [*IM_SIZE, 1])),
num_parallel_calls=AUTOTUNE)
.map(lambda i, m: augment(i, m), num_parallel_calls=AUTOTUNE)
.shuffle(256)
.batch(batch_size)
.prefetch(AUTOTUNE)
)

val_ds = (val_ds
.map(lambda i, m: tf.py_function(load_pair, [i, m], [tf.float32, tf.float32]),
num_parallel_calls=AUTOTUNE)
.map(lambda i, m: (tf.ensure_shape(i, [*IM_SIZE, 3]), tf.ensure_shape(m, [*IM_SIZE, 1])),
num_parallel_calls=AUTOTUNE)
.batch(batch_size)
.prefetch(AUTOTUNE)
)

return train_ds, val_ds, train_pairs, val_pairs

# ----------------------
# Metrics and Loss
# ----------------------
def dice_coef(y_true, y_pred, smooth=1e-6):
y_true_f = tf.reshape(y_true, [-1])
y_pred_f = tf.reshape(y_pred, [-1])
intersection = tf.reduce_sum(y_true_f * y_pred_f)
return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

def dice_loss(y_true, y_pred):
return 1.0 - dice_coef(y_true, y_pred)

def bce_dice_loss(y_true, y_pred):
bce = tf.keras.losses.BinaryCrossentropy()(y_true, y_pred)
return bce + dice_loss(y_true, y_pred)

# ----------------------
# Training routine
# ----------------------
def main():
pairs = list_pairs(IMAGE_DIR, MASK_DIR)
if len(pairs) == 0:
raise RuntimeError(f"No matching image/mask pairs found in {IMAGE_DIR} and {MASK_DIR}.")
print(f"Found {len(pairs)} pairs.")

train_ds, val_ds, train_pairs, val_pairs = make_datasets(pairs)

model = DeepLabV3Plus(input_shape=(*IM_SIZE, 3), num_classes=1)
model.summary()

# compile
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
loss=bce_dice_loss,
#metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy'),
# tf.keras.metrics.MeanIoU(num_classes=2, name='iou'),
# dice_coef])
metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy'),
BinaryMeanIoU(),
dice_coef])
# Callbacks
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(MODEL_SAVE, save_best_only=True, monitor='val_loss')
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, verbose=1)
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)

history = model.fit(train_ds,
epochs=EPOCHS,
validation_data=val_ds,
callbacks=[checkpoint_cb, reduce_lr, early_stop])

# Save final model
model.save(MODEL_SAVE)
print(f"Model saved to {MODEL_SAVE}")

# Visual comparison on a few validation samples
visualize_predictions(model, val_pairs, n=6)

def visualize_predictions(model, val_pairs, n=6):
# pick up to n validation examples randomly
samples = random.sample(val_pairs, min(n, len(val_pairs)))
fig_rows = len(samples)
plt.figure(figsize=(10, 4 * fig_rows))
for i, (img_p, mask_p) in enumerate(samples):
img = tf.io.read_file(img_p)
img = tf.image.decode_image(img, channels=3)
img = tf.image.resize(img, IM_SIZE)
img = tf.cast(img, tf.float32) / 255.0
mask = tf.io.read_file(mask_p)
mask = tf.image.decode_image(mask, channels=1)
mask = tf.image.resize(mask, IM_SIZE, method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
mask = tf.cast(mask, tf.float32)
mask = (mask > 64.0).numpy().astype(np.uint8).squeeze()

# Predict
inp = tf.expand_dims(img, 0)
pred = model.predict(inp)[0]
pred_mask = (pred[..., 0] > 0.5).astype(np.uint8)

ax = plt.subplot(fig_rows, 3, i*3 + 1)
plt.imshow(img.numpy())
plt.title("Image")
plt.axis('off')

ax = plt.subplot(fig_rows, 3, i*3 + 2)
plt.imshow(mask, cmap='gray')
plt.title("Ground Truth")
plt.axis('off')

ax = plt.subplot(fig_rows, 3, i*3 + 3)
plt.imshow(pred_mask, cmap='gray')
plt.title("Prediction")
plt.axis('off')

plt.tight_layout()
plt.show()

if __name__ == "__main__":
main()




Il programma per la inferenza e' il seguente

import tensorflow as tf
import numpy as np
import cv2
import os

# ---------------- CONFIG ----------------
MODEL_PATH = "coredrill_deeplabv3p.h5" # path to your trained model
IMG_PATH = "./dataset/controllo/images/0402.png" # input image for inference
OUT_PATH = "402.png" # output mask path
IM_SIZE = (256, 256) # must match training size
# ----------------------------------------


def preprocess_image(image_path, target_size=IM_SIZE):
"""Load and preprocess input image"""
img = cv2.imread(image_path, cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, target_size)
img = img.astype(np.float32) / 255.0
return np.expand_dims(img, axis=0) # shape (1, H, W, 3)


def postprocess_mask(mask, original_shape):
"""Convert network output to binary mask"""
mask = (mask > 0.5).astype(np.uint8) * 255 # threshold
mask = mask[0, :, :, 0] # remove batch and channel dim
mask = cv2.resize(mask, (original_shape[1], original_shape[0]), interpolation=cv2.INTER_NEAREST)
return mask


def main():
# Load trained model
print(f"Loading model from {MODEL_PATH}...")
model = tf.keras.models.load_model(MODEL_PATH, compile=False)

# Load input image
original = cv2.imread(IMG_PATH, cv2.IMREAD_COLOR)
H, W = original.shape[:2]

# Preprocess
x = preprocess_image(IMG_PATH)

# Predict
print("Running inference...")
pred = model.predict(x)

# Postprocess
mask = postprocess_mask(pred, (H, W))

# Save results
cv2.imwrite(OUT_PATH, mask)
print(f"Saved mask at {OUT_PATH}")

# Optional: save overlay
overlay = original.copy()
overlay[mask > 127] = (0, 0, 255) # red overlay for class
cv2.imwrite("overlay.png", overlay)
print("Saved overlay at overlay.png")


if __name__ == "__main__":
main()


un po' di verifiche sul dataset di controllo 

RGB 402

Inferenza della rete 402

---------------------------
426 RGB


Inferenza della rete 426



Inferenza della rete 426

---------------------------


446 RGB





Inferenza della rete 446


















Montagne frattali

 Negli anni 90 quando i miei amici matematici leggevano (Ri)creazioni al calcolatore di Dewdney (vedi anche pag. 107 del libro The Magic Mac...