martedì 30 giugno 2026

DJI ID

Per motivi di sicurezza in volo durante il lavoro volevo che il drone fosse visibile tramite la funzionalita' wifi beacon

Attivarlo non e' stato banale perche' si tratta di una funzionalita' abbastanza ben nascosta

Nella prima schermata di DJI Pilot 2 sotto la scritta max altitude c'e' una piccola scritta RID con un triangolo

cliccandolo si passa alla pagina di inserimento del proprio Id operatore. Attenzione si deve andare di DFlight nel proprio profilo 


 come si vede l'EASA ID ha tre asterischi che nascondono 3 caratteri privati. Nella applicazione DJI deve essere inserito tutto l'Id...per fare comparire la stringa completa si deve cliccare sull'occhio barrato a sinistra della stringa

L'operazione deve essere fatta online perche' DJI Pilot 2 valida la stringa 

a questo punto nella mappa si avra' la scritta RID in nero
 
per leggere i beacon si deve mettere la scheda wifi del portatile in monitor mode. Per vedere se la scheda e' compatibile si puo' procedere con il comando sottostante

 root@HW27747:/home/luca# iw list | grep -A 10 "Supported interface modes"
    Supported interface modes:
         * IBSS
         * managed
         * AP
         * AP/VLAN
         * monitor
         * P2P-client
         * P2P-GO
         * P2P-device
    Band 1:
        Capabilities: 0x11ee

a questo punto si imposta in monitoraggio

sudo ip link set wlp58s0 down
sudo iw dev wlp58s0 set type monitor 
sudo ip link set wlp58s0 up

(base) luca@HW27747:~/scapy$ sudo iw dev
phy#0
    Unnamed/non-netdev interface
        wdev 0x2
        addr 5c:51:4f:8a:24:d3
        type P2P-device
    Interface wlp58s0
        ifindex 2
        wdev 0x1
        addr 5c:51:4f:8a:24:d3
        type monitor
        multicast TXQ:
            qsz-byt    qsz-pkt    flows    drops    marks    overlmt    hashcol    tx-bytes    tx-packets

 

in generale Network manager di Linux interferisce in questa operazione ed e' quindi meglio disattivarlo 

sudo nmcli device set wlp58s0 managed no

per invertire il tutto a fine lavoro 

sudo ip link set wlp58s0 down
sudo iw dev wlp58s0 set type managed
sudo ip link set wlp58s0 up 

sudo nmcli device set wlp58s0 managed yes 

Si installa quindi questo script per la decodifica dei pacchetti wifi  (grazie Claude)

pip install scapy

 

sudo $(which python3) remote_id_wifi_decoder.py wlp58s0 --hop -v --json log.jsonl


---------------------------

#!/usr/bin/env python3
"""
remote_id_wifi_decoder.py
---------------------------
Sniffer/decoder per il Remote ID standard ASTM F3411 (opendroneid)
trasmesso via Wi-Fi Beacon, il transport effettivamente usato da DJI
(Matrice 4, e in generale quasi tutta la linea DJI con Remote ID
compliance) invece del Bluetooth.

I beacon DJI includono un Information Element Vendor-Specific (ID 0xDD)
con OUI FA:0B:BC (ASD-STAN/ASTM), seguito da un byte "vendor type" 0x0D
e dal payload ODID (singolo messaggio o Message Pack), esattamente lo
stesso formato binario usato sul transport Bluetooth.

REQUISITI:
- Scheda Wi-Fi in modalita' monitor mode
- root
- pip install scapy --break-system-packages

USO:
sudo python3 remote_id_wifi_decoder.py wlan0mon
sudo python3 remote_id_wifi_decoder.py wlan0mon --hop -v
sudo python3 remote_id_wifi_decoder.py wlan0mon --json out.jsonl

Setup monitor mode:
sudo ip link set wlan0 down
sudo iw wlan0 set monitor none
sudo ip link set wlan0 up
"""

import argparse
import json
import struct
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone

from scapy.all import sniff, Dot11, Dot11Beacon, Dot11ProbeResp, Dot11Elt

ODID_OUI = b"\xfa\x0b\xbc"
ODID_VENDOR_TYPE = 0x0D

