mercoledì 1 luglio 2026

Allarme prossimita' ADBS

Uno degli incubi mentre volo con il drone e' di imbattermi in Pegaso 1, l'elisoccorso della Toscana

La quota di volo di Pegaso minima (tranne quando decolla ed atterra) sembra essere di 1000 piedi (circa 300 m) quindi ben al di sopra dei 120 m di quota massima dei droni 

l'idea e' comunque di avere una ground station con allarme di prossimita' usando un dongle RTL-SDR impostando la propria posizione ed un raggio di 2 miglia nautiche

su Debian 13 si usa (scaricabile tramite apt)  

dump1090-fa    --net   --net-sbs-port 30003   --net-ro-port 30002   --net-http-port 8080   --lat 43.7696 --lon 11.2558  

il comando soprastante 

usando il programma Python (grazie Claude) si leggono i dati in uscita dump1009-fa e si selezionano solo gli aeromobili in 3 miglia nautiche di distanza e separazione verticale inferiore a 1500 piedi

 python3 adsb_proximity_alarm.py sbs --host 127.0.0.1 --port 30003     --lat 43.7696 --lon 11.2558 --elev-ft 150     --radius-nm 3 --vsep-ft 1500 -v 

 

 


#!/usr/bin/env python3
"""
adsb_proximity_alarm.py

Proximity alarm for ADS-B traffic received via an RTL-SDR + dump1090
(dump1090-fa, dump1090-mutability, readsb, or the dump1090 fork bundled
with Stratux).

It watches live aircraft positions and raises an alert when any aircraft
comes within a configurable horizontal distance AND vertical separation
of a reference point (e.g. your ground station / launch site).

Two input modes are supported, pick whichever your setup exposes:

1. SBS / BaseStation feed (raw TCP text feed, classic dump1090 port 30003)
Most dump1090 builds expose this. Stratux's internal dump1090 usually
does too, though it may not be reachable outside localhost unless you
tunnel it (e.g. `ssh -L 30003:localhost:30003 pi@stratux.local`).

2. aircraft.json polling (HTTP), as served by dump1090-fa/readsb/tar1090,
typically at http://<host>/dump1090-fa/data/aircraft.json or similar.
Check your install; the path varies by distro/build.

Usage examples
--------------
SBS mode, ground station at 43.7696N 11.2558E (Florence area), alarm inside
3 NM / 1500 ft:

python3 adsb_proximity_alarm.py sbs \\
--host 127.0.0.1 --port 30003 \\
--lat 43.7696 --lon 11.2558 --elev-ft 150 \\
--radius-nm 3 --vsep-ft 1500

JSON polling mode:

python3 adsb_proximity_alarm.py json \\
--url http://127.0.0.1/dump1090-fa/data/aircraft.json \\
--lat 43.7696 --lon 11.2558 --radius-nm 3 --vsep-ft 1500

No extra dependencies beyond the standard library and `requests`
(only needed for JSON mode: pip install requests).
"""

import argparse
import math
import socket
import sys
import time
import urllib.request
import json as jsonlib
from dataclasses import dataclass, field
from datetime import datetime, timezone


# --------------------------------------------------------------------------
# Geometry helpers
# --------------------------------------------------------------------------

EARTH_RADIUS_NM = 3440.065 # mean earth radius in nautical miles


def haversine_nm(lat1, lon1, lat2, lon2):
"""Great-circle distance in nautical miles."""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = (math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2)
return 2 * EARTH_RADIUS_NM * math.asin(math.sqrt(a))


def bearing_deg(lat1, lon1, lat2, lon2):
"""Initial bearing from point 1 to point 2, in degrees (0=N, 90=E)."""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dlambda = math.radians(lon2 - lon1)
y = math.sin(dlambda) * math.cos(phi2)
x = (math.cos(phi1) * math.sin(phi2)
- math.sin(phi1) * math.cos(phi2) * math.cos(dlambda))
return (math.degrees(math.atan2(y, x)) + 360) % 360


