#!/usr/bin/env python3 import os import sys import time import socket import struct import argparse import random import tempfile import subprocess from tqdm import tqdm # progress bar sexxy # need that for lame transcoding try: from mutagen.mp3 import MP3 # type: ignore MUTAGEN_AVAILABLE = True except ImportError: MUTAGEN_AVAILABLE = False # Default configuration values DEFAULT_MULTICAST_ADDR = "172.16.20.109" DEFAULT_PORT = 1681 DEFAULT_ZONE_INFO = "01 1000 0000 0000 0000 0000 0000" # Zone 1 DEFAULT_SEND_DUPLICATES = True # Do it like Bodet SIGMA does and just send duplicates "for redundancy" (UDP for emergency systems who the fuck though that was a good idea) DEFAULT_CHUNK_SIZE = 1000 # Need to use 1000 due to not understanding protocol fully yet DEFAULT_TTL = 3 # We want it going to Multicast and then to the client, nomore otherwise bodeter speaker gets confused DEFAULT_HEADER = "4d454c" # MEL goofy ah header DEFAULT_COMMAND = "0703" # Probably the stream command DEFAULT_STREAM_ID = "110c" # Write none without qoutes to use random stream ID # Prem prem maximum it supports HIGH_QUALITY = { "bitrate": "256k", # 256k max "sample_rate": 48000, # 48000 max "channels": 1, # mono because uhm one speaker only "sample_format": "s32", # max s32 "description": "Highest Quality (256kbps, 48kHz)" } # Bodeter Shitsy Sigma software defaults LOW_QUALITY = { "bitrate": "64k", # Bodet Defaults "sample_rate": 32000, # Bodet MP3 file default "channels": 1, "sample_format": "s16", # bodet MP3 file default "description": "Normal Quality (64kbps, 32kHz)" } DEFAULT_TIMING_FACTOR = 1 # Used back when chat gpt suggested i should adjust the timing... but it was really just that I have to use 1000 byte chunks ... def compute_psa_checksum(data: bytes) -> bytes: """Compute PSA checksum for packet data.""" var_e = 0x0000 # Starting seed value for i in range(len(data)): var_e ^= (data[i] + i) & 0xFFFF return var_e.to_bytes(2, 'big') # 2-byte checksum, big-endian def create_mp3_packet(sequence_num, zone_info, stream_id, mp3_chunk, show_packet_length=False): """Create a PSA packet containing MP3 data.""" # Part 1: Header (static "MEL") header = bytes.fromhex("4d454c") # Prepare all the components that will go into the payload # Start marker (0100) start_marker = bytes.fromhex("0100") # Sequence number (1 byte, Little Endian with FF padding) seq = sequence_num.to_bytes(1, 'little') + b'\xff' # Command (static 070301) command = bytes.fromhex(DEFAULT_COMMAND) # Zone info zones = bytes.fromhex(zone_info) # Stream ID (2 bytes) stream_id_bytes = stream_id.to_bytes(2, 'little') # Constant data (appears in all original packets) constant_data = bytes.fromhex("05080503e8") # Build the payload (everything that comes after start_marker) payload = seq + command + zones + stream_id_bytes + constant_data + mp3_chunk # Calculate the length for the header - IMPORTANT: This matches what the PSA software does # Length is the total packet length MINUS the header (4d454c) and the length field itself (2 bytes) length_value = len(start_marker) + len(payload) + 7 # +2 for checksum # Insert the length as a 2-byte value (big endian) length_bytes = length_value.to_bytes(2, 'big') # Assemble the packet without checksum packet_data = header + length_bytes + start_marker + payload # Calculate and append checksum checksum = compute_psa_checksum(packet_data) # Create final packet final_packet = packet_data + checksum if show_packet_length: print(f"Packet length: {len(final_packet)} bytes, Length field: {length_value} (0x{length_value:04x})") return final_packet def send_multicast_packet(sock, packet, multicast_addr, port, send_duplicates=True): """Send a packet to the multicast address.""" try: # Send the packet sock.sendto(packet, (multicast_addr, port)) # Send a duplicate (common in multicast to improve "reliability") if send_duplicates: sock.sendto(packet, (multicast_addr, port)) return True except Exception as e: print(f"Error sending packet: {e}") return False def setup_multicast_socket(ttl=DEFAULT_TTL): """Set up a socket for sending multicast UDP.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) return sock def check_dependencies(): """Check if required dependencies are installed.""" try: subprocess.run(['lame', '--version'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except FileNotFoundError: print("Error: LAME encoder is required but not found.") print("Please install LAME and make sure it's available in your PATH.") sys.exit(1) if not MUTAGEN_AVAILABLE: print("Warning: mutagen library not found. Limited audio file analysis available.") print("Install with: pip install mutagen") def get_audio_info(mp3_file): """Get audio file information using mutagen if available.""" try: if MUTAGEN_AVAILABLE: # Use mutagen to analyze the file audio = MP3(mp3_file) # Extract relevant information sample_rate = audio.info.sample_rate bit_rate = int(audio.info.bitrate / 1000) # Convert to kbps channels = audio.info.channels codec_name = "mp3" if audio.info.layer == 3 else f"mpeg-{audio.info.layer}" return { 'bit_rate': bit_rate, 'sample_rate': sample_rate, 'channels': channels, 'codec_name': codec_name } else: # Fallback method: use LAME to identify file info cmd = ['lame', '--decode', mp3_file, '-t', '--brief', '-'] result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True) # LAME outputs limited info to stderr when using --brief info_text = result.stderr # Very basic parsing - this is not ideal but works as a fallback codec_name = "mp3" # Assume it's MP3 sample_rate = 44100 # Default assumption channels = 2 # Default assumption bit_rate = 0 # Unknown for line in info_text.split('\n'): if "MPEG" in line and "Layer" in line: codec_name = "mp3" if "Hz" in line: parts = line.split() for part in parts: if "Hz" in part: try: sample_rate = int(part.replace("Hz", "")) except ValueError: pass if "stereo" in line.lower(): channels = 2 elif "mono" in line.lower(): channels = 1 if "kbps" in line: parts = line.split() for part in parts: if "kbps" in part: try: bit_rate = int(part.replace("kbps", "")) except ValueError: pass return { 'bit_rate': bit_rate, 'sample_rate': sample_rate, 'channels': channels, 'codec_name': codec_name } except Exception as e: print(f"Error getting audio information: {e}") return None def transcode_mp3(input_file, quality_preset, include_metadata=False): """Transcode MP3 file using LAME encoder.""" print(f"Transcoding audio to {quality_preset['description']}...") # Create temporary output file fd, temp_output = tempfile.mkstemp(suffix='.mp3') os.close(fd) try: # Build LAME command cmd = [ 'lame', '--quiet', # Less output '-m', 'm' if quality_preset['channels'] == 1 else 's', # m=mono, s=stereo '--resample', str(quality_preset['sample_rate'] // 1000), # Convert to kHz '-b', quality_preset['bitrate'].replace('k', ''), # Remove 'k' suffix '--cbr', # Use constant bitrate ] # Add options to minimize metadata if requested if not include_metadata: cmd.append('--noreplaygain') # Use more compatible tag options instead of --id3v2-none cmd.append('--tt') cmd.append('') # Empty title cmd.append('--tc') cmd.append('') # Empty comment cmd.append('--nohist') # Input and output files cmd.extend([input_file, temp_output]) result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True) if result.returncode != 0: print(f"Error transcoding audio: {result.stderr}") os.remove(temp_output) return None return temp_output except Exception as e: print(f"Error during transcoding: {e}") if os.path.exists(temp_output): os.remove(temp_output) return None def prepare_mp3_file(mp3_file, quality, include_metadata=False, sample_format=None): """Ensure the MP3 file meets the required specifications.""" # Check if quality is valid quality_preset = HIGH_QUALITY if quality == "high" else LOW_QUALITY # Override sample format if specified if sample_format: quality_preset = quality_preset.copy() quality_preset["sample_format"] = sample_format # Get audio file information info = get_audio_info(mp3_file) if not info: return None print(f"Audio file details: {info['codec_name']}, {info['bit_rate']}kbps, " f"{info['sample_rate']}Hz, {info['channels']} channel(s)") # Check if transcoding is needed needs_transcode = False if info['codec_name'].lower() != 'mp3': print("Non-MP3 format detected. Transcoding required.") needs_transcode = True elif info['bit_rate'] is None or abs(info['bit_rate'] - int(quality_preset['bitrate'][:-1])) > 5: print(f"Bitrate mismatch: {info['bit_rate']}kbps vs {quality_preset['bitrate']}. Transcoding required.") needs_transcode = True elif info['sample_rate'] != quality_preset['sample_rate']: print(f"Sample rate mismatch: {info['sample_rate']}Hz vs {quality_preset['sample_rate']}Hz. Transcoding required.") needs_transcode = True elif info['channels'] != quality_preset['channels']: print(f"Channel count mismatch: {info['channels']} vs {quality_preset['channels']} (mono). Transcoding required.") needs_transcode = True # If transcoding is needed, do it if needs_transcode: return transcode_mp3(mp3_file, quality_preset, include_metadata) else: print("Audio file already meets required specifications.") return mp3_file def calculate_delay_ms(chunk_size, bitrate, timing_factor=DEFAULT_TIMING_FACTOR): """ Calculate the appropriate delay between chunks for real-time streaming. Args: chunk_size: Size of each audio chunk in bytes bitrate: Audio bitrate (string like '128k' or integer) timing_factor: Factor to adjust timing (lower = faster playback) Returns: Delay in milliseconds between packets """ if isinstance(bitrate, str) and bitrate.endswith('k'): bitrate_kbps = int(bitrate[:-1]) else: bitrate_kbps = int(bitrate) bytes_per_second = bitrate_kbps * 1000 // 8 delay_ms = (chunk_size / bytes_per_second) * 1000 * timing_factor return delay_ms class StreamTimer: """Maintains proper timing for streaming audio packets.""" def __init__(self, chunk_size, bitrate, timing_factor=DEFAULT_TIMING_FACTOR): """Initialize the stream timer with audio parameters.""" self.chunk_size = chunk_size self.bitrate = bitrate self.timing_factor = timing_factor self.start_time = None self.packets_sent = 0 self.delay_per_packet = calculate_delay_ms(chunk_size, bitrate, timing_factor) / 1000.0 def start(self): """Start the stream timer.""" self.start_time = time.time() self.packets_sent = 0 def wait_for_next_packet(self): """Wait until it's time to send the next packet.""" if self.start_time is None: self.start() return self.packets_sent += 1 # Calculate when this packet should be sent target_time = self.start_time + (self.packets_sent * self.delay_per_packet) # Calculate how long to wait now = time.time() wait_time = target_time - now # Only wait if we're ahead of schedule if wait_time > 0: time.sleep(wait_time) def stream_mp3_to_psa(mp3_file, multicast_addr, port, zone_info, send_duplicates, chunk_size, quality, include_metadata=False, sample_format=None, show_packet_length=False, timing_factor=DEFAULT_TIMING_FACTOR, fixed_stream_id=None): """Stream an MP3 file to PSA system.""" try: # Check dependencies check_dependencies() # Prepare MP3 file (transcode if needed) prepared_file = prepare_mp3_file(mp3_file, quality, include_metadata, sample_format) if not prepared_file: print("Failed to prepare MP3 file.") return using_temp_file = prepared_file != mp3_file try: # Use fixed stream ID if provided, otherwise generate a random one if fixed_stream_id is not None: stream_id = fixed_stream_id else: stream_id = random.randint(0, 0xFFFF) # 16-bit (2 bytes) stream ID # Open the MP3 file with open(prepared_file, 'rb') as f: mp3_data = f.read() # Calculate number of chunks chunks = chunk_mp3_data(mp3_data, chunk_size) total_chunks = len(chunks) # Get the appropriate quality preset quality_preset = HIGH_QUALITY if quality == "high" else LOW_QUALITY # Calculate the appropriate delay for real-time streaming using the actual bitrate delay_ms = calculate_delay_ms(chunk_size, quality_preset['bitrate']) print(f"Streaming {os.path.basename(mp3_file)} ({len(mp3_data)/1024:.2f} KB)") print(f"Split into {total_chunks} packets of {chunk_size} bytes each") print(f"Sending to multicast {multicast_addr}:{port}") print(f"Stream ID: {stream_id:04x}, Zone info: {zone_info}") print(f"Duplicate packets: {'Yes' if send_duplicates else 'No'}") print(f"Quality preset: {HIGH_QUALITY['description'] if quality == 'high' else LOW_QUALITY['description']}") print(f"Including metadata: {'Yes' if include_metadata else 'No'}") print(f"Real-time streaming delay: {delay_ms:.2f}ms per packet") # Setup multicast socket sock = setup_multicast_socket() # Initialize the stream timer with the appropriate parameters timer = StreamTimer(chunk_size, quality_preset['bitrate'], timing_factor) timer.start() # Process and send each chunk sequence_num = 0 with tqdm(total=total_chunks, desc="Streaming progress") as pbar: try: for i, chunk in enumerate(chunks): # Create packet with current sequence number packet = create_mp3_packet(sequence_num, zone_info, stream_id, chunk, show_packet_length) # Send the packet success = send_multicast_packet(sock, packet, multicast_addr, port, send_duplicates) # Increment sequence num sequence_num = (sequence_num + 1) % 256 # Update progress bar pbar.update(1) # Wait for the next packet timing using our timer if i < total_chunks - 1: # No need to wait after the last chunk timer.wait_for_next_packet() except Exception as e: print(f"Error during streaming: {e}") print("\nStreaming completed successfully!") finally: # Clean up temporary file if created if using_temp_file and os.path.exists(prepared_file): os.remove(prepared_file) except FileNotFoundError: print(f"Error: MP3 file {mp3_file} not found") sys.exit(1) except Exception as e: print(f"Error during streaming: {e}") sys.exit(1) def chunk_mp3_data(data, chunk_size): """ Split MP3 data into chunks of specified size. Args: data: The complete MP3 data as bytes chunk_size: Size of each chunk in bytes Returns: A list of data chunks """ chunks = [] # Calculate how many chunks we'll have num_chunks = (len(data) + chunk_size - 1) // chunk_size for i in range(num_chunks): start = i * chunk_size end = min((i + 1) * chunk_size, len(data)) chunks.append(data[start:end]) return chunks def main(): # Set up command line arguments parser = argparse.ArgumentParser(description="Stream MP3 to Bodet PSA System") parser.add_argument("mp3_file", help="Path to MP3 file to stream") parser.add_argument("-a", "--addr", default=DEFAULT_MULTICAST_ADDR, help=f"Multicast address (default: {DEFAULT_MULTICAST_ADDR})") parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help=f"UDP port (default: {DEFAULT_PORT})") parser.add_argument("-z", "--zone", default=DEFAULT_ZONE_INFO, help=f"Hex zone info (default: Zone 1)") parser.add_argument("-n", "--no-duplicates", action="store_true", help="Don't send duplicate packets (default: send duplicates)") parser.add_argument("-c", "--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE, help=f"MP3 chunk size in bytes (default: {DEFAULT_CHUNK_SIZE})") parser.add_argument("-q", "--quality", choices=["high", "low"], default="high", help="Audio quality preset: high (128kbps, 48kHz) or low (40kbps, 32kHz) (default: high)") parser.add_argument("-m", "--include-metadata", action="store_true", help="Include metadata in MP3 stream (default: no metadata)") parser.add_argument("-s", "--sample-format", choices=["s16", "s24", "s32"], default=None, help="Sample format to use for transcoding (default: s16)") parser.add_argument("--show-packet-length", action="store_true", help="Show packet length information during streaming") parser.add_argument("-t", "--timing-factor", type=float, default=DEFAULT_TIMING_FACTOR, help=f"Timing adjustment factor (lower = faster playback, default: {DEFAULT_TIMING_FACTOR})") parser.add_argument("--fixed-stream-id", type=int, help="Use a fixed stream ID instead of a random one (0-65535)") args = parser.parse_args() # Stream the MP3 file stream_mp3_to_psa( args.mp3_file, args.addr, args.port, args.zone, not args.no_duplicates, args.chunk_size, args.quality, args.include_metadata, args.sample_format, args.show_packet_length, args.timing_factor, args.fixed_stream_id # Pass fixed_stream_id parameter ) # Remove duplicate main() call if __name__ == "__main__": main()