MSG_TYPE_NAMES = {
0x0: "Basic ID", 0x1: "Location/Vector", 0x2: "Auth",
0x3: "Self-ID", 0x4: "System", 0x5: "Operator ID", 0xF: "Message Pack",
}
ID_TYPE_NAMES = {
0: "None", 1: "Serial Number (ANSI/CTA-2063-A)",
2: "CAA Assigned Registration ID", 3: "UTM Assigned ID", 4: "Specific Session ID",
}
UA_TYPE_NAMES = {
0: "None/Undeclared", 1: "Aeroplane", 2: "Helicopter/Multirotor",
3: "Gyroplane", 4: "Hybrid Lift", 5: "Ornithopter", 6: "Glider",
7: "Kite", 8: "Free Balloon", 9: "Captive Balloon", 10: "Airship",
11: "Free Fall/Parachute", 12: "Rocket", 13: "Tethered Powered Aircraft",
14: "Ground Obstacle", 15: "Other",
}
CLASSIFICATION_NAMES = {0: "Undeclared", 1: "EU classification"}


def s8(v):
return v - 256 if v > 127 else v


def decode_basic_id(payload: bytes):
if len(payload) < 22:
return None
id_type = (payload[0] >> 4) & 0x0F
ua_type = payload[0] & 0x0F
raw_id = payload[1:21]
uas_id = raw_id.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {
"id_type": ID_TYPE_NAMES.get(id_type, f"sconosciuto({id_type})"),
"ua_type": UA_TYPE_NAMES.get(ua_type, f"sconosciuto({ua_type})"),
"uas_id": uas_id,
}


def decode_location(payload: bytes):
if len(payload) < 24:
return None
b1 = payload[0]
status = (b1 >> 4) & 0x0F
height_type = (b1 >> 2) & 0x01
ew_dir_segment = (b1 >> 1) & 0x01
speed_multiplier = b1 & 0x01

track_raw = payload[1]
track_dir = track_raw + 180 if ew_dir_segment == 1 else track_raw

speed_raw = payload[2]
speed = speed_raw * 0.25 if speed_multiplier == 0 else 255 * 0.25 + speed_raw * 0.75

vspeed = s8(payload[3]) * 0.5

lat_raw, lon_raw = struct.unpack_from("<ii", payload, 4)
lat, lon = lat_raw / 1e7, lon_raw / 1e7

press_alt_raw, geo_alt_raw, height_raw = struct.unpack_from("<HHH", payload, 12)
press_alt = press_alt_raw * 0.5 - 1000
geo_alt = geo_alt_raw * 0.5 - 1000
height = height_raw * 0.5 - 1000

timestamp_raw = struct.unpack_from("<H", payload, 20)[0]

status_names = {0: "Undeclared", 1: "Ground", 2: "Airborne",
3: "Emergency", 4: "Remote ID System Failure"}

return {
"status": status_names.get(status, f"sconosciuto({status})"),
"track_direction_deg": track_dir,
"speed_horizontal_mps": round(speed, 2),
"speed_vertical_mps": round(vspeed, 2),
"latitude": round(lat, 7),
"longitude": round(lon, 7),
"pressure_altitude_m": round(press_alt, 1),
"geodetic_altitude_m": round(geo_alt, 1),
"height_above_ref_m": round(height, 1),
"height_type": "sopra il take-off" if height_type == 0 else "sopra il livello del suolo",
"timestamp_sec_of_hour": round(timestamp_raw / 10.0, 1),
}


def decode_self_id(payload: bytes):
if len(payload) < 23:
return None
desc_type = payload[0]
desc = payload[1:24].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"description_type": desc_type, "description": desc}


