martedì 30 giugno 2026

DJI ID

Per motivi di sicurezza in volo durante il lavoro volevo che il drone fosse visibile tramite la funzionalita' wifi beacon

Attivarlo non e' stato banale perche' si tratta di una funzionalita' abbastanza ben nascosta

Nella prima schermata di DJI Pilot 2 sotto la scritta max altitude c'e' una piccola scritta RID con un triangolo

cliccandolo si passa alla pagina di inserimento del proprio Id operatore. Attenzione si deve andare di DFlight nel proprio profilo 


 come si vede l'EASA ID ha tre asterischi che nascondono 3 caratteri privati. Nella applicazione DJI deve essere inserito tutto l'Id...per fare comparire la stringa completa si deve cliccare sull'occhio barrato a sinistra della stringa

L'operazione deve essere fatta online perche' DJI Pilot 2 valida la stringa 

a questo punto nella mappa si avra' la scritta RID in nero
 
per leggere i beacon si deve mettere la scheda wifi del portatile in monitor mode. Per vedere se la scheda e' compatibile si puo' procedere con il comando sottostante

 root@HW27747:/home/luca# iw list | grep -A 10 "Supported interface modes"
    Supported interface modes:
         * IBSS
         * managed
         * AP
         * AP/VLAN
         * monitor
         * P2P-client
         * P2P-GO
         * P2P-device
    Band 1:
        Capabilities: 0x11ee

a questo punto si imposta in monitoraggio

sudo ip link set wlp58s0 down
sudo iw dev wlp58s0 set type monitor 
sudo ip link set wlp58s0 up

(base) luca@HW27747:~/scapy$ sudo iw dev
phy#0
    Unnamed/non-netdev interface
        wdev 0x2
        addr 5c:51:4f:8a:24:d3
        type P2P-device
    Interface wlp58s0
        ifindex 2
        wdev 0x1
        addr 5c:51:4f:8a:24:d3
        type monitor
        multicast TXQ:
            qsz-byt    qsz-pkt    flows    drops    marks    overlmt    hashcol    tx-bytes    tx-packets

 

in generale Network manager di Linux interferisce in questa operazione ed e' quindi meglio disattivarlo 

sudo nmcli device set wlp58s0 managed no

per invertire il tutto a fine lavoro 

sudo ip link set wlp58s0 down
sudo iw dev wlp58s0 set type managed
sudo ip link set wlp58s0 up 

sudo nmcli device set wlp58s0 managed yes 

Si installa quindi questo script per la decodifica dei pacchetti wifi  (grazie Claude)

pip install scapy

 

sudo $(which python3) remote_id_wifi_decoder.py wlp58s0 --hop -v --json log.jsonl


---------------------------

#!/usr/bin/env python3
"""
remote_id_wifi_decoder.py
---------------------------
Sniffer/decoder per il Remote ID standard ASTM F3411 (opendroneid)
trasmesso via Wi-Fi Beacon, il transport effettivamente usato da DJI
(Matrice 4, e in generale quasi tutta la linea DJI con Remote ID
compliance) invece del Bluetooth.

I beacon DJI includono un Information Element Vendor-Specific (ID 0xDD)
con OUI FA:0B:BC (ASD-STAN/ASTM), seguito da un byte "vendor type" 0x0D
e dal payload ODID (singolo messaggio o Message Pack), esattamente lo
stesso formato binario usato sul transport Bluetooth.

REQUISITI:
- Scheda Wi-Fi in modalita' monitor mode
- root
- pip install scapy --break-system-packages

USO:
sudo python3 remote_id_wifi_decoder.py wlan0mon
sudo python3 remote_id_wifi_decoder.py wlan0mon --hop -v
sudo python3 remote_id_wifi_decoder.py wlan0mon --json out.jsonl

Setup monitor mode:
sudo ip link set wlan0 down
sudo iw wlan0 set monitor none
sudo ip link set wlan0 up
"""

import argparse
import json
import struct
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone

from scapy.all import sniff, Dot11, Dot11Beacon, Dot11ProbeResp, Dot11Elt

