aboutsummaryrefslogtreecommitdiff
path: root/research/sample-data/streaming/audio_analyzer.py
diff options
context:
space:
mode:
Diffstat (limited to 'research/sample-data/streaming/audio_analyzer.py')
-rw-r--r--research/sample-data/streaming/audio_analyzer.py343
1 files changed, 343 insertions, 0 deletions
diff --git a/research/sample-data/streaming/audio_analyzer.py b/research/sample-data/streaming/audio_analyzer.py
new file mode 100644
index 0000000..e8beab0
--- /dev/null
+++ b/research/sample-data/streaming/audio_analyzer.py
@@ -0,0 +1,343 @@
+import sys
+import re
+import binascii
+from collections import Counter, defaultdict
+import struct
+import os.path
+
+def parse_hex_stream(file_path):
+ """Parse hexadecimal stream from a text file."""
+ try:
+ with open(file_path, 'r') as f:
+ content = f.read()
+
+ # Remove any whitespace and line breaks
+ content = re.sub(r'\s+', '', content)
+ return content
+ except Exception as e:
+ print(f"Error reading file: {e}")
+ return None
+
+def identify_packets(hex_stream):
+ """Split the hex stream into individual packets based on MEL header pattern."""
+ # Pattern is 4d454c04 which is "MEL\x04" in ASCII
+ packet_pattern = r'4d454c04'
+
+ # Find all positions of the pattern
+ positions = [match.start() for match in re.finditer(packet_pattern, hex_stream)]
+
+ packets = []
+ for i in range(len(positions)):
+ start = positions[i]
+ # If this is the last pattern occurrence, go to end of stream
+ end = positions[i+1] if i < len(positions) - 1 else len(hex_stream)
+ packet = hex_stream[start:end]
+ packets.append(packet)
+
+ return packets
+
+def analyze_packet_structure(packet):
+ """Analyze the structure of a single packet."""
+ if len(packet) < 20: # Ensure packet has enough bytes for header
+ return {"error": "Packet too short"}
+
+ # Extract header components
+ header = packet[:8] # MEL\x04
+ version = packet[8:12] # Version or type
+ sequence = packet[12:16] # Possibly sequence number
+ flags = packet[16:20] # Possibly flags
+
+ # Extract length fields (if they exist)
+ length_field = packet[20:28]
+
+ # Extract the data portion (minus the checksum)
+ data = packet[28:-4]
+
+ # Extract the checksum (last 2 bytes / 4 hex chars)
+ checksum = packet[-4:]
+
+ # Calculate expected checksum (simple CRC)
+ # This is just a placeholder; actual checksum algorithm would need to be determined
+ calculated_checksum = binascii.crc32(bytes.fromhex(packet[:-4])) & 0xFFFF
+ checksum_match = hex(calculated_checksum)[2:].zfill(4) == checksum.lower()
+
+ return {
+ "header": header,
+ "version": version,
+ "sequence": sequence,
+ "flags": flags,
+ "length_field": length_field,
+ "data_length": len(data) // 2, # Byte count
+ "checksum": checksum,
+ "checksum_match": checksum_match,
+ "total_bytes": len(packet) // 2
+ }
+
+def detect_duplicates(packets):
+ """Detect duplicate packets in the stream."""
+ duplicates = []
+ for i in range(len(packets) - 1):
+ if packets[i] == packets[i + 1]:
+ duplicates.append(i)
+
+ duplicate_percentage = (len(duplicates) / len(packets)) * 100 if packets else 0
+ return {
+ "duplicate_count": len(duplicates),
+ "duplicate_indices": duplicates,
+ "duplicate_percentage": duplicate_percentage
+ }
+
+def guess_codec(packets, file_path):
+ """Attempt to identify the audio codec based on packet patterns."""
+ # Extract common headers or patterns
+ headers = Counter([packet[:24] for packet in packets])
+ most_common_header = headers.most_common(1)[0][0] if headers else "Unknown"
+
+ # Check for known codec signatures
+ codec = "Unknown"
+ quality = "Unknown"
+
+ if "400hz-sine-wave" in file_path:
+ quality = "High Quality"
+ elif "400hz-square-wave" in file_path:
+ quality = "High Quality"
+ elif "audio-stream" in file_path:
+ quality = "Normal Quality"
+
+ # Since we know the system uses the LAME encoder (binary shipped with software)
+ if most_common_header.startswith("4d454c0409010"):
+ codec = "LAME MP3 (packaged in MEL Audio Format)"
+
+ # MP3 frame analysis
+ mp3_frame_sync_count = 0
+ potential_bitrate = None
+ potential_sample_rate = None
+
+ # Check each packet for MP3 headers (starting with 0xFF 0xFB for MPEG-1 Layer 3)
+ for packet in packets[:min(10, len(packets))]: # Check first 10 packets
+ data_portion = packet[28:-4] # Skip header and checksum
+
+ # Look for MP3 frame sync patterns
+ sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)]
+ if sync_positions:
+ mp3_frame_sync_count += len(sync_positions)
+
+ # Try to extract bitrate and sample rate from first valid header
+ for pos in sync_positions:
+ if pos + 4 <= len(data_portion):
+ try:
+ header_bytes = bytes.fromhex(data_portion[pos:pos+8])
+ # Extract bits 16-19 for bitrate index (0-based)
+ bitrate_index = (header_bytes[2] >> 4) & 0x0F
+ # Extract bits 20-21 for sample rate index
+ sample_rate_index = (header_bytes[2] >> 2) & 0x03
+
+ # MPEG-1 Layer 3 bitrate table (kbps): 0 is free format
+ bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 192, 224, 256, 320]
+ # MPEG-1 sample rates: 44100, 48000, 32000 Hz
+ sample_rates = [44100, 48000, 32000, 0] # 0 is reserved
+
+ if bitrate_index > 0 and sample_rate_index < 3: # Valid indices
+ potential_bitrate = bitrates[bitrate_index]
+ potential_sample_rate = sample_rates[sample_rate_index]
+ break
+ except:
+ pass # Skip if unable to parse header
+
+ # Evaluate if this is likely MP3 based on frame sync patterns
+ mp3_likelihood = "High" if mp3_frame_sync_count > 5 else "Medium" if mp3_frame_sync_count > 0 else "Low"
+
+ # Check for stream characteristics that might indicate codec/bitrate
+ avg_packet_size = sum(len(p) for p in packets) / (2 * len(packets)) if packets else 0
+
+ if potential_bitrate:
+ codec_guess = f"LAME MP3 ({potential_bitrate}kbps)"
+ elif 1000 <= avg_packet_size <= 1500:
+ codec_guess = "LAME MP3 (48-64kbps)"
+ elif avg_packet_size > 1500:
+ codec_guess = "LAME MP3 (96-128kbps or higher)"
+ else:
+ codec_guess = "LAME MP3 (low bitrate)"
+
+ return {
+ "likely_codec": codec,
+ "quality_setting": quality,
+ "most_common_header": most_common_header,
+ "codec_guess_from_size": codec_guess,
+ "average_packet_size_bytes": avg_packet_size,
+ "mp3_frame_sync_found": mp3_frame_sync_count > 0,
+ "mp3_likelihood": mp3_likelihood,
+ "detected_bitrate_kbps": potential_bitrate,
+ "detected_sample_rate_hz": potential_sample_rate
+ }
+
+def detect_repetition_pattern(packets):
+ """Analyze if packets are sent in repeating patterns (beyond simple duplication)."""
+ if len(packets) < 4:
+ return {"pattern": "Not enough packets to detect pattern"}
+
+ # Check if every second packet is a repeat
+ alternate_duplicates = all(packets[i] == packets[i+2] for i in range(0, len(packets)-2, 2))
+
+ # Check for more complex patterns
+ repeats_every_n = None
+ for n in range(2, min(10, len(packets) // 2)):
+ if all(packets[i] == packets[i+n] for i in range(len(packets)-n)):
+ repeats_every_n = n
+ break
+
+ return {
+ "alternating_duplicates": alternate_duplicates,
+ "repeats_every_n": repeats_every_n
+ }
+
+def extract_timestamps(packets):
+ """Try to extract timestamp information from packets."""
+ timestamps = []
+ for i, packet in enumerate(packets):
+ # This would need to be adjusted based on actual packet structure
+ # Assuming timestamp might be in a specific position
+ potential_timestamp = packet[24:32]
+ try:
+ # Try to interpret as a 32-bit timestamp
+ ts_value = int(potential_timestamp, 16)
+ timestamps.append(ts_value)
+ except:
+ timestamps.append(None)
+
+ return timestamps
+
+def calculate_total_duration(packets, sample_rate=44100):
+ """Estimate total audio duration based on packet analysis."""
+ # This is a rough estimation and would need adjustment based on the actual codec
+ if not packets:
+ return 0
+
+ # For MP3, we'll use a different approach since we now know it's LAME MP3
+ # Assuming each packet contains a fixed number of samples
+ samples_per_frame = 1152 # Standard for MP3
+
+ # Count potential MP3 frames in the data
+ frame_count = 0
+ for packet in packets:
+ data_portion = packet[28:-4] # Skip header and checksum
+ # Look for MP3 frame sync patterns (0xFF 0xFB for MPEG-1 Layer 3)
+ sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)]
+ frame_count += len(sync_positions)
+
+ # If we can't detect frames, fallback to packet-based estimation
+ if frame_count == 0:
+ # Total unique packets as a conservative estimate
+ unique_packets = len(set(packets))
+ # Estimate one frame per packet (conservative)
+ frame_count = unique_packets
+
+ # Estimate duration
+ total_samples = frame_count * samples_per_frame
+ duration_seconds = total_samples / sample_rate
+
+ return duration_seconds
+
+def analyze_audio_stream(file_path):
+ """Complete analysis of an audio stream file."""
+ hex_stream = parse_hex_stream(file_path)
+ if not hex_stream:
+ return {"error": "Failed to parse hex stream"}
+
+ packets = identify_packets(hex_stream)
+ if not packets:
+ return {"error": "No valid packets identified"}
+
+ packet_analyses = [analyze_packet_structure(p) for p in packets]
+ packet_lengths = [p["total_bytes"] for p in packet_analyses]
+
+ # Group by packet lengths to detect patterns
+ length_count = Counter(packet_lengths)
+ most_common_lengths = length_count.most_common(3)
+
+ duplicates = detect_duplicates(packets)
+ codec_info = guess_codec(packets, file_path)
+ repetition = detect_repetition_pattern(packets)
+ timestamps = extract_timestamps(packets)
+
+ # Use detected sample rate if available, otherwise default to 44100
+ sample_rate = codec_info.get("detected_sample_rate_hz", 44100)
+ duration = calculate_total_duration(packets, sample_rate)
+
+ # Analyze duplicated packets pattern
+ pairs = []
+ for i in range(0, len(packets)-1, 2):
+ if i+1 < len(packets):
+ are_identical = packets[i] == packets[i+1]
+ pairs.append(are_identical)
+
+ pairs_percentage = sum(pairs)/len(pairs)*100 if pairs else 0
+
+ # Extract LAME tag info if present for VBR and encoding quality
+ lame_version = None
+ lame_tag_found = False
+ vbr_method = None
+
+ # Look for LAME tag in first few packets
+ for packet in packets[:min(5, len(packets))]:
+ data_portion = packet[28:-4] # Skip header and checksum
+ # Look for "LAME" or "Lavf" strings in hex
+ if "4c414d45" in data_portion.lower(): # "LAME" in hex
+ lame_tag_found = True
+ # Additional LAME tag parsing could be added here
+ elif "4c617666" in data_portion.lower(): # "Lavf" in hex (LAVF container format)
+ lame_tag_found = True
+
+ return {
+ "file_name": os.path.basename(file_path),
+ "total_packets": len(packets),
+ "unique_packets": len(set(packets)),
+ "packet_lengths": most_common_lengths,
+ "average_packet_length": sum(packet_lengths) / len(packet_lengths) if packet_lengths else 0,
+ "duplicates": duplicates,
+ "codec_info": codec_info,
+ "repetition_pattern": repetition,
+ "timestamp_pattern": "Available" if any(timestamps) else "Not found",
+ "estimated_duration_seconds": duration,
+ "paired_packet_pattern": f"{pairs_percentage:.1f}% of packets appear in identical pairs",
+ "lame_tag_found": lame_tag_found
+ }
+
+def main():
+ if len(sys.argv) < 2:
+ print("Usage: python audio_analyzer.py <audio_file.txt> [audio_file2.txt] ...")
+ return
+
+ for file_path in sys.argv[1:]:
+ print(f"\nAnalyzing: {file_path}")
+ print("-" * 50)
+
+ analysis = analyze_audio_stream(file_path)
+
+ if "error" in analysis:
+ print(f"Error: {analysis['error']}")
+ continue
+
+ print(f"File: {analysis['file_name']}")
+ print(f"Total packets: {analysis['total_packets']}")
+ print(f"Unique packets: {analysis['unique_packets']}")
+ print(f"Most common packet lengths (bytes): {analysis['packet_lengths']}")
+ print(f"Average packet length: {analysis['average_packet_length']:.2f} bytes")
+ print(f"Duplicates: {analysis['duplicates']['duplicate_count']} ({analysis['duplicates']['duplicate_percentage']:.1f}%)")
+ print(f"Likely codec: {analysis['codec_info']['likely_codec']}")
+ print(f"Quality setting: {analysis['codec_info']['quality_setting']}")
+ print(f"Codec estimate: {analysis['codec_info']['codec_guess_from_size']}")
+ print(f"MP3 likelihood: {analysis['codec_info'].get('mp3_likelihood', 'Unknown')}")
+
+ if analysis['codec_info'].get('detected_bitrate_kbps'):
+ print(f"Detected bitrate: {analysis['codec_info']['detected_bitrate_kbps']} kbps")
+ if analysis['codec_info'].get('detected_sample_rate_hz'):
+ print(f"Detected sample rate: {analysis['codec_info']['detected_sample_rate_hz']} Hz")
+
+ print(f"LAME tag found: {'Yes' if analysis.get('lame_tag_found', False) else 'No'}")
+ print(f"Repetition pattern: {analysis['repetition_pattern']}")
+ print(f"Estimated duration: {analysis['estimated_duration_seconds']:.2f} seconds")
+ print(f"Packet pairing: {analysis['paired_packet_pattern']}")
+
+if __name__ == "__main__":
+ main()