def decode_system(payload: bytes):
if len(payload) < 23:
return None
flags = payload[0]
classification = CLASSIFICATION_NAMES.get((flags >> 2) & 0x03, "sconosciuta")
op_lat_raw, op_lon_raw = struct.unpack_from("<ii", payload, 1)
area_count = struct.unpack_from("<H", payload, 9)[0]
area_radius = payload[11] * 10
area_ceiling_raw, area_floor_raw = struct.unpack_from("<HH", payload, 12)
op_alt_raw = struct.unpack_from("<H", payload, 17)[0]
timestamp_raw = struct.unpack_from("<I", payload, 19)[0]
return {
"classification": classification,
"operator_latitude": round(op_lat_raw / 1e7, 7),
"operator_longitude": round(op_lon_raw / 1e7, 7),
"area_count": area_count,
"area_radius_m": area_radius,
"area_ceiling_m": round(area_ceiling_raw * 0.5 - 1000, 1),
"area_floor_m": round(area_floor_raw * 0.5 - 1000, 1),
"operator_altitude_geo_m": round(op_alt_raw * 0.5 - 1000, 1),
"timestamp_sec_since_2019": timestamp_raw,
}


def decode_operator_id(payload: bytes):
if len(payload) < 21:
return None
op_id_type = payload[0]
op_id = payload[1:21].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"operator_id_type": op_id_type, "operator_id": op_id}


DECODERS = {
0x0: decode_basic_id, 0x1: decode_location, 0x3: decode_self_id,
0x4: decode_system, 0x5: decode_operator_id,
}


def decode_single_message(msg: bytes):
if len(msg) < 2:
return None
header = msg[0]
msg_type = (header >> 4) & 0x0F
proto_version = header & 0x0F
payload = msg[1:25]

entry = {
"message_type": MSG_TYPE_NAMES.get(msg_type, f"sconosciuto(0x{msg_type:X})"),
"protocol_version": proto_version,
}
if msg_type in DECODERS:
try:
decoded = DECODERS[msg_type](payload)
except (struct.error, IndexError):
decoded = None
if decoded:
entry.update(decoded)
elif msg_type == 0x2:
entry["note"] = "messaggio Auth (firma/autenticazione) - non decodificato"
return entry


def decode_odid_body(body: bytes):
"""body = il payload subito dopo OUI + vendor_type (0x0D).

NOTA: i beacon DJI osservati anteposero un singolo byte extra
(un contatore di sequenza, non standard ASTM) prima del vero
ODID Message Pack. Il vero header del pack (0xF_ = Message Pack)
si trova quindi a body[1], non a body[0]. Se invece il primo byte
e' gia' un header di Message Pack valido (tipo 0xF), assumiamo
che non ci sia byte extra (compatibilita' con altri vendor/firmware).
"""
if len(body) < 2:
return None

if (body[0] >> 4) & 0x0F == 0xF:
pack = body
else:
pack = body[1:] # salta il byte contatore non-standard

if len(pack) < 1:
return None

header = pack[0]
msg_type = (header >> 4) & 0x0F

results = []
if msg_type == 0xF:
if len(pack) < 3:
return None
msg_size = pack[1]
msg_count = pack[2]
offset = 3
for _ in range(msg_count):
chunk = pack[offset:offset + msg_size]
if len(chunk) < msg_size:
break
decoded = decode_single_message(chunk)
if decoded:
results.append(decoded)
offset += msg_size
else:
decoded = decode_single_message(pack)
if decoded:
results.append(decoded)
return results


def handle_packet(pkt, json_fp, verbose):
if not pkt.haslayer(Dot11):
return
if not (pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp)):
return

elt = pkt.getlayer(Dot11Elt)
while elt is not None:
if elt.ID == 221 and elt.info[:3] == ODID_OUI and len(elt.info) > 3 and elt.info[3] == ODID_VENDOR_TYPE:
src = pkt.addr2
body = elt.info[4:]
messages = decode_odid_body(body)
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")

if messages:
print(f"\n[{ts}] Remote ID (WiFi Beacon) da {src}")
for m in messages:
mtype = m.get("message_type")
print(f" [{mtype}]")
for k, v in m.items():
if k == "message_type":
continue
print(f" {k}: {v}")
if json_fp:
record = {"timestamp": ts, "mac": src, "messages": messages}
json_fp.write(json.dumps(record, ensure_ascii=False) + "\n")
json_fp.flush()
else:
print(f"\n[{ts}] IE ODID rilevato da {src} ma payload non riconosciuto")

if verbose:
print(f" raw IE: {elt.info.hex()}")

elt = elt.payload.getlayer(Dot11Elt)


