Uno degli incubi mentre volo con il drone e' di imbattermi in Pegaso 1, l'elisoccorso della Toscana
La quota di volo di Pegaso minima (tranne quando decolla ed atterra) sembra essere di 1000 piedi (circa 300 m) quindi ben al di sopra dei 120 m di quota massima dei droni
l'idea e' comunque di avere una ground station con allarme di prossimita' usando un dongle RTL-SDR impostando la propria posizione ed un raggio di 2 miglia nautiche
usando il programma Python (grazie Claude) si leggono i dati in uscita dump1009-fa e si selezionano solo gli aeromobili in 3 miglia nautiche di distanza e separazione verticale inferiore a 1500 piedi
#!/usr/bin/env python3
"""
adsb_proximity_alarm.py
Proximity alarm for ADS-B traffic received via an RTL-SDR + dump1090
(dump1090-fa, dump1090-mutability, readsb, or the dump1090 fork bundled
with Stratux).
It watches live aircraft positions and raises an alert when any aircraft
comes within a configurable horizontal distance AND vertical separation
of a reference point (e.g. your ground station / launch site).
Two input modes are supported, pick whichever your setup exposes:
1. SBS / BaseStation feed (raw TCP text feed, classic dump1090 port 30003)
Most dump1090 builds expose this. Stratux's internal dump1090 usually
does too, though it may not be reachable outside localhost unless you
tunnel it (e.g. `ssh -L 30003:localhost:30003 pi@stratux.local`).
2. aircraft.json polling (HTTP), as served by dump1090-fa/readsb/tar1090,
typically at http://<host>/dump1090-fa/data/aircraft.json or similar.
Check your install; the path varies by distro/build.
Usage examples
--------------
SBS mode, ground station at 43.7696N 11.2558E (Florence area), alarm inside
3 NM / 1500 ft:
python3 adsb_proximity_alarm.py sbs \\
--host 127.0.0.1 --port 30003 \\
--lat 43.7696 --lon 11.2558 --elev-ft 150 \\
--radius-nm 3 --vsep-ft 1500
JSON polling mode:
python3 adsb_proximity_alarm.py json \\
--url http://127.0.0.1/dump1090-fa/data/aircraft.json \\
--lat 43.7696 --lon 11.2558 --radius-nm 3 --vsep-ft 1500
No extra dependencies beyond the standard library and `requests`
(only needed for JSON mode: pip install requests).
"""
import argparse
import math
import socket
import sys
import time
import urllib.request
import json as jsonlib
from dataclasses import dataclass, field
from datetime import datetime, timezone
# --------------------------------------------------------------------------
# Geometry helpers
# --------------------------------------------------------------------------
EARTH_RADIUS_NM = 3440.065 # mean earth radius in nautical miles
def haversine_nm(lat1, lon1, lat2, lon2):
"""Great-circle distance in nautical miles."""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = (math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2)
return 2 * EARTH_RADIUS_NM * math.asin(math.sqrt(a))
def bearing_deg(lat1, lon1, lat2, lon2):
"""Initial bearing from point 1 to point 2, in degrees (0=N, 90=E)."""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dlambda = math.radians(lon2 - lon1)
y = math.sin(dlambda) * math.cos(phi2)
x = (math.cos(phi1) * math.sin(phi2)
- math.sin(phi1) * math.cos(phi2) * math.cos(dlambda))
return (math.degrees(math.atan2(y, x)) + 360) % 360
def compass(deg):
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
return dirs[int((deg + 22.5) // 45) % 8]
# --------------------------------------------------------------------------
# Aircraft state
# --------------------------------------------------------------------------
@dataclass
class Aircraft:
icao: str
callsign: str = ""
lat: float = None
lon: float = None
alt_ft: float = None
last_seen: float = field(default_factory=time.monotonic)
last_alert: float = 0.0
def has_position(self):
return self.lat is not None and self.lon is not None
class AircraftTable:
"""Keeps track of recently seen aircraft and prunes stale ones."""
def __init__(self, stale_after_s=60):
self.aircraft = {}
self.stale_after_s = stale_after_s
def update(self, icao, callsign=None, lat=None, lon=None, alt_ft=None):
ac = self.aircraft.get(icao)
if ac is None:
ac = Aircraft(icao=icao)
self.aircraft[icao] = ac
if callsign:
ac.callsign = callsign.strip()
if lat is not None:
ac.lat = lat
if lon is not None:
ac.lon = lon
if alt_ft is not None:
ac.alt_ft = alt_ft
ac.last_seen = time.monotonic()
return ac
def prune(self):
now = time.monotonic()
dead = [icao for icao, ac in self.aircraft.items()
if now - ac.last_seen > self.stale_after_s]
for icao in dead:
del self.aircraft[icao]
# --------------------------------------------------------------------------
# Alerting
# --------------------------------------------------------------------------
RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
RESET = "\033[0m"
def alert(ac: Aircraft, ref_lat, ref_lon, dist_nm, vsep_ft, bell=True):
brg = bearing_deg(ref_lat, ref_lon, ac.lat, ac.lon)
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
tag = ac.callsign if ac.callsign else ac.icao
msg = (f"{RED}[ALERT {ts}Z]{RESET} {tag} (ICAO {ac.icao}) "
f"{dist_nm:.1f} NM {compass(brg)} ({brg:5.1f}°), "
f"alt {ac.alt_ft:.0f} ft, vertical sep {vsep_ft:.0f} ft")
print(msg)
if bell:
sys.stdout.write("\a")
sys.stdout.flush()
def status_line(ac: Aircraft, dist_nm, vsep_ft):
tag = ac.callsign if ac.callsign else ac.icao
print(f"{YELLOW} near{RESET} {tag:8s} {dist_nm:5.1f} NM "
f"alt {ac.alt_ft:6.0f} ft vsep {vsep_ft:6.0f} ft")
# --------------------------------------------------------------------------
# Core proximity check
# --------------------------------------------------------------------------
def check_proximity(ac: Aircraft, args):
if not ac.has_position() or ac.alt_ft is None:
return
dist_nm = haversine_nm(args.lat, args.lon, ac.lat, ac.lon)
vsep_ft = abs(ac.alt_ft - args.elev_ft)
if dist_nm <= args.radius_nm and vsep_ft <= args.vsep_ft:
now = time.monotonic()
if now - ac.last_alert >= args.cooldown:
alert(ac, args.lat, args.lon, dist_nm, vsep_ft, bell=not args.no_bell)
ac.last_alert = now
elif args.verbose and dist_nm <= args.radius_nm * 2.0:
# Show near-miss traffic even if outside the hard alert ring,
# useful for situational awareness.
status_line(ac, dist_nm, vsep_ft)
# --------------------------------------------------------------------------
# SBS (BaseStation) feed parser
# --------------------------------------------------------------------------
#
# Format reference (dump1090 SBS output), comma separated fields:
# MSG,type,session,aircraft,hex,flight,date,time,date,time,
# callsign,altitude,gspeed,track,lat,lon,vrate,squawk,alert,emergency,spi,onground
#
# Relevant message types:
# MSG,1 -> callsign (field 10)
# MSG,3 -> airborne position: altitude(11), lat(14), lon(15)
# MSG,4 -> velocity
# MSG,5/6/7/8 -> various, mostly ignored here
def run_sbs(args):
table = AircraftTable()
print(f"Connecting to SBS feed at {args.host}:{args.port} ...")
while True:
try:
with socket.create_connection((args.host, args.port), timeout=10) as sock:
print(f"{GREEN}Connected.{RESET} Watching for traffic within "
f"{args.radius_nm} NM / {args.vsep_ft} ft of "
f"{args.lat:.4f},{args.lon:.4f} (elev {args.elev_ft} ft)...")
buf = b""
sock.settimeout(5)
last_prune = time.monotonic()
while True:
try:
chunk = sock.recv(4096)
except socket.timeout:
chunk = b""
if chunk == b"" and time.monotonic() - last_prune > 30:
# no data for a while but connection alive; keep looping
pass
buf += chunk
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.decode(errors="ignore").strip()
if line:
handle_sbs_line(line, table, args)
if time.monotonic() - last_prune > 15:
table.prune()
last_prune = time.monotonic()
except (ConnectionRefusedError, OSError) as e:
print(f"Connection error ({e}); retrying in 5s...")
time.sleep(5)
def handle_sbs_line(line, table, args):
fields = line.split(",")
if len(fields) < 2 or fields[0] != "MSG":
return
msg_type = fields[1]
if len(fields) < 5:
return
icao = fields[4].strip()
if not icao:
return
callsign = None
lat = lon = alt_ft = None
try:
if msg_type == "1" and len(fields) > 10:
callsign = fields[10]
elif msg_type == "3" and len(fields) > 15:
alt_ft = float(fields[11]) if fields[11] else None
lat = float(fields[14]) if fields[14] else None
lon = float(fields[15]) if fields[15] else None
elif msg_type in ("5", "6", "7", "8") and len(fields) > 11 and fields[11]:
alt_ft = float(fields[11])
except ValueError:
return
ac = table.update(icao, callsign=callsign, lat=lat, lon=lon, alt_ft=alt_ft)
check_proximity(ac, args)
# --------------------------------------------------------------------------
# aircraft.json polling
# --------------------------------------------------------------------------
def run_json(args):
table = AircraftTable()
print(f"Polling {args.url} every {args.poll_interval}s ...")
print(f"Watching for traffic within {args.radius_nm} NM / {args.vsep_ft} ft of "
f"{args.lat:.4f},{args.lon:.4f} (elev {args.elev_ft} ft)...")
last_prune = time.monotonic()
while True:
try:
with urllib.request.urlopen(args.url, timeout=5) as resp:
data = jsonlib.loads(resp.read())
except Exception as e:
print(f"Fetch error ({e}); retrying in {args.poll_interval}s...")
time.sleep(args.poll_interval)
continue
aircraft_list = data.get("aircraft", data if isinstance(data, list) else [])
for entry in aircraft_list:
icao = (entry.get("hex") or entry.get("icao") or "").strip()
if not icao:
continue
callsign = entry.get("flight") or entry.get("callsign")
lat = entry.get("lat")
lon = entry.get("lon")
alt_ft = entry.get("alt_baro", entry.get("altitude"))
try:
alt_ft = float(alt_ft) if alt_ft not in (None, "ground") else None
except (TypeError, ValueError):
alt_ft = None
ac = table.update(icao, callsign=callsign, lat=lat, lon=lon, alt_ft=alt_ft)
check_proximity(ac, args)
if time.monotonic() - last_prune > 15:
table.prune()
last_prune = time.monotonic()
time.sleep(args.poll_interval)
# --------------------------------------------------------------------------
# CLI
# --------------------------------------------------------------------------
def build_parser():
# Shared options live on a "parent" parser so they can be passed either
# before or after the sbs/json subcommand, e.g. both of these work:
# adsb_proximity_alarm.py --lat 43.7 --lon 11.2 sbs --port 30003
# adsb_proximity_alarm.py sbs --port 30003 --lat 43.7 --lon 11.2
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--lat", type=float, required=True,
help="Reference latitude (your ground station / launch site)")
common.add_argument("--lon", type=float, required=True,
help="Reference longitude")
common.add_argument("--elev-ft", type=float, default=0.0,
help="Reference point elevation, feet MSL (default 0)")
common.add_argument("--radius-nm", type=float, default=3.0,
help="Horizontal alert radius in nautical miles (default 3)")
common.add_argument("--vsep-ft", type=float, default=1500.0,
help="Vertical separation threshold in feet (default 1500)")
common.add_argument("--cooldown", type=float, default=30.0,
help="Seconds before re-alerting on the same aircraft (default 30)")
common.add_argument("--no-bell", action="store_true",
help="Disable terminal bell on alert")
common.add_argument("-v", "--verbose", action="store_true",
help="Also print traffic within 2x the alert radius")
p = argparse.ArgumentParser(
description="ADS-B proximity alarm using RTL-SDR / dump1090 feeds")
sub = p.add_subparsers(dest="mode", required=True)
sbs = sub.add_parser("sbs", parents=[common],
help="Connect to raw SBS/BaseStation TCP feed")
sbs.add_argument("--host", default="127.0.0.1")
sbs.add_argument("--port", type=int, default=30003)
js = sub.add_parser("json", parents=[common],
help="Poll an aircraft.json HTTP endpoint")
js.add_argument("--url", required=True,
help="Full URL to aircraft.json")
js.add_argument("--poll-interval", type=float, default=1.0,
help="Seconds between polls (default 1.0)")
return p
def main():
args = build_parser().parse_args()
try:
if args.mode == "sbs":
run_sbs(args)
else:
run_json(args)
except KeyboardInterrupt:
print("\nStopped.")
if __name__ == "__main__":
main()
Nessun commento:
Posta un commento