aboutsummaryrefslogtreecommitdiff
path: root/research/sample-data/streaming/bodet_psa_cli.py
diff options
context:
space:
mode:
Diffstat (limited to 'research/sample-data/streaming/bodet_psa_cli.py')
-rw-r--r--research/sample-data/streaming/bodet_psa_cli.py329
1 files changed, 329 insertions, 0 deletions
diff --git a/research/sample-data/streaming/bodet_psa_cli.py b/research/sample-data/streaming/bodet_psa_cli.py
new file mode 100644
index 0000000..2788aa1
--- /dev/null
+++ b/research/sample-data/streaming/bodet_psa_cli.py
@@ -0,0 +1,329 @@
+#!/usr/bin/env python3
+import socket
+import argparse
+import time
+import sys
+from typing import List, Dict, Tuple, Optional
+
+# Default configuration values
+DEFAULT_MULTICAST_ADDR = "239.192.0.1"
+DEFAULT_PORT = 1681
+DEFAULT_TTL = 2
+DEFAULT_VOLUME = 3
+DEFAULT_REPEATS = 0 # 0 means infinite
+
+# Command codes
+CMD_MELODY = "3001"
+CMD_ALARM = "5001"
+CMD_STOP = "5002"
+
+# Header constants
+MEL_HEADER = "4d454c" # "MEL" in hex
+START_MARKER = "0100"
+END_MARKER = "0100"
+
+# Maps for user-friendly selection
+MELODY_MAP = {
+ # Standard melodies
+ "Westminster": 1,
+ "BimBam": 2,
+ "Gong": 3,
+ "CanCan": 4,
+ "SingingBird": 5,
+ "Violin": 6,
+ "Trumpet": 7,
+ "Piano": 8,
+ "Telephone": 9,
+ "Circus": 10,
+ # Add more melodies as needed
+}
+
+# Common functions (borrowed from mp3_psa_streamer.py)
+def compute_psa_checksum(data: bytes) -> bytes:
+ """Compute PSA checksum for packet data."""
+ var_e = 0x0000 # Starting seed value
+ for i in range(len(data)):
+ var_e ^= (data[i] + i) & 0xFFFF
+ return var_e.to_bytes(2, 'big') # 2-byte checksum, big-endian
+
+def setup_multicast_socket(ttl=DEFAULT_TTL):
+ """Set up a socket for sending multicast UDP."""
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
+ return sock
+
+def send_multicast_packet(sock, packet, addr, port, send_twice=True):
+ """Send a packet to the multicast group, optionally twice."""
+ try:
+ sock.sendto(packet, (addr, port))
+ if send_twice:
+ time.sleep(0.01) # Small delay between duplicates
+ sock.sendto(packet, (addr, port))
+ return True
+ except Exception as e:
+ print(f"Error sending packet: {e}")
+ return False
+
+# New functions for the CLI tool
+def encode_zones(zones: List[int]) -> str:
+ """
+ Encode zone numbers (1-100) into a 12-byte hex string (24 characters).
+ Each byte represents 8 zones, with MSB being the lowest zone number.
+ """
+ # Initialize 96 bits (12 bytes) of zeros
+ zone_bits = [0] * 96
+
+ for zone in zones:
+ if 1 <= zone <= 96: # Ensure zone is in valid range
+ # Calculate bit position (zero-indexed)
+ # Zone 1 = bit 0, Zone 2 = bit 1, etc.
+ bit_pos = zone - 1
+
+ # Set the bit
+ zone_bits[bit_pos] = 1
+
+ # Convert bits to bytes
+ zone_bytes = bytearray()
+ for i in range(0, 96, 8):
+ byte_val = 0
+ for j in range(8):
+ if i + j < 96: # Avoid index out of range
+ byte_val |= (zone_bits[i + j] << (7 - j))
+ zone_bytes.append(byte_val)
+
+ # Convert to hex string
+ return zone_bytes.hex()
+
+def create_command_packet(
+ cmd: str,
+ zones: List[int],
+ sequence: int = 0,
+ melody_id: Optional[int] = None,
+ volume: Optional[int] = None,
+ repeats: Optional[int] = None
+) -> bytes:
+ """Create a PSA command packet."""
+ # Convert sequence to bytes (2 bytes, little endian with FF padding)
+ seq_bytes = sequence.to_bytes(1, 'little') + b'\xff'
+
+ # Encode zone info
+ zone_info = bytes.fromhex(encode_zones(zones))
+
+ # Command-specific handling
+ if cmd == CMD_STOP:
+ # For stop command, use all-zones pattern regardless of input
+ zone_info = bytes.fromhex("FFFFFFFFFFFFFFFFFFFF")
+ metadata = bytes.fromhex("0F")
+ # No end marker for stop commands
+ command_data = bytes.fromhex(MEL_HEADER) + seq_bytes + bytes.fromhex(cmd) + zone_info + metadata
+ else:
+ # For melody and alarm commands
+ if melody_id is None or volume is None or repeats is None:
+ raise ValueError("Melody ID, volume, and repeats are required for melody/alarm commands")
+
+ # Check ranges
+ if not 1 <= melody_id <= 20:
+ raise ValueError("Melody ID must be between 1 and 20")
+ if not 1 <= volume <= 8:
+ raise ValueError("Volume must be between 1 and 8")
+ if not 0 <= repeats <= 255:
+ raise ValueError("Repeats must be between 0 and 255")
+
+ # Build metadata
+ fixed_field = "0001" # Standard fixed field
+ metadata = bytes.fromhex(fixed_field) + \
+ volume.to_bytes(1, 'big') + \
+ repeats.to_bytes(1, 'big') + \
+ b'\x01' + \
+ melody_id.to_bytes(1, 'big') + \
+ bytes.fromhex(END_MARKER)
+
+ command_data = bytes.fromhex(MEL_HEADER) + \
+ seq_bytes + \
+ bytes.fromhex(cmd) + \
+ zone_info + \
+ metadata
+
+ # Calculate payload length (excluding header and length field)
+ # We need to add this back into the actual packet
+ length = len(command_data) - 3 # subtract "MEL" header
+ length_bytes = length.to_bytes(2, 'big')
+
+ # Re-build with correct length
+ packet = bytes.fromhex(MEL_HEADER) + length_bytes + \
+ bytes.fromhex(START_MARKER) + seq_bytes + \
+ bytes.fromhex(cmd) + zone_info + \
+ (metadata if cmd != CMD_STOP else bytes.fromhex("0F"))
+
+ # Calculate and append checksum
+ checksum = compute_psa_checksum(packet)
+ return packet + checksum
+
+def list_melodies():
+ """Display available melodies."""
+ print("\nAvailable Melody Names:")
+ print("-" * 25)
+ for name, id in sorted(MELODY_MAP.items(), key=lambda x: x[1]):
+ print(f"{id:2d}: {name}")
+ print("\nNote: You can also enter melody ID numbers directly.")
+
+def parse_zones(zone_arg: str) -> List[int]:
+ """Parse zone argument into a list of zone numbers."""
+ zones = []
+
+ # Handle empty case
+ if not zone_arg:
+ return zones
+
+ # Split by comma or space
+ parts = zone_arg.replace(',', ' ').split()
+
+ for part in parts:
+ # Handle ranges like 1-5
+ if '-' in part:
+ try:
+ start, end = map(int, part.split('-'))
+ if 1 <= start <= end <= 96:
+ zones.extend(range(start, end + 1))
+ else:
+ print(f"Warning: Invalid zone range {part} (must be 1-96)")
+ except ValueError:
+ print(f"Warning: Could not parse zone range {part}")
+ # Handle single numbers
+ else:
+ try:
+ zone = int(part)
+ if 1 <= zone <= 96:
+ zones.append(zone)
+ else:
+ print(f"Warning: Zone {zone} out of range (must be 1-96)")
+ except ValueError:
+ print(f"Warning: Could not parse zone {part}")
+
+ # Remove duplicates and sort
+ return sorted(set(zones))
+
+def parse_melody(melody_arg: str) -> int:
+ """Parse melody argument into a melody ID."""
+ # First check if it's a direct integer
+ try:
+ melody_id = int(melody_arg)
+ if 1 <= melody_id <= 20:
+ return melody_id
+ else:
+ print(f"Warning: Melody ID {melody_id} out of range, using default (1)")
+ return 1
+ except ValueError:
+ # Try to match by name (case insensitive)
+ melody_name = melody_arg.strip().lower()
+ for name, id in MELODY_MAP.items():
+ if name.lower() == melody_name:
+ return id
+
+ print(f"Warning: Unknown melody '{melody_arg}', using default (1)")
+ return 1
+
+def main():
+ # Set up the argument parser
+ parser = argparse.ArgumentParser(description="Send commands to Bodet Harmony PSA system")
+
+ # Common arguments
+ parser.add_argument("-a", "--addr", default=DEFAULT_MULTICAST_ADDR,
+ help=f"Multicast address (default: {DEFAULT_MULTICAST_ADDR})")
+ parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT,
+ help=f"UDP port (default: {DEFAULT_PORT})")
+ parser.add_argument("-z", "--zones", default="8",
+ help="Zones to target (e.g., '1,2,3' or '1-5 8')")
+ parser.add_argument("-s", "--single", action="store_true",
+ help="Send packet only once (default: send twice)")
+ parser.add_argument("-l", "--list-melodies", action="store_true",
+ help="List available melody names")
+
+ # Create subparsers for different commands
+ subparsers = parser.add_subparsers(dest="command", help="Command to send")
+
+ # Melody command
+ melody_parser = subparsers.add_parser("melody", help="Play a melody")
+ melody_parser.add_argument("-m", "--melody", default="Westminster",
+ help="Melody name or ID (1-20)")
+ melody_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME,
+ help=f"Volume level (1-8, default: {DEFAULT_VOLUME})")
+ melody_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS,
+ help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})")
+
+ # Alarm command
+ alarm_parser = subparsers.add_parser("alarm", help="Play an alarm")
+ alarm_parser.add_argument("-m", "--melody", default="Westminster",
+ help="Alarm melody name or ID (1-20)")
+ alarm_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME,
+ help=f"Volume level (1-8, default: {DEFAULT_VOLUME})")
+ alarm_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS,
+ help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})")
+
+ # Stop command
+ subparsers.add_parser("stop", help="Stop all audio")
+
+ # Parse arguments
+ args = parser.parse_args()
+
+ # Just list melodies if requested
+ if args.list_melodies:
+ list_melodies()
+ return
+
+ # Ensure a command was specified
+ if not args.command:
+ parser.print_help()
+ return
+
+ # Set up the socket
+ sock = setup_multicast_socket()
+
+ try:
+ # Parse zones
+ zones = parse_zones(args.zones)
+ if not zones and args.command != "stop":
+ print("No valid zones specified, using zone 8")
+ zones = [8]
+
+ # Command-specific processing
+ if args.command == "melody":
+ melody_id = parse_melody(args.melody)
+ cmd_code = CMD_MELODY
+ packet = create_command_packet(
+ cmd_code, zones,
+ melody_id=melody_id,
+ volume=args.volume,
+ repeats=args.repeats
+ )
+ print(f"Sending melody {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}")
+
+ elif args.command == "alarm":
+ melody_id = parse_melody(args.melody)
+ cmd_code = CMD_ALARM
+ packet = create_command_packet(
+ cmd_code, zones,
+ melody_id=melody_id,
+ volume=args.volume,
+ repeats=args.repeats
+ )
+ print(f"Sending alarm {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}")
+
+ elif args.command == "stop":
+ cmd_code = CMD_STOP
+ packet = create_command_packet(cmd_code, zones)
+ print("Sending stop command to all zones")
+
+ # Send the packet
+ success = send_multicast_packet(sock, packet, args.addr, args.port, not args.single)
+ if success:
+ print("Command sent successfully!")
+ else:
+ print("Failed to send command.")
+
+ except Exception as e:
+ print(f"Error: {e}")
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main() or 0)