def channel_hopper(iface, stop_event, channels=range(1, 14)):
while not stop_event.is_set():
for ch in channels:
if stop_event.is_set():
break
subprocess.run(["iw", "dev", iface, "set", "channel", str(ch)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.3)


def main():
ap = argparse.ArgumentParser(description="Decoder Remote ID (ASTM F3411) via WiFi Beacon")
ap.add_argument("iface", help="interfaccia in monitor mode (es. wlan0mon)")
ap.add_argument("--hop", action="store_true", help="channel hopping 1-13 (2.4GHz)")
ap.add_argument("--json", dest="json_path", default=None, help="salva ogni rilevamento come riga JSON")
ap.add_argument("-v", "--verbose", action="store_true", help="stampa anche l'IE raw in hex")
args = ap.parse_args()

print(f"In ascolto su {args.iface}... (Ctrl+C per uscire)")
print(f"Cerco IE Vendor Specific OUI {ODID_OUI.hex(':')} type 0x{ODID_VENDOR_TYPE:02X} (ASTM F3411 Remote ID).\n")

json_fp = open(args.json_path, "a", encoding="utf-8") if args.json_path else None

stop_event = threading.Event()
hopper_thread = None
if args.hop:
hopper_thread = threading.Thread(target=channel_hopper, args=(args.iface, stop_event), daemon=True)
hopper_thread.start()

try:
sniff(iface=args.iface, prn=lambda pkt: handle_packet(pkt, json_fp, args.verbose), store=False)
except PermissionError:
print("Errore: serve eseguire lo script come root (sudo).", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
pass
finally:
stop_event.set()
if hopper_thread:
hopper_thread.join(timeout=1)
if json_fp:
json_fp.close()


if __name__ == "__main__":
main()

Per ricevere i pacchetti il drone deve essere in volo o quanto meno deve avere i motori armati .Un esempio della cattura.

 

[11:30:11] Remote ID (WiFi Beacon) da 8c:1e:d9:13:a2:8b
  [Basic ID]
    protocol_version: 1
    id_type: Serial Number (ANSI/CTA-2063-A)
    ua_type: Helicopter/Multirotor
    uas_id: 1581F7K3C255K00D7EEL
  [Location/Vector]
    protocol_version: 1
    status: Airborne
    track_direction_deg: 361
    speed_horizontal_mps: 0.0
    speed_vertical_mps: 0.0
    latitude: 43.7842685
    longitude: 11.2306187
    pressure_altitude_m: 13.0
    geodetic_altitude_m: 98.0
    height_above_ref_m: 3.0
    height_type: sopra il take-off
    timestamp_sec_of_hour: 1810.4
  [Self-ID]
    protocol_version: 1
    description_type: 0
    description: 
  [System]
    protocol_version: 1
    classification: EU classification
    operator_latitude: 43.7841697
    operator_longitude: 11.2305805
    area_count: 1
    area_radius_m: 0
    area_ceiling_m: -1000.0
    area_floor_m: -1000.0
    operator_altitude_geo_m: 112.0
    timestamp_sec_since_2019: 0
  [Operator ID]
    protocol_version: 1
    operator_id_type: 0
    operator_id: ITA5qjdid6y5x5ni
  raw IE: fa0bbc0d38f1190501123135383146374b33433235354b3030443745454c0000001122b50000fdf2181a0ba8b106ea079408d6073b44b846010031000000000000000000000000000000000000000000000000410521ef181a8da6b1060100000000000003b0080000000000510049544135716a64696436793578356e6900000000000000


giovedì 18 giugno 2026

LLama3 Anita

A seguito di questo post ho provato a vedere ho provato a vedere cosa accadeva ad utilizzare un modello specifico per la lingua italiana in particolare per la gestione delle notizie di eventi passati

Il modello e' 

https://huggingface.co/mradermacher/LLaMAntino-3-ANITA-8B-Inst-DPO-ITA-GGUF

Visto il formato gguf ho usato llamacpp al posto di ollama con la seguente sintassi  che e' specifica per mac silicon  

./llama-server -m ./local_models/LLaMAntino-3-ANITA-8B-Inst-DPO-ITA.Q8_0.gguf \-ngl 999 -c 8192 -t 6 -tb 6 --port 8080 --host 127.0.0.1


"""
Agente di rilevamento frane da Google News con geocodifica e output GeoJSON.
Ottimizzato per LLaMAntino-3-ANITA-8B eseguito tramite llama.cpp server su Mac M4.
"""

import json
import time
import logging
from pathlib import Path
from datetime import datetime
from typing import List, Optional

import feedparser
import geojson
import httpx
from bs4 import BeautifulSoup
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from pydantic import BaseModel, Field

# ---------------------------------------------------------------------------
# Configurazione
# ---------------------------------------------------------------------------

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)

MAX_ARTICOLI = 20
MAX_PREVIEW_CHARS = 150 # usato solo come fallback se il download dell'articolo fallisce
MAX_TESTO_ARTICOLO = 3500 # caratteri massimi del testo completo scaricato
MAX_RETRY_LLAMACPP = 3
OUTPUT_DIR = Path(".")

GOOGLE_NEWS_URL = (
"https://news.google.com/rss/search"
"?q=frana+OR+smottamento+when:30d&hl=it&gl=IT&ceid=IT:it"
)

# Configurazione endpoint llama.cpp server
LLAMACPP_API_URL = "http://127.0.0.1:8080/v1/chat/completions"

# Opzioni di generazione specifiche per LLaMAntino-3 (architettura Llama-3)
LLAMACPP_OPTIONS = {
"temperature": 0.1, # Molto bassa per garantire rigore filologico e sintattico
"top_p": 0.9,
"max_tokens": 4096,
}

SYSTEM_PROMPT = (
"Sei ANITA, un assistente IA esperto di rischio idrogeologico e nell'analisi linguistica della lingua italiana, sviluppato dall'Università di Bari. "
"Il tuo compito è estrarre notizie di frane e associare sempre l'URL della fonte corretto ad ogni record. "
"Analizza con estremo rigore la sintassi, la flessione dei tempi verbali e i marcatori temporali per valutare ogni evento:\n"
"1) Se il testo menziona interruzioni, chiusure o blocchi di strade, viabilità, sentieri o ferrovie causati dalla frana: imposta 'interruzione_stradale' a true;\n"
"2) Se il testo menziona evacuazioni, sfollati, persone fatte uscire dalle abitazioni o ordini di evacuazione: imposta 'evacuazione' a true;\n"
"3) Valuta accuratamente la cronologia dell'evento rispetto alla data odierna fornita nel prompt. Presta attenzione allo stile giornalistico (es. l'uso del presente storico per fatti passati). Se la data di pubblicazione è più vecchia di 7 giorni rispetto ad oggi, o se l'analisi sintattica dei verbi o delle locuzioni avverbiali indica che l'evento è accaduto nel passato, è storico o è già stato completamente risolto: imposta 'notizia_vecchia' a true. Altrimenti imposta false.\n"
"In assenza di menzione esplicita, imposta i campi a false. Genera un output JSON valido seguendo strettamente lo schema richiesto."
)

# ---------------------------------------------------------------------------
# Modelli Pydantic
# ---------------------------------------------------------------------------


class SingolaFrana(BaseModel):
localita: str = Field(
description="Nome del comune, frazione o località specifica in Italia."
)
provincia: str = Field(
description="Sigla della provincia (es. 'SO', 'TO', 'BG')."
)
descrizione: str = Field(
description="Breve sintesi dell'evento o dei danni descritti nell'articolo."
)
interruzione_stradale: bool = Field(
description="True se la frana ha causato interruzione, chiusura o blocco di strade, viabilità o ferrovie. False altrimenti."
)
evacuazione: bool = Field(
description="True se la frana ha causato evacuazione di persone, sfollati o ordini di evacuazione. False altrimenti."
)
notizia_vecchia: bool = Field(
description="True se la frana è accaduta più di una settimana fa rispetto alla data odierna o se l'analisi dei tempi verbali e del contesto indica un evento passato, storico o già risolto. False se l'evento è recente (ultimi 7 giorni)."
)
url: Optional[str] = Field(
description="L'URL esatto dell'articolo da cui è stata estratta la notizia."
)


class ListaFrane(BaseModel):
eventi: List[SingolaFrana] = Field(
description="Lista di tutte le frane distinte rilevate negli articoli analizzati."
)


# ---------------------------------------------------------------------------
# Tool 1a: Scraping testo completo articolo
# ---------------------------------------------------------------------------

HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "it-IT,it;q=0.9",
}

