Files
ADS-Bit/server.py
root 707df72def Rename to ADS-Bit and show receiver IP
- Rename app from "SKY WATCH" to "ADS-Bit" (8-bit wordplay)
- Update page title to "ADS-Bit - Retro Flight Tracker"
- Add receiver IPs to /api/config endpoint
- Display connected receiver IP(s) in status bar
- Update screenshot

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 13:05:57 -08:00

390 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""Pixel ADS-B Server - Connects directly to ADS-B receivers"""
import asyncio
import socket
import json
import time
import ipaddress
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Dict, Set, List
from aiohttp import web
import netifaces
WEB_DIR = Path(__file__).parent
CONFIG_FILE = WEB_DIR / "config.json"
# Default configuration
config = {
"receivers": "AUTO",
"receiver_port": 30003,
"location": {
"name": "My Location",
"lat": 0.0,
"lon": 0.0
},
"web_port": 2001,
"theme": "desert"
}
def load_config():
"""Load configuration from config.json"""
global config
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, 'r') as f:
loaded = json.load(f)
config.update(loaded)
print(f"Loaded config from {CONFIG_FILE}")
except Exception as e:
print(f"Error loading config: {e}, using defaults")
else:
print(f"No config file found at {CONFIG_FILE}, using defaults")
print(f" Receivers: {config['receivers']}")
print(f" Receiver port: {config['receiver_port']}")
print(f" Location: {config['location']['name']} ({config['location']['lat']}, {config['location']['lon']})")
print(f" Web port: {config['web_port']}")
print(f" Theme: {config.get('theme', 'desert')}")
# Flight data storage
flights: Dict[str, dict] = {}
connected_clients: Set = set()
receivers: List[str] = []
@dataclass
class SBSMessage:
msg_type: str
icao: str
callsign: str = ""
altitude: int = 0
speed: float = 0
track: float = 0
lat: float = 0
lon: float = 0
vrate: int = 0
squawk: str = ""
ground: bool = False
def parse_sbs_message(line: str):
"""Parse SBS/BaseStation format message"""
parts = line.strip().split(',')
if len(parts) < 10:
return None
msg_type = parts[0]
if msg_type not in ['MSG']:
return None
transmission_type = parts[1]
icao = parts[4].strip()
if not icao:
return None
msg = SBSMessage(msg_type=msg_type, icao=icao)
# Callsign (field 10)
if len(parts) > 10 and parts[10].strip():
msg.callsign = parts[10].strip()
# Altitude (field 11)
if len(parts) > 11 and parts[11].strip():
try:
msg.altitude = int(parts[11])
except ValueError:
pass
# Ground speed (field 12)
if len(parts) > 12 and parts[12].strip():
try:
msg.speed = float(parts[12])
except ValueError:
pass
# Track (field 13)
if len(parts) > 13 and parts[13].strip():
try:
msg.track = float(parts[13])
except ValueError:
pass
# Latitude (field 14)
if len(parts) > 14 and parts[14].strip():
try:
msg.lat = float(parts[14])
except ValueError:
pass
# Longitude (field 15)
if len(parts) > 15 and parts[15].strip():
try:
msg.lon = float(parts[15])
except ValueError:
pass
# Vertical rate (field 16)
if len(parts) > 16 and parts[16].strip():
try:
msg.vrate = int(parts[16])
except ValueError:
pass
# Squawk (field 17)
if len(parts) > 17 and parts[17].strip():
msg.squawk = parts[17].strip()
# Ground flag (field 21)
if len(parts) > 21 and parts[21].strip() == '-1':
msg.ground = True
return msg
async def connect_to_receiver(host: str, port: int = 30003):
"""Connect to an SBS receiver and process messages"""
print(f"Connecting to receiver at {host}:{port}...")
while True:
try:
reader, writer = await asyncio.open_connection(host, port)
print(f"Connected to {host}:{port}")
while True:
line = await reader.readline()
if not line:
break
line = line.decode('utf-8', errors='ignore')
msg = parse_sbs_message(line)
if msg and msg.icao:
# Update flight data
if msg.icao not in flights:
flights[msg.icao] = {
'icao': msg.icao,
'callsign': '',
'altitude': 0,
'speed': 0,
'track': 0,
'lat': 0,
'lon': 0,
'vrate': 0,
'squawk': '',
'ground': False,
'last_seen': time.time()
}
flight = flights[msg.icao]
if msg.callsign:
flight['callsign'] = msg.callsign
if msg.altitude:
flight['altitude'] = msg.altitude
if msg.speed:
flight['speed'] = msg.speed
if msg.track:
flight['track'] = msg.track
if msg.lat:
flight['lat'] = msg.lat
if msg.lon:
flight['lon'] = msg.lon
if msg.vrate:
flight['vrate'] = msg.vrate
if msg.squawk:
flight['squawk'] = msg.squawk
flight['ground'] = msg.ground
flight['last_seen'] = time.time()
writer.close()
await writer.wait_closed()
except Exception as e:
print(f"Receiver {host}:{port} error: {e}")
await asyncio.sleep(5) # Wait before reconnecting
async def scan_for_receivers():
"""Scan local network for ADS-B receivers on port 30003"""
print("Scanning for ADS-B receivers on port 30003...")
# Get local network
for iface in netifaces.interfaces():
addrs = netifaces.ifaddresses(iface)
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
ip = addr.get('addr')
netmask = addr.get('netmask')
if ip and netmask and not ip.startswith('127.'):
# Calculate network from IP and netmask
try:
network = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False)
print(f"Scanning {network} ({network.num_addresses} hosts)...")
# Get all host IPs in the network
host_ips = [str(host) for host in network.hosts()]
# Scan all IPs in parallel
async def check_ip(test_ip):
if await test_port(test_ip, 30003):
return test_ip
return None
results = await asyncio.gather(*[check_ip(host) for host in host_ips])
for result in results:
if result:
print(f"Found receiver at {result}:30003")
receivers.append(result)
except ValueError as e:
print(f"Invalid network {ip}/{netmask}: {e}")
async def test_port(ip: str, port: int, timeout: float = 0.5):
"""Test if a port is open"""
try:
conn = asyncio.open_connection(ip, port)
reader, writer = await asyncio.wait_for(conn, timeout=timeout)
writer.close()
await writer.wait_closed()
return True
except:
return False
async def cleanup_old_flights():
"""Remove flights not seen in 60 seconds"""
while True:
await asyncio.sleep(10)
now = time.time()
to_remove = [icao for icao, flight in flights.items()
if now - flight['last_seen'] > 60]
for icao in to_remove:
del flights[icao]
async def broadcast_flights():
"""Broadcast flight data to all connected WebSocket clients"""
while True:
await asyncio.sleep(1)
if connected_clients:
# Get flights with position data
flight_list = [f for f in flights.values() if f['lat'] and f['lon']]
print(f"Broadcasting {len(flight_list)} flights to {len(connected_clients)} clients")
print(f"Total flights in memory: {len(flights)}")
message = json.dumps({
'type': 'flights',
'flights': flight_list,
'count': len(flight_list)
})
# Send to all connected clients
dead_clients = set()
for client in connected_clients:
try:
await client.send_str(message)
except:
dead_clients.add(client)
# Remove disconnected clients
connected_clients.difference_update(dead_clients)
async def websocket_handler(request):
"""Handle WebSocket connections from browser"""
ws = web.WebSocketResponse()
await ws.prepare(request)
connected_clients.add(ws)
print(f"WebSocket client connected ({len(connected_clients)} total)")
try:
async for msg in ws:
pass # We don't expect messages from client
finally:
connected_clients.discard(ws)
print(f"WebSocket client disconnected ({len(connected_clients)} remaining)")
return ws
async def handle_receiver_location(request):
"""Return receiver location from config"""
return web.json_response({
"lat": config["location"]["lat"],
"lon": config["location"]["lon"],
"name": config["location"]["name"]
})
async def handle_config(request):
"""Return client-relevant configuration"""
return web.json_response({
"theme": config.get("theme", "desert"),
"location": config["location"],
"receivers": receivers
})
async def handle_http(request):
"""Serve static files"""
path = request.path
if path == '/':
path = '/index.html'
file_path = WEB_DIR / path.lstrip('/')
if file_path.exists() and file_path.is_file():
return web.FileResponse(file_path)
return web.Response(status=404, text="Not Found")
async def start_http_server():
"""Start HTTP server with WebSocket support"""
port = config["web_port"]
app = web.Application()
app.router.add_get('/ws', websocket_handler)
app.router.add_get('/api/receiver-location', handle_receiver_location)
app.router.add_get('/api/config', handle_config)
app.router.add_get('/{tail:.*}', handle_http)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', port)
await site.start()
print(f"HTTP server with WebSocket running on http://0.0.0.0:{port}")
print(f"WebSocket available at ws://0.0.0.0:{port}/ws")
print(f"Receiver location API at http://0.0.0.0:{port}/api/receiver-location")
async def main():
"""Main entry point"""
global receivers
print("Pixel ADS-B Server Starting...")
# Load configuration
load_config()
print(f"Access at http://0.0.0.0:{config['web_port']}")
# Get receivers based on config
receiver_config = config["receivers"]
receiver_port = config["receiver_port"]
if receiver_config == "AUTO":
# Auto-scan for receivers
await scan_for_receivers()
elif isinstance(receiver_config, list):
# Use specified receiver IPs
receivers = receiver_config
print(f"Using configured receivers: {receivers}")
elif isinstance(receiver_config, str):
# Single receiver IP specified
receivers = [receiver_config]
print(f"Using configured receiver: {receivers[0]}")
if not receivers:
print("WARNING: No receivers found or configured!")
else:
print(f"Using {len(receivers)} receiver(s)")
# Start all tasks
tasks = [
start_http_server(),
cleanup_old_flights(),
broadcast_flights(),
]
# Connect to all receivers
for receiver in receivers:
tasks.append(connect_to_receiver(receiver, receiver_port))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())