def compass(deg):
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
return dirs[int((deg + 22.5) // 45) % 8]


# --------------------------------------------------------------------------
# Aircraft state
# --------------------------------------------------------------------------

@dataclass
class Aircraft:
icao: str
callsign: str = ""
lat: float = None
lon: float = None
alt_ft: float = None
last_seen: float = field(default_factory=time.monotonic)
last_alert: float = 0.0

def has_position(self):
return self.lat is not None and self.lon is not None


class AircraftTable:
"""Keeps track of recently seen aircraft and prunes stale ones."""

def __init__(self, stale_after_s=60):
self.aircraft = {}
self.stale_after_s = stale_after_s

def update(self, icao, callsign=None, lat=None, lon=None, alt_ft=None):
ac = self.aircraft.get(icao)
if ac is None:
ac = Aircraft(icao=icao)
self.aircraft[icao] = ac
if callsign:
ac.callsign = callsign.strip()
if lat is not None:
ac.lat = lat
if lon is not None:
ac.lon = lon
if alt_ft is not None:
ac.alt_ft = alt_ft
ac.last_seen = time.monotonic()
return ac

def prune(self):
now = time.monotonic()
dead = [icao for icao, ac in self.aircraft.items()
if now - ac.last_seen > self.stale_after_s]
for icao in dead:
del self.aircraft[icao]


# --------------------------------------------------------------------------
# Alerting
# --------------------------------------------------------------------------

RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
RESET = "\033[0m"


def alert(ac: Aircraft, ref_lat, ref_lon, dist_nm, vsep_ft, bell=True):
brg = bearing_deg(ref_lat, ref_lon, ac.lat, ac.lon)
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
tag = ac.callsign if ac.callsign else ac.icao
msg = (f"{RED}[ALERT {ts}Z]{RESET} {tag} (ICAO {ac.icao}) "
f"{dist_nm:.1f} NM {compass(brg)} ({brg:5.1f}°), "
f"alt {ac.alt_ft:.0f} ft, vertical sep {vsep_ft:.0f} ft")
print(msg)
if bell:
sys.stdout.write("\a")
sys.stdout.flush()


def status_line(ac: Aircraft, dist_nm, vsep_ft):
tag = ac.callsign if ac.callsign else ac.icao
print(f"{YELLOW} near{RESET} {tag:8s} {dist_nm:5.1f} NM "
f"alt {ac.alt_ft:6.0f} ft vsep {vsep_ft:6.0f} ft")


# --------------------------------------------------------------------------
# Core proximity check
# --------------------------------------------------------------------------

def check_proximity(ac: Aircraft, args):
if not ac.has_position() or ac.alt_ft is None:
return
dist_nm = haversine_nm(args.lat, args.lon, ac.lat, ac.lon)
vsep_ft = abs(ac.alt_ft - args.elev_ft)

if dist_nm <= args.radius_nm and vsep_ft <= args.vsep_ft:
now = time.monotonic()
if now - ac.last_alert >= args.cooldown:
alert(ac, args.lat, args.lon, dist_nm, vsep_ft, bell=not args.no_bell)
ac.last_alert = now
elif args.verbose and dist_nm <= args.radius_nm * 2.0:
# Show near-miss traffic even if outside the hard alert ring,
# useful for situational awareness.
status_line(ac, dist_nm, vsep_ft)


# --------------------------------------------------------------------------
# SBS (BaseStation) feed parser
# --------------------------------------------------------------------------
#
# Format reference (dump1090 SBS output), comma separated fields:
# MSG,type,session,aircraft,hex,flight,date,time,date,time,
# callsign,altitude,gspeed,track,lat,lon,vrate,squawk,alert,emergency,spi,onground
#
# Relevant message types:
# MSG,1 -> callsign (field 10)
# MSG,3 -> airborne position: altitude(11), lat(14), lon(15)
# MSG,4 -> velocity
# MSG,5/6/7/8 -> various, mostly ignored here

def run_sbs(args):
table = AircraftTable()
print(f"Connecting to SBS feed at {args.host}:{args.port} ...")
while True:
try:
with socket.create_connection((args.host, args.port), timeout=10) as sock:
print(f"{GREEN}Connected.{RESET} Watching for traffic within "
f"{args.radius_nm} NM / {args.vsep_ft} ft of "
f"{args.lat:.4f},{args.lon:.4f} (elev {args.elev_ft} ft)...")
buf = b""
sock.settimeout(5)
last_prune = time.monotonic()
while True:
try:
chunk = sock.recv(4096)
except socket.timeout:
chunk = b""
if chunk == b"" and time.monotonic() - last_prune > 30:
# no data for a while but connection alive; keep looping
pass
buf += chunk
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.decode(errors="ignore").strip()
if line:
handle_sbs_line(line, table, args)
if time.monotonic() - last_prune > 15:
table.prune()
last_prune = time.monotonic()
except (ConnectionRefusedError, OSError) as e:
print(f"Connection error ({e}); retrying in 5s...")
time.sleep(5)


def handle_sbs_line(line, table, args):
fields = line.split(",")
if len(fields) < 2 or fields[0] != "MSG":
return
msg_type = fields[1]
if len(fields) < 5:
return
icao = fields[4].strip()
if not icao:
return

callsign = None
lat = lon = alt_ft = None

try:
if msg_type == "1" and len(fields) > 10:
callsign = fields[10]
elif msg_type == "3" and len(fields) > 15:
alt_ft = float(fields[11]) if fields[11] else None
lat = float(fields[14]) if fields[14] else None
lon = float(fields[15]) if fields[15] else None
elif msg_type in ("5", "6", "7", "8") and len(fields) > 11 and fields[11]:
alt_ft = float(fields[11])
except ValueError:
return

ac = table.update(icao, callsign=callsign, lat=lat, lon=lon, alt_ft=alt_ft)
check_proximity(ac, args)


# --------------------------------------------------------------------------
# aircraft.json polling
# --------------------------------------------------------------------------

def run_json(args):
table = AircraftTable()
print(f"Polling {args.url} every {args.poll_interval}s ...")
print(f"Watching for traffic within {args.radius_nm} NM / {args.vsep_ft} ft of "
f"{args.lat:.4f},{args.lon:.4f} (elev {args.elev_ft} ft)...")
last_prune = time.monotonic()
while True:
try:
with urllib.request.urlopen(args.url, timeout=5) as resp:
data = jsonlib.loads(resp.read())
except Exception as e:
print(f"Fetch error ({e}); retrying in {args.poll_interval}s...")
time.sleep(args.poll_interval)
continue

aircraft_list = data.get("aircraft", data if isinstance(data, list) else [])
for entry in aircraft_list:
icao = (entry.get("hex") or entry.get("icao") or "").strip()
if not icao:
continue
callsign = entry.get("flight") or entry.get("callsign")
lat = entry.get("lat")
lon = entry.get("lon")
alt_ft = entry.get("alt_baro", entry.get("altitude"))
try:
alt_ft = float(alt_ft) if alt_ft not in (None, "ground") else None
except (TypeError, ValueError):
alt_ft = None

ac = table.update(icao, callsign=callsign, lat=lat, lon=lon, alt_ft=alt_ft)
check_proximity(ac, args)

if time.monotonic() - last_prune > 15:
table.prune()
last_prune = time.monotonic()

time.sleep(args.poll_interval)


# --------------------------------------------------------------------------
# CLI
# --------------------------------------------------------------------------

def build_parser():
# Shared options live on a "parent" parser so they can be passed either
# before or after the sbs/json subcommand, e.g. both of these work:
# adsb_proximity_alarm.py --lat 43.7 --lon 11.2 sbs --port 30003
# adsb_proximity_alarm.py sbs --port 30003 --lat 43.7 --lon 11.2
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--lat", type=float, required=True,
help="Reference latitude (your ground station / launch site)")
common.add_argument("--lon", type=float, required=True,
help="Reference longitude")
common.add_argument("--elev-ft", type=float, default=0.0,
help="Reference point elevation, feet MSL (default 0)")
common.add_argument("--radius-nm", type=float, default=3.0,
help="Horizontal alert radius in nautical miles (default 3)")
common.add_argument("--vsep-ft", type=float, default=1500.0,
help="Vertical separation threshold in feet (default 1500)")
common.add_argument("--cooldown", type=float, default=30.0,
help="Seconds before re-alerting on the same aircraft (default 30)")
common.add_argument("--no-bell", action="store_true",
help="Disable terminal bell on alert")
common.add_argument("-v", "--verbose", action="store_true",
help="Also print traffic within 2x the alert radius")

p = argparse.ArgumentParser(
description="ADS-B proximity alarm using RTL-SDR / dump1090 feeds")
sub = p.add_subparsers(dest="mode", required=True)

sbs = sub.add_parser("sbs", parents=[common],
help="Connect to raw SBS/BaseStation TCP feed")
sbs.add_argument("--host", default="127.0.0.1")
sbs.add_argument("--port", type=int, default=30003)

js = sub.add_parser("json", parents=[common],
help="Poll an aircraft.json HTTP endpoint")
js.add_argument("--url", required=True,
help="Full URL to aircraft.json")
js.add_argument("--poll-interval", type=float, default=1.0,
help="Seconds between polls (default 1.0)")

return p


def main():
args = build_parser().parse_args()
try:
if args.mode == "sbs":
run_sbs(args)
else:
run_json(args)
except KeyboardInterrupt:
print("\nStopped.")


if __name__ == "__main__":
main()


martedì 30 giugno 2026

DJI ID

Per motivi di sicurezza in volo durante il lavoro volevo che il drone fosse visibile tramite la funzionalita' wifi beacon

Attivarlo non e' stato banale perche' si tratta di una funzionalita' abbastanza ben nascosta

Nella prima schermata di DJI Pilot 2 sotto la scritta max altitude c'e' una piccola scritta RID con un triangolo

cliccandolo si passa alla pagina di inserimento del proprio Id operatore. Attenzione si deve andare di DFlight nel proprio profilo 


 come si vede l'EASA ID ha tre asterischi che nascondono 3 caratteri privati. Nella applicazione DJI deve essere inserito tutto l'Id...per fare comparire la stringa completa si deve cliccare sull'occhio barrato a sinistra della stringa

L'operazione deve essere fatta online perche' DJI Pilot 2 valida la stringa 

a questo punto nella mappa si avra' la scritta RID in nero
 
per leggere i beacon si deve mettere la scheda wifi del portatile in monitor mode. Per vedere se la scheda e' compatibile si puo' procedere con il comando sottostante

 root@HW27747:/home/luca# iw list | grep -A 10 "Supported interface modes"
    Supported interface modes:
         * IBSS
         * managed
         * AP
         * AP/VLAN
         * monitor
         * P2P-client
         * P2P-GO
         * P2P-device
    Band 1:
        Capabilities: 0x11ee

a questo punto si imposta in monitoraggio

sudo ip link set wlp58s0 down
sudo iw dev wlp58s0 set type monitor 
sudo ip link set wlp58s0 up

(base) luca@HW27747:~/scapy$ sudo iw dev
phy#0
    Unnamed/non-netdev interface
        wdev 0x2
        addr 5c:51:4f:8a:24:d3
        type P2P-device
    Interface wlp58s0
        ifindex 2
        wdev 0x1
        addr 5c:51:4f:8a:24:d3
        type monitor
        multicast TXQ:
            qsz-byt    qsz-pkt    flows    drops    marks    overlmt    hashcol    tx-bytes    tx-packets

 

in generale Network manager di Linux interferisce in questa operazione ed e' quindi meglio disattivarlo 

sudo nmcli device set wlp58s0 managed no

per invertire il tutto a fine lavoro 

sudo ip link set wlp58s0 down
sudo iw dev wlp58s0 set type managed
sudo ip link set wlp58s0 up 

sudo nmcli device set wlp58s0 managed yes 

Si installa quindi questo script per la decodifica dei pacchetti wifi  (grazie Claude)

pip install scapy

 

sudo $(which python3) remote_id_wifi_decoder.py wlp58s0 --hop -v --json log.jsonl


---------------------------

#!/usr/bin/env python3
"""
remote_id_wifi_decoder.py
---------------------------
Sniffer/decoder per il Remote ID standard ASTM F3411 (opendroneid)
trasmesso via Wi-Fi Beacon, il transport effettivamente usato da DJI
(Matrice 4, e in generale quasi tutta la linea DJI con Remote ID
compliance) invece del Bluetooth.

I beacon DJI includono un Information Element Vendor-Specific (ID 0xDD)
con OUI FA:0B:BC (ASD-STAN/ASTM), seguito da un byte "vendor type" 0x0D
e dal payload ODID (singolo messaggio o Message Pack), esattamente lo
stesso formato binario usato sul transport Bluetooth.

REQUISITI:
- Scheda Wi-Fi in modalita' monitor mode
- root
- pip install scapy --break-system-packages

USO:
sudo python3 remote_id_wifi_decoder.py wlan0mon
sudo python3 remote_id_wifi_decoder.py wlan0mon --hop -v
sudo python3 remote_id_wifi_decoder.py wlan0mon --json out.jsonl

Setup monitor mode:
sudo ip link set wlan0 down
sudo iw wlan0 set monitor none
sudo ip link set wlan0 up
"""

import argparse
import json
import struct
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone

from scapy.all import sniff, Dot11, Dot11Beacon, Dot11ProbeResp, Dot11Elt

ODID_OUI = b"\xfa\x0b\xbc"
ODID_VENDOR_TYPE = 0x0D

MSG_TYPE_NAMES = {
0x0: "Basic ID", 0x1: "Location/Vector", 0x2: "Auth",
0x3: "Self-ID", 0x4: "System", 0x5: "Operator ID", 0xF: "Message Pack",
}
ID_TYPE_NAMES = {
0: "None", 1: "Serial Number (ANSI/CTA-2063-A)",
2: "CAA Assigned Registration ID", 3: "UTM Assigned ID", 4: "Specific Session ID",
}
UA_TYPE_NAMES = {
0: "None/Undeclared", 1: "Aeroplane", 2: "Helicopter/Multirotor",
3: "Gyroplane", 4: "Hybrid Lift", 5: "Ornithopter", 6: "Glider",
7: "Kite", 8: "Free Balloon", 9: "Captive Balloon", 10: "Airship",
11: "Free Fall/Parachute", 12: "Rocket", 13: "Tethered Powered Aircraft",
14: "Ground Obstacle", 15: "Other",
}
CLASSIFICATION_NAMES = {0: "Undeclared", 1: "EU classification"}


def s8(v):
return v - 256 if v > 127 else v


def decode_basic_id(payload: bytes):
if len(payload) < 22:
return None
id_type = (payload[0] >> 4) & 0x0F
ua_type = payload[0] & 0x0F
raw_id = payload[1:21]
uas_id = raw_id.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {
"id_type": ID_TYPE_NAMES.get(id_type, f"sconosciuto({id_type})"),
"ua_type": UA_TYPE_NAMES.get(ua_type, f"sconosciuto({ua_type})"),
"uas_id": uas_id,
}


def decode_location(payload: bytes):
if len(payload) < 24:
return None
b1 = payload[0]
status = (b1 >> 4) & 0x0F
height_type = (b1 >> 2) & 0x01
ew_dir_segment = (b1 >> 1) & 0x01
speed_multiplier = b1 & 0x01

track_raw = payload[1]
track_dir = track_raw + 180 if ew_dir_segment == 1 else track_raw

speed_raw = payload[2]
speed = speed_raw * 0.25 if speed_multiplier == 0 else 255 * 0.25 + speed_raw * 0.75

vspeed = s8(payload[3]) * 0.5

lat_raw, lon_raw = struct.unpack_from("<ii", payload, 4)
lat, lon = lat_raw / 1e7, lon_raw / 1e7

press_alt_raw, geo_alt_raw, height_raw = struct.unpack_from("<HHH", payload, 12)
press_alt = press_alt_raw * 0.5 - 1000
geo_alt = geo_alt_raw * 0.5 - 1000
height = height_raw * 0.5 - 1000

timestamp_raw = struct.unpack_from("<H", payload, 20)[0]

status_names = {0: "Undeclared", 1: "Ground", 2: "Airborne",
3: "Emergency", 4: "Remote ID System Failure"}

return {
"status": status_names.get(status, f"sconosciuto({status})"),
"track_direction_deg": track_dir,
"speed_horizontal_mps": round(speed, 2),
"speed_vertical_mps": round(vspeed, 2),
"latitude": round(lat, 7),
"longitude": round(lon, 7),
"pressure_altitude_m": round(press_alt, 1),
"geodetic_altitude_m": round(geo_alt, 1),
"height_above_ref_m": round(height, 1),
"height_type": "sopra il take-off" if height_type == 0 else "sopra il livello del suolo",
"timestamp_sec_of_hour": round(timestamp_raw / 10.0, 1),
}


def decode_self_id(payload: bytes):
if len(payload) < 23:
return None
desc_type = payload[0]
desc = payload[1:24].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"description_type": desc_type, "description": desc}