COOKIE_WALL_MARKERS = [
"accetta tutto",
"rifiuta tutto",
"cookie e dati",
"contenuti non personalizzati",
"impostazioni cookie",
"privacy policy",
"we use cookies",
]


def is_cookie_wall(testo: str) -> bool:
testo_lower = testo.lower()
hits = sum(1 for marker in COOKIE_WALL_MARKERS if marker in testo_lower)
return hits >= 2


PAYWALL_DOMAINS = {
"corriere.it", "repubblica.it", "sole24ore.com",
"stampa.it", "ilmessaggero.it", "lastampa.it",
}


def estrai_dominio(url: str) -> str:
try:
from urllib.parse import urlparse
return urlparse(url).netloc.replace("www.", "")
except Exception:
return ""


def scarica_testo_articolo(url: str) -> str:
dominio = estrai_dominio(url)
if dominio in PAYWALL_DOMAINS:
logger.debug(f"Paywall noto, uso snippet RSS per: {dominio}")
return ""

try:
response = httpx.get(
url,
headers=HEADERS,
timeout=10,
follow_redirects=True,
)
if response.status_code != 200:
logger.debug(f"HTTP {response.status_code} per {url}")
return ""

soup = BeautifulSoup(response.text, "html.parser")

for tag in soup(["script", "style", "nav", "footer", "header",
"aside", "form", "noscript", "iframe", "figure",
"button", "input", "select", "textarea"]):
tag.decompose()

