import sys import re import binascii from collections import Counter, defaultdict import struct import os.path def parse_hex_stream(file_path): """Parse hexadecimal stream from a text file.""" try: with open(file_path, 'r') as f: content = f.read() # Remove any whitespace and line breaks content = re.sub(r'\s+', '', content) return content except Exception as e: print(f"Error reading file: {e}") return None def identify_packets(hex_stream): """Split the hex stream into individual packets based on MEL header pattern.""" # Pattern is 4d454c04 which is "MEL\x04" in ASCII packet_pattern = r'4d454c04' # Find all positions of the pattern positions = [match.start() for match in re.finditer(packet_pattern, hex_stream)] packets = [] for i in range(len(positions)): start = positions[i] # If this is the last pattern occurrence, go to end of stream end = positions[i+1] if i < len(positions) - 1 else len(hex_stream) packet = hex_stream[start:end] packets.append(packet) return packets def analyze_packet_structure(packet): """Analyze the structure of a single packet.""" if len(packet) < 20: # Ensure packet has enough bytes for header return {"error": "Packet too short"} # Extract header components header = packet[:8] # MEL\x04 version = packet[8:12] # Version or type sequence = packet[12:16] # Possibly sequence number flags = packet[16:20] # Possibly flags # Extract length fields (if they exist) length_field = packet[20:28] # Extract the data portion (minus the checksum) data = packet[28:-4] # Extract the checksum (last 2 bytes / 4 hex chars) checksum = packet[-4:] # Calculate expected checksum (simple CRC) # This is just a placeholder; actual checksum algorithm would need to be determined calculated_checksum = binascii.crc32(bytes.fromhex(packet[:-4])) & 0xFFFF checksum_match = hex(calculated_checksum)[2:].zfill(4) == checksum.lower() return { "header": header, "version": version, "sequence": sequence, "flags": flags, "length_field": length_field, "data_length": len(data) // 2, # Byte count "checksum": checksum, "checksum_match": checksum_match, "total_bytes": len(packet) // 2 } def detect_duplicates(packets): """Detect duplicate packets in the stream.""" duplicates = [] for i in range(len(packets) - 1): if packets[i] == packets[i + 1]: duplicates.append(i) duplicate_percentage = (len(duplicates) / len(packets)) * 100 if packets else 0 return { "duplicate_count": len(duplicates), "duplicate_indices": duplicates, "duplicate_percentage": duplicate_percentage } def guess_codec(packets, file_path): """Attempt to identify the audio codec based on packet patterns.""" # Extract common headers or patterns headers = Counter([packet[:24] for packet in packets]) most_common_header = headers.most_common(1)[0][0] if headers else "Unknown" # Check for known codec signatures codec = "Unknown" quality = "Unknown" if "400hz-sine-wave" in file_path: quality = "High Quality" elif "400hz-square-wave" in file_path: quality = "High Quality" elif "audio-stream" in file_path: quality = "Normal Quality" # Since we know the system uses the LAME encoder (binary shipped with software) if most_common_header.startswith("4d454c0409010"): codec = "LAME MP3 (packaged in MEL Audio Format)" # MP3 frame analysis mp3_frame_sync_count = 0 potential_bitrate = None potential_sample_rate = None # Check each packet for MP3 headers (starting with 0xFF 0xFB for MPEG-1 Layer 3) for packet in packets[:min(10, len(packets))]: # Check first 10 packets data_portion = packet[28:-4] # Skip header and checksum # Look for MP3 frame sync patterns sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)] if sync_positions: mp3_frame_sync_count += len(sync_positions) # Try to extract bitrate and sample rate from first valid header for pos in sync_positions: if pos + 4 <= len(data_portion): try: header_bytes = bytes.fromhex(data_portion[pos:pos+8]) # Extract bits 16-19 for bitrate index (0-based) bitrate_index = (header_bytes[2] >> 4) & 0x0F # Extract bits 20-21 for sample rate index sample_rate_index = (header_bytes[2] >> 2) & 0x03 # MPEG-1 Layer 3 bitrate table (kbps): 0 is free format bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 192, 224, 256, 320] # MPEG-1 sample rates: 44100, 48000, 32000 Hz sample_rates = [44100, 48000, 32000, 0] # 0 is reserved if bitrate_index > 0 and sample_rate_index < 3: # Valid indices potential_bitrate = bitrates[bitrate_index] potential_sample_rate = sample_rates[sample_rate_index] break except: pass # Skip if unable to parse header # Evaluate if this is likely MP3 based on frame sync patterns mp3_likelihood = "High" if mp3_frame_sync_count > 5 else "Medium" if mp3_frame_sync_count > 0 else "Low" # Check for stream characteristics that might indicate codec/bitrate avg_packet_size = sum(len(p) for p in packets) / (2 * len(packets)) if packets else 0 if potential_bitrate: codec_guess = f"LAME MP3 ({potential_bitrate}kbps)" elif 1000 <= avg_packet_size <= 1500: codec_guess = "LAME MP3 (48-64kbps)" elif avg_packet_size > 1500: codec_guess = "LAME MP3 (96-128kbps or higher)" else: codec_guess = "LAME MP3 (low bitrate)" return { "likely_codec": codec, "quality_setting": quality, "most_common_header": most_common_header, "codec_guess_from_size": codec_guess, "average_packet_size_bytes": avg_packet_size, "mp3_frame_sync_found": mp3_frame_sync_count > 0, "mp3_likelihood": mp3_likelihood, "detected_bitrate_kbps": potential_bitrate, "detected_sample_rate_hz": potential_sample_rate } def detect_repetition_pattern(packets): """Analyze if packets are sent in repeating patterns (beyond simple duplication).""" if len(packets) < 4: return {"pattern": "Not enough packets to detect pattern"} # Check if every second packet is a repeat alternate_duplicates = all(packets[i] == packets[i+2] for i in range(0, len(packets)-2, 2)) # Check for more complex patterns repeats_every_n = None for n in range(2, min(10, len(packets) // 2)): if all(packets[i] == packets[i+n] for i in range(len(packets)-n)): repeats_every_n = n break return { "alternating_duplicates": alternate_duplicates, "repeats_every_n": repeats_every_n } def extract_timestamps(packets): """Try to extract timestamp information from packets.""" timestamps = [] for i, packet in enumerate(packets): # This would need to be adjusted based on actual packet structure # Assuming timestamp might be in a specific position potential_timestamp = packet[24:32] try: # Try to interpret as a 32-bit timestamp ts_value = int(potential_timestamp, 16) timestamps.append(ts_value) except: timestamps.append(None) return timestamps def calculate_total_duration(packets, sample_rate=44100): """Estimate total audio duration based on packet analysis.""" # This is a rough estimation and would need adjustment based on the actual codec if not packets: return 0 # For MP3, we'll use a different approach since we now know it's LAME MP3 # Assuming each packet contains a fixed number of samples samples_per_frame = 1152 # Standard for MP3 # Count potential MP3 frames in the data frame_count = 0 for packet in packets: data_portion = packet[28:-4] # Skip header and checksum # Look for MP3 frame sync patterns (0xFF 0xFB for MPEG-1 Layer 3) sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)] frame_count += len(sync_positions) # If we can't detect frames, fallback to packet-based estimation if frame_count == 0: # Total unique packets as a conservative estimate unique_packets = len(set(packets)) # Estimate one frame per packet (conservative) frame_count = unique_packets # Estimate duration total_samples = frame_count * samples_per_frame duration_seconds = total_samples / sample_rate return duration_seconds def analyze_audio_stream(file_path): """Complete analysis of an audio stream file.""" hex_stream = parse_hex_stream(file_path) if not hex_stream: return {"error": "Failed to parse hex stream"} packets = identify_packets(hex_stream) if not packets: return {"error": "No valid packets identified"} packet_analyses = [analyze_packet_structure(p) for p in packets] packet_lengths = [p["total_bytes"] for p in packet_analyses] # Group by packet lengths to detect patterns length_count = Counter(packet_lengths) most_common_lengths = length_count.most_common(3) duplicates = detect_duplicates(packets) codec_info = guess_codec(packets, file_path) repetition = detect_repetition_pattern(packets) timestamps = extract_timestamps(packets) # Use detected sample rate if available, otherwise default to 44100 sample_rate = codec_info.get("detected_sample_rate_hz", 44100) duration = calculate_total_duration(packets, sample_rate) # Analyze duplicated packets pattern pairs = [] for i in range(0, len(packets)-1, 2): if i+1 < len(packets): are_identical = packets[i] == packets[i+1] pairs.append(are_identical) pairs_percentage = sum(pairs)/len(pairs)*100 if pairs else 0 # Extract LAME tag info if present for VBR and encoding quality lame_version = None lame_tag_found = False vbr_method = None # Look for LAME tag in first few packets for packet in packets[:min(5, len(packets))]: data_portion = packet[28:-4] # Skip header and checksum # Look for "LAME" or "Lavf" strings in hex if "4c414d45" in data_portion.lower(): # "LAME" in hex lame_tag_found = True # Additional LAME tag parsing could be added here elif "4c617666" in data_portion.lower(): # "Lavf" in hex (LAVF container format) lame_tag_found = True return { "file_name": os.path.basename(file_path), "total_packets": len(packets), "unique_packets": len(set(packets)), "packet_lengths": most_common_lengths, "average_packet_length": sum(packet_lengths) / len(packet_lengths) if packet_lengths else 0, "duplicates": duplicates, "codec_info": codec_info, "repetition_pattern": repetition, "timestamp_pattern": "Available" if any(timestamps) else "Not found", "estimated_duration_seconds": duration, "paired_packet_pattern": f"{pairs_percentage:.1f}% of packets appear in identical pairs", "lame_tag_found": lame_tag_found } def main(): if len(sys.argv) < 2: print("Usage: python audio_analyzer.py [audio_file2.txt] ...") return for file_path in sys.argv[1:]: print(f"\nAnalyzing: {file_path}") print("-" * 50) analysis = analyze_audio_stream(file_path) if "error" in analysis: print(f"Error: {analysis['error']}") continue print(f"File: {analysis['file_name']}") print(f"Total packets: {analysis['total_packets']}") print(f"Unique packets: {analysis['unique_packets']}") print(f"Most common packet lengths (bytes): {analysis['packet_lengths']}") print(f"Average packet length: {analysis['average_packet_length']:.2f} bytes") print(f"Duplicates: {analysis['duplicates']['duplicate_count']} ({analysis['duplicates']['duplicate_percentage']:.1f}%)") print(f"Likely codec: {analysis['codec_info']['likely_codec']}") print(f"Quality setting: {analysis['codec_info']['quality_setting']}") print(f"Codec estimate: {analysis['codec_info']['codec_guess_from_size']}") print(f"MP3 likelihood: {analysis['codec_info'].get('mp3_likelihood', 'Unknown')}") if analysis['codec_info'].get('detected_bitrate_kbps'): print(f"Detected bitrate: {analysis['codec_info']['detected_bitrate_kbps']} kbps") if analysis['codec_info'].get('detected_sample_rate_hz'): print(f"Detected sample rate: {analysis['codec_info']['detected_sample_rate_hz']} Hz") print(f"LAME tag found: {'Yes' if analysis.get('lame_tag_found', False) else 'No'}") print(f"Repetition pattern: {analysis['repetition_pattern']}") print(f"Estimated duration: {analysis['estimated_duration_seconds']:.2f} seconds") print(f"Packet pairing: {analysis['paired_packet_pattern']}") if __name__ == "__main__": main()