def decode_system(payload: bytes):
if len(payload) < 23:
return None
flags = payload[0]
classification = CLASSIFICATION_NAMES.get((flags >> 2) & 0x03, "sconosciuta")
op_lat_raw, op_lon_raw = struct.unpack_from("<ii", payload, 1)
area_count = struct.unpack_from("<H", payload, 9)[0]
area_radius = payload[11] * 10
area_ceiling_raw, area_floor_raw = struct.unpack_from("<HH", payload, 12)
op_alt_raw = struct.unpack_from("<H", payload, 17)[0]
timestamp_raw = struct.unpack_from("<I", payload, 19)[0]
return {
"classification": classification,
"operator_latitude": round(op_lat_raw / 1e7, 7),
"operator_longitude": round(op_lon_raw / 1e7, 7),
"area_count": area_count,
"area_radius_m": area_radius,
"area_ceiling_m": round(area_ceiling_raw * 0.5 - 1000, 1),
"area_floor_m": round(area_floor_raw * 0.5 - 1000, 1),
"operator_altitude_geo_m": round(op_alt_raw * 0.5 - 1000, 1),
"timestamp_sec_since_2019": timestamp_raw,
}


def decode_operator_id(payload: bytes):
if len(payload) < 21:
return None
op_id_type = payload[0]
op_id = payload[1:21].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"operator_id_type": op_id_type, "operator_id": op_id}