contenuto = (
soup.find("article")
or soup.find("main")
or soup.find(attrs={"itemprop": "articleBody"})
or soup.find(class_=lambda c: c and any(
kw in c.lower() for kw in ["article-body", "article__body",
"post-content", "entry-content",
"news-body", "article-content"]
))
)

if contenuto:
paragrafi = contenido.find_all("p")
if paragrafi:
testo = " ".join(p.get_text(strip=True) for p in paragrafi)
else:
testo = contenido.get_text(separator=" ", strip=True)
else:
paragrafi = soup.find_all("p")
testo = " ".join(p.get_text(strip=True) for p in paragrafi if len(p.get_text(strip=True)) > 40)

if not testo or is_cookie_wall(testo):
logger.debug(f"Cookie wall o testo vuoto rilevato per: {url}")
return ""

return testo[:MAX_TESTO_ARTICOLO]

except httpx.TimeoutException:
logger.warning(f"Timeout scaricando: {url}")
return ""
except Exception as e:
logger.warning(f"Errore scaricando {url}: {e}")
return ""


# ---------------------------------------------------------------------------
# Tool 1: Recupero notizie da Google News
# ---------------------------------------------------------------------------


def recupero_emergenza_google_news(max_risultati: int = MAX_ARTICOLI) -> str:
logger.info("Recupero notizie da Google News...")
feed = feedparser.parse(GOOGLE_NEWS_URL)

risultati_raw = []
for entry in feed.entries[:max_risultati]:
titolo = entry.get("title", "")
snippet = BeautifulSoup(
entry.get("summary", ""), "html.parser"
).get_text().strip()
link = entry.get("link", "")

testo_base = f"{titolo}. {snippet}"[:MAX_PREVIEW_CHARS * 2]

testo_completo = scarica_testo_articolo(link)
if testo_completo:
testo = testo_completo
fonte_testo = "articolo completo"
else:
testo = testo_base
fonte_testo = "snippet RSS"

logger.info(f" [{fonte_testo}] {titolo[:60]}")

risultati_raw.append(
{
"titolo": titolo,
"testo": testo,
"link": link,
"data_pubblicazione": entry.get("published", "sconosciuta"),
}
)

logger.info(f"Pronti {len(risultati_raw)} articoli per l'analisi.")
return json.dumps(risultati_raw, ensure_ascii=False)


# ---------------------------------------------------------------------------
# Costruzione prompt
# ---------------------------------------------------------------------------


def costruisci_prompt_singolo(articolo: dict) -> str:
# Recupera dinamicamente la data di esecuzione (oggi)
oggi = datetime.now()
today_str = oggi.strftime("%A %d/%m/%Y")
testo = articolo['testo']

