Per motivi di sicurezza in volo durante il lavoro volevo che il drone fosse visibile tramite la funzionalita' wifi beacon
Attivarlo non e' stato banale perche' si tratta di una funzionalita' abbastanza ben nascosta
Nella prima schermata di DJI Pilot 2 sotto la scritta max altitude c'e' una piccola scritta RID con un triangolo
cliccandolo si passa alla pagina di inserimento del proprio Id operatore. Attenzione si deve andare di DFlight nel proprio profilo
come si vede l'EASA ID ha tre asterischi che nascondono 3 caratteri privati. Nella applicazione DJI deve essere inserito tutto l'Id...per fare comparire la stringa completa si deve cliccare sull'occhio barrato a sinistra della stringa
per leggere i beacon si deve mettere la scheda wifi del portatile in monitor mode. Per vedere se la scheda e' compatibile si puo' procedere con il comando sottostante
in generale Network manager di Linux interferisce in questa operazione ed e' quindi meglio disattivarlo
Si installa quindi questo script per la decodifica dei pacchetti wifi (grazie Claude)
#!/usr/bin/env python3
"""
remote_id_wifi_decoder.py
---------------------------
Sniffer/decoder per il Remote ID standard ASTM F3411 (opendroneid)
trasmesso via Wi-Fi Beacon, il transport effettivamente usato da DJI
(Matrice 4, e in generale quasi tutta la linea DJI con Remote ID
compliance) invece del Bluetooth.
I beacon DJI includono un Information Element Vendor-Specific (ID 0xDD)
con OUI FA:0B:BC (ASD-STAN/ASTM), seguito da un byte "vendor type" 0x0D
e dal payload ODID (singolo messaggio o Message Pack), esattamente lo
stesso formato binario usato sul transport Bluetooth.
REQUISITI:
- Scheda Wi-Fi in modalita' monitor mode
- root
- pip install scapy --break-system-packages
USO:
sudo python3 remote_id_wifi_decoder.py wlan0mon
sudo python3 remote_id_wifi_decoder.py wlan0mon --hop -v
sudo python3 remote_id_wifi_decoder.py wlan0mon --json out.jsonl
Setup monitor mode:
sudo ip link set wlan0 down
sudo iw wlan0 set monitor none
sudo ip link set wlan0 up
"""
import argparse
import json
import struct
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from scapy.all import sniff, Dot11, Dot11Beacon, Dot11ProbeResp, Dot11Elt
ODID_OUI = b"\xfa\x0b\xbc"
ODID_VENDOR_TYPE = 0x0D
MSG_TYPE_NAMES = {
0x0: "Basic ID", 0x1: "Location/Vector", 0x2: "Auth",
0x3: "Self-ID", 0x4: "System", 0x5: "Operator ID", 0xF: "Message Pack",
}
ID_TYPE_NAMES = {
0: "None", 1: "Serial Number (ANSI/CTA-2063-A)",
2: "CAA Assigned Registration ID", 3: "UTM Assigned ID", 4: "Specific Session ID",
}
UA_TYPE_NAMES = {
0: "None/Undeclared", 1: "Aeroplane", 2: "Helicopter/Multirotor",
3: "Gyroplane", 4: "Hybrid Lift", 5: "Ornithopter", 6: "Glider",
7: "Kite", 8: "Free Balloon", 9: "Captive Balloon", 10: "Airship",
11: "Free Fall/Parachute", 12: "Rocket", 13: "Tethered Powered Aircraft",
14: "Ground Obstacle", 15: "Other",
}
CLASSIFICATION_NAMES = {0: "Undeclared", 1: "EU classification"}
def s8(v):
return v - 256 if v > 127 else v
def decode_basic_id(payload: bytes):
if len(payload) < 22:
return None
id_type = (payload[0] >> 4) & 0x0F
ua_type = payload[0] & 0x0F
raw_id = payload[1:21]
uas_id = raw_id.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {
"id_type": ID_TYPE_NAMES.get(id_type, f"sconosciuto({id_type})"),
"ua_type": UA_TYPE_NAMES.get(ua_type, f"sconosciuto({ua_type})"),
"uas_id": uas_id,
}
def decode_location(payload: bytes):
if len(payload) < 24:
return None
b1 = payload[0]
status = (b1 >> 4) & 0x0F
height_type = (b1 >> 2) & 0x01
ew_dir_segment = (b1 >> 1) & 0x01
speed_multiplier = b1 & 0x01
track_raw = payload[1]
track_dir = track_raw + 180 if ew_dir_segment == 1 else track_raw
speed_raw = payload[2]
speed = speed_raw * 0.25 if speed_multiplier == 0 else 255 * 0.25 + speed_raw * 0.75
vspeed = s8(payload[3]) * 0.5
lat_raw, lon_raw = struct.unpack_from("<ii", payload, 4)
lat, lon = lat_raw / 1e7, lon_raw / 1e7
press_alt_raw, geo_alt_raw, height_raw = struct.unpack_from("<HHH", payload, 12)
press_alt = press_alt_raw * 0.5 - 1000
geo_alt = geo_alt_raw * 0.5 - 1000
height = height_raw * 0.5 - 1000
timestamp_raw = struct.unpack_from("<H", payload, 20)[0]
status_names = {0: "Undeclared", 1: "Ground", 2: "Airborne",
3: "Emergency", 4: "Remote ID System Failure"}
return {
"status": status_names.get(status, f"sconosciuto({status})"),
"track_direction_deg": track_dir,
"speed_horizontal_mps": round(speed, 2),
"speed_vertical_mps": round(vspeed, 2),
"latitude": round(lat, 7),
"longitude": round(lon, 7),
"pressure_altitude_m": round(press_alt, 1),
"geodetic_altitude_m": round(geo_alt, 1),
"height_above_ref_m": round(height, 1),
"height_type": "sopra il take-off" if height_type == 0 else "sopra il livello del suolo",
"timestamp_sec_of_hour": round(timestamp_raw / 10.0, 1),
}
def decode_self_id(payload: bytes):
if len(payload) < 23:
return None
desc_type = payload[0]
desc = payload[1:24].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"description_type": desc_type, "description": desc}
def decode_system(payload: bytes):
if len(payload) < 23:
return None
flags = payload[0]
classification = CLASSIFICATION_NAMES.get((flags >> 2) & 0x03, "sconosciuta")
op_lat_raw, op_lon_raw = struct.unpack_from("<ii", payload, 1)
area_count = struct.unpack_from("<H", payload, 9)[0]
area_radius = payload[11] * 10
area_ceiling_raw, area_floor_raw = struct.unpack_from("<HH", payload, 12)
op_alt_raw = struct.unpack_from("<H", payload, 17)[0]
timestamp_raw = struct.unpack_from("<I", payload, 19)[0]
return {
"classification": classification,
"operator_latitude": round(op_lat_raw / 1e7, 7),
"operator_longitude": round(op_lon_raw / 1e7, 7),
"area_count": area_count,
"area_radius_m": area_radius,
"area_ceiling_m": round(area_ceiling_raw * 0.5 - 1000, 1),
"area_floor_m": round(area_floor_raw * 0.5 - 1000, 1),
"operator_altitude_geo_m": round(op_alt_raw * 0.5 - 1000, 1),
"timestamp_sec_since_2019": timestamp_raw,
}
def decode_operator_id(payload: bytes):
if len(payload) < 21:
return None
op_id_type = payload[0]
op_id = payload[1:21].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"operator_id_type": op_id_type, "operator_id": op_id}
DECODERS = {
0x0: decode_basic_id, 0x1: decode_location, 0x3: decode_self_id,
0x4: decode_system, 0x5: decode_operator_id,
}
def decode_single_message(msg: bytes):
if len(msg) < 2:
return None
header = msg[0]
msg_type = (header >> 4) & 0x0F
proto_version = header & 0x0F
payload = msg[1:25]
entry = {
"message_type": MSG_TYPE_NAMES.get(msg_type, f"sconosciuto(0x{msg_type:X})"),
"protocol_version": proto_version,
}
if msg_type in DECODERS:
try:
decoded = DECODERS[msg_type](payload)
except (struct.error, IndexError):
decoded = None
if decoded:
entry.update(decoded)
elif msg_type == 0x2:
entry["note"] = "messaggio Auth (firma/autenticazione) - non decodificato"
return entry
def decode_odid_body(body: bytes):
"""body = il payload subito dopo OUI + vendor_type (0x0D).
NOTA: i beacon DJI osservati anteposero un singolo byte extra
(un contatore di sequenza, non standard ASTM) prima del vero
ODID Message Pack. Il vero header del pack (0xF_ = Message Pack)
si trova quindi a body[1], non a body[0]. Se invece il primo byte
e' gia' un header di Message Pack valido (tipo 0xF), assumiamo
che non ci sia byte extra (compatibilita' con altri vendor/firmware).
"""
if len(body) < 2:
return None
if (body[0] >> 4) & 0x0F == 0xF:
pack = body
else:
pack = body[1:] # salta il byte contatore non-standard
if len(pack) < 1:
return None
header = pack[0]
msg_type = (header >> 4) & 0x0F
results = []
if msg_type == 0xF:
if len(pack) < 3:
return None
msg_size = pack[1]
msg_count = pack[2]
offset = 3
for _ in range(msg_count):
chunk = pack[offset:offset + msg_size]
if len(chunk) < msg_size:
break
decoded = decode_single_message(chunk)
if decoded:
results.append(decoded)
offset += msg_size
else:
decoded = decode_single_message(pack)
if decoded:
results.append(decoded)
return results
def handle_packet(pkt, json_fp, verbose):
if not pkt.haslayer(Dot11):
return
if not (pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp)):
return
elt = pkt.getlayer(Dot11Elt)
while elt is not None:
if elt.ID == 221 and elt.info[:3] == ODID_OUI and len(elt.info) > 3 and elt.info[3] == ODID_VENDOR_TYPE:
src = pkt.addr2
body = elt.info[4:]
messages = decode_odid_body(body)
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
if messages:
print(f"\n[{ts}] Remote ID (WiFi Beacon) da {src}")
for m in messages:
mtype = m.get("message_type")
print(f" [{mtype}]")
for k, v in m.items():
if k == "message_type":
continue
print(f" {k}: {v}")
if json_fp:
record = {"timestamp": ts, "mac": src, "messages": messages}
json_fp.write(json.dumps(record, ensure_ascii=False) + "\n")
json_fp.flush()
else:
print(f"\n[{ts}] IE ODID rilevato da {src} ma payload non riconosciuto")
if verbose:
print(f" raw IE: {elt.info.hex()}")
elt = elt.payload.getlayer(Dot11Elt)
def channel_hopper(iface, stop_event, channels=range(1, 14)):
while not stop_event.is_set():
for ch in channels:
if stop_event.is_set():
break
subprocess.run(["iw", "dev", iface, "set", "channel", str(ch)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.3)
def main():
ap = argparse.ArgumentParser(description="Decoder Remote ID (ASTM F3411) via WiFi Beacon")
ap.add_argument("iface", help="interfaccia in monitor mode (es. wlan0mon)")
ap.add_argument("--hop", action="store_true", help="channel hopping 1-13 (2.4GHz)")
ap.add_argument("--json", dest="json_path", default=None, help="salva ogni rilevamento come riga JSON")
ap.add_argument("-v", "--verbose", action="store_true", help="stampa anche l'IE raw in hex")
args = ap.parse_args()
print(f"In ascolto su {args.iface}... (Ctrl+C per uscire)")
print(f"Cerco IE Vendor Specific OUI {ODID_OUI.hex(':')} type 0x{ODID_VENDOR_TYPE:02X} (ASTM F3411 Remote ID).\n")
json_fp = open(args.json_path, "a", encoding="utf-8") if args.json_path else None
stop_event = threading.Event()
hopper_thread = None
if args.hop:
hopper_thread = threading.Thread(target=channel_hopper, args=(args.iface, stop_event), daemon=True)
hopper_thread.start()
try:
sniff(iface=args.iface, prn=lambda pkt: handle_packet(pkt, json_fp, args.verbose), store=False)
except PermissionError:
print("Errore: serve eseguire lo script come root (sudo).", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
pass
finally:
stop_event.set()
if hopper_thread:
hopper_thread.join(timeout=1)
if json_fp:
json_fp.close()
if __name__ == "__main__":
main()
Per ricevere i pacchetti il drone deve essere in volo o quanto meno deve avere i motori armati .Un esempio della cattura.
Nessun commento:
Posta un commento