diff options
Diffstat (limited to 'research/sample-data/streaming/bodet_psa_cli.py')
-rw-r--r-- | research/sample-data/streaming/bodet_psa_cli.py | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/research/sample-data/streaming/bodet_psa_cli.py b/research/sample-data/streaming/bodet_psa_cli.py new file mode 100644 index 0000000..2788aa1 --- /dev/null +++ b/research/sample-data/streaming/bodet_psa_cli.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +import socket +import argparse +import time +import sys +from typing import List, Dict, Tuple, Optional + +# Default configuration values +DEFAULT_MULTICAST_ADDR = "239.192.0.1" +DEFAULT_PORT = 1681 +DEFAULT_TTL = 2 +DEFAULT_VOLUME = 3 +DEFAULT_REPEATS = 0 # 0 means infinite + +# Command codes +CMD_MELODY = "3001" +CMD_ALARM = "5001" +CMD_STOP = "5002" + +# Header constants +MEL_HEADER = "4d454c" # "MEL" in hex +START_MARKER = "0100" +END_MARKER = "0100" + +# Maps for user-friendly selection +MELODY_MAP = { + # Standard melodies + "Westminster": 1, + "BimBam": 2, + "Gong": 3, + "CanCan": 4, + "SingingBird": 5, + "Violin": 6, + "Trumpet": 7, + "Piano": 8, + "Telephone": 9, + "Circus": 10, + # Add more melodies as needed +} + +# Common functions (borrowed from mp3_psa_streamer.py) +def compute_psa_checksum(data: bytes) -> bytes: + """Compute PSA checksum for packet data.""" + var_e = 0x0000 # Starting seed value + for i in range(len(data)): + var_e ^= (data[i] + i) & 0xFFFF + return var_e.to_bytes(2, 'big') # 2-byte checksum, big-endian + +def setup_multicast_socket(ttl=DEFAULT_TTL): + """Set up a socket for sending multicast UDP.""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) + return sock + +def send_multicast_packet(sock, packet, addr, port, send_twice=True): + """Send a packet to the multicast group, optionally twice.""" + try: + sock.sendto(packet, (addr, port)) + if send_twice: + time.sleep(0.01) # Small delay between duplicates + sock.sendto(packet, (addr, port)) + return True + except Exception as e: + print(f"Error sending packet: {e}") + return False + +# New functions for the CLI tool +def encode_zones(zones: List[int]) -> str: + """ + Encode zone numbers (1-100) into a 12-byte hex string (24 characters). + Each byte represents 8 zones, with MSB being the lowest zone number. + """ + # Initialize 96 bits (12 bytes) of zeros + zone_bits = [0] * 96 + + for zone in zones: + if 1 <= zone <= 96: # Ensure zone is in valid range + # Calculate bit position (zero-indexed) + # Zone 1 = bit 0, Zone 2 = bit 1, etc. + bit_pos = zone - 1 + + # Set the bit + zone_bits[bit_pos] = 1 + + # Convert bits to bytes + zone_bytes = bytearray() + for i in range(0, 96, 8): + byte_val = 0 + for j in range(8): + if i + j < 96: # Avoid index out of range + byte_val |= (zone_bits[i + j] << (7 - j)) + zone_bytes.append(byte_val) + + # Convert to hex string + return zone_bytes.hex() + +def create_command_packet( + cmd: str, + zones: List[int], + sequence: int = 0, + melody_id: Optional[int] = None, + volume: Optional[int] = None, + repeats: Optional[int] = None +) -> bytes: + """Create a PSA command packet.""" + # Convert sequence to bytes (2 bytes, little endian with FF padding) + seq_bytes = sequence.to_bytes(1, 'little') + b'\xff' + + # Encode zone info + zone_info = bytes.fromhex(encode_zones(zones)) + + # Command-specific handling + if cmd == CMD_STOP: + # For stop command, use all-zones pattern regardless of input + zone_info = bytes.fromhex("FFFFFFFFFFFFFFFFFFFF") + metadata = bytes.fromhex("0F") + # No end marker for stop commands + command_data = bytes.fromhex(MEL_HEADER) + seq_bytes + bytes.fromhex(cmd) + zone_info + metadata + else: + # For melody and alarm commands + if melody_id is None or volume is None or repeats is None: + raise ValueError("Melody ID, volume, and repeats are required for melody/alarm commands") + + # Check ranges + if not 1 <= melody_id <= 20: + raise ValueError("Melody ID must be between 1 and 20") + if not 1 <= volume <= 8: + raise ValueError("Volume must be between 1 and 8") + if not 0 <= repeats <= 255: + raise ValueError("Repeats must be between 0 and 255") + + # Build metadata + fixed_field = "0001" # Standard fixed field + metadata = bytes.fromhex(fixed_field) + \ + volume.to_bytes(1, 'big') + \ + repeats.to_bytes(1, 'big') + \ + b'\x01' + \ + melody_id.to_bytes(1, 'big') + \ + bytes.fromhex(END_MARKER) + + command_data = bytes.fromhex(MEL_HEADER) + \ + seq_bytes + \ + bytes.fromhex(cmd) + \ + zone_info + \ + metadata + + # Calculate payload length (excluding header and length field) + # We need to add this back into the actual packet + length = len(command_data) - 3 # subtract "MEL" header + length_bytes = length.to_bytes(2, 'big') + + # Re-build with correct length + packet = bytes.fromhex(MEL_HEADER) + length_bytes + \ + bytes.fromhex(START_MARKER) + seq_bytes + \ + bytes.fromhex(cmd) + zone_info + \ + (metadata if cmd != CMD_STOP else bytes.fromhex("0F")) + + # Calculate and append checksum + checksum = compute_psa_checksum(packet) + return packet + checksum + +def list_melodies(): + """Display available melodies.""" + print("\nAvailable Melody Names:") + print("-" * 25) + for name, id in sorted(MELODY_MAP.items(), key=lambda x: x[1]): + print(f"{id:2d}: {name}") + print("\nNote: You can also enter melody ID numbers directly.") + +def parse_zones(zone_arg: str) -> List[int]: + """Parse zone argument into a list of zone numbers.""" + zones = [] + + # Handle empty case + if not zone_arg: + return zones + + # Split by comma or space + parts = zone_arg.replace(',', ' ').split() + + for part in parts: + # Handle ranges like 1-5 + if '-' in part: + try: + start, end = map(int, part.split('-')) + if 1 <= start <= end <= 96: + zones.extend(range(start, end + 1)) + else: + print(f"Warning: Invalid zone range {part} (must be 1-96)") + except ValueError: + print(f"Warning: Could not parse zone range {part}") + # Handle single numbers + else: + try: + zone = int(part) + if 1 <= zone <= 96: + zones.append(zone) + else: + print(f"Warning: Zone {zone} out of range (must be 1-96)") + except ValueError: + print(f"Warning: Could not parse zone {part}") + + # Remove duplicates and sort + return sorted(set(zones)) + +def parse_melody(melody_arg: str) -> int: + """Parse melody argument into a melody ID.""" + # First check if it's a direct integer + try: + melody_id = int(melody_arg) + if 1 <= melody_id <= 20: + return melody_id + else: + print(f"Warning: Melody ID {melody_id} out of range, using default (1)") + return 1 + except ValueError: + # Try to match by name (case insensitive) + melody_name = melody_arg.strip().lower() + for name, id in MELODY_MAP.items(): + if name.lower() == melody_name: + return id + + print(f"Warning: Unknown melody '{melody_arg}', using default (1)") + return 1 + +def main(): + # Set up the argument parser + parser = argparse.ArgumentParser(description="Send commands to Bodet Harmony PSA system") + + # Common arguments + parser.add_argument("-a", "--addr", default=DEFAULT_MULTICAST_ADDR, + help=f"Multicast address (default: {DEFAULT_MULTICAST_ADDR})") + parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, + help=f"UDP port (default: {DEFAULT_PORT})") + parser.add_argument("-z", "--zones", default="8", + help="Zones to target (e.g., '1,2,3' or '1-5 8')") + parser.add_argument("-s", "--single", action="store_true", + help="Send packet only once (default: send twice)") + parser.add_argument("-l", "--list-melodies", action="store_true", + help="List available melody names") + + # Create subparsers for different commands + subparsers = parser.add_subparsers(dest="command", help="Command to send") + + # Melody command + melody_parser = subparsers.add_parser("melody", help="Play a melody") + melody_parser.add_argument("-m", "--melody", default="Westminster", + help="Melody name or ID (1-20)") + melody_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME, + help=f"Volume level (1-8, default: {DEFAULT_VOLUME})") + melody_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS, + help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})") + + # Alarm command + alarm_parser = subparsers.add_parser("alarm", help="Play an alarm") + alarm_parser.add_argument("-m", "--melody", default="Westminster", + help="Alarm melody name or ID (1-20)") + alarm_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME, + help=f"Volume level (1-8, default: {DEFAULT_VOLUME})") + alarm_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS, + help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})") + + # Stop command + subparsers.add_parser("stop", help="Stop all audio") + + # Parse arguments + args = parser.parse_args() + + # Just list melodies if requested + if args.list_melodies: + list_melodies() + return + + # Ensure a command was specified + if not args.command: + parser.print_help() + return + + # Set up the socket + sock = setup_multicast_socket() + + try: + # Parse zones + zones = parse_zones(args.zones) + if not zones and args.command != "stop": + print("No valid zones specified, using zone 8") + zones = [8] + + # Command-specific processing + if args.command == "melody": + melody_id = parse_melody(args.melody) + cmd_code = CMD_MELODY + packet = create_command_packet( + cmd_code, zones, + melody_id=melody_id, + volume=args.volume, + repeats=args.repeats + ) + print(f"Sending melody {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}") + + elif args.command == "alarm": + melody_id = parse_melody(args.melody) + cmd_code = CMD_ALARM + packet = create_command_packet( + cmd_code, zones, + melody_id=melody_id, + volume=args.volume, + repeats=args.repeats + ) + print(f"Sending alarm {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}") + + elif args.command == "stop": + cmd_code = CMD_STOP + packet = create_command_packet(cmd_code, zones) + print("Sending stop command to all zones") + + # Send the packet + success = send_multicast_packet(sock, packet, args.addr, args.port, not args.single) + if success: + print("Command sent successfully!") + else: + print("Failed to send command.") + + except Exception as e: + print(f"Error: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main() or 0) |