DECODERS = {
0x0: decode_basic_id, 0x1: decode_location, 0x3: decode_self_id,
0x4: decode_system, 0x5: decode_operator_id,
}


def decode_single_message(msg: bytes):
if len(msg) < 2:
return None
header = msg[0]
msg_type = (header >> 4) & 0x0F
proto_version = header & 0x0F
payload = msg[1:25]

entry = {
"message_type": MSG_TYPE_NAMES.get(msg_type, f"sconosciuto(0x{msg_type:X})"),
"protocol_version": proto_version,
}
if msg_type in DECODERS:
try:
decoded = DECODERS[msg_type](payload)
except (struct.error, IndexError):
decoded = None
if decoded:
entry.update(decoded)
elif msg_type == 0x2:
entry["note"] = "messaggio Auth (firma/autenticazione) - non decodificato"
return entry


def decode_odid_body(body: bytes):
"""body = il payload subito dopo OUI + vendor_type (0x0D).

NOTA: i beacon DJI osservati anteposero un singolo byte extra
(un contatore di sequenza, non standard ASTM) prima del vero
ODID Message Pack. Il vero header del pack (0xF_ = Message Pack)
si trova quindi a body[1], non a body[0]. Se invece il primo byte
e' gia' un header di Message Pack valido (tipo 0xF), assumiamo
che non ci sia byte extra (compatibilita' con altri vendor/firmware).
"""
if len(body) < 2:
return None