logger.debug(f" Testo inviato a LLaMAntino via llama.cpp:\n {testo[:300]!r}")

return (
f"--- CONTESTO TEMPORALE OBBLIGATORIO ---\n"
f"DATA DI OGGI: {today_str}\n"
f"Data di pubblicazione dell'articolo: {articolo.get('data_pubblicazione', 'sconosciuta')}\n"
f"----------------------------------------\n\n"
"Analizza il seguente articolo di cronaca e determina se descrive uno o più eventi "
"di frana o smottamento avvenuti in Italia.\n\n"
"LINEE GUIDA RIGIDE PER LA VALUTAZIONE DELLA DATA ('notizia_vecchia'):\n"
f"1. Confronta i riferimenti temporali nel testo (es. 'ieri', 'due giorni fa', 'mercoledì scorso', 'il mese scorso') con la DATA DI OGGI ({today_str}).\n"
"2. Fai un calcolo mentale: quanti giorni sono passati tra l'evento descritto e OGGI?\n"
"3. Imposta 'notizia_vecchia' a TRUE se:\n"
" - La data di pubblicazione o l'evento stesso risalgono a più di 7 giorni fa rispetto a OGGI.\n"
" - Il testo usa verbi al passato remoto o fa riferimento a frane storiche, passate o già completamente ripristinate (es. 'la frana dello scorso anno', 'i lavori post-frana di due mesi fa').\n"
"4. Imposta 'notizia_vecchia' a FALSE solo se l'evento è in corso, è accaduto oggi, ieri o comunque negli ultimi 7 giorni ed è ancora un'emergenza attuale.\n\n"
"Se NON contiene notizie di frane attive o passate, rispondi con la lista 'eventi' vuota.\n\n"
"Per ogni evento trovato, compila i campi richiesti. Nel campo 'url', copia esattamente: " + articolo['link'] + "\n\n"
f"Testo dell'articolo:\n{testo}"
)


# ---------------------------------------------------------------------------
# Tool 2: Chiamata al Server llama.cpp con validazione formale dello schema
# ---------------------------------------------------------------------------


def chiedi_a_llamacpp(prompt: str, max_retry: int = MAX_RETRY_LLAMACPP) -> dict:
"""Invia il prompt al server locale di llama.cpp forzando la grammatica JSON nativa."""
# Costruiamo il payload compatibile OpenAI + estensione dello schema JSON nativo per llama.cpp
payload = {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": LLAMACPP_OPTIONS["temperature"],
"top_p": LLAMACPP_OPTIONS["top_p"],
"max_tokens": LLAMACPP_OPTIONS["max_tokens"],
# Questa chiave forza llama.cpp a campionare i token seguendo strettamente la struttura Pydantic
"response_format": {
"type": "json_object",
"schema": ListaFrane.model_json_schema()
}
}

for tentativo in range(1, max_retry + 1):
try:
logger.info(f"Chiamata a llama.cpp server (tentativo {tentativo}/{max_retry})...")
with httpx.Client() as client:
response = client.post(
LLAMACPP_API_URL,
json=payload,
timeout=120.0 # Timeout ampio per l'elaborazione di contesti lunghi
)
if response.status_code != 200:
logger.error(f"Errore HTTP dal server llama.cpp: {response.status_code}")
if tentativo < max_retry:
time.sleep(2)
continue
response.raise_for_status()

risposta_json = response.json()
testo_risposta = risposta_json["choices"][0]["message"]["content"].strip()
# Parsing del JSON strutturato restituito dal modello
dati = json.loads(testo_risposta)
return dati

except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
logger.warning(f"Tentativo {tentativo} fallito a causa di un errore di comunicazione o parsing: {e}")
if tentativo < max_retry:
time.sleep(2)

raise RuntimeError(
f"Impossibile ottenere una risposta valida dal server llama.cpp dopo {max_retry} tentativi."
)


# ---------------------------------------------------------------------------
# Tool 3: Geocodifica e salvataggio GeoJSON
# ---------------------------------------------------------------------------


