Correva l'anno 2008 ed il mio direttore di Dipartimento mi aveva chiesto di lavorare sui Big Data per estrarre dati di geolocalizzazione di eventi franosi da notizie sui quotidiani
Non ho piu' il codice dell'epoca ma piu' o meno venivano scaricati gli RSS da Google News, si prendeva il link ed il rank della fonte (all'epoca era ancora possibile), veniva scaricato il documento html originale e salvato su Db per poi un primitivo parsing basato su parole chiave con annessa geolocalizzazione.
Molto primitivo e molto poco funzionante. La cosa piu' fastidiosa era che era molto complicato distinguere se la frana era avvenuta il giorno prima oppure se la notizia riguardava eventi pregressi (se per esempio erano stati stanziati finanziamenti dell'evento di 5 anni prima o se era stata ripristinata la viabilita' chiusa un mese prima...peraltro in inglese landslide e' utilizzato per vittoria politica schiacciante winning landslide quindi era necessario interpretare il contesto)
Le differenze principali rispetto al 2008 e' che i giornali adesso usano spesso i paywall per accedere al documento completo, twitter ti permette di usare le API di ricerca solo a pagamento ma abbiamo il vantaggio di poter usare LLM per il parsing delle notizie in modo da poter aggirare le limitazioni viste nel 2008 usando il contesto
Prima di iniziare si deve avere un server Ollama con alcuni modelli caricati (sul mio Mac mini M4 16 Gb ho provato gemma3:12b, llama3.1, qwen2.5:7b (qwen sembra essere quello più idoneo per generare dati tabellari come Json in output)
1) ricerca su Google News tramite una query minima con estrazione del link alla notizia originale
4) passaggio del testo della notizia ad ollama per il parsing contestuale tramite prompt che ricerca specifiche informazioni (se la notizia e' recente, se vi sono state interruzioni di strade, se ci sono state evacuazioni di abitati o sfollati)
la fase computazionalmente più impegnativa e' quella relativa all'LLM. Per non saturare la memoria ogni notizia viene processata singolarmente
"""
Agente di rilevamento frane da Google News con geocodifica e output GeoJSON.
"""
import json
import time
import logging
from pathlib import Path
from datetime import datetime
from typing import List, Optional
import feedparser
import geojson
import httpx
import ollama
from bs4 import BeautifulSoup
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Configurazione
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
MAX_ARTICOLI = 20
MAX_PREVIEW_CHARS = 150 # usato solo come fallback se il download dell'articolo fallisce
MAX_TESTO_ARTICOLO = 1500 # caratteri massimi del testo completo scaricato
MAX_RETRY_OLLAMA = 3
OUTPUT_DIR = Path(".")
GOOGLE_NEWS_URL = (
"https://news.google.com/rss/search"
"?q=frana+OR+smottamento+when:30d&hl=it&gl=IT&ceid=IT:it"
)
OLLAMA_MODEL = "gemma3:12b" # modifica qui per cambiare modello
# OLLAMA_MODEL = "llama3.1"
# OLLAMA_MODEL = "qwen2.5:7b"
# OLLAMA_MODEL = "gemma3:4b"
OLLAMA_OPTIONS = {
"num_ctx": 16384,
"num_predict": 8192, # aumentato per evitare troncamento del JSON
"temperature": 0.1,
}
SYSTEM_PROMPT = (
"Sei un analista geologo esperto di rischio idrogeologico. "
"Estrai le frane e associa sempre l'URL della fonte corretto ad ogni record. "
"Per ogni evento, valuta attentamente: "
"(1) se il testo menziona interruzioni, chiusure o blocchi di strade, viabilità, "
"sentieri o ferrovie causati dalla frana: imposta 'interruzione_stradale' a true; "
"(2) se il testo menziona evacuazioni, sfollati, persone fatte uscire dalle abitazioni "
"o ordini di evacuazione: imposta 'evacuazione' a true; "
"(3) se la data di pubblicazione è più vecchia di 7 giorni rispetto ad oggi, "
"o se il testo descrive eventi passati o già risolti: imposta 'notizia_vecchia' a true. "
"In assenza di menzione esplicita, imposta i campi a false. "
"Genera un output JSON valido. Sii conciso nella descrizione per risparmiare spazio."
)
# ---------------------------------------------------------------------------
# Modelli Pydantic
# ---------------------------------------------------------------------------
class SingolaFrana(BaseModel):
localita: str = Field(
description="Nome del comune, frazione o località specifica in Italia."
)
provincia: str = Field(
description="Sigla della provincia (es. 'SO', 'TO', 'BG')."
)
descrizione: str = Field(
description="Breve sintesi dell'evento o dei danni descritti nell'articolo."
)
interruzione_stradale: bool = Field(
description="True se la frana ha causato interruzione, chiusura o blocco di strade, viabilità o ferrovie. False altrimenti."
)
evacuazione: bool = Field(
description="True se la frana ha causato evacuazione di persone, sfollati o ordini di evacuazione. False altrimenti."
)
notizia_vecchia: bool = Field(
description="True se la frana è accaduta più di una settimana fa rispetto alla data odierna o se il testo fa riferimento a eventi passati, storici o già risolti. False se l'evento è recente (ultimi 7 giorni)."
)
url: Optional[str] = Field(
description="L'URL esatto dell'articolo da cui è stata estratta la notizia."
)
class ListaFrane(BaseModel):
eventi: List[SingolaFrana] = Field(
description="Lista di tutte le frane distinte rilevate negli articoli analizzati."
)
# ---------------------------------------------------------------------------
# Tool 1a: Scraping testo completo articolo
# ---------------------------------------------------------------------------
# Header che simulano un browser reale per evitare blocchi base
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "it-IT,it;q=0.9",
}
# Testo che indica una pagina di consenso cookie invece dell'articolo reale
COOKIE_WALL_MARKERS = [
"accetta tutto",
"rifiuta tutto",
"cookie e dati",
"contenuti non personalizzati",
"impostazioni cookie",
"privacy policy",
"we use cookies",
]
def is_cookie_wall(testo: str) -> bool:
"""Ritorna True se il testo è una pagina di consenso cookie, non un articolo."""
testo_lower = testo.lower()
hits = sum(1 for marker in COOKIE_WALL_MARKERS if marker in testo_lower)
return hits >= 2
# Domini noti per paywall o blocco scraping — usa solo lo snippet RSS
PAYWALL_DOMAINS = {
"corriere.it", "repubblica.it", "sole24ore.com",
"stampa.it", "ilmessaggero.it", "lastampa.it",
}
def estrai_dominio(url: str) -> str:
"""Estrae il dominio base da un URL."""
try:
from urllib.parse import urlparse
return urlparse(url).netloc.replace("www.", "")
except Exception:
return ""
def scarica_testo_articolo(url: str) -> str:
"""
Scarica e pulisce il testo completo di un articolo.
Segue i redirect di Google News, salta i paywall noti.
Ritorna stringa vuota se il download fallisce.
"""
dominio = estrai_dominio(url)
if dominio in PAYWALL_DOMAINS:
logger.debug(f"Paywall noto, uso snippet RSS per: {dominio}")
return ""
try:
response = httpx.get(
url,
headers=HEADERS,
timeout=10,
follow_redirects=True,
)
if response.status_code != 200:
logger.debug(f"HTTP {response.status_code} per {url}")
return ""
soup = BeautifulSoup(response.text, "html.parser")
# Rimuovi elementi non editoriali
for tag in soup(["script", "style", "nav", "footer", "header",
"aside", "form", "noscript", "iframe", "figure",
"button", "input", "select", "textarea"]):
tag.decompose()
# Preferisci tag semantici dell'articolo se presenti
contenuto = (
soup.find("article")
or soup.find("main")
or soup.find(attrs={"itemprop": "articleBody"})
or soup.find(class_=lambda c: c and any(
kw in c.lower() for kw in ["article-body", "article__body",
"post-content", "entry-content",
"news-body", "article-content"]
))
)
if contenuto:
# Estrai solo i paragrafi <p> per evitare testo di navigazione residuo
paragrafi = contenuto.find_all("p")
if paragrafi:
testo = " ".join(p.get_text(strip=True) for p in paragrafi)
else:
testo = contenuto.get_text(separator=" ", strip=True)
else:
# Fallback: tutti i <p> della pagina
paragrafi = soup.find_all("p")
testo = " ".join(p.get_text(strip=True) for p in paragrafi if len(p.get_text(strip=True)) > 40)
if not testo or is_cookie_wall(testo):
logger.debug(f"Cookie wall o testo vuoto rilevato per: {url}")
return ""
return testo[:MAX_TESTO_ARTICOLO]
except httpx.TimeoutException:
logger.warning(f"Timeout scaricando: {url}")
return ""
except Exception as e:
logger.warning(f"Errore scaricando {url}: {e}")
return ""
# ---------------------------------------------------------------------------
# Tool 1: Recupero notizie da Google News
# ---------------------------------------------------------------------------
def recupero_emergenza_google_news(max_risultati: int = MAX_ARTICOLI) -> str:
"""
Scarica gli articoli da Google News RSS.
Per ogni articolo tenta di scaricare il testo completo;
se fallisce usa lo snippet RSS come fallback.
"""
logger.info("Recupero notizie da Google News...")
feed = feedparser.parse(GOOGLE_NEWS_URL)
risultati_raw = []
for entry in feed.entries[:max_risultati]:
titolo = entry.get("title", "")
snippet = BeautifulSoup(
entry.get("summary", ""), "html.parser"
).get_text().strip()
link = entry.get("link", "")
# Testo base = titolo + snippet RSS (sempre disponibile)
testo_base = f"{titolo}. {snippet}"[:MAX_PREVIEW_CHARS * 2]
# Prova a scaricare il testo completo come arricchimento
testo_completo = scarica_testo_articolo(link)
if testo_completo:
testo = testo_completo
fonte_testo = "articolo completo"
else:
testo = testo_base
fonte_testo = "snippet RSS"
logger.info(f" [{fonte_testo}] {titolo[:60]}")
risultati_raw.append(
{
"titolo": titolo,
"testo": testo,
"link": link,
"data_pubblicazione": entry.get("published", "sconosciuta"),
}
)
logger.info(f"Pronti {len(risultati_raw)} articoli per l'analisi.")
return json.dumps(risultati_raw, ensure_ascii=False)
# ---------------------------------------------------------------------------
# Costruzione prompt (un articolo alla volta)
# ---------------------------------------------------------------------------
def costruisci_prompt_singolo(articolo: dict) -> str:
"""Costruisce il prompt per un singolo articolo."""
today = datetime.now().strftime("%d/%m/%Y")
testo = articolo['testo']
# Log di debug: mostra i primi 300 caratteri del testo mandato a Ollama
logger.debug(f" Testo inviato a Ollama:\n {testo[:300]!r}")
return (
f"La data di oggi è {today}.\n\n"
"Analizza il seguente articolo e determina se descrive uno o più eventi "
"di frana o smottamento avvenuti in Italia.\n\n"
"IMPORTANTE: Il testo potrebbe contenere menu, pubblicità o testo di navigazione "
"del sito web — ignorali e concentrati solo sul contenuto giornalistico.\n\n"
"Se NON contiene notizie di frane, rispondi con: {\"eventi\": []}.\n\n"
"Per ogni evento trovato:\n"
"1. Estrai 'localita' e 'provincia' (sigla, es. 'SO').\n"
"2. Scrivi una breve 'descrizione' dell'evento.\n"
"3. Copia ESATTAMENTE nel campo 'url' questo valore: " + articolo['link'] + "\n"
"4. 'interruzione_stradale': true se si menziona chiusura/blocco di strade, "
"vie, sentieri o ferrovie. Altrimenti false.\n"
"5. 'evacuazione': true se si menziona evacuazione, sfollati o ordini di "
"allontanamento. Altrimenti false.\n"
"6. 'notizia_vecchia': true se la data di pubblicazione è più di 7 giorni fa "
"rispetto ad oggi, o se l'evento è descritto come passato/risolto. "
"Altrimenti false.\n\n"
f"Data pubblicazione: {articolo.get('data_pubblicazione', 'sconosciuta')}\n"
f"Testo articolo:\n{testo}"
)
# ---------------------------------------------------------------------------
# Tool 2: Chiamata a Ollama con retry
# ---------------------------------------------------------------------------
def ripara_json_troncato(testo: str) -> str:
"""
Tenta di riparare un JSON troncato chiudendo le strutture aperte.
Funziona per il caso più comune: array di oggetti interrotto a metà.
"""
testo = testo.strip()
# Rimuovi l'ultimo oggetto incompleto (non chiuso da '}')
ultimo_oggetto_completo = testo.rfind("},")
if ultimo_oggetto_completo != -1:
testo = testo[: ultimo_oggetto_completo + 1]
# Chiudi array e oggetto radice se necessario
if not testo.endswith("]"):
testo += "]"
if not testo.endswith("}}"):
testo += "}"
return testo
def chiedi_a_ollama(prompt: str, max_retry: int = MAX_RETRY_OLLAMA) -> dict:
"""
Invia il prompt a Ollama e ritorna il dizionario JSON parsato.
Riprova fino a max_retry volte in caso di risposta non valida.
"""
for tentativo in range(1, max_retry + 1):
try:
logger.info(f"Chiamata a Ollama (tentativo {tentativo}/{max_retry})...")
response = ollama.chat(
model=OLLAMA_MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
format=ListaFrane.model_json_schema(),
options=OLLAMA_OPTIONS,
)
testo = response["message"]["content"]
# alcuni modelli (es. Gemma) wrappano l'output in ```json ... ```
testo = testo.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
dati = json.loads(testo)
return dati
except json.JSONDecodeError as e:
logger.warning(f"Tentativo {tentativo} — JSON malformato: {e}")
# Prova a riparare il JSON troncato prima del prossimo tentativo
testo_raw = testo.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
try:
testo_riparato = ripara_json_troncato(testo_raw)
dati = json.loads(testo_riparato)
logger.info("JSON riparato con successo.")
return dati
except Exception:
logger.warning("Riparazione fallita, ritento da capo.")
if tentativo < max_retry:
time.sleep(2)
except KeyError as e:
logger.warning(f"Tentativo {tentativo} fallito — chiave mancante: {e}")
if tentativo < max_retry:
time.sleep(2)
raise RuntimeError(
f"Ollama non ha restituito un JSON valido dopo {max_retry} tentativi."
)
# ---------------------------------------------------------------------------
# Tool 3: Geocodifica e salvataggio GeoJSON
# ---------------------------------------------------------------------------
def deduplica_frane(lista: List[SingolaFrana]) -> List[SingolaFrana]:
"""Rimuove eventi duplicati basandosi su (localita, provincia)."""
visti = set()
uniche = []
for f in lista:
chiave = (f.localita.lower().strip(), f.provincia.lower().strip())
if chiave not in visti:
visti.add(chiave)
uniche.append(f)
else:
logger.warning(f"Duplicato rimosso: {f.localita} ({f.provincia})")
return uniche
def geocode_and_save(
lista_frane: List[SingolaFrana],
output_dir: Path = OUTPUT_DIR,
) -> Optional[Path]:
"""
Geocodifica ogni frana e salva un file GeoJSON con timestamp nel nome.
Restituisce il path del file creato, oppure None se nessuna frana è mappabile.
"""
geolocator = Nominatim(user_agent="ollama_geo_agent", timeout=10)
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=2, max_retries=3, error_wait_seconds=5)
features = []
for f in lista_frane:
query_geo = f"{f.localita}, {f.provincia}, Italia"
location = None
# Primo tentativo: con provincia
try:
location = geocode(query_geo)
except Exception as e:
logger.warning(f"Errore rete su '{query_geo}': {e}")
# Secondo tentativo: senza provincia
if location is None:
try:
logger.warning(f"Riprovo senza provincia per: {f.localita}")
location = geocode(f"{f.localita}, Italia")
except Exception as e:
logger.error(f"Errore rete anche sul fallback per '{f.localita}': {e}")
if location is None:
logger.warning(f"Geocodifica fallita definitivamente per: {f.localita}")
continue
punto = geojson.Point((location.longitude, location.latitude))
feature = geojson.Feature(
geometry=punto,
properties={
"localita": f.localita,
"provincia": f.provincia,
"descrizione": f.descrizione,
"interruzione_stradale": f.interruzione_stradale,
"evacuazione": f.evacuazione,
"notizia_vecchia": f.notizia_vecchia,
"url_notizia": f.url or "Non disponibile",
"fonte": "Google News",
},
)
features.append(feature)
stradale = "⚠️ strada interrotta" if f.interruzione_stradale else "viabilità ok"
evacuazione = "🚨 evacuazione" if f.evacuazione else "nessuna evacuazione"
eta = "📅 notizia vecchia" if f.notizia_vecchia else "🆕 recente"
logger.info(f"Mappato: {f.localita} ({f.provincia}) — {stradale} — {evacuazione} — {eta}")
if not features:
logger.warning("Nessuna frana geocodificata. File GeoJSON non creato.")
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = output_dir / f"frane_{timestamp}.geojson"
fc = geojson.FeatureCollection(features)
with open(output_path, "w", encoding="utf-8") as out:
geojson.dump(fc, out, indent=2, ensure_ascii=False)
logger.info(f"File '{output_path}' creato con {len(features)} eventi.")
return output_path
# ---------------------------------------------------------------------------
# Agent loop principale
# ---------------------------------------------------------------------------
def esegui_agente():
# 1. Recupero notizie
notizie_json = recupero_emergenza_google_news()
lista_articoli = json.loads(notizie_json)
if not lista_articoli:
logger.info("Nessun articolo recuperato. Uscita.")
return
# 2. Analisi un articolo alla volta
tutti_gli_eventi: List[SingolaFrana] = []
for i, articolo in enumerate(lista_articoli):
logger.info(f"Analisi articolo {i+1}/{len(lista_articoli)}: {articolo.get('titolo', '')[:60]}")
prompt = costruisci_prompt_singolo(articolo)
try:
dati_json = chiedi_a_ollama(prompt)
eventi_raw = dati_json.get("eventi", [])
if not eventi_raw:
logger.info(" → Nessuna frana rilevata in questo articolo.")
continue
for item in eventi_raw:
try:
tutti_gli_eventi.append(SingolaFrana(**item))
except Exception as e:
logger.warning(f" → Evento non valido saltato: {e}")
logger.info(f" → {len(eventi_raw)} evento/i estratto/i.")
except RuntimeError as e:
logger.error(f" → Ollama fallito su articolo {i+1}: {e}")
continue
if not tutti_gli_eventi:
logger.info("Nessun evento estratto da nessun articolo.")
return
# 3. Deduplicazione globale
lista_frane = deduplica_frane(tutti_gli_eventi)
logger.info(f"Totale eventi unici: {len(lista_frane)}")
# 4. Geocodifica e salvataggio
geocode_and_save(lista_frane)
if __name__ == "__main__":
esegui_agente()