ODID_OUI = b"\xfa\x0b\xbc"
ODID_VENDOR_TYPE = 0x0D

MSG_TYPE_NAMES = {
0x0: "Basic ID", 0x1: "Location/Vector", 0x2: "Auth",
0x3: "Self-ID", 0x4: "System", 0x5: "Operator ID", 0xF: "Message Pack",
}
ID_TYPE_NAMES = {
0: "None", 1: "Serial Number (ANSI/CTA-2063-A)",
2: "CAA Assigned Registration ID", 3: "UTM Assigned ID", 4: "Specific Session ID",
}
UA_TYPE_NAMES = {
0: "None/Undeclared", 1: "Aeroplane", 2: "Helicopter/Multirotor",
3: "Gyroplane", 4: "Hybrid Lift", 5: "Ornithopter", 6: "Glider",
7: "Kite", 8: "Free Balloon", 9: "Captive Balloon", 10: "Airship",
11: "Free Fall/Parachute", 12: "Rocket", 13: "Tethered Powered Aircraft",
14: "Ground Obstacle", 15: "Other",
}
CLASSIFICATION_NAMES = {0: "Undeclared", 1: "EU classification"}


def s8(v):
return v - 256 if v > 127 else v


def decode_basic_id(payload: bytes):
if len(payload) < 22:
return None
id_type = (payload[0] >> 4) & 0x0F
ua_type = payload[0] & 0x0F
raw_id = payload[1:21]
uas_id = raw_id.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {
"id_type": ID_TYPE_NAMES.get(id_type, f"sconosciuto({id_type})"),
"ua_type": UA_TYPE_NAMES.get(ua_type, f"sconosciuto({ua_type})"),
"uas_id": uas_id,
}


def decode_location(payload: bytes):
if len(payload) < 24:
return None
b1 = payload[0]
status = (b1 >> 4) & 0x0F
height_type = (b1 >> 2) & 0x01
ew_dir_segment = (b1 >> 1) & 0x01
speed_multiplier = b1 & 0x01

track_raw = payload[1]
track_dir = track_raw + 180 if ew_dir_segment == 1 else track_raw

speed_raw = payload[2]
speed = speed_raw * 0.25 if speed_multiplier == 0 else 255 * 0.25 + speed_raw * 0.75

vspeed = s8(payload[3]) * 0.5

lat_raw, lon_raw = struct.unpack_from("<ii", payload, 4)
lat, lon = lat_raw / 1e7, lon_raw / 1e7

press_alt_raw, geo_alt_raw, height_raw = struct.unpack_from("<HHH", payload, 12)
press_alt = press_alt_raw * 0.5 - 1000
geo_alt = geo_alt_raw * 0.5 - 1000
height = height_raw * 0.5 - 1000

timestamp_raw = struct.unpack_from("<H", payload, 20)[0]

status_names = {0: "Undeclared", 1: "Ground", 2: "Airborne",
3: "Emergency", 4: "Remote ID System Failure"}

return {
"status": status_names.get(status, f"sconosciuto({status})"),
"track_direction_deg": track_dir,
"speed_horizontal_mps": round(speed, 2),
"speed_vertical_mps": round(vspeed, 2),
"latitude": round(lat, 7),
"longitude": round(lon, 7),
"pressure_altitude_m": round(press_alt, 1),
"geodetic_altitude_m": round(geo_alt, 1),
"height_above_ref_m": round(height, 1),
"height_type": "sopra il take-off" if height_type == 0 else "sopra il livello del suolo",
"timestamp_sec_of_hour": round(timestamp_raw / 10.0, 1),
}


def decode_self_id(payload: bytes):
if len(payload) < 23:
return None
desc_type = payload[0]
desc = payload[1:24].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"description_type": desc_type, "description": desc}


