giovedì 18 giugno 2026

LLama3 Anita

A seguito di questo post ho provato a vedere ho provato a vedere cosa accadeva ad utilizzare un modello specifico per la lingua italiana in particolare per la gestione delle notizie di eventi passati

Il modello e' 

https://huggingface.co/mradermacher/LLaMAntino-3-ANITA-8B-Inst-DPO-ITA-GGUF

Visto il formato gguf ho usato llamacpp al posto di ollama con la seguente sintassi  che e' specifica per mac silicon  

./llama-server -m ./local_models/LLaMAntino-3-ANITA-8B-Inst-DPO-ITA.Q8_0.gguf \-ngl 999 -c 8192 -t 6 -tb 6 --port 8080 --host 127.0.0.1


"""
Agente di rilevamento frane da Google News con geocodifica e output GeoJSON.
Ottimizzato per LLaMAntino-3-ANITA-8B eseguito tramite llama.cpp server su Mac M4.
"""

import json
import time
import logging
from pathlib import Path
from datetime import datetime
from typing import List, Optional

import feedparser
import geojson
import httpx
from bs4 import BeautifulSoup
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from pydantic import BaseModel, Field

# ---------------------------------------------------------------------------
# Configurazione
# ---------------------------------------------------------------------------

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)

MAX_ARTICOLI = 20
MAX_PREVIEW_CHARS = 150 # usato solo come fallback se il download dell'articolo fallisce
MAX_TESTO_ARTICOLO = 3500 # caratteri massimi del testo completo scaricato
MAX_RETRY_LLAMACPP = 3
OUTPUT_DIR = Path(".")

GOOGLE_NEWS_URL = (
"https://news.google.com/rss/search"
"?q=frana+OR+smottamento+when:30d&hl=it&gl=IT&ceid=IT:it"
)

# Configurazione endpoint llama.cpp server
LLAMACPP_API_URL = "http://127.0.0.1:8080/v1/chat/completions"

# Opzioni di generazione specifiche per LLaMAntino-3 (architettura Llama-3)
LLAMACPP_OPTIONS = {
"temperature": 0.1, # Molto bassa per garantire rigore filologico e sintattico
"top_p": 0.9,
"max_tokens": 4096,
}

SYSTEM_PROMPT = (
"Sei ANITA, un assistente IA esperto di rischio idrogeologico e nell'analisi linguistica della lingua italiana, sviluppato dall'Università di Bari. "
"Il tuo compito è estrarre notizie di frane e associare sempre l'URL della fonte corretto ad ogni record. "
"Analizza con estremo rigore la sintassi, la flessione dei tempi verbali e i marcatori temporali per valutare ogni evento:\n"
"1) Se il testo menziona interruzioni, chiusure o blocchi di strade, viabilità, sentieri o ferrovie causati dalla frana: imposta 'interruzione_stradale' a true;\n"
"2) Se il testo menziona evacuazioni, sfollati, persone fatte uscire dalle abitazioni o ordini di evacuazione: imposta 'evacuazione' a true;\n"
"3) Valuta accuratamente la cronologia dell'evento rispetto alla data odierna fornita nel prompt. Presta attenzione allo stile giornalistico (es. l'uso del presente storico per fatti passati). Se la data di pubblicazione è più vecchia di 7 giorni rispetto ad oggi, o se l'analisi sintattica dei verbi o delle locuzioni avverbiali indica che l'evento è accaduto nel passato, è storico o è già stato completamente risolto: imposta 'notizia_vecchia' a true. Altrimenti imposta false.\n"
"In assenza di menzione esplicita, imposta i campi a false. Genera un output JSON valido seguendo strettamente lo schema richiesto."
)

# ---------------------------------------------------------------------------
# Modelli Pydantic
# ---------------------------------------------------------------------------


class SingolaFrana(BaseModel):
localita: str = Field(
description="Nome del comune, frazione o località specifica in Italia."
)
provincia: str = Field(
description="Sigla della provincia (es. 'SO', 'TO', 'BG')."
)
descrizione: str = Field(
description="Breve sintesi dell'evento o dei danni descritti nell'articolo."
)
interruzione_stradale: bool = Field(
description="True se la frana ha causato interruzione, chiusura o blocco di strade, viabilità o ferrovie. False altrimenti."
)
evacuazione: bool = Field(
description="True se la frana ha causato evacuazione di persone, sfollati o ordini di evacuazione. False altrimenti."
)
notizia_vecchia: bool = Field(
description="True se la frana è accaduta più di una settimana fa rispetto alla data odierna o se l'analisi dei tempi verbali e del contesto indica un evento passato, storico o già risolto. False se l'evento è recente (ultimi 7 giorni)."
)
url: Optional[str] = Field(
description="L'URL esatto dell'articolo da cui è stata estratta la notizia."
)