def deduplica_frane(lista: List[SingolaFrana]) -> List[SingolaFrana]:
visti = set()
uniche = []
for f in lista:
chiave = (f.localita.lower().strip(), f.provincia.lower().strip())
if chiave not in visti:
visti.add(chiave)
uniche.append(f)
else:
logger.warning(f"Duplicato rimosso: {f.localita} ({f.provincia})")
return uniche


def geocode_and_save(
lista_frane: List[SingolaFrana],
output_dir: Path = OUTPUT_DIR,
) -> Optional[Path]:
geolocator = Nominatim(user_agent="llamacpp_geo_agent", timeout=10)
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=2, max_retries=3, error_wait_seconds=5)
features = []

for f in lista_frane:
query_geo = f"{f.localita}, {f.provincia}, Italia"
location = None

try:
location = geocode(query_geo)
except Exception as e:
logger.warning(f"Errore rete su '{query_geo}': {e}")

if location is None:
try:
logger.warning(f"Riprovo senza provincia per: {f.localita}")
location = geocode(f"{f.localita}, Italia")
except Exception as e:
logger.error(f"Errore rete anche sul fallback per '{f.localita}': {e}")

if location is None:
logger.warning(f"Geocodifica fallita definitivamente per: {f.localita}")
continue

punto = geojson.Point((location.longitude, location.latitude))
feature = geojson.Feature(
geometry=punto,
properties={
"localita": f.localita,
"provincia": f.provincia,
"descrizione": f.descrizione,
"interruzione_stradale": f.interruzione_stradale,
"evacuazione": f.evacuazione,
"notizia_vecchia": f.notizia_vecchia,
"url_notizia": f.url or "Non disponibile",
"fonte": "Google News",
},
)
features.append(feature)
stradale = "⚠️ strada interrotta" if f.interruzione_stradale else "viabilità ok"
evacuazione = "🚨 evacuazione" if f.evacuazione else "nessuna evacuazione"
eta = "📅 notizia vecchia" if f.notizia_vecchia else "🆕 recente"
logger.info(f"Mappato: {f.localita} ({f.provincia}) — {stradale}{evacuazione}{eta}")

if not features:
logger.warning("Nessuna frana geocodificata. File GeoJSON non creato.")
return None

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = output_dir / f"frane_{timestamp}.geojson"

fc = geojson.FeatureCollection(features)
with open(output_path, "w", encoding="utf-8") as out:
geojson.dump(fc, out, indent=2, ensure_ascii=False)

logger.info(f"File '{output_path}' creato con {len(features)} eventi.")
return output_path


# ---------------------------------------------------------------------------
# Agent loop principale
# ---------------------------------------------------------------------------


def esegui_agente():
notizie_json = recupero_emergenza_google_news()
lista_articoli = json.loads(notizie_json)

if not lista_articoli:
logger.info("Nessun articolo recuperato. Uscita.")
return

tutti_gli_eventi: List[SingolaFrana] = []

for i, articolo in enumerate(lista_articoli):
logger.info(f"Analisi articolo {i+1}/{len(lista_articoli)}: {articolo.get('titolo', '')[:60]}")
prompt = costruisci_prompt_singolo(articolo)

try:
dati_json = chiedi_a_llamacpp(prompt)
eventi_raw = dati_json.get("eventi", [])
if not eventi_raw:
logger.info(" → Nessuna frana rilevata in questo articolo.")
continue
for item in eventi_raw:
try:
tutti_gli_eventi.append(SingolaFrana(**item))
except Exception as e:
logger.warning(f" → Evento non valido saltato: {e}")
logger.info(f" → {len(eventi_raw)} evento/i estratto/i.")
except RuntimeError as e:
logger.error(f" → llama.cpp fallito su articolo {i+1}: {e}")
continue

if not tutti_gli_eventi:
logger.info("Nessun evento espresso estratto dagli articoli.")
return

lista_frane = deduplica_frane(tutti_gli_eventi)
logger.info(f"Totale eventi unici: {len(lista_frane)}")

geocode_and_save(lista_frane)


if __name__ == "__main__":
esegui_agente()

DJI ID

Per motivi di sicurezza in volo durante il lavoro volevo che il drone fosse visibile tramite la funzionalita' wifi beacon Attivarlo non ...