def decode_system(payload: bytes):
if len(payload) < 23:
return None
flags = payload[0]
classification = CLASSIFICATION_NAMES.get((flags >> 2) & 0x03, "sconosciuta")
op_lat_raw, op_lon_raw = struct.unpack_from("<ii", payload, 1)
area_count = struct.unpack_from("<H", payload, 9)[0]
area_radius = payload[11] * 10
area_ceiling_raw, area_floor_raw = struct.unpack_from("<HH", payload, 12)
op_alt_raw = struct.unpack_from("<H", payload, 17)[0]
timestamp_raw = struct.unpack_from("<I", payload, 19)[0]
return {
"classification": classification,
"operator_latitude": round(op_lat_raw / 1e7, 7),
"operator_longitude": round(op_lon_raw / 1e7, 7),
"area_count": area_count,
"area_radius_m": area_radius,
"area_ceiling_m": round(area_ceiling_raw * 0.5 - 1000, 1),
"area_floor_m": round(area_floor_raw * 0.5 - 1000, 1),
"operator_altitude_geo_m": round(op_alt_raw * 0.5 - 1000, 1),
"timestamp_sec_since_2019": timestamp_raw,
}


def decode_operator_id(payload: bytes):
if len(payload) < 21:
return None
op_id_type = payload[0]
op_id = payload[1:21].split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
return {"operator_id_type": op_id_type, "operator_id": op_id}


DECODERS = {
0x0: decode_basic_id, 0x1: decode_location, 0x3: decode_self_id,
0x4: decode_system, 0x5: decode_operator_id,
}


def decode_single_message(msg: bytes):
if len(msg) < 2:
return None
header = msg[0]
msg_type = (header >> 4) & 0x0F
proto_version = header & 0x0F
payload = msg[1:25]

entry = {
"message_type": MSG_TYPE_NAMES.get(msg_type, f"sconosciuto(0x{msg_type:X})"),
"protocol_version": proto_version,
}
if msg_type in DECODERS:
try:
decoded = DECODERS[msg_type](payload)
except (struct.error, IndexError):
decoded = None
if decoded:
entry.update(decoded)
elif msg_type == 0x2:
entry["note"] = "messaggio Auth (firma/autenticazione) - non decodificato"
return entry


def decode_odid_body(body: bytes):
"""body = il payload subito dopo OUI + vendor_type (0x0D).

NOTA: i beacon DJI osservati anteposero un singolo byte extra
(un contatore di sequenza, non standard ASTM) prima del vero
ODID Message Pack. Il vero header del pack (0xF_ = Message Pack)
si trova quindi a body[1], non a body[0]. Se invece il primo byte
e' gia' un header di Message Pack valido (tipo 0xF), assumiamo
che non ci sia byte extra (compatibilita' con altri vendor/firmware).
"""
if len(body) < 2:
return None

if (body[0] >> 4) & 0x0F == 0xF:
pack = body
else:
pack = body[1:] # salta il byte contatore non-standard

if len(pack) < 1:
return None

header = pack[0]
msg_type = (header >> 4) & 0x0F

results = []
if msg_type == 0xF:
if len(pack) < 3:
return None
msg_size = pack[1]
msg_count = pack[2]
offset = 3
for _ in range(msg_count):
chunk = pack[offset:offset + msg_size]
if len(chunk) < msg_size:
break
decoded = decode_single_message(chunk)
if decoded:
results.append(decoded)
offset += msg_size
else:
decoded = decode_single_message(pack)
if decoded:
results.append(decoded)
return results


def handle_packet(pkt, json_fp, verbose):
if not pkt.haslayer(Dot11):
return
if not (pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp)):
return

elt = pkt.getlayer(Dot11Elt)
while elt is not None:
if elt.ID == 221 and elt.info[:3] == ODID_OUI and len(elt.info) > 3 and elt.info[3] == ODID_VENDOR_TYPE:
src = pkt.addr2
body = elt.info[4:]
messages = decode_odid_body(body)
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")

if messages:
print(f"\n[{ts}] Remote ID (WiFi Beacon) da {src}")
for m in messages:
mtype = m.get("message_type")
print(f" [{mtype}]")
for k, v in m.items():
if k == "message_type":
continue
print(f" {k}: {v}")
if json_fp:
record = {"timestamp": ts, "mac": src, "messages": messages}
json_fp.write(json.dumps(record, ensure_ascii=False) + "\n")
json_fp.flush()
else:
print(f"\n[{ts}] IE ODID rilevato da {src} ma payload non riconosciuto")