class ListaFrane(BaseModel):
eventi: List[SingolaFrana] = Field(
description="Lista di tutte le frane distinte rilevate negli articoli analizzati."
)


# ---------------------------------------------------------------------------
# Tool 1a: Scraping testo completo articolo
# ---------------------------------------------------------------------------

HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "it-IT,it;q=0.9",
}

COOKIE_WALL_MARKERS = [
"accetta tutto",
"rifiuta tutto",
"cookie e dati",
"contenuti non personalizzati",
"impostazioni cookie",
"privacy policy",
"we use cookies",
]


def is_cookie_wall(testo: str) -> bool:
testo_lower = testo.lower()
hits = sum(1 for marker in COOKIE_WALL_MARKERS if marker in testo_lower)
return hits >= 2


PAYWALL_DOMAINS = {
"corriere.it", "repubblica.it", "sole24ore.com",
"stampa.it", "ilmessaggero.it", "lastampa.it",
}


def estrai_dominio(url: str) -> str:
try:
from urllib.parse import urlparse
return urlparse(url).netloc.replace("www.", "")
except Exception:
return ""


def scarica_testo_articolo(url: str) -> str:
dominio = estrai_dominio(url)
if dominio in PAYWALL_DOMAINS:
logger.debug(f"Paywall noto, uso snippet RSS per: {dominio}")
return ""

try:
response = httpx.get(
url,
headers=HEADERS,
timeout=10,
follow_redirects=True,
)
if response.status_code != 200:
logger.debug(f"HTTP {response.status_code} per {url}")
return ""

soup = BeautifulSoup(response.text, "html.parser")

for tag in soup(["script", "style", "nav", "footer", "header",
"aside", "form", "noscript", "iframe", "figure",
"button", "input", "select", "textarea"]):
tag.decompose()

contenuto = (
soup.find("article")
or soup.find("main")
or soup.find(attrs={"itemprop": "articleBody"})
or soup.find(class_=lambda c: c and any(
kw in c.lower() for kw in ["article-body", "article__body",
"post-content", "entry-content",
"news-body", "article-content"]
))
)

if contenuto:
paragrafi = contenido.find_all("p")
if paragrafi:
testo = " ".join(p.get_text(strip=True) for p in paragrafi)
else:
testo = contenido.get_text(separator=" ", strip=True)
else:
paragrafi = soup.find_all("p")
testo = " ".join(p.get_text(strip=True) for p in paragrafi if len(p.get_text(strip=True)) > 40)

if not testo or is_cookie_wall(testo):
logger.debug(f"Cookie wall o testo vuoto rilevato per: {url}")
return ""

return testo[:MAX_TESTO_ARTICOLO]

except httpx.TimeoutException:
logger.warning(f"Timeout scaricando: {url}")
return ""
except Exception as e:
logger.warning(f"Errore scaricando {url}: {e}")
return ""


# ---------------------------------------------------------------------------
# Tool 1: Recupero notizie da Google News
# ---------------------------------------------------------------------------


def recupero_emergenza_google_news(max_risultati: int = MAX_ARTICOLI) -> str:
logger.info("Recupero notizie da Google News...")
feed = feedparser.parse(GOOGLE_NEWS_URL)

risultati_raw = []
for entry in feed.entries[:max_risultati]:
titolo = entry.get("title", "")
snippet = BeautifulSoup(
entry.get("summary", ""), "html.parser"
).get_text().strip()
link = entry.get("link", "")

testo_base = f"{titolo}. {snippet}"[:MAX_PREVIEW_CHARS * 2]

testo_completo = scarica_testo_articolo(link)
if testo_completo:
testo = testo_completo
fonte_testo = "articolo completo"
else:
testo = testo_base
fonte_testo = "snippet RSS"

logger.info(f" [{fonte_testo}] {titolo[:60]}")

risultati_raw.append(
{
"titolo": titolo,
"testo": testo,
"link": link,
"data_pubblicazione": entry.get("published", "sconosciuta"),
}
)