if (body[0] >> 4) & 0x0F == 0xF:
pack = body
else:
pack = body[1:] # salta il byte contatore non-standard

if len(pack) < 1:
return None

header = pack[0]
msg_type = (header >> 4) & 0x0F

results = []
if msg_type == 0xF:
if len(pack) < 3:
return None
msg_size = pack[1]
msg_count = pack[2]
offset = 3
for _ in range(msg_count):
chunk = pack[offset:offset + msg_size]
if len(chunk) < msg_size:
break
decoded = decode_single_message(chunk)
if decoded:
results.append(decoded)
offset += msg_size
else:
decoded = decode_single_message(pack)
if decoded:
results.append(decoded)
return results


def handle_packet(pkt, json_fp, verbose):
if not pkt.haslayer(Dot11):
return
if not (pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp)):
return

elt = pkt.getlayer(Dot11Elt)
while elt is not None:
if elt.ID == 221 and elt.info[:3] == ODID_OUI and len(elt.info) > 3 and elt.info[3] == ODID_VENDOR_TYPE:
src = pkt.addr2
body = elt.info[4:]
messages = decode_odid_body(body)
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")

if messages:
print(f"\n[{ts}] Remote ID (WiFi Beacon) da {src}")
for m in messages:
mtype = m.get("message_type")
print(f" [{mtype}]")
for k, v in m.items():
if k == "message_type":
continue
print(f" {k}: {v}")
if json_fp:
record = {"timestamp": ts, "mac": src, "messages": messages}
json_fp.write(json.dumps(record, ensure_ascii=False) + "\n")
json_fp.flush()
else:
print(f"\n[{ts}] IE ODID rilevato da {src} ma payload non riconosciuto")

