#!/usr/bin/env python3 """Pixel ADS-B Server - Connects directly to ADS-B receivers""" import asyncio import socket import json import time import ipaddress from pathlib import Path from dataclasses import dataclass, asdict from datetime import datetime from typing import Dict, Set, List from aiohttp import web import netifaces WEB_DIR = Path(__file__).parent CONFIG_FILE = WEB_DIR / "config.json" # Default configuration config = { "receivers": "AUTO", "receiver_port": 30003, "location": { "name": "My Location", "lat": 0.0, "lon": 0.0 }, "web_port": 2001, "theme": "desert" } def load_config(): """Load configuration from config.json""" global config if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, 'r') as f: loaded = json.load(f) config.update(loaded) print(f"Loaded config from {CONFIG_FILE}") except Exception as e: print(f"Error loading config: {e}, using defaults") else: print(f"No config file found at {CONFIG_FILE}, using defaults") print(f" Receivers: {config['receivers']}") print(f" Receiver port: {config['receiver_port']}") print(f" Location: {config['location']['name']} ({config['location']['lat']}, {config['location']['lon']})") print(f" Web port: {config['web_port']}") print(f" Theme: {config.get('theme', 'desert')}") # Flight data storage flights: Dict[str, dict] = {} connected_clients: Set = set() receivers: List[str] = [] @dataclass class SBSMessage: msg_type: str icao: str callsign: str = "" altitude: int = 0 speed: float = 0 track: float = 0 lat: float = 0 lon: float = 0 vrate: int = 0 squawk: str = "" ground: bool = False def parse_sbs_message(line: str): """Parse SBS/BaseStation format message""" parts = line.strip().split(',') if len(parts) < 10: return None msg_type = parts[0] if msg_type not in ['MSG']: return None transmission_type = parts[1] icao = parts[4].strip() if not icao: return None msg = SBSMessage(msg_type=msg_type, icao=icao) # Callsign (field 10) if len(parts) > 10 and parts[10].strip(): msg.callsign = parts[10].strip() # Altitude (field 11) if len(parts) > 11 and parts[11].strip(): try: msg.altitude = int(parts[11]) except ValueError: pass # Ground speed (field 12) if len(parts) > 12 and parts[12].strip(): try: msg.speed = float(parts[12]) except ValueError: pass # Track (field 13) if len(parts) > 13 and parts[13].strip(): try: msg.track = float(parts[13]) except ValueError: pass # Latitude (field 14) if len(parts) > 14 and parts[14].strip(): try: msg.lat = float(parts[14]) except ValueError: pass # Longitude (field 15) if len(parts) > 15 and parts[15].strip(): try: msg.lon = float(parts[15]) except ValueError: pass # Vertical rate (field 16) if len(parts) > 16 and parts[16].strip(): try: msg.vrate = int(parts[16]) except ValueError: pass # Squawk (field 17) if len(parts) > 17 and parts[17].strip(): msg.squawk = parts[17].strip() # Ground flag (field 21) if len(parts) > 21 and parts[21].strip() == '-1': msg.ground = True return msg async def connect_to_receiver(host: str, port: int = 30003): """Connect to an SBS receiver and process messages""" print(f"Connecting to receiver at {host}:{port}...") while True: try: reader, writer = await asyncio.open_connection(host, port) print(f"Connected to {host}:{port}") while True: line = await reader.readline() if not line: break line = line.decode('utf-8', errors='ignore') msg = parse_sbs_message(line) if msg and msg.icao: # Update flight data if msg.icao not in flights: flights[msg.icao] = { 'icao': msg.icao, 'callsign': '', 'altitude': 0, 'speed': 0, 'track': 0, 'lat': 0, 'lon': 0, 'vrate': 0, 'squawk': '', 'ground': False, 'last_seen': time.time() } flight = flights[msg.icao] if msg.callsign: flight['callsign'] = msg.callsign if msg.altitude: flight['altitude'] = msg.altitude if msg.speed: flight['speed'] = msg.speed if msg.track: flight['track'] = msg.track if msg.lat: flight['lat'] = msg.lat if msg.lon: flight['lon'] = msg.lon if msg.vrate: flight['vrate'] = msg.vrate if msg.squawk: flight['squawk'] = msg.squawk flight['ground'] = msg.ground flight['last_seen'] = time.time() writer.close() await writer.wait_closed() except Exception as e: print(f"Receiver {host}:{port} error: {e}") await asyncio.sleep(5) # Wait before reconnecting async def scan_for_receivers(): """Scan local network for ADS-B receivers on port 30003""" print("Scanning for ADS-B receivers on port 30003...") # Get local network for iface in netifaces.interfaces(): addrs = netifaces.ifaddresses(iface) if netifaces.AF_INET in addrs: for addr in addrs[netifaces.AF_INET]: ip = addr.get('addr') netmask = addr.get('netmask') if ip and netmask and not ip.startswith('127.'): # Calculate network from IP and netmask try: network = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False) print(f"Scanning {network} ({network.num_addresses} hosts)...") # Get all host IPs in the network host_ips = [str(host) for host in network.hosts()] # Scan all IPs in parallel async def check_ip(test_ip): if await test_port(test_ip, 30003): return test_ip return None results = await asyncio.gather(*[check_ip(host) for host in host_ips]) for result in results: if result: print(f"Found receiver at {result}:30003") receivers.append(result) except ValueError as e: print(f"Invalid network {ip}/{netmask}: {e}") async def test_port(ip: str, port: int, timeout: float = 0.5): """Test if a port is open""" try: conn = asyncio.open_connection(ip, port) reader, writer = await asyncio.wait_for(conn, timeout=timeout) writer.close() await writer.wait_closed() return True except: return False async def cleanup_old_flights(): """Remove flights not seen in 60 seconds""" while True: await asyncio.sleep(10) now = time.time() to_remove = [icao for icao, flight in flights.items() if now - flight['last_seen'] > 60] for icao in to_remove: del flights[icao] async def broadcast_flights(): """Broadcast flight data to all connected WebSocket clients""" while True: await asyncio.sleep(1) if connected_clients: # Get flights with position data flight_list = [f for f in flights.values() if f['lat'] and f['lon']] print(f"Broadcasting {len(flight_list)} flights to {len(connected_clients)} clients") print(f"Total flights in memory: {len(flights)}") message = json.dumps({ 'type': 'flights', 'flights': flight_list, 'count': len(flight_list) }) # Send to all connected clients dead_clients = set() for client in connected_clients: try: await client.send_str(message) except: dead_clients.add(client) # Remove disconnected clients connected_clients.difference_update(dead_clients) async def websocket_handler(request): """Handle WebSocket connections from browser""" ws = web.WebSocketResponse() await ws.prepare(request) connected_clients.add(ws) print(f"WebSocket client connected ({len(connected_clients)} total)") try: async for msg in ws: pass # We don't expect messages from client finally: connected_clients.discard(ws) print(f"WebSocket client disconnected ({len(connected_clients)} remaining)") return ws async def handle_receiver_location(request): """Return receiver location from config""" return web.json_response({ "lat": config["location"]["lat"], "lon": config["location"]["lon"], "name": config["location"]["name"] }) async def handle_config(request): """Return client-relevant configuration""" return web.json_response({ "theme": config.get("theme", "desert"), "location": config["location"] }) async def handle_http(request): """Serve static files""" path = request.path if path == '/': path = '/index.html' file_path = WEB_DIR / path.lstrip('/') if file_path.exists() and file_path.is_file(): return web.FileResponse(file_path) return web.Response(status=404, text="Not Found") async def start_http_server(): """Start HTTP server with WebSocket support""" port = config["web_port"] app = web.Application() app.router.add_get('/ws', websocket_handler) app.router.add_get('/api/receiver-location', handle_receiver_location) app.router.add_get('/api/config', handle_config) app.router.add_get('/{tail:.*}', handle_http) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', port) await site.start() print(f"HTTP server with WebSocket running on http://0.0.0.0:{port}") print(f"WebSocket available at ws://0.0.0.0:{port}/ws") print(f"Receiver location API at http://0.0.0.0:{port}/api/receiver-location") async def main(): """Main entry point""" global receivers print("Pixel ADS-B Server Starting...") # Load configuration load_config() print(f"Access at http://0.0.0.0:{config['web_port']}") # Get receivers based on config receiver_config = config["receivers"] receiver_port = config["receiver_port"] if receiver_config == "AUTO": # Auto-scan for receivers await scan_for_receivers() elif isinstance(receiver_config, list): # Use specified receiver IPs receivers = receiver_config print(f"Using configured receivers: {receivers}") elif isinstance(receiver_config, str): # Single receiver IP specified receivers = [receiver_config] print(f"Using configured receiver: {receivers[0]}") if not receivers: print("WARNING: No receivers found or configured!") else: print(f"Using {len(receivers)} receiver(s)") # Start all tasks tasks = [ start_http_server(), cleanup_old_flights(), broadcast_flights(), ] # Connect to all receivers for receiver in receivers: tasks.append(connect_to_receiver(receiver, receiver_port)) await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())