logger.info(f"Pronti {len(risultati_raw)} articoli per l'analisi.")
return json.dumps(risultati_raw, ensure_ascii=False)


# ---------------------------------------------------------------------------
# Costruzione prompt
# ---------------------------------------------------------------------------


def costruisci_prompt_singolo(articolo: dict) -> str:
# Recupera dinamicamente la data di esecuzione (oggi)
oggi = datetime.now()
today_str = oggi.strftime("%A %d/%m/%Y")
testo = articolo['testo']

logger.debug(f" Testo inviato a LLaMAntino via llama.cpp:\n {testo[:300]!r}")

return (
f"--- CONTESTO TEMPORALE OBBLIGATORIO ---\n"
f"DATA DI OGGI: {today_str}\n"
f"Data di pubblicazione dell'articolo: {articolo.get('data_pubblicazione', 'sconosciuta')}\n"
f"----------------------------------------\n\n"
"Analizza il seguente articolo di cronaca e determina se descrive uno o più eventi "
"di frana o smottamento avvenuti in Italia.\n\n"
"LINEE GUIDA RIGIDE PER LA VALUTAZIONE DELLA DATA ('notizia_vecchia'):\n"
f"1. Confronta i riferimenti temporali nel testo (es. 'ieri', 'due giorni fa', 'mercoledì scorso', 'il mese scorso') con la DATA DI OGGI ({today_str}).\n"
"2. Fai un calcolo mentale: quanti giorni sono passati tra l'evento descritto e OGGI?\n"
"3. Imposta 'notizia_vecchia' a TRUE se:\n"
" - La data di pubblicazione o l'evento stesso risalgono a più di 7 giorni fa rispetto a OGGI.\n"
" - Il testo usa verbi al passato remoto o fa riferimento a frane storiche, passate o già completamente ripristinate (es. 'la frana dello scorso anno', 'i lavori post-frana di due mesi fa').\n"
"4. Imposta 'notizia_vecchia' a FALSE solo se l'evento è in corso, è accaduto oggi, ieri o comunque negli ultimi 7 giorni ed è ancora un'emergenza attuale.\n\n"
"Se NON contiene notizie di frane attive o passate, rispondi con la lista 'eventi' vuota.\n\n"
"Per ogni evento trovato, compila i campi richiesti. Nel campo 'url', copia esattamente: " + articolo['link'] + "\n\n"
f"Testo dell'articolo:\n{testo}"
)


# ---------------------------------------------------------------------------
# Tool 2: Chiamata al Server llama.cpp con validazione formale dello schema
# ---------------------------------------------------------------------------


def chiedi_a_llamacpp(prompt: str, max_retry: int = MAX_RETRY_LLAMACPP) -> dict:
"""Invia il prompt al server locale di llama.cpp forzando la grammatica JSON nativa."""
# Costruiamo il payload compatibile OpenAI + estensione dello schema JSON nativo per llama.cpp
payload = {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": LLAMACPP_OPTIONS["temperature"],
"top_p": LLAMACPP_OPTIONS["top_p"],
"max_tokens": LLAMACPP_OPTIONS["max_tokens"],
# Questa chiave forza llama.cpp a campionare i token seguendo strettamente la struttura Pydantic
"response_format": {
"type": "json_object",
"schema": ListaFrane.model_json_schema()
}
}

for tentativo in range(1, max_retry + 1):
try:
logger.info(f"Chiamata a llama.cpp server (tentativo {tentativo}/{max_retry})...")
with httpx.Client() as client:
response = client.post(
LLAMACPP_API_URL,
json=payload,
timeout=120.0 # Timeout ampio per l'elaborazione di contesti lunghi
)
if response.status_code != 200:
logger.error(f"Errore HTTP dal server llama.cpp: {response.status_code}")
if tentativo < max_retry:
time.sleep(2)
continue
response.raise_for_status()

risposta_json = response.json()
testo_risposta = risposta_json["choices"][0]["message"]["content"].strip()
# Parsing del JSON strutturato restituito dal modello
dati = json.loads(testo_risposta)
return dati

except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
logger.warning(f"Tentativo {tentativo} fallito a causa di un errore di comunicazione o parsing: {e}")
if tentativo < max_retry:
time.sleep(2)

raise RuntimeError(
f"Impossibile ottenere una risposta valida dal server llama.cpp dopo {max_retry} tentativi."
)