if verbose:
print(f" raw IE: {elt.info.hex()}")

elt = elt.payload.getlayer(Dot11Elt)


def channel_hopper(iface, stop_event, channels=range(1, 14)):
while not stop_event.is_set():
for ch in channels:
if stop_event.is_set():
break
subprocess.run(["iw", "dev", iface, "set", "channel", str(ch)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.3)


def main():
ap = argparse.ArgumentParser(description="Decoder Remote ID (ASTM F3411) via WiFi Beacon")
ap.add_argument("iface", help="interfaccia in monitor mode (es. wlan0mon)")
ap.add_argument("--hop", action="store_true", help="channel hopping 1-13 (2.4GHz)")
ap.add_argument("--json", dest="json_path", default=None, help="salva ogni rilevamento come riga JSON")
ap.add_argument("-v", "--verbose", action="store_true", help="stampa anche l'IE raw in hex")
args = ap.parse_args()

print(f"In ascolto su {args.iface}... (Ctrl+C per uscire)")
print(f"Cerco IE Vendor Specific OUI {ODID_OUI.hex(':')} type 0x{ODID_VENDOR_TYPE:02X} (ASTM F3411 Remote ID).\n")

json_fp = open(args.json_path, "a", encoding="utf-8") if args.json_path else None

stop_event = threading.Event()
hopper_thread = None
if args.hop:
hopper_thread = threading.Thread(target=channel_hopper, args=(args.iface, stop_event), daemon=True)
hopper_thread.start()

try:
sniff(iface=args.iface, prn=lambda pkt: handle_packet(pkt, json_fp, args.verbose), store=False)
except PermissionError:
print("Errore: serve eseguire lo script come root (sudo).", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
pass
finally:
stop_event.set()
if hopper_thread:
hopper_thread.join(timeout=1)
if json_fp:
json_fp.close()


if __name__ == "__main__":
main()

Per ricevere i pacchetti il drone deve essere in volo o quanto meno deve avere i motori armati .Un esempio della cattura.

 

[11:30:11] Remote ID (WiFi Beacon) da 8c:1e:d9:13:a2:8b
  [Basic ID]
    protocol_version: 1
    id_type: Serial Number (ANSI/CTA-2063-A)
    ua_type: Helicopter/Multirotor
    uas_id: 1581F7K3C255K00D7EEL
  [Location/Vector]
    protocol_version: 1
    status: Airborne
    track_direction_deg: 361
    speed_horizontal_mps: 0.0
    speed_vertical_mps: 0.0
    latitude: 43.7842685
    longitude: 11.2306187
    pressure_altitude_m: 13.0
    geodetic_altitude_m: 98.0
    height_above_ref_m: 3.0
    height_type: sopra il take-off
    timestamp_sec_of_hour: 1810.4
  [Self-ID]
    protocol_version: 1
    description_type: 0
    description: 
  [System]
    protocol_version: 1
    classification: EU classification
    operator_latitude: 43.7841697
    operator_longitude: 11.2305805
    area_count: 1
    area_radius_m: 0
    area_ceiling_m: -1000.0
    area_floor_m: -1000.0
    operator_altitude_geo_m: 112.0
    timestamp_sec_since_2019: 0
  [Operator ID]
    protocol_version: 1
    operator_id_type: 0
    operator_id: ITA5qjdid6y5x5ni
  raw IE: fa0bbc0d38f1190501123135383146374b33433235354b3030443745454c0000001122b50000fdf2181a0ba8b106ea079408d6073b44b846010031000000000000000000000000000000000000000000000000410521ef181a8da6b1060100000000000003b0080000000000510049544135716a64696436793578356e6900000000000000


Allarme prossimita' ADBS

Uno degli incubi mentre volo con il drone e' di imbattermi in Pegaso 1, l'elisoccorso della Toscana La quota di volo di Pegaso minim...