if verbose:
print(f" raw IE: {elt.info.hex()}")

elt = elt.payload.getlayer(Dot11Elt)


def channel_hopper(iface, stop_event, channels=range(1, 14)):
while not stop_event.is_set():
for ch in channels:
if stop_event.is_set():
break
subprocess.run(["iw", "dev", iface, "set", "channel", str(ch)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.3)


def main():
ap = argparse.ArgumentParser(description="Decoder Remote ID (ASTM F3411) via WiFi Beacon")
ap.add_argument("iface", help="interfaccia in monitor mode (es. wlan0mon)")
ap.add_argument("--hop", action="store_true", help="channel hopping 1-13 (2.4GHz)")
ap.add_argument("--json", dest="json_path", default=None, help="salva ogni rilevamento come riga JSON")
ap.add_argument("-v", "--verbose", action="store_true", help="stampa anche l'IE raw in hex")
args = ap.parse_args()

print(f"In ascolto su {args.iface}... (Ctrl+C per uscire)")
print(f"Cerco IE Vendor Specific OUI {ODID_OUI.hex(':')} type 0x{ODID_VENDOR_TYPE:02X} (ASTM F3411 Remote ID).\n")

json_fp = open(args.json_path, "a", encoding="utf-8") if args.json_path else None

stop_event = threading.Event()
hopper_thread = None
if args.hop:
hopper_thread = threading.Thread(target=channel_hopper, args=(args.iface, stop_event), daemon=True)
hopper_thread.start()

try:
sniff(iface=args.iface, prn=lambda pkt: handle_packet(pkt, json_fp, args.verbose), store=False)
except PermissionError:
print("Errore: serve eseguire lo script come root (sudo).", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
pass
finally:
stop_event.set()
if hopper_thread:
hopper_thread.join(timeout=1)
if json_fp:
json_fp.close()


if __name__ == "__main__":
main()

Per ricevere i pacchetti il drone deve essere in volo o quanto meno deve avere i motori armati .Un esempio della cattura.

 

[11:30:11] Remote ID (WiFi Beacon) da 8c:1e:d9:13:a2:8b
  [Basic ID]
    protocol_version: 1
    id_type: Serial Number (ANSI/CTA-2063-A)
    ua_type: Helicopter/Multirotor
    uas_id: 1581F7K3C255K00D7EEL
  [Location/Vector]
    protocol_version: 1
    status: Airborne
    track_direction_deg: 361
    speed_horizontal_mps: 0.0
    speed_vertical_mps: 0.0
    latitude: 43.7842685
    longitude: 11.2306187
    pressure_altitude_m: 13.0
    geodetic_altitude_m: 98.0
    height_above_ref_m: 3.0
    height_type: sopra il take-off
    timestamp_sec_of_hour: 1810.4
  [Self-ID]
    protocol_version: 1
    description_type: 0
    description: 
  [System]
    protocol_version: 1
    classification: EU classification
    operator_latitude: 43.7841697
    operator_longitude: 11.2305805
    area_count: 1
    area_radius_m: 0
    area_ceiling_m: -1000.0
    area_floor_m: -1000.0
    operator_altitude_geo_m: 112.0
    timestamp_sec_since_2019: 0
  [Operator ID]
    protocol_version: 1
    operator_id_type: 0
    operator_id: ITA5qjdid6y5x5ni
  raw IE: fa0bbc0d38f1190501123135383146374b33433235354b3030443745454c0000001122b50000fdf2181a0ba8b106ea079408d6073b44b846010031000000000000000000000000000000000000000000000000410521ef181a8da6b1060100000000000003b0080000000000510049544135716a64696436793578356e6900000000000000


Nessun commento:

Posta un commento

DJI ID

Per motivi di sicurezza in volo durante il lavoro volevo che il drone fosse visibile tramite la funzionalita' wifi beacon Attivarlo non ...