# ---------------------------------------------------------------------------
# Tool 3: Geocodifica e salvataggio GeoJSON
# ---------------------------------------------------------------------------


def deduplica_frane(lista: List[SingolaFrana]) -> List[SingolaFrana]:
visti = set()
uniche = []
for f in lista:
chiave = (f.localita.lower().strip(), f.provincia.lower().strip())
if chiave not in visti:
visti.add(chiave)
uniche.append(f)
else:
logger.warning(f"Duplicato rimosso: {f.localita} ({f.provincia})")
return uniche


def geocode_and_save(
lista_frane: List[SingolaFrana],
output_dir: Path = OUTPUT_DIR,
) -> Optional[Path]:
geolocator = Nominatim(user_agent="llamacpp_geo_agent", timeout=10)
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=2, max_retries=3, error_wait_seconds=5)
features = []

for f in lista_frane:
query_geo = f"{f.localita}, {f.provincia}, Italia"
location = None

try:
location = geocode(query_geo)
except Exception as e:
logger.warning(f"Errore rete su '{query_geo}': {e}")

if location is None:
try:
logger.warning(f"Riprovo senza provincia per: {f.localita}")
location = geocode(f"{f.localita}, Italia")
except Exception as e:
logger.error(f"Errore rete anche sul fallback per '{f.localita}': {e}")

if location is None:
logger.warning(f"Geocodifica fallita definitivamente per: {f.localita}")
continue

punto = geojson.Point((location.longitude, location.latitude))
feature = geojson.Feature(
geometry=punto,
properties={
"localita": f.localita,
"provincia": f.provincia,
"descrizione": f.descrizione,
"interruzione_stradale": f.interruzione_stradale,
"evacuazione": f.evacuazione,
"notizia_vecchia": f.notizia_vecchia,
"url_notizia": f.url or "Non disponibile",
"fonte": "Google News",
},
)
features.append(feature)
stradale = "⚠️ strada interrotta" if f.interruzione_stradale else "viabilità ok"
evacuazione = "🚨 evacuazione" if f.evacuazione else "nessuna evacuazione"
eta = "📅 notizia vecchia" if f.notizia_vecchia else "🆕 recente"
logger.info(f"Mappato: {f.localita} ({f.provincia}) — {stradale}{evacuazione}{eta}")

if not features:
logger.warning("Nessuna frana geocodificata. File GeoJSON non creato.")
return None

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = output_dir / f"frane_{timestamp}.geojson"

fc = geojson.FeatureCollection(features)
with open(output_path, "w", encoding="utf-8") as out:
geojson.dump(fc, out, indent=2, ensure_ascii=False)

logger.info(f"File '{output_path}' creato con {len(features)} eventi.")
return output_path


# ---------------------------------------------------------------------------
# Agent loop principale
# ---------------------------------------------------------------------------


def esegui_agente():
notizie_json = recupero_emergenza_google_news()
lista_articoli = json.loads(notizie_json)

if not lista_articoli:
logger.info("Nessun articolo recuperato. Uscita.")
return

tutti_gli_eventi: List[SingolaFrana] = []

for i, articolo in enumerate(lista_articoli):
logger.info(f"Analisi articolo {i+1}/{len(lista_articoli)}: {articolo.get('titolo', '')[:60]}")
prompt = costruisci_prompt_singolo(articolo)

try:
dati_json = chiedi_a_llamacpp(prompt)
eventi_raw = dati_json.get("eventi", [])
if not eventi_raw:
logger.info(" → Nessuna frana rilevata in questo articolo.")
continue
for item in eventi_raw:
try:
tutti_gli_eventi.append(SingolaFrana(**item))
except Exception as e:
logger.warning(f" → Evento non valido saltato: {e}")
logger.info(f" → {len(eventi_raw)} evento/i estratto/i.")
except RuntimeError as e:
logger.error(f" → llama.cpp fallito su articolo {i+1}: {e}")
continue

if not tutti_gli_eventi:
logger.info("Nessun evento espresso estratto dagli articoli.")
return

lista_frane = deduplica_frane(tutti_gli_eventi)
logger.info(f"Totale eventi unici: {len(lista_frane)}")

geocode_and_save(lista_frane)


if __name__ == "__main__":
esegui_agente()

Nessun commento:

Posta un commento

LLama3 Anita

A seguito di questo post ho provato a vedere ho provato a vedere cosa accadeva ad utilizzare un modello specifico per la lingua italiana in...