diff options
Diffstat (limited to 'research/sample-data/streaming/audio_analyzer.py')
-rw-r--r-- | research/sample-data/streaming/audio_analyzer.py | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/research/sample-data/streaming/audio_analyzer.py b/research/sample-data/streaming/audio_analyzer.py new file mode 100644 index 0000000..e8beab0 --- /dev/null +++ b/research/sample-data/streaming/audio_analyzer.py @@ -0,0 +1,343 @@ +import sys +import re +import binascii +from collections import Counter, defaultdict +import struct +import os.path + +def parse_hex_stream(file_path): + """Parse hexadecimal stream from a text file.""" + try: + with open(file_path, 'r') as f: + content = f.read() + + # Remove any whitespace and line breaks + content = re.sub(r'\s+', '', content) + return content + except Exception as e: + print(f"Error reading file: {e}") + return None + +def identify_packets(hex_stream): + """Split the hex stream into individual packets based on MEL header pattern.""" + # Pattern is 4d454c04 which is "MEL\x04" in ASCII + packet_pattern = r'4d454c04' + + # Find all positions of the pattern + positions = [match.start() for match in re.finditer(packet_pattern, hex_stream)] + + packets = [] + for i in range(len(positions)): + start = positions[i] + # If this is the last pattern occurrence, go to end of stream + end = positions[i+1] if i < len(positions) - 1 else len(hex_stream) + packet = hex_stream[start:end] + packets.append(packet) + + return packets + +def analyze_packet_structure(packet): + """Analyze the structure of a single packet.""" + if len(packet) < 20: # Ensure packet has enough bytes for header + return {"error": "Packet too short"} + + # Extract header components + header = packet[:8] # MEL\x04 + version = packet[8:12] # Version or type + sequence = packet[12:16] # Possibly sequence number + flags = packet[16:20] # Possibly flags + + # Extract length fields (if they exist) + length_field = packet[20:28] + + # Extract the data portion (minus the checksum) + data = packet[28:-4] + + # Extract the checksum (last 2 bytes / 4 hex chars) + checksum = packet[-4:] + + # Calculate expected checksum (simple CRC) + # This is just a placeholder; actual checksum algorithm would need to be determined + calculated_checksum = binascii.crc32(bytes.fromhex(packet[:-4])) & 0xFFFF + checksum_match = hex(calculated_checksum)[2:].zfill(4) == checksum.lower() + + return { + "header": header, + "version": version, + "sequence": sequence, + "flags": flags, + "length_field": length_field, + "data_length": len(data) // 2, # Byte count + "checksum": checksum, + "checksum_match": checksum_match, + "total_bytes": len(packet) // 2 + } + +def detect_duplicates(packets): + """Detect duplicate packets in the stream.""" + duplicates = [] + for i in range(len(packets) - 1): + if packets[i] == packets[i + 1]: + duplicates.append(i) + + duplicate_percentage = (len(duplicates) / len(packets)) * 100 if packets else 0 + return { + "duplicate_count": len(duplicates), + "duplicate_indices": duplicates, + "duplicate_percentage": duplicate_percentage + } + +def guess_codec(packets, file_path): + """Attempt to identify the audio codec based on packet patterns.""" + # Extract common headers or patterns + headers = Counter([packet[:24] for packet in packets]) + most_common_header = headers.most_common(1)[0][0] if headers else "Unknown" + + # Check for known codec signatures + codec = "Unknown" + quality = "Unknown" + + if "400hz-sine-wave" in file_path: + quality = "High Quality" + elif "400hz-square-wave" in file_path: + quality = "High Quality" + elif "audio-stream" in file_path: + quality = "Normal Quality" + + # Since we know the system uses the LAME encoder (binary shipped with software) + if most_common_header.startswith("4d454c0409010"): + codec = "LAME MP3 (packaged in MEL Audio Format)" + + # MP3 frame analysis + mp3_frame_sync_count = 0 + potential_bitrate = None + potential_sample_rate = None + + # Check each packet for MP3 headers (starting with 0xFF 0xFB for MPEG-1 Layer 3) + for packet in packets[:min(10, len(packets))]: # Check first 10 packets + data_portion = packet[28:-4] # Skip header and checksum + + # Look for MP3 frame sync patterns + sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)] + if sync_positions: + mp3_frame_sync_count += len(sync_positions) + + # Try to extract bitrate and sample rate from first valid header + for pos in sync_positions: + if pos + 4 <= len(data_portion): + try: + header_bytes = bytes.fromhex(data_portion[pos:pos+8]) + # Extract bits 16-19 for bitrate index (0-based) + bitrate_index = (header_bytes[2] >> 4) & 0x0F + # Extract bits 20-21 for sample rate index + sample_rate_index = (header_bytes[2] >> 2) & 0x03 + + # MPEG-1 Layer 3 bitrate table (kbps): 0 is free format + bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 192, 224, 256, 320] + # MPEG-1 sample rates: 44100, 48000, 32000 Hz + sample_rates = [44100, 48000, 32000, 0] # 0 is reserved + + if bitrate_index > 0 and sample_rate_index < 3: # Valid indices + potential_bitrate = bitrates[bitrate_index] + potential_sample_rate = sample_rates[sample_rate_index] + break + except: + pass # Skip if unable to parse header + + # Evaluate if this is likely MP3 based on frame sync patterns + mp3_likelihood = "High" if mp3_frame_sync_count > 5 else "Medium" if mp3_frame_sync_count > 0 else "Low" + + # Check for stream characteristics that might indicate codec/bitrate + avg_packet_size = sum(len(p) for p in packets) / (2 * len(packets)) if packets else 0 + + if potential_bitrate: + codec_guess = f"LAME MP3 ({potential_bitrate}kbps)" + elif 1000 <= avg_packet_size <= 1500: + codec_guess = "LAME MP3 (48-64kbps)" + elif avg_packet_size > 1500: + codec_guess = "LAME MP3 (96-128kbps or higher)" + else: + codec_guess = "LAME MP3 (low bitrate)" + + return { + "likely_codec": codec, + "quality_setting": quality, + "most_common_header": most_common_header, + "codec_guess_from_size": codec_guess, + "average_packet_size_bytes": avg_packet_size, + "mp3_frame_sync_found": mp3_frame_sync_count > 0, + "mp3_likelihood": mp3_likelihood, + "detected_bitrate_kbps": potential_bitrate, + "detected_sample_rate_hz": potential_sample_rate + } + +def detect_repetition_pattern(packets): + """Analyze if packets are sent in repeating patterns (beyond simple duplication).""" + if len(packets) < 4: + return {"pattern": "Not enough packets to detect pattern"} + + # Check if every second packet is a repeat + alternate_duplicates = all(packets[i] == packets[i+2] for i in range(0, len(packets)-2, 2)) + + # Check for more complex patterns + repeats_every_n = None + for n in range(2, min(10, len(packets) // 2)): + if all(packets[i] == packets[i+n] for i in range(len(packets)-n)): + repeats_every_n = n + break + + return { + "alternating_duplicates": alternate_duplicates, + "repeats_every_n": repeats_every_n + } + +def extract_timestamps(packets): + """Try to extract timestamp information from packets.""" + timestamps = [] + for i, packet in enumerate(packets): + # This would need to be adjusted based on actual packet structure + # Assuming timestamp might be in a specific position + potential_timestamp = packet[24:32] + try: + # Try to interpret as a 32-bit timestamp + ts_value = int(potential_timestamp, 16) + timestamps.append(ts_value) + except: + timestamps.append(None) + + return timestamps + +def calculate_total_duration(packets, sample_rate=44100): + """Estimate total audio duration based on packet analysis.""" + # This is a rough estimation and would need adjustment based on the actual codec + if not packets: + return 0 + + # For MP3, we'll use a different approach since we now know it's LAME MP3 + # Assuming each packet contains a fixed number of samples + samples_per_frame = 1152 # Standard for MP3 + + # Count potential MP3 frames in the data + frame_count = 0 + for packet in packets: + data_portion = packet[28:-4] # Skip header and checksum + # Look for MP3 frame sync patterns (0xFF 0xFB for MPEG-1 Layer 3) + sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)] + frame_count += len(sync_positions) + + # If we can't detect frames, fallback to packet-based estimation + if frame_count == 0: + # Total unique packets as a conservative estimate + unique_packets = len(set(packets)) + # Estimate one frame per packet (conservative) + frame_count = unique_packets + + # Estimate duration + total_samples = frame_count * samples_per_frame + duration_seconds = total_samples / sample_rate + + return duration_seconds + +def analyze_audio_stream(file_path): + """Complete analysis of an audio stream file.""" + hex_stream = parse_hex_stream(file_path) + if not hex_stream: + return {"error": "Failed to parse hex stream"} + + packets = identify_packets(hex_stream) + if not packets: + return {"error": "No valid packets identified"} + + packet_analyses = [analyze_packet_structure(p) for p in packets] + packet_lengths = [p["total_bytes"] for p in packet_analyses] + + # Group by packet lengths to detect patterns + length_count = Counter(packet_lengths) + most_common_lengths = length_count.most_common(3) + + duplicates = detect_duplicates(packets) + codec_info = guess_codec(packets, file_path) + repetition = detect_repetition_pattern(packets) + timestamps = extract_timestamps(packets) + + # Use detected sample rate if available, otherwise default to 44100 + sample_rate = codec_info.get("detected_sample_rate_hz", 44100) + duration = calculate_total_duration(packets, sample_rate) + + # Analyze duplicated packets pattern + pairs = [] + for i in range(0, len(packets)-1, 2): + if i+1 < len(packets): + are_identical = packets[i] == packets[i+1] + pairs.append(are_identical) + + pairs_percentage = sum(pairs)/len(pairs)*100 if pairs else 0 + + # Extract LAME tag info if present for VBR and encoding quality + lame_version = None + lame_tag_found = False + vbr_method = None + + # Look for LAME tag in first few packets + for packet in packets[:min(5, len(packets))]: + data_portion = packet[28:-4] # Skip header and checksum + # Look for "LAME" or "Lavf" strings in hex + if "4c414d45" in data_portion.lower(): # "LAME" in hex + lame_tag_found = True + # Additional LAME tag parsing could be added here + elif "4c617666" in data_portion.lower(): # "Lavf" in hex (LAVF container format) + lame_tag_found = True + + return { + "file_name": os.path.basename(file_path), + "total_packets": len(packets), + "unique_packets": len(set(packets)), + "packet_lengths": most_common_lengths, + "average_packet_length": sum(packet_lengths) / len(packet_lengths) if packet_lengths else 0, + "duplicates": duplicates, + "codec_info": codec_info, + "repetition_pattern": repetition, + "timestamp_pattern": "Available" if any(timestamps) else "Not found", + "estimated_duration_seconds": duration, + "paired_packet_pattern": f"{pairs_percentage:.1f}% of packets appear in identical pairs", + "lame_tag_found": lame_tag_found + } + +def main(): + if len(sys.argv) < 2: + print("Usage: python audio_analyzer.py <audio_file.txt> [audio_file2.txt] ...") + return + + for file_path in sys.argv[1:]: + print(f"\nAnalyzing: {file_path}") + print("-" * 50) + + analysis = analyze_audio_stream(file_path) + + if "error" in analysis: + print(f"Error: {analysis['error']}") + continue + + print(f"File: {analysis['file_name']}") + print(f"Total packets: {analysis['total_packets']}") + print(f"Unique packets: {analysis['unique_packets']}") + print(f"Most common packet lengths (bytes): {analysis['packet_lengths']}") + print(f"Average packet length: {analysis['average_packet_length']:.2f} bytes") + print(f"Duplicates: {analysis['duplicates']['duplicate_count']} ({analysis['duplicates']['duplicate_percentage']:.1f}%)") + print(f"Likely codec: {analysis['codec_info']['likely_codec']}") + print(f"Quality setting: {analysis['codec_info']['quality_setting']}") + print(f"Codec estimate: {analysis['codec_info']['codec_guess_from_size']}") + print(f"MP3 likelihood: {analysis['codec_info'].get('mp3_likelihood', 'Unknown')}") + + if analysis['codec_info'].get('detected_bitrate_kbps'): + print(f"Detected bitrate: {analysis['codec_info']['detected_bitrate_kbps']} kbps") + if analysis['codec_info'].get('detected_sample_rate_hz'): + print(f"Detected sample rate: {analysis['codec_info']['detected_sample_rate_hz']} Hz") + + print(f"LAME tag found: {'Yes' if analysis.get('lame_tag_found', False) else 'No'}") + print(f"Repetition pattern: {analysis['repetition_pattern']}") + print(f"Estimated duration: {analysis['estimated_duration_seconds']:.2f} seconds") + print(f"Packet pairing: {analysis['paired_packet_pattern']}") + +if __name__ == "__main__": + main() |