import re import sys import os import argparse from typing import List, Tuple, Callable, Dict, Generator, Optional from collections import defaultdict, Counter import json import time from itertools import islice import math import random # --- This is pure AI Slop --- def checksum_sum(data: bytes) -> int: return sum(data) % 256 def checksum_xor(data: bytes) -> int: result = 0 for b in data: result ^= b return result def checksum_sum_shifted(data: bytes, shift: int) -> int: return sum((b << shift) & 0xFF for b in data) % 256 def checksum_xor_shifted(data: bytes, shift: int) -> int: result = 0 for b in data: result ^= (b << shift) & 0xFF return result def checksum_weighted_sum(data: bytes) -> int: return sum((i + 1) * b for i, b in enumerate(data)) % 256 def checksum_alt_sum_xor(data: bytes) -> int: s = sum(data) x = 0 for i, b in enumerate(data): if i % 2 == 0: x ^= b else: s ^= b return (s + x) % 256 def checksum_bit_flip_sum(data: bytes) -> int: return sum(b ^ 0xFF for b in data) % 256 # --- Input Parser --- def parse_input_file_lines(filepath: str) -> Tuple[List[Tuple[bytes, int]], Dict]: samples = [] total_lines = 0 with open(filepath, "r") as f: for line in f: total_lines += 1 match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip()) if match: hex_data = bytes.fromhex(match.group(1)) checksum = int(match.group(2), 16) samples.append((hex_data, checksum)) # Return samples and metadata return samples, {"total_lines": total_lines, "valid_samples": len(samples)} # --- Enhanced Input Parser for Large Files --- def parse_input_file_lines_batched(filepath: str, batch_size: int = 1000) -> Generator[List[Tuple[bytes, int]], None, Dict]: """ Parse a large input file in batches to avoid memory issues. Returns a generator that yields batches of samples. """ samples = [] total_lines = 0 valid_samples = 0 try: with open(filepath, "r") as f: for line in f: total_lines += 1 match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip()) if match: hex_data = bytes.fromhex(match.group(1)) checksum = int(match.group(2), 16) samples.append((hex_data, checksum)) valid_samples += 1 # Yield a batch when it reaches the batch size if len(samples) >= batch_size: yield samples samples = [] except Exception as e: print(f"Error reading file: {e}") # Yield any remaining samples if samples: yield samples # Return metadata about the entire file return {"total_lines": total_lines, "valid_samples": valid_samples} # --- Brute Force Evaluation --- def bruteforce_all_methods(samples: List[Tuple[bytes, int]], label_prefix="", file_metadata=None) -> List[Tuple[str, int, int, str]]: methods: List[Tuple[str, Callable[[bytes], int]]] = [ ("SUM", checksum_sum), ("XOR", checksum_xor), ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)), ("SUM<<2", lambda d: checksum_sum_shifted(d, 2)), ("XOR<<1", lambda d: checksum_xor_shifted(d, 1)), ("XOR<<2", lambda d: checksum_xor_shifted(d, 2)), ("WEIGHTED_SUM", checksum_weighted_sum), ("ALT_SUM_XOR", checksum_alt_sum_xor), ("BIT_FLIP_SUM", checksum_bit_flip_sum) ] seen = set() matches = [] sample_methods = defaultdict(list) # Track methods that work for each sample for sample_index, (data, expected) in enumerate(samples): length = len(data) sample_success = [] # Track successful methods for this sample for start in range(length): for end in range(start + 1, length + 1): sliced = data[start:end] label = f"[{start}:{end}]" for name, func in methods: try: result = func(sliced) method_id = f"{name}{label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) sample_success.append((name, start, end)) except Exception: continue # Store methods that work for this sample if sample_success: sample_methods[sample_index] = sample_success # Calculate consistency scores if we have enough samples if len(samples) > 1 and sample_methods: consistency_analysis = analyze_consistency(sample_methods, len(samples)) matches.append(("CONSISTENCY_DATA", 0, 0, json.dumps(consistency_analysis))) # Add file metadata for reporting if file_metadata: file_name = file_metadata.get("file", "unknown") matches.append(("FILE_METADATA", file_name, 0, json.dumps(file_metadata))) return matches # --- Consistency Analysis --- def analyze_consistency(sample_methods: Dict[int, List[Tuple[str, int, int]]], total_samples: int) -> Dict: """Analyze which methods work consistently across different samples.""" method_consistency = defaultdict(int) range_consistency = defaultdict(int) method_range_consistency = defaultdict(int) # Count how many samples each method/range works for for sample_idx, methods in sample_methods.items(): seen_methods = set() seen_ranges = set() seen_method_ranges = set() for method, start, end in methods: if method not in seen_methods: seen_methods.add(method) method_consistency[method] += 1 range_key = f"{start}:{end}" if range_key not in seen_ranges: seen_ranges.add(range_key) range_consistency[range_key] += 1 method_range_key = f"{method}[{start}:{end}]" if method_range_key not in seen_method_ranges: seen_method_ranges.add(method_range_key) method_range_consistency[method_range_key] += 1 # Calculate consistency percentages method_scores = {method: count / total_samples * 100 for method, count in method_consistency.items()} range_scores = {range_key: count / total_samples * 100 for range_key, count in range_consistency.items()} method_range_scores = {mr: count / total_samples * 100 for mr, count in method_range_consistency.items()} # Find the most consistent options best_methods = sorted(method_scores.items(), key=lambda x: x[1], reverse=True)[:5] best_ranges = sorted(range_scores.items(), key=lambda x: x[1], reverse=True)[:5] best_method_ranges = sorted(method_range_scores.items(), key=lambda x: x[1], reverse=True)[:5] return { "best_methods": best_methods, "best_ranges": best_ranges, "best_method_ranges": best_method_ranges, "total_samples": total_samples } # --- Pattern Recognition --- def analyze_patterns(matches: List[Tuple[str, int, int, str]]) -> Dict: patterns = { "methods": Counter(), "ranges": Counter(), "start_positions": Counter(), "end_positions": Counter(), "lengths": Counter() } for method_id, _, _, _ in matches: # Extract method name and range from method_id (e.g., "SUM[0:5]") method_parts = re.match(r'([A-Z_<>0-9]+)\[(\d+):(\d+)\]', method_id) if method_parts: method_name, start, end = method_parts.groups() start_pos, end_pos = int(start), int(end) byte_range = f"[{start}:{end}]" length = end_pos - start_pos patterns["methods"][method_name] += 1 patterns["ranges"][byte_range] += 1 patterns["start_positions"][start_pos] += 1 patterns["end_positions"][end_pos] += 1 patterns["lengths"][length] += 1 return patterns # --- Result Display --- def print_results_with_summary(all_matches: List[Tuple[str, int, int, str]], per_file=False, insights=None, show_full=False): """Print results with optional detailed analysis""" # Extract consistency data and file metadata consistency_data = {} file_metadata = {} filtered_matches = [] for match in all_matches: if match[0] == "CONSISTENCY_DATA" and match[3]: try: file_data = match[3] consistency_data[file_data] = json.loads(file_data) except: pass elif match[0] == "FILE_METADATA" and match[3]: try: metadata = json.loads(match[3]) file_name = match[1] # Use the file name stored in match[1] file_metadata[file_name] = metadata except Exception as e: print(f"Error processing metadata: {e}") else: filtered_matches.append(match) all_matches = filtered_matches if not all_matches: print("āŒ No matches found.") return # Always organize by file per_file_matches = defaultdict(list) for match in all_matches: per_file_matches[match[3]].append(match) # Per-file statistics and pattern analysis for file, matches in per_file_matches.items(): # Get file metadata if available metadata = {} for meta_file, meta_data in file_metadata.items(): if isinstance(meta_file, str) and file in meta_file: # Ensure meta_file is a string metadata = meta_data break # Extract sample lines that matched successfully matched_lines = set(line for _, line, _, _ in matches) # Print file summary with line counts print(f"\n\nšŸ“„ Results for: {file}") if metadata: total_lines = metadata.get("total_lines", "?") valid_samples = metadata.get("valid_samples", len(matched_lines)) success_rate = (len(matched_lines)/valid_samples*100) if valid_samples > 0 else 0 print(f"āœ… Matches Found: {len(matched_lines)}/{valid_samples} samples " + f"({success_rate:.1f}% success rate)") print(f"šŸ“ Total file lines: {total_lines}, Valid samples: {valid_samples}") else: print(f"āœ… Matches Found: {len(matches)}") # Only show individual matches if per_file flag is set AND full details are requested if per_file and show_full: for method_id, line, expected, _ in matches[:20]: # Show only first 20 to avoid flooding print(f"Line {line:03d} | Method: {method_id:20s} | Expected: {expected:02X}") if len(matches) > 20: print(f"... and {len(matches) - 20} more matches") elif per_file: # In condensed mode, just show counts per line line_counts = Counter(line for _, line, _, _ in matches) print(f"Lines with matches: {', '.join(str(l) for l in sorted(line_counts.keys()))}") if len(line_counts) > 10: print(f"Total lines with matches: {len(line_counts)}") # Pattern analysis for this file patterns = analyze_patterns(matches) # Print top methods for this file print("\nšŸ“Š Most Successful Methods in this file:") for method, count in patterns["methods"].most_common(5): print(f"{method:<15} → {count} matches") if show_full: # Print top ranges for this file print("\nšŸ“ Most Common Byte Ranges:") for range_str, count in patterns["ranges"].most_common(5): print(f"{range_str:<10} → {count} matches") # Print common start positions print("\nšŸ” Common Start Positions:") for pos, count in patterns["start_positions"].most_common(5): print(f"Position {pos:<3} → {count} matches") # Print common end positions print("\nšŸ”Ž Common End Positions:") for pos, count in patterns["end_positions"].most_common(5): print(f"Position {pos:<3} → {count} matches") # Print common byte lengths print("\nšŸ“Š Common Byte Lengths:") for length, count in patterns["lengths"].most_common(5): print(f"{length} bytes → {count} matches") # Visual representation of match distribution if patterns["start_positions"] and patterns["end_positions"]: max_pos = max(max(patterns["end_positions"].keys()), max(patterns["start_positions"].keys())) print("\nšŸ“ˆ Match Distribution (frequency by position):") scale = 30 # Reduced scale for more compact output max_count = max(max(patterns["start_positions"].values()), max(patterns["end_positions"].values())) for pos in range(min(max_pos + 1, 40)): # Limit to first 40 positions start_count = patterns["start_positions"].get(pos, 0) end_count = patterns["end_positions"].get(pos, 0) start_bar = 'ā–ˆ' * int((start_count / max_count) * scale) if start_count else '' end_bar = 'ā–‘' * int((end_count / max_count) * scale) if end_count else '' print(f"{pos:2d}: {start_bar}|{end_bar}") print(" ā–ˆā–ˆā–ˆ = start positions, ā–‘ā–‘ā–‘ = end positions") # Print byte-level insights for each sample if available if insights and show_full: file_insights = {k: v for k, v in insights.items() if k.startswith(f"sample_") and file in v.get("method", "")} if file_insights: print("\nšŸ”¬ Byte-Level Analysis:") for key, data in file_insights.items(): parts = key.split('_') sample_id = parts[1] if len(parts) > 1 else "?" print(f"\nSample {sample_id} with {data['method']}[{data['range']}]:") # Show optimal byte changes if data.get("optimal_changes"): print("Optimal byte changes to achieve expected checksum:") for pos, new_val in data["optimal_changes"]: print(f" Change byte at position {pos} from 0x{data['contributions']['byte_contributions'][pos]['original_value']:02X} to 0x{new_val:02X}") else: print("No simple byte changes found to fix checksum") # Global summary (always show this part) print("\n\nšŸ“Š Global Summary of Most Successful Methods:") method_counts = defaultdict(int) for method_id, _, _, _ in all_matches: method_counts[method_id] += 1 sorted_methods = sorted(method_counts.items(), key=lambda x: x[1], reverse=True) for method_id, count in sorted_methods[:5]: # Reduced to top 5 for conciseness print(f"{method_id:<25} → {count} matches") # Show more detailed global pattern summary only in full mode if show_full: all_patterns = analyze_patterns(all_matches) print("\nšŸ“ˆ Global Pattern Summary:") print(f"Total unique methods found: {len(all_patterns['methods'])}") print(f"Total unique byte ranges: {len(all_patterns['ranges'])}") print(f"Most common method: {all_patterns['methods'].most_common(1)[0][0]} with {all_patterns['methods'].most_common(1)[0][1]} matches") # Print global consensus analysis at the end if consistency_data and show_full: print("\n\n🧩 Global Consensus Analysis") print("═══════════════════════════") print("Methods that work across multiple files:") # Collect global statistics from all files global_methods = Counter() global_ranges = Counter() global_method_ranges = Counter() for file_data in consistency_data.values(): for method, score in file_data.get("best_methods", []): global_methods[method] += 1 for range_key, score in file_data.get("best_ranges", []): global_ranges[range_key] += 1 for mr, score in file_data.get("best_method_ranges", []): global_method_ranges[mr] += 1 # Display methods that work across multiple files num_files = len(consistency_data) print(f"\nšŸ“Š Methods that work across multiple files (total files: {num_files}):") for method, count in global_methods.most_common(5): print(f"{method:<15} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") print(f"\nšŸ“ Byte ranges that work across multiple files:") for range_key, count in global_ranges.most_common(5): print(f"[{range_key}] → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") print(f"\nšŸ” Method+Range combinations that work across multiple files:") for mr, count in global_method_ranges.most_common(5): print(f"{mr:<20} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") # Generate a recommended approach if global_method_ranges: best_combo, count = global_method_ranges.most_common(1)[0] if count >= num_files * 0.5: # If it works for at least half the files print(f"\nāœ… Recommended global method: {best_combo}") print(f" This combination works in top 5 for {count}/{num_files} files") else: print("\nāš ļø No single method+range combination works reliably across most files") print(f" Best option ({best_combo}) only works in top 5 for {count}/{num_files} files") # Try to find patterns in the most successful methods if global_methods: best_method, method_count = global_methods.most_common(1)[0] print(f"\nšŸ’” Consider using {best_method} with file-specific byte ranges") print(f" This algorithm appears in top 5 for {method_count}/{num_files} files") # --- Advanced Checksum Algorithms --- def checksum_weighted_sum_parametric(data: bytes, weight_start: float = 1.0, weight_step: float = 1.0) -> int: """Weighted sum with configurable starting weight and step""" return sum(int((weight_start + i * weight_step) * b) % 256 for i, b in enumerate(data)) % 256 def checksum_hybrid_sum_xor(data: bytes, weight: float = 0.5) -> int: """Hybrid checksum using weighted combination of sum and XOR""" sum_result = sum(data) % 256 xor_result = 0 for b in data: xor_result ^= b return int((weight * sum_result + (1 - weight) * xor_result)) % 256 def checksum_adaptive_bit_flip_sum(data: bytes, flip_mask: int = 0xFF) -> int: """Bit flip sum with configurable flip mask""" return sum(b ^ flip_mask for b in data) % 256 def checksum_position_weighted_sum(data: bytes, position_weights: List[float] = None) -> int: """Sum where each byte is weighted by its position in a specific pattern""" if position_weights is None: # Default to alternating weights position_weights = [1.0, 0.5] result = 0 for i, b in enumerate(data): weight = position_weights[i % len(position_weights)] result = (result + int(b * weight)) % 256 return result def evaluate_targeted_algorithms(samples: List[Tuple[bytes, int]], label_prefix="") -> List[Tuple[str, int, int, str]]: """Run a more focused test on the most promising algorithms with fine-tuned parameters""" # Based on consensus, focus testing on these methods with more parameter variations matches = [] seen = set() # Set up parameter variations for testing bit_flip_masks = [0xFF, 0xF0, 0x0F, 0xCC, 0x55, 0xAA] hybrid_weights = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] weight_steps = [0.9, 1.0, 1.1, 1.2, 1.5] pos_weight_patterns = [ [1.0, 0.5], # Alternating [1.0, 1.0, 0.5], # Every third byte gets half weight [1.0, 0.75, 0.5, 0.25] # Descending weights ] # Process each sample with focused algorithms for sample_index, (data, expected) in enumerate(samples): length = len(data) # Instead of trying every possible byte range, focus on the most promising ranges # based on global patterns from previous analysis # Try more specific ranges based on insights ranges_to_try = [] # Focus on common start positions from global analysis: 0-5 and specific ranges for start in [0, 1, 2, 3, 4, 5]: # Try full data range ranges_to_try.append((start, length)) # Try common end points (from previous runs) for end_offset in [0, 1, 2, 4, 8]: if length - end_offset > start + 1: # Ensure valid range ranges_to_try.append((start, length - end_offset)) # Add specific ranges that were successful in multiple files specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)] for start, end in specific_ranges: if start < length and end <= length and start < end: ranges_to_try.append((start, end)) # Process the focused ranges with our most promising algorithms for start, end in ranges_to_try: sliced = data[start:end] label = f"[{start}:{end}]" # Test standard checksum methods that showed promise methods = [ ("WEIGHTED_SUM", lambda d: checksum_weighted_sum(d)), ("ALT_SUM_XOR", lambda d: checksum_alt_sum_xor(d)), ("BIT_FLIP_SUM", lambda d: checksum_bit_flip_sum(d)), ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)) ] # Test the standard methods for name, func in methods: try: result = func(sliced) method_id = f"{name}{label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) except Exception: continue # Test advanced parametric methods for mask in bit_flip_masks: try: result = checksum_adaptive_bit_flip_sum(sliced, mask) method_id = f"BIT_FLIP_SUM({mask:02X}){label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) except Exception: continue for weight in hybrid_weights: try: result = checksum_hybrid_sum_xor(sliced, weight) method_id = f"HYBRID_SUM_XOR({weight:.1f}){label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) except Exception: continue for step in weight_steps: try: result = checksum_weighted_sum_parametric(sliced, 1.0, step) method_id = f"WEIGHTED_SUM_STEP({step:.1f}){label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) except Exception: continue for i, pattern in enumerate(pos_weight_patterns): try: result = checksum_position_weighted_sum(sliced, pattern) method_id = f"POS_WEIGHT_{i+1}{label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) except Exception: continue return matches # --- Byte Change Correlation Analysis --- def analyze_byte_value_correlations(samples: List[Tuple[bytes, int]], max_samples: int = 1000) -> Dict: """ Analyze how changing specific bytes correlates with changes in the checksum. This helps understand the "sensitivity" of the checksum to specific byte positions. """ # Sample if we have too many samples to process if len(samples) > max_samples: print(f"Sampling {max_samples} out of {len(samples)} for correlation analysis") samples = random.sample(samples, max_samples) # Initialize data structures for correlation analysis bytes_by_position = defaultdict(list) checksums_by_position_value = defaultdict(list) correlations = {} position_weights = {} # Gather data by byte position max_length = max(len(data) for data, _ in samples) print(f"Analyzing correlations for {len(samples)} samples with max length {max_length}") # Track all byte values and checksums by position for data, checksum in samples: for pos, value in enumerate(data): bytes_by_position[pos].append(value) checksums_by_position_value[(pos, value)].append(checksum) # Calculate correlation strength for each position for pos in range(max_length): pos_values = bytes_by_position.get(pos, []) if len(pos_values) <= 1: continue # Create value-to-checksum mapping and analyze patterns value_impact = {} checksum_changes = [] # Group by unique values at this position unique_values = set(pos_values) if len(unique_values) <= 1: continue # Analyze how changes in this position correlate with checksums for val in unique_values: checksums = checksums_by_position_value.get((pos, val), []) if checksums: avg_checksum = sum(checksums) / len(checksums) value_impact[val] = avg_checksum # If we have enough data, calculate correlation metrics if len(value_impact) >= 2: # Look for linear relationships xy_pairs = [(val, cs) for val, cs in value_impact.items()] correlation = calculate_correlation_coefficient(xy_pairs) # Look for bit-level patterns (XOR, bit flips) bit_patterns = analyze_bit_patterns(value_impact) correlations[pos] = { "strength": abs(correlation), "direction": "positive" if correlation >= 0 else "negative", "unique_values": len(unique_values), "sample_count": len(pos_values), "bit_patterns": bit_patterns } # Calculate a rough "weight" for this position in checksum calculations pos_weight = abs(correlation) * (len(unique_values) / 256) position_weights[pos] = pos_weight # Sort positions by correlation strength sorted_positions = sorted(correlations.keys(), key=lambda p: correlations[p]["strength"], reverse=True) significant_positions = sorted_positions[:10] # Most influential positions # Build response return { "significant_positions": significant_positions, "position_correlations": {p: correlations[p] for p in significant_positions}, "position_weights": {p: position_weights[p] for p in position_weights if p in significant_positions}, "analyzed_samples": len(samples), "max_length": max_length } def calculate_correlation_coefficient(pairs: List[Tuple[int, int]]) -> float: """Calculate Pearson's correlation coefficient between byte values and checksums.""" if len(pairs) < 2: return 0.0 x_vals = [p[0] for p in pairs] y_vals = [p[1] for p in pairs] n = len(pairs) # Calculate means x_mean = sum(x_vals) / n y_mean = sum(y_vals) / n # Calculate correlation coefficient numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_vals, y_vals)) denominator_x = sum((x - x_mean) ** 2 for x in x_vals) denominator_y = sum((y - y_mean) ** 2 for y in y_vals) if denominator_x == 0 or denominator_y == 0: return 0.0 return numerator / math.sqrt(denominator_x * denominator_y) def analyze_bit_patterns(value_impact: Dict[int, float]) -> Dict: """ Analyze bit-level patterns in how byte changes affect checksums. Identifies patterns like "flipping bit 3 adds 8 to checksum" etc. """ bit_influences = [0.0] * 8 # Influence of each bit position # Calculate average impact when each bit is set vs unset bit_set_checksums = [[] for _ in range(8)] bit_unset_checksums = [[] for _ in range(8)] for value, checksum in value_impact.items(): # Analyze each bit for bit_pos in range(8): bit_mask = 1 << bit_pos if value & bit_mask: # Bit is set bit_set_checksums[bit_pos].append(checksum) else: # Bit is unset bit_unset_checksums[bit_pos].append(checksum) # Calculate average difference per bit for bit_pos in range(8): set_avg = sum(bit_set_checksums[bit_pos]) / len(bit_set_checksums[bit_pos]) if bit_set_checksums[bit_pos] else 0 unset_avg = sum(bit_unset_checksums[bit_pos]) / len(bit_unset_checksums[bit_pos]) if bit_unset_checksums[bit_pos] else 0 if set_avg and unset_avg: influence = set_avg - unset_avg bit_influences[bit_pos] = influence # Determine the bit pattern type pattern_types = { "xor_like": all(abs(bit_influences[i]) >= 0.5 for i in range(8)), "additive": all(bit_influences[i] >= 0 for i in range(8)), "subtractive": all(bit_influences[i] <= 0 for i in range(8)), "weighted": max(abs(b) for b in bit_influences) / (min(abs(b) for b in bit_influences) if min(abs(b) for b in bit_influences) else 1) > 3, } return { "bit_influences": {i: bit_influences[i] for i in range(8)}, "pattern_type": next((ptype for ptype, matches in pattern_types.items() if matches), "mixed"), "most_influential_bit": bit_influences.index(max(bit_influences, key=abs)) } def find_optimal_byte_changes(data: bytes, checksum_func: Callable, expected: int) -> List[Tuple[int, int]]: """ Find the minimal set of byte changes needed to achieve the expected checksum. Returns a list of (position, new_value) tuples. """ base_checksum = checksum_func(data) if base_checksum == expected: return [] # No changes needed # Try changing bytes to match target checksum using sensitivity information # First try single byte changes - this is much faster and most likely case for i in range(len(data)): modified = bytearray(data) target_diff = (expected - base_checksum) % 256 # Try calculating what value this position should have if checksum_func == checksum_sum: # For sum, we can directly calculate needed value new_val = (data[i] + target_diff) % 256 modified[i] = new_val if checksum_func(bytes(modified)) == expected: return [(i, new_val)] elif checksum_func == checksum_xor: # For XOR, direct calculation also works new_val = data[i] ^ (base_checksum ^ expected) modified[i] = new_val if checksum_func(bytes(modified)) == expected: return [(i, new_val)] else: # For other algorithms, try incremental changes or use binary search best_value = None best_diff = 256 # Check common values first, then do a smarter search if needed for test_val in [0, 1, 0xFF, expected, data[i] ^ 0xFF]: if test_val == data[i]: continue modified[i] = test_val new_checksum = checksum_func(bytes(modified)) if new_checksum == expected: return [(i, test_val)] diff = abs((new_checksum - expected) % 256) if diff < best_diff: best_diff = diff best_value = test_val # If we got close, try a more focused search around the promising value if best_diff < 50 and best_value is not None: for offset in range(-10, 11): test_val = (best_value + offset) % 256 if test_val == data[i]: continue modified[i] = test_val new_checksum = checksum_func(bytes(modified)) if new_checksum == expected: return [(i, test_val)] # If single byte changes don't work, try strategic two-byte changes # For performance, we'll limit this to nearby byte combinations for i in range(len(data)): for j in range(i+1, min(i+8, len(data))): # Try up to 7 bytes ahead for i_adj in [-1, 1]: for j_adj in [-1, 1]: modified = bytearray(data) modified[i] = (data[i] + i_adj) % 256 modified[j] = (data[j] + j_adj) % 256 if checksum_func(bytes(modified)) == expected: return [(i, modified[i]), (j, modified[j])] return [] # --- Large-Scale File Analysis --- def analyze_large_file(filepath: str, max_samples=1000) -> Dict: """Analyze a large file efficiently by processing it in batches.""" start_time = time.time() print(f"Starting large-scale analysis of {filepath}...") # Process the file in batches to handle large files batch_gen = parse_input_file_lines_batched(filepath, batch_size=1000) # First batch will be used for detailed analysis first_batch = next(batch_gen, []) if not first_batch: print("No valid samples found in file.") return {} # Collect metadata about the batch batch_metadata = next(batch_gen, {"total_lines": 0, "valid_samples": 0}) # Perform initial algorithm identification on the first batch print(f"Identifying potential checksum algorithms on first {len(first_batch)} samples...") matches = bruteforce_all_methods(first_batch, label_prefix=os.path.basename(filepath)) # Extract the most promising algorithms and ranges patterns = analyze_patterns([m for m in matches if m[0] != "CONSISTENCY_DATA"]) top_methods = patterns["methods"].most_common(3) top_ranges = patterns["ranges"].most_common(3) # Combining top methods with top ranges for focused analysis focused_analysis = [] method_func_map = { "SUM": checksum_sum, "XOR": checksum_xor, "SUM<<1": lambda d: checksum_sum_shifted(d, 1), "SUM<<2": lambda d: checksum_sum_shifted(d, 2), "XOR<<1": lambda d: checksum_xor_shifted(d, 1), "XOR<<2": lambda d: checksum_xor_shifted(d, 2), "WEIGHTED_SUM": checksum_weighted_sum, "ALT_SUM_XOR": checksum_alt_sum_xor, "BIT_FLIP_SUM": checksum_bit_flip_sum } # Collect a sample of data for correlation analysis correlation_samples = first_batch.copy() # Check more batches if we need more samples for correlation analysis batches_processed = 1 while len(correlation_samples) < max_samples: batch = next(batch_gen, None) if batch is None: break correlation_samples.extend(batch[:max_samples - len(correlation_samples)]) batches_processed += 1 if batches_processed >= 10: # Limit to 10 batches for performance break # Perform correlation analysis print(f"Performing byte correlation analysis on {len(correlation_samples)} samples...") correlations = analyze_byte_value_correlations(correlation_samples, max_samples=max_samples) # Test the most likely algorithms on the significant byte positions print("Testing algorithm-position combinations...") for method_name, _ in top_methods: for range_str, _ in top_ranges: range_parts = range_str.strip('[]').split(':') if len(range_parts) == 2: start, end = int(range_parts[0]), int(range_parts[1]) method_func = method_func_map.get(method_name) if method_func: success_count = 0 for data, expected in correlation_samples[:100]: # Test on first 100 samples if len(data) >= end: result = method_func(data[start:end]) if result == expected: success_count += 1 success_rate = success_count / min(100, len(correlation_samples)) focused_analysis.append({ "method": method_name, "range": f"[{start}:{end}]", "success_rate": success_rate, "success_count": success_count }) # Sort by success rate focused_analysis.sort(key=lambda x: x["success_rate"], reverse=True) # Find byte positions that most strongly influence the checksum influential_positions = correlations["significant_positions"][:5] elapsed_time = time.time() - start_time return { "file_name": os.path.basename(filepath), "samples_analyzed": len(correlation_samples), "elapsed_time": elapsed_time, "top_methods": [m[0] for m in top_methods], "top_ranges": [r[0] for r in top_ranges], "focused_analysis": focused_analysis[:5], "influential_positions": influential_positions, "position_correlations": {str(p): correlations["position_correlations"][p] for p in influential_positions}, "byte_pattern_summary": summarize_byte_patterns(correlations), } def summarize_byte_patterns(correlations: Dict) -> Dict: """Summarize patterns in byte correlations to help understand the checksum algorithm.""" if not correlations or "position_correlations" not in correlations: return {} # Identify patterns in how byte positions affect the checksum positions = correlations.get("significant_positions", []) if not positions: return {} # Count pattern types to identify algorithm characteristics pattern_types = Counter() for pos in positions: if pos in correlations["position_correlations"]: bit_patterns = correlations["position_correlations"][pos].get("bit_patterns", {}) pattern_type = bit_patterns.get("pattern_type", "unknown") pattern_types[pattern_type] += 1 # Algorithm characteristics based on patterns primary_pattern = pattern_types.most_common(1)[0][0] if pattern_types else "unknown" algorithm_characteristics = { "xor_like": "XOR-based algorithm (position-independent)", "additive": "Sum-based algorithm (position-independent)", "subtractive": "Subtraction-based algorithm (unusual)", "weighted": "Weighted algorithm (position-dependent)", "mixed": "Mixed algorithm (complex checksum)" } # Check position importance distribution pos_weights = correlations.get("position_weights", {}) weight_values = list(pos_weights.values()) weight_variance = 0 if weight_values: mean_weight = sum(weight_values) / len(weight_values) weight_variance = sum((w - mean_weight) ** 2 for w in weight_values) / len(weight_values) position_dependent = weight_variance > 0.05 return { "dominant_pattern": primary_pattern, "likely_algorithm_type": algorithm_characteristics.get(primary_pattern, "Unknown algorithm type"), "position_dependent": position_dependent, "weight_variance": weight_variance, "recommendation": get_algorithm_recommendation(primary_pattern, position_dependent) } def get_algorithm_recommendation(pattern_type: str, position_dependent: bool) -> str: """Get a recommendation for checksum algorithm based on correlation analysis.""" if pattern_type == "xor_like" and not position_dependent: return "XOR-based checksum recommended" elif pattern_type == "xor_like" and position_dependent: return "Position-dependent XOR (shifted XOR) recommended" elif pattern_type == "additive" and not position_dependent: return "Simple sum checksum recommended" elif pattern_type == "additive" and position_dependent: return "Weighted sum checksum recommended" elif pattern_type == "weighted": return "Complex weighted checksum recommended" else: return "Mixed or complex algorithm recommended, try ALT_SUM_XOR or custom hybrid" def print_large_file_analysis(analysis: Dict): """Print the results of large-file analysis in a readable format.""" print("\nšŸ“Š Large File Analysis Results") print("═══════════════════════════") print(f"File: {analysis.get('file_name', 'Unknown')}") print(f"Samples analyzed: {analysis.get('samples_analyzed', 0)}") print(f"Analysis time: {analysis.get('elapsed_time', 0):.2f} seconds") # Print the top methods and ranges print("\nšŸ” Top Checksum Methods:") for method in analysis.get('top_methods', []): print(f" • {method}") print("\nšŸ“ Top Byte Ranges:") for range_str in analysis.get('top_ranges', []): print(f" • {range_str}") # Print the focused analysis results print("\nāœ… Best Method+Range Combinations:") for combo in analysis.get('focused_analysis', []): print(f" • {combo['method']}{combo['range']} → {combo['success_rate']*100:.1f}% success rate ({combo['success_count']} samples)") # Print the byte pattern summary pattern_summary = analysis.get('byte_pattern_summary', {}) if pattern_summary: print("\n🧠 Algorithm Characteristics:") print(f" Dominant pattern: {pattern_summary.get('dominant_pattern', 'Unknown')}") print(f" Likely algorithm: {pattern_summary.get('likely_algorithm_type', 'Unknown')}") print(f" Position dependent: {'Yes' if pattern_summary.get('position_dependent', False) else 'No'}") print(f"\nšŸ’” Recommendation: {pattern_summary.get('recommendation', 'Unknown')}") # Print influential byte positions print("\nšŸ”¢ Most Influential Byte Positions:") positions = analysis.get('influential_positions', []) pos_correlations = analysis.get('position_correlations', {}) for pos in positions: pos_str = str(pos) if pos_str in pos_correlations: info = pos_correlations[pos_str] print(f" • Position {pos}: {info['strength']:.3f} correlation strength, " + f"{info['direction']} correlation, {info['unique_values']} unique values") # Print bit patterns if available bit_patterns = info.get("bit_patterns", {}) if bit_patterns: most_influential_bit = bit_patterns.get("most_influential_bit", 0) print(f" Most influential bit: {most_influential_bit} (bit {7-most_influential_bit} from left)") # --- Enhanced Folder Processing --- def process_folder_with_limits(folder_path: str, max_total_samples: int = 1000) -> List[Tuple[bytes, int]]: """ Process files in a folder with a limit on total samples. Returns a list of samples up to the specified limit. """ all_samples = [] files_processed = 0 samples_collected = 0 print(f"Processing folder with limit of {max_total_samples} samples...") for file in os.listdir(folder_path): if file.endswith(".txt"): full_path = os.path.join(folder_path, file) try: samples, file_meta = parse_input_file_lines(full_path) # Take only what we need to stay under max_total_samples remaining = max_total_samples - len(all_samples) if remaining <= 0: break if len(samples) > remaining: print(f"Taking {remaining} of {len(samples)} samples from {file}") samples = samples[:remaining] else: print(f"Taking all {len(samples)} samples from {file}") all_samples.extend(samples) files_processed += 1 samples_collected += len(samples) # Stop if we've reached our limit if len(all_samples) >= max_total_samples: break except Exception as e: print(f"Error processing {file}: {e}") print(f"Processed {files_processed} files, collected {samples_collected} samples") return all_samples # --- Main --- if __name__ == "__main__": # Create argument parser parser = argparse.ArgumentParser(description='Analyze checksum algorithms in files.') parser.add_argument('path', help='Path to file or directory to analyze') parser.add_argument('--full', action='store_true', help='Show detailed output with all analyses') parser.add_argument('--byte-analysis', action='store_true', help='Perform byte-level contribution analysis') parser.add_argument('--large', action='store_true', help='Perform large-scale analysis optimized for big files') parser.add_argument('--max-samples', type=int, default=1000, help='Maximum number of samples for intensive analyses (byte-level and large-scale)') args = parser.parse_args() path = args.path show_full = args.full perform_byte_analysis = args.byte_analysis large_analysis = args.large max_samples = args.max_samples all_matches = [] byte_insights = {} if os.path.isdir(path): # Standard brute force - process all samples without limits print("Phase 1: Running standard brute force analysis...") for file in os.listdir(path): if file.endswith(".txt"): full_path = os.path.join(path, file) try: parsed_samples, file_meta = parse_input_file_lines(full_path) # Process all samples for standard analysis match_results = bruteforce_all_methods( parsed_samples, label_prefix=file, file_metadata={"file": file, **file_meta} ) all_matches.extend(match_results) except Exception as e: print(f"Error processing {file}: {e}") # Display standard results print_results_with_summary(all_matches, per_file=True, show_full=show_full) if perform_byte_analysis: # Limit to max_samples for the intensive byte-level analysis print(f"\n\nPhase 2: Running byte-level contribution analysis (limit: {max_samples} samples)...") files_analyzed = 0 total_samples_analyzed = 0 for file in list(os.listdir(path)): # Stop if we've hit our sample limit or analyzed enough files if total_samples_analyzed >= max_samples or files_analyzed >= 3: break if file.endswith(".txt"): full_path = os.path.join(path, file) try: parsed_samples, file_meta = parse_input_file_lines(full_path) if not parsed_samples: print(f"āš ļø No valid samples found in {file}") continue # Determine how many samples to take from this file samples_remaining = max_samples - total_samples_analyzed if samples_remaining <= 0: break samples_to_analyze = parsed_samples if len(parsed_samples) > samples_remaining: print(f"Limiting to {samples_remaining} samples from {file}") samples_to_analyze = parsed_samples[:samples_remaining] else: print(f"Analyzing all {len(parsed_samples)} samples from {file}") total_samples_analyzed += len(samples_to_analyze) files_analyzed += 1 print(f"\nšŸ“„ Analyzing file: {file} ({len(samples_to_analyze)} samples)") match_results, file_insights = evaluate_with_byte_analysis( samples_to_analyze, label_prefix=f"BYTE_ANALYSIS_{file}", detailed=True ) if not file_insights: print(f"āš ļø No byte-level insights found for {file}") byte_insights.update(file_insights) except Exception as e: print(f"āš ļø Error analyzing {file}: {e}") print(f"\nCompleted byte-level analysis on {total_samples_analyzed} samples from {files_analyzed} files") # Overall summary print("\n\n🧬 Byte Contribution Analysis Summary") print("═════════════════════════════════════") print(f"Total samples analyzed: {len(byte_insights)}") print(f"Methods with most influence on checksums:") # Collect statistics on which methods have highest average impact method_impacts = defaultdict(list) for key, data in byte_insights.items(): if "contributions" in data: # Get average of max impacts across all bytes impacts = [info["max_impact"] for info in data["contributions"]["byte_contributions"].values()] if impacts: avg_impact = sum(impacts) / len(impacts) method_impacts[data["method"]].append(avg_impact) # Show average impact by method for method, impacts in method_impacts.items(): if impacts: avg = sum(impacts) / len(impacts) print(f"{method:<15} → Avg impact: {avg:.1f}") elif os.path.isfile(path): parsed_samples, file_meta = parse_input_file_lines(path) file_name = os.path.basename(path) match_results = bruteforce_all_methods( parsed_samples, label_prefix=file_name, file_metadata={"file": file_name, **file_meta} ) all_matches.extend(match_results) # Display results print_results_with_summary(all_matches, per_file=True, show_full=show_full) if perform_byte_analysis and parsed_samples: print("\nRunning byte-level contribution analysis...") try: match_results, file_insights = evaluate_with_byte_analysis( parsed_samples, # Now correctly passing just the samples list label_prefix=f"BYTE_ANALYSIS_{os.path.basename(path)}", detailed=True ) # Print just the first sample's analysis as an example if file_insights: key = next(iter(file_insights)) data = file_insights[key] sample_id = key.split('_')[1] if len(key.split('_')) > 1 else "?" method_name = data["method"] range_str = data["range"] # Get original sample data if int(sample_id) <= len(parsed_samples): data_bytes, expected = parsed_samples[int(sample_id)-1] start, end = map(int, data["range"].split(':')) sliced_data = data_bytes[start:end] print(f"\nByte analysis for Sample {sample_id} using {method_name}[{range_str}]") print_byte_analysis(sliced_data, data["contributions"], method_name) except Exception as e: print(f"āš ļø Error during byte analysis: {e}") if os.path.isdir(path): # ...existing code... if large_analysis: print(f"\n\nPerforming large-scale file analysis (limit: {max_samples} samples per file)...") files_analyzed = 0 for file in list(os.listdir(path)): if files_analyzed >= 5: # Limit to 5 files for performance break if file.endswith(".txt"): full_path = os.path.join(path, file) try: analysis = analyze_large_file(full_path, max_samples=max_samples) print_large_file_analysis(analysis) files_analyzed += 1 except Exception as e: print(f"āš ļø Error during large file analysis of {file}: {e}") elif os.path.isfile(path): # ...existing code... if large_analysis: try: analysis = analyze_large_file(path, max_samples=max_samples) print_large_file_analysis(analysis) except Exception as e: print(f"āš ļø Error during large file analysis: {e}") def evaluate_with_byte_analysis(samples: List[Tuple[bytes, int]], label_prefix="", detailed=False) -> Tuple[List, Dict]: """Analyze which methods work and provide byte-level insights""" matches = [] seen = set() byte_insights = {} # Most promising methods based on previous analysis methods = [ ("WEIGHTED_SUM", checksum_weighted_sum), ("ALT_SUM_XOR", checksum_alt_sum_xor), ("BIT_FLIP_SUM", checksum_bit_flip_sum), ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)), ("HYBRID_SUM_XOR(0.5)", lambda d: checksum_hybrid_sum_xor(d, 0.5)), ("BIT_FLIP_SUM(AA)", lambda d: checksum_adaptive_bit_flip_sum(d, 0xAA)) ] for sample_index, (data, expected) in enumerate(samples[:5]): # Limit to first 5 samples for performance length = len(data) # Focus on the most promising ranges ranges_to_try = [] # Add the specific ranges that were most successful in our analysis specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)] for start, end in specific_ranges: if start < length and end <= length and start < end: ranges_to_try.append((start, end)) # Process each range with our methods for start, end in ranges_to_try: if end > start + 30: # Skip very large ranges to keep analysis fast continue sliced = data[start:end] label = f"[{start}:{end}]" for name, func in methods: try: result = func(sliced) method_id = f"{name}{label}" key = (sample_index, method_id, label_prefix) if result == expected and key not in seen: seen.add(key) matches.append((method_id, sample_index + 1, expected, label_prefix)) # For matching methods, perform byte contribution analysis if detailed: print(f"Analyzing contributions for sample {sample_index+1}, method {method_id}...") byte_contributions = analyze_byte_contributions(sliced, func, expected) optimal_changes = find_optimal_byte_changes(sliced, func, expected) # Store insights and also print them immediately insights_key = f"sample_{sample_index+1}_{name}" byte_insights[insights_key] = { "contributions": byte_contributions, "optimal_changes": optimal_changes, "method": name, "range": f"{start}:{end}", "data": sliced # Store the data slice itself for easier analysis } # Print analysis directly during collection for immediate feedback print_byte_analysis(sliced, byte_contributions, method_id) # If we found compensation values, print them if optimal_changes: print("\nSuggested byte changes:") for pos, new_val in optimal_changes: print(f" Change byte at position {pos} from 0x{sliced[pos]:02X} to 0x{new_val:02X}") # Once we've found and analyzed one matching method for a sample, move on # to keep the output manageable break except Exception as e: continue # If we've already found and analyzed a method for this sample, move on if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()): break # If we've already found and analyzed a method for this sample, move on if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()): continue return matches, byte_insights