diff options
Diffstat (limited to 'research/bad-bruteforcing/idiot.py')
-rw-r--r-- | research/bad-bruteforcing/idiot.py | 1324 |
1 files changed, 1324 insertions, 0 deletions
diff --git a/research/bad-bruteforcing/idiot.py b/research/bad-bruteforcing/idiot.py new file mode 100644 index 0000000..7ead36e --- /dev/null +++ b/research/bad-bruteforcing/idiot.py @@ -0,0 +1,1324 @@ +import re +import sys +import os +import argparse +from typing import List, Tuple, Callable, Dict, Generator, Optional +from collections import defaultdict, Counter +import json +import time +from itertools import islice +import math +import random + +# --- This is pure AI Slop --- +def checksum_sum(data: bytes) -> int: + return sum(data) % 256 + +def checksum_xor(data: bytes) -> int: + result = 0 + for b in data: + result ^= b + return result + +def checksum_sum_shifted(data: bytes, shift: int) -> int: + return sum((b << shift) & 0xFF for b in data) % 256 + +def checksum_xor_shifted(data: bytes, shift: int) -> int: + result = 0 + for b in data: + result ^= (b << shift) & 0xFF + return result + +def checksum_weighted_sum(data: bytes) -> int: + return sum((i + 1) * b for i, b in enumerate(data)) % 256 + +def checksum_alt_sum_xor(data: bytes) -> int: + s = sum(data) + x = 0 + for i, b in enumerate(data): + if i % 2 == 0: + x ^= b + else: + s ^= b + return (s + x) % 256 + +def checksum_bit_flip_sum(data: bytes) -> int: + return sum(b ^ 0xFF for b in data) % 256 + +# --- Input Parser --- +def parse_input_file_lines(filepath: str) -> Tuple[List[Tuple[bytes, int]], Dict]: + samples = [] + total_lines = 0 + with open(filepath, "r") as f: + for line in f: + total_lines += 1 + match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip()) + if match: + hex_data = bytes.fromhex(match.group(1)) + checksum = int(match.group(2), 16) + samples.append((hex_data, checksum)) + + # Return samples and metadata + return samples, {"total_lines": total_lines, "valid_samples": len(samples)} + +# --- Enhanced Input Parser for Large Files --- +def parse_input_file_lines_batched(filepath: str, batch_size: int = 1000) -> Generator[List[Tuple[bytes, int]], None, Dict]: + """ + Parse a large input file in batches to avoid memory issues. + Returns a generator that yields batches of samples. + """ + samples = [] + total_lines = 0 + valid_samples = 0 + + try: + with open(filepath, "r") as f: + for line in f: + total_lines += 1 + match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip()) + if match: + hex_data = bytes.fromhex(match.group(1)) + checksum = int(match.group(2), 16) + samples.append((hex_data, checksum)) + valid_samples += 1 + + # Yield a batch when it reaches the batch size + if len(samples) >= batch_size: + yield samples + samples = [] + except Exception as e: + print(f"Error reading file: {e}") + + # Yield any remaining samples + if samples: + yield samples + + # Return metadata about the entire file + return {"total_lines": total_lines, "valid_samples": valid_samples} + +# --- Brute Force Evaluation --- +def bruteforce_all_methods(samples: List[Tuple[bytes, int]], label_prefix="", file_metadata=None) -> List[Tuple[str, int, int, str]]: + methods: List[Tuple[str, Callable[[bytes], int]]] = [ + ("SUM", checksum_sum), + ("XOR", checksum_xor), + ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)), + ("SUM<<2", lambda d: checksum_sum_shifted(d, 2)), + ("XOR<<1", lambda d: checksum_xor_shifted(d, 1)), + ("XOR<<2", lambda d: checksum_xor_shifted(d, 2)), + ("WEIGHTED_SUM", checksum_weighted_sum), + ("ALT_SUM_XOR", checksum_alt_sum_xor), + ("BIT_FLIP_SUM", checksum_bit_flip_sum) + ] + + seen = set() + matches = [] + sample_methods = defaultdict(list) # Track methods that work for each sample + + for sample_index, (data, expected) in enumerate(samples): + length = len(data) + sample_success = [] # Track successful methods for this sample + + for start in range(length): + for end in range(start + 1, length + 1): + sliced = data[start:end] + label = f"[{start}:{end}]" + for name, func in methods: + try: + result = func(sliced) + method_id = f"{name}{label}" + key = (sample_index, method_id, label_prefix) + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + sample_success.append((name, start, end)) + except Exception: + continue + + # Store methods that work for this sample + if sample_success: + sample_methods[sample_index] = sample_success + + # Calculate consistency scores if we have enough samples + if len(samples) > 1 and sample_methods: + consistency_analysis = analyze_consistency(sample_methods, len(samples)) + matches.append(("CONSISTENCY_DATA", 0, 0, json.dumps(consistency_analysis))) + + # Add file metadata for reporting + if file_metadata: + file_name = file_metadata.get("file", "unknown") + matches.append(("FILE_METADATA", file_name, 0, json.dumps(file_metadata))) + + return matches + +# --- Consistency Analysis --- +def analyze_consistency(sample_methods: Dict[int, List[Tuple[str, int, int]]], total_samples: int) -> Dict: + """Analyze which methods work consistently across different samples.""" + method_consistency = defaultdict(int) + range_consistency = defaultdict(int) + method_range_consistency = defaultdict(int) + + # Count how many samples each method/range works for + for sample_idx, methods in sample_methods.items(): + seen_methods = set() + seen_ranges = set() + seen_method_ranges = set() + + for method, start, end in methods: + if method not in seen_methods: + seen_methods.add(method) + method_consistency[method] += 1 + + range_key = f"{start}:{end}" + if range_key not in seen_ranges: + seen_ranges.add(range_key) + range_consistency[range_key] += 1 + + method_range_key = f"{method}[{start}:{end}]" + if method_range_key not in seen_method_ranges: + seen_method_ranges.add(method_range_key) + method_range_consistency[method_range_key] += 1 + + # Calculate consistency percentages + method_scores = {method: count / total_samples * 100 for method, count in method_consistency.items()} + range_scores = {range_key: count / total_samples * 100 for range_key, count in range_consistency.items()} + method_range_scores = {mr: count / total_samples * 100 for mr, count in method_range_consistency.items()} + + # Find the most consistent options + best_methods = sorted(method_scores.items(), key=lambda x: x[1], reverse=True)[:5] + best_ranges = sorted(range_scores.items(), key=lambda x: x[1], reverse=True)[:5] + best_method_ranges = sorted(method_range_scores.items(), key=lambda x: x[1], reverse=True)[:5] + + return { + "best_methods": best_methods, + "best_ranges": best_ranges, + "best_method_ranges": best_method_ranges, + "total_samples": total_samples + } + +# --- Pattern Recognition --- +def analyze_patterns(matches: List[Tuple[str, int, int, str]]) -> Dict: + patterns = { + "methods": Counter(), + "ranges": Counter(), + "start_positions": Counter(), + "end_positions": Counter(), + "lengths": Counter() + } + + for method_id, _, _, _ in matches: + # Extract method name and range from method_id (e.g., "SUM[0:5]") + method_parts = re.match(r'([A-Z_<>0-9]+)\[(\d+):(\d+)\]', method_id) + if method_parts: + method_name, start, end = method_parts.groups() + start_pos, end_pos = int(start), int(end) + byte_range = f"[{start}:{end}]" + length = end_pos - start_pos + + patterns["methods"][method_name] += 1 + patterns["ranges"][byte_range] += 1 + patterns["start_positions"][start_pos] += 1 + patterns["end_positions"][end_pos] += 1 + patterns["lengths"][length] += 1 + + return patterns + +# --- Result Display --- +def print_results_with_summary(all_matches: List[Tuple[str, int, int, str]], per_file=False, insights=None, show_full=False): + """Print results with optional detailed analysis""" + # Extract consistency data and file metadata + consistency_data = {} + file_metadata = {} + filtered_matches = [] + + for match in all_matches: + if match[0] == "CONSISTENCY_DATA" and match[3]: + try: + file_data = match[3] + consistency_data[file_data] = json.loads(file_data) + except: + pass + elif match[0] == "FILE_METADATA" and match[3]: + try: + metadata = json.loads(match[3]) + file_name = match[1] # Use the file name stored in match[1] + file_metadata[file_name] = metadata + except Exception as e: + print(f"Error processing metadata: {e}") + else: + filtered_matches.append(match) + + all_matches = filtered_matches + + if not all_matches: + print("ā No matches found.") + return + + # Always organize by file + per_file_matches = defaultdict(list) + for match in all_matches: + per_file_matches[match[3]].append(match) + + # Per-file statistics and pattern analysis + for file, matches in per_file_matches.items(): + # Get file metadata if available + metadata = {} + for meta_file, meta_data in file_metadata.items(): + if isinstance(meta_file, str) and file in meta_file: # Ensure meta_file is a string + metadata = meta_data + break + + # Extract sample lines that matched successfully + matched_lines = set(line for _, line, _, _ in matches) + + # Print file summary with line counts + print(f"\n\nš Results for: {file}") + if metadata: + total_lines = metadata.get("total_lines", "?") + valid_samples = metadata.get("valid_samples", len(matched_lines)) + success_rate = (len(matched_lines)/valid_samples*100) if valid_samples > 0 else 0 + print(f"ā
Matches Found: {len(matched_lines)}/{valid_samples} samples " + + f"({success_rate:.1f}% success rate)") + print(f"š Total file lines: {total_lines}, Valid samples: {valid_samples}") + else: + print(f"ā
Matches Found: {len(matches)}") + + # Only show individual matches if per_file flag is set AND full details are requested + if per_file and show_full: + for method_id, line, expected, _ in matches[:20]: # Show only first 20 to avoid flooding + print(f"Line {line:03d} | Method: {method_id:20s} | Expected: {expected:02X}") + if len(matches) > 20: + print(f"... and {len(matches) - 20} more matches") + elif per_file: + # In condensed mode, just show counts per line + line_counts = Counter(line for _, line, _, _ in matches) + print(f"Lines with matches: {', '.join(str(l) for l in sorted(line_counts.keys()))}") + if len(line_counts) > 10: + print(f"Total lines with matches: {len(line_counts)}") + + # Pattern analysis for this file + patterns = analyze_patterns(matches) + + # Print top methods for this file + print("\nš Most Successful Methods in this file:") + for method, count in patterns["methods"].most_common(5): + print(f"{method:<15} ā {count} matches") + + if show_full: + # Print top ranges for this file + print("\nš Most Common Byte Ranges:") + for range_str, count in patterns["ranges"].most_common(5): + print(f"{range_str:<10} ā {count} matches") + + # Print common start positions + print("\nš Common Start Positions:") + for pos, count in patterns["start_positions"].most_common(5): + print(f"Position {pos:<3} ā {count} matches") + + # Print common end positions + print("\nš Common End Positions:") + for pos, count in patterns["end_positions"].most_common(5): + print(f"Position {pos:<3} ā {count} matches") + + # Print common byte lengths + print("\nš Common Byte Lengths:") + for length, count in patterns["lengths"].most_common(5): + print(f"{length} bytes ā {count} matches") + + # Visual representation of match distribution + if patterns["start_positions"] and patterns["end_positions"]: + max_pos = max(max(patterns["end_positions"].keys()), + max(patterns["start_positions"].keys())) + print("\nš Match Distribution (frequency by position):") + scale = 30 # Reduced scale for more compact output + max_count = max(max(patterns["start_positions"].values()), + max(patterns["end_positions"].values())) + for pos in range(min(max_pos + 1, 40)): # Limit to first 40 positions + start_count = patterns["start_positions"].get(pos, 0) + end_count = patterns["end_positions"].get(pos, 0) + start_bar = 'ā' * int((start_count / max_count) * scale) if start_count else '' + end_bar = 'ā' * int((end_count / max_count) * scale) if end_count else '' + print(f"{pos:2d}: {start_bar}|{end_bar}") + print(" āāā = start positions, āāā = end positions") + + # Print byte-level insights for each sample if available + if insights and show_full: + file_insights = {k: v for k, v in insights.items() if k.startswith(f"sample_") and file in v.get("method", "")} + if file_insights: + print("\nš¬ Byte-Level Analysis:") + for key, data in file_insights.items(): + parts = key.split('_') + sample_id = parts[1] if len(parts) > 1 else "?" + print(f"\nSample {sample_id} with {data['method']}[{data['range']}]:") + + # Show optimal byte changes + if data.get("optimal_changes"): + print("Optimal byte changes to achieve expected checksum:") + for pos, new_val in data["optimal_changes"]: + print(f" Change byte at position {pos} from 0x{data['contributions']['byte_contributions'][pos]['original_value']:02X} to 0x{new_val:02X}") + else: + print("No simple byte changes found to fix checksum") + + # Global summary (always show this part) + print("\n\nš Global Summary of Most Successful Methods:") + method_counts = defaultdict(int) + for method_id, _, _, _ in all_matches: + method_counts[method_id] += 1 + + sorted_methods = sorted(method_counts.items(), key=lambda x: x[1], reverse=True) + for method_id, count in sorted_methods[:5]: # Reduced to top 5 for conciseness + print(f"{method_id:<25} ā {count} matches") + + # Show more detailed global pattern summary only in full mode + if show_full: + all_patterns = analyze_patterns(all_matches) + print("\nš Global Pattern Summary:") + print(f"Total unique methods found: {len(all_patterns['methods'])}") + print(f"Total unique byte ranges: {len(all_patterns['ranges'])}") + print(f"Most common method: {all_patterns['methods'].most_common(1)[0][0]} with {all_patterns['methods'].most_common(1)[0][1]} matches") + + # Print global consensus analysis at the end + if consistency_data and show_full: + print("\n\nš§© Global Consensus Analysis") + print("āāāāāāāāāāāāāāāāāāāāāāāāāāā") + print("Methods that work across multiple files:") + + # Collect global statistics from all files + global_methods = Counter() + global_ranges = Counter() + global_method_ranges = Counter() + + for file_data in consistency_data.values(): + for method, score in file_data.get("best_methods", []): + global_methods[method] += 1 + for range_key, score in file_data.get("best_ranges", []): + global_ranges[range_key] += 1 + for mr, score in file_data.get("best_method_ranges", []): + global_method_ranges[mr] += 1 + + # Display methods that work across multiple files + num_files = len(consistency_data) + print(f"\nš Methods that work across multiple files (total files: {num_files}):") + for method, count in global_methods.most_common(5): + print(f"{method:<15} ā appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") + + print(f"\nš Byte ranges that work across multiple files:") + for range_key, count in global_ranges.most_common(5): + print(f"[{range_key}] ā appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") + + print(f"\nš Method+Range combinations that work across multiple files:") + for mr, count in global_method_ranges.most_common(5): + print(f"{mr:<20} ā appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") + + # Generate a recommended approach + if global_method_ranges: + best_combo, count = global_method_ranges.most_common(1)[0] + if count >= num_files * 0.5: # If it works for at least half the files + print(f"\nā
Recommended global method: {best_combo}") + print(f" This combination works in top 5 for {count}/{num_files} files") + else: + print("\nā ļø No single method+range combination works reliably across most files") + print(f" Best option ({best_combo}) only works in top 5 for {count}/{num_files} files") + + # Try to find patterns in the most successful methods + if global_methods: + best_method, method_count = global_methods.most_common(1)[0] + print(f"\nš” Consider using {best_method} with file-specific byte ranges") + print(f" This algorithm appears in top 5 for {method_count}/{num_files} files") + +# --- Advanced Checksum Algorithms --- +def checksum_weighted_sum_parametric(data: bytes, weight_start: float = 1.0, weight_step: float = 1.0) -> int: + """Weighted sum with configurable starting weight and step""" + return sum(int((weight_start + i * weight_step) * b) % 256 for i, b in enumerate(data)) % 256 + +def checksum_hybrid_sum_xor(data: bytes, weight: float = 0.5) -> int: + """Hybrid checksum using weighted combination of sum and XOR""" + sum_result = sum(data) % 256 + xor_result = 0 + for b in data: + xor_result ^= b + return int((weight * sum_result + (1 - weight) * xor_result)) % 256 + +def checksum_adaptive_bit_flip_sum(data: bytes, flip_mask: int = 0xFF) -> int: + """Bit flip sum with configurable flip mask""" + return sum(b ^ flip_mask for b in data) % 256 + +def checksum_position_weighted_sum(data: bytes, position_weights: List[float] = None) -> int: + """Sum where each byte is weighted by its position in a specific pattern""" + if position_weights is None: + # Default to alternating weights + position_weights = [1.0, 0.5] + + result = 0 + for i, b in enumerate(data): + weight = position_weights[i % len(position_weights)] + result = (result + int(b * weight)) % 256 + return result + +def evaluate_targeted_algorithms(samples: List[Tuple[bytes, int]], label_prefix="") -> List[Tuple[str, int, int, str]]: + """Run a more focused test on the most promising algorithms with fine-tuned parameters""" + + # Based on consensus, focus testing on these methods with more parameter variations + matches = [] + seen = set() + + # Set up parameter variations for testing + bit_flip_masks = [0xFF, 0xF0, 0x0F, 0xCC, 0x55, 0xAA] + hybrid_weights = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] + weight_steps = [0.9, 1.0, 1.1, 1.2, 1.5] + pos_weight_patterns = [ + [1.0, 0.5], # Alternating + [1.0, 1.0, 0.5], # Every third byte gets half weight + [1.0, 0.75, 0.5, 0.25] # Descending weights + ] + + # Process each sample with focused algorithms + for sample_index, (data, expected) in enumerate(samples): + length = len(data) + + # Instead of trying every possible byte range, focus on the most promising ranges + # based on global patterns from previous analysis + + # Try more specific ranges based on insights + ranges_to_try = [] + + # Focus on common start positions from global analysis: 0-5 and specific ranges + for start in [0, 1, 2, 3, 4, 5]: + # Try full data range + ranges_to_try.append((start, length)) + + # Try common end points (from previous runs) + for end_offset in [0, 1, 2, 4, 8]: + if length - end_offset > start + 1: # Ensure valid range + ranges_to_try.append((start, length - end_offset)) + + # Add specific ranges that were successful in multiple files + specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)] + for start, end in specific_ranges: + if start < length and end <= length and start < end: + ranges_to_try.append((start, end)) + + # Process the focused ranges with our most promising algorithms + for start, end in ranges_to_try: + sliced = data[start:end] + label = f"[{start}:{end}]" + + # Test standard checksum methods that showed promise + methods = [ + ("WEIGHTED_SUM", lambda d: checksum_weighted_sum(d)), + ("ALT_SUM_XOR", lambda d: checksum_alt_sum_xor(d)), + ("BIT_FLIP_SUM", lambda d: checksum_bit_flip_sum(d)), + ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)) + ] + + # Test the standard methods + for name, func in methods: + try: + result = func(sliced) + method_id = f"{name}{label}" + key = (sample_index, method_id, label_prefix) + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + except Exception: + continue + + # Test advanced parametric methods + for mask in bit_flip_masks: + try: + result = checksum_adaptive_bit_flip_sum(sliced, mask) + method_id = f"BIT_FLIP_SUM({mask:02X}){label}" + key = (sample_index, method_id, label_prefix) + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + except Exception: + continue + + for weight in hybrid_weights: + try: + result = checksum_hybrid_sum_xor(sliced, weight) + method_id = f"HYBRID_SUM_XOR({weight:.1f}){label}" + key = (sample_index, method_id, label_prefix) + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + except Exception: + continue + + for step in weight_steps: + try: + result = checksum_weighted_sum_parametric(sliced, 1.0, step) + method_id = f"WEIGHTED_SUM_STEP({step:.1f}){label}" + key = (sample_index, method_id, label_prefix) + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + except Exception: + continue + + for i, pattern in enumerate(pos_weight_patterns): + try: + result = checksum_position_weighted_sum(sliced, pattern) + method_id = f"POS_WEIGHT_{i+1}{label}" + key = (sample_index, method_id, label_prefix) + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + except Exception: + continue + + return matches + +# --- Byte Change Correlation Analysis --- +def analyze_byte_value_correlations(samples: List[Tuple[bytes, int]], max_samples: int = 1000) -> Dict: + """ + Analyze how changing specific bytes correlates with changes in the checksum. + This helps understand the "sensitivity" of the checksum to specific byte positions. + """ + # Sample if we have too many samples to process + if len(samples) > max_samples: + print(f"Sampling {max_samples} out of {len(samples)} for correlation analysis") + samples = random.sample(samples, max_samples) + + # Initialize data structures for correlation analysis + bytes_by_position = defaultdict(list) + checksums_by_position_value = defaultdict(list) + correlations = {} + position_weights = {} + + # Gather data by byte position + max_length = max(len(data) for data, _ in samples) + print(f"Analyzing correlations for {len(samples)} samples with max length {max_length}") + + # Track all byte values and checksums by position + for data, checksum in samples: + for pos, value in enumerate(data): + bytes_by_position[pos].append(value) + checksums_by_position_value[(pos, value)].append(checksum) + + # Calculate correlation strength for each position + for pos in range(max_length): + pos_values = bytes_by_position.get(pos, []) + if len(pos_values) <= 1: + continue + + # Create value-to-checksum mapping and analyze patterns + value_impact = {} + checksum_changes = [] + + # Group by unique values at this position + unique_values = set(pos_values) + if len(unique_values) <= 1: + continue + + # Analyze how changes in this position correlate with checksums + for val in unique_values: + checksums = checksums_by_position_value.get((pos, val), []) + if checksums: + avg_checksum = sum(checksums) / len(checksums) + value_impact[val] = avg_checksum + + # If we have enough data, calculate correlation metrics + if len(value_impact) >= 2: + # Look for linear relationships + xy_pairs = [(val, cs) for val, cs in value_impact.items()] + correlation = calculate_correlation_coefficient(xy_pairs) + + # Look for bit-level patterns (XOR, bit flips) + bit_patterns = analyze_bit_patterns(value_impact) + + correlations[pos] = { + "strength": abs(correlation), + "direction": "positive" if correlation >= 0 else "negative", + "unique_values": len(unique_values), + "sample_count": len(pos_values), + "bit_patterns": bit_patterns + } + + # Calculate a rough "weight" for this position in checksum calculations + pos_weight = abs(correlation) * (len(unique_values) / 256) + position_weights[pos] = pos_weight + + # Sort positions by correlation strength + sorted_positions = sorted(correlations.keys(), key=lambda p: correlations[p]["strength"], reverse=True) + significant_positions = sorted_positions[:10] # Most influential positions + + # Build response + return { + "significant_positions": significant_positions, + "position_correlations": {p: correlations[p] for p in significant_positions}, + "position_weights": {p: position_weights[p] for p in position_weights if p in significant_positions}, + "analyzed_samples": len(samples), + "max_length": max_length + } + +def calculate_correlation_coefficient(pairs: List[Tuple[int, int]]) -> float: + """Calculate Pearson's correlation coefficient between byte values and checksums.""" + if len(pairs) < 2: + return 0.0 + + x_vals = [p[0] for p in pairs] + y_vals = [p[1] for p in pairs] + + n = len(pairs) + + # Calculate means + x_mean = sum(x_vals) / n + y_mean = sum(y_vals) / n + + # Calculate correlation coefficient + numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_vals, y_vals)) + denominator_x = sum((x - x_mean) ** 2 for x in x_vals) + denominator_y = sum((y - y_mean) ** 2 for y in y_vals) + + if denominator_x == 0 or denominator_y == 0: + return 0.0 + + return numerator / math.sqrt(denominator_x * denominator_y) + +def analyze_bit_patterns(value_impact: Dict[int, float]) -> Dict: + """ + Analyze bit-level patterns in how byte changes affect checksums. + Identifies patterns like "flipping bit 3 adds 8 to checksum" etc. + """ + bit_influences = [0.0] * 8 # Influence of each bit position + + # Calculate average impact when each bit is set vs unset + bit_set_checksums = [[] for _ in range(8)] + bit_unset_checksums = [[] for _ in range(8)] + + for value, checksum in value_impact.items(): + # Analyze each bit + for bit_pos in range(8): + bit_mask = 1 << bit_pos + if value & bit_mask: # Bit is set + bit_set_checksums[bit_pos].append(checksum) + else: # Bit is unset + bit_unset_checksums[bit_pos].append(checksum) + + # Calculate average difference per bit + for bit_pos in range(8): + set_avg = sum(bit_set_checksums[bit_pos]) / len(bit_set_checksums[bit_pos]) if bit_set_checksums[bit_pos] else 0 + unset_avg = sum(bit_unset_checksums[bit_pos]) / len(bit_unset_checksums[bit_pos]) if bit_unset_checksums[bit_pos] else 0 + + if set_avg and unset_avg: + influence = set_avg - unset_avg + bit_influences[bit_pos] = influence + + # Determine the bit pattern type + pattern_types = { + "xor_like": all(abs(bit_influences[i]) >= 0.5 for i in range(8)), + "additive": all(bit_influences[i] >= 0 for i in range(8)), + "subtractive": all(bit_influences[i] <= 0 for i in range(8)), + "weighted": max(abs(b) for b in bit_influences) / (min(abs(b) for b in bit_influences) if min(abs(b) for b in bit_influences) else 1) > 3, + } + + return { + "bit_influences": {i: bit_influences[i] for i in range(8)}, + "pattern_type": next((ptype for ptype, matches in pattern_types.items() if matches), "mixed"), + "most_influential_bit": bit_influences.index(max(bit_influences, key=abs)) + } + +def find_optimal_byte_changes(data: bytes, checksum_func: Callable, expected: int) -> List[Tuple[int, int]]: + """ + Find the minimal set of byte changes needed to achieve the expected checksum. + Returns a list of (position, new_value) tuples. + """ + base_checksum = checksum_func(data) + if base_checksum == expected: + return [] # No changes needed + + # Try changing bytes to match target checksum using sensitivity information + + # First try single byte changes - this is much faster and most likely case + for i in range(len(data)): + modified = bytearray(data) + target_diff = (expected - base_checksum) % 256 + + # Try calculating what value this position should have + if checksum_func == checksum_sum: + # For sum, we can directly calculate needed value + new_val = (data[i] + target_diff) % 256 + modified[i] = new_val + if checksum_func(bytes(modified)) == expected: + return [(i, new_val)] + elif checksum_func == checksum_xor: + # For XOR, direct calculation also works + new_val = data[i] ^ (base_checksum ^ expected) + modified[i] = new_val + if checksum_func(bytes(modified)) == expected: + return [(i, new_val)] + else: + # For other algorithms, try incremental changes or use binary search + best_value = None + best_diff = 256 + + # Check common values first, then do a smarter search if needed + for test_val in [0, 1, 0xFF, expected, data[i] ^ 0xFF]: + if test_val == data[i]: + continue + + modified[i] = test_val + new_checksum = checksum_func(bytes(modified)) + if new_checksum == expected: + return [(i, test_val)] + diff = abs((new_checksum - expected) % 256) + if diff < best_diff: + best_diff = diff + best_value = test_val + + # If we got close, try a more focused search around the promising value + if best_diff < 50 and best_value is not None: + for offset in range(-10, 11): + test_val = (best_value + offset) % 256 + if test_val == data[i]: + continue + + modified[i] = test_val + new_checksum = checksum_func(bytes(modified)) + if new_checksum == expected: + return [(i, test_val)] + + # If single byte changes don't work, try strategic two-byte changes + # For performance, we'll limit this to nearby byte combinations + for i in range(len(data)): + for j in range(i+1, min(i+8, len(data))): # Try up to 7 bytes ahead + for i_adj in [-1, 1]: + for j_adj in [-1, 1]: + modified = bytearray(data) + modified[i] = (data[i] + i_adj) % 256 + modified[j] = (data[j] + j_adj) % 256 + + if checksum_func(bytes(modified)) == expected: + return [(i, modified[i]), (j, modified[j])] + + return [] + +# --- Large-Scale File Analysis --- +def analyze_large_file(filepath: str, max_samples=1000) -> Dict: + """Analyze a large file efficiently by processing it in batches.""" + start_time = time.time() + print(f"Starting large-scale analysis of {filepath}...") + + # Process the file in batches to handle large files + batch_gen = parse_input_file_lines_batched(filepath, batch_size=1000) + + # First batch will be used for detailed analysis + first_batch = next(batch_gen, []) + if not first_batch: + print("No valid samples found in file.") + return {} + + # Collect metadata about the batch + batch_metadata = next(batch_gen, {"total_lines": 0, "valid_samples": 0}) + + # Perform initial algorithm identification on the first batch + print(f"Identifying potential checksum algorithms on first {len(first_batch)} samples...") + matches = bruteforce_all_methods(first_batch, label_prefix=os.path.basename(filepath)) + + # Extract the most promising algorithms and ranges + patterns = analyze_patterns([m for m in matches if m[0] != "CONSISTENCY_DATA"]) + top_methods = patterns["methods"].most_common(3) + top_ranges = patterns["ranges"].most_common(3) + + # Combining top methods with top ranges for focused analysis + focused_analysis = [] + method_func_map = { + "SUM": checksum_sum, + "XOR": checksum_xor, + "SUM<<1": lambda d: checksum_sum_shifted(d, 1), + "SUM<<2": lambda d: checksum_sum_shifted(d, 2), + "XOR<<1": lambda d: checksum_xor_shifted(d, 1), + "XOR<<2": lambda d: checksum_xor_shifted(d, 2), + "WEIGHTED_SUM": checksum_weighted_sum, + "ALT_SUM_XOR": checksum_alt_sum_xor, + "BIT_FLIP_SUM": checksum_bit_flip_sum + } + + # Collect a sample of data for correlation analysis + correlation_samples = first_batch.copy() + + # Check more batches if we need more samples for correlation analysis + batches_processed = 1 + while len(correlation_samples) < max_samples: + batch = next(batch_gen, None) + if batch is None: + break + correlation_samples.extend(batch[:max_samples - len(correlation_samples)]) + batches_processed += 1 + if batches_processed >= 10: # Limit to 10 batches for performance + break + + # Perform correlation analysis + print(f"Performing byte correlation analysis on {len(correlation_samples)} samples...") + correlations = analyze_byte_value_correlations(correlation_samples, max_samples=max_samples) + + # Test the most likely algorithms on the significant byte positions + print("Testing algorithm-position combinations...") + for method_name, _ in top_methods: + for range_str, _ in top_ranges: + range_parts = range_str.strip('[]').split(':') + if len(range_parts) == 2: + start, end = int(range_parts[0]), int(range_parts[1]) + method_func = method_func_map.get(method_name) + if method_func: + success_count = 0 + for data, expected in correlation_samples[:100]: # Test on first 100 samples + if len(data) >= end: + result = method_func(data[start:end]) + if result == expected: + success_count += 1 + + success_rate = success_count / min(100, len(correlation_samples)) + focused_analysis.append({ + "method": method_name, + "range": f"[{start}:{end}]", + "success_rate": success_rate, + "success_count": success_count + }) + + # Sort by success rate + focused_analysis.sort(key=lambda x: x["success_rate"], reverse=True) + + # Find byte positions that most strongly influence the checksum + influential_positions = correlations["significant_positions"][:5] + + elapsed_time = time.time() - start_time + + return { + "file_name": os.path.basename(filepath), + "samples_analyzed": len(correlation_samples), + "elapsed_time": elapsed_time, + "top_methods": [m[0] for m in top_methods], + "top_ranges": [r[0] for r in top_ranges], + "focused_analysis": focused_analysis[:5], + "influential_positions": influential_positions, + "position_correlations": {str(p): correlations["position_correlations"][p] for p in influential_positions}, + "byte_pattern_summary": summarize_byte_patterns(correlations), + } + +def summarize_byte_patterns(correlations: Dict) -> Dict: + """Summarize patterns in byte correlations to help understand the checksum algorithm.""" + if not correlations or "position_correlations" not in correlations: + return {} + + # Identify patterns in how byte positions affect the checksum + positions = correlations.get("significant_positions", []) + if not positions: + return {} + + # Count pattern types to identify algorithm characteristics + pattern_types = Counter() + for pos in positions: + if pos in correlations["position_correlations"]: + bit_patterns = correlations["position_correlations"][pos].get("bit_patterns", {}) + pattern_type = bit_patterns.get("pattern_type", "unknown") + pattern_types[pattern_type] += 1 + + # Algorithm characteristics based on patterns + primary_pattern = pattern_types.most_common(1)[0][0] if pattern_types else "unknown" + algorithm_characteristics = { + "xor_like": "XOR-based algorithm (position-independent)", + "additive": "Sum-based algorithm (position-independent)", + "subtractive": "Subtraction-based algorithm (unusual)", + "weighted": "Weighted algorithm (position-dependent)", + "mixed": "Mixed algorithm (complex checksum)" + } + + # Check position importance distribution + pos_weights = correlations.get("position_weights", {}) + weight_values = list(pos_weights.values()) + weight_variance = 0 + if weight_values: + mean_weight = sum(weight_values) / len(weight_values) + weight_variance = sum((w - mean_weight) ** 2 for w in weight_values) / len(weight_values) + + position_dependent = weight_variance > 0.05 + + return { + "dominant_pattern": primary_pattern, + "likely_algorithm_type": algorithm_characteristics.get(primary_pattern, "Unknown algorithm type"), + "position_dependent": position_dependent, + "weight_variance": weight_variance, + "recommendation": get_algorithm_recommendation(primary_pattern, position_dependent) + } + +def get_algorithm_recommendation(pattern_type: str, position_dependent: bool) -> str: + """Get a recommendation for checksum algorithm based on correlation analysis.""" + if pattern_type == "xor_like" and not position_dependent: + return "XOR-based checksum recommended" + elif pattern_type == "xor_like" and position_dependent: + return "Position-dependent XOR (shifted XOR) recommended" + elif pattern_type == "additive" and not position_dependent: + return "Simple sum checksum recommended" + elif pattern_type == "additive" and position_dependent: + return "Weighted sum checksum recommended" + elif pattern_type == "weighted": + return "Complex weighted checksum recommended" + else: + return "Mixed or complex algorithm recommended, try ALT_SUM_XOR or custom hybrid" + +def print_large_file_analysis(analysis: Dict): + """Print the results of large-file analysis in a readable format.""" + print("\nš Large File Analysis Results") + print("āāāāāāāāāāāāāāāāāāāāāāāāāāā") + print(f"File: {analysis.get('file_name', 'Unknown')}") + print(f"Samples analyzed: {analysis.get('samples_analyzed', 0)}") + print(f"Analysis time: {analysis.get('elapsed_time', 0):.2f} seconds") + + # Print the top methods and ranges + print("\nš Top Checksum Methods:") + for method in analysis.get('top_methods', []): + print(f" ⢠{method}") + + print("\nš Top Byte Ranges:") + for range_str in analysis.get('top_ranges', []): + print(f" ⢠{range_str}") + + # Print the focused analysis results + print("\nā
Best Method+Range Combinations:") + for combo in analysis.get('focused_analysis', []): + print(f" ⢠{combo['method']}{combo['range']} ā {combo['success_rate']*100:.1f}% success rate ({combo['success_count']} samples)") + + # Print the byte pattern summary + pattern_summary = analysis.get('byte_pattern_summary', {}) + if pattern_summary: + print("\nš§ Algorithm Characteristics:") + print(f" Dominant pattern: {pattern_summary.get('dominant_pattern', 'Unknown')}") + print(f" Likely algorithm: {pattern_summary.get('likely_algorithm_type', 'Unknown')}") + print(f" Position dependent: {'Yes' if pattern_summary.get('position_dependent', False) else 'No'}") + print(f"\nš” Recommendation: {pattern_summary.get('recommendation', 'Unknown')}") + + # Print influential byte positions + print("\nš¢ Most Influential Byte Positions:") + positions = analysis.get('influential_positions', []) + pos_correlations = analysis.get('position_correlations', {}) + + for pos in positions: + pos_str = str(pos) + if pos_str in pos_correlations: + info = pos_correlations[pos_str] + print(f" ⢠Position {pos}: {info['strength']:.3f} correlation strength, " + + f"{info['direction']} correlation, {info['unique_values']} unique values") + + # Print bit patterns if available + bit_patterns = info.get("bit_patterns", {}) + if bit_patterns: + most_influential_bit = bit_patterns.get("most_influential_bit", 0) + print(f" Most influential bit: {most_influential_bit} (bit {7-most_influential_bit} from left)") + +# --- Enhanced Folder Processing --- +def process_folder_with_limits(folder_path: str, max_total_samples: int = 1000) -> List[Tuple[bytes, int]]: + """ + Process files in a folder with a limit on total samples. + Returns a list of samples up to the specified limit. + """ + all_samples = [] + files_processed = 0 + samples_collected = 0 + + print(f"Processing folder with limit of {max_total_samples} samples...") + + for file in os.listdir(folder_path): + if file.endswith(".txt"): + full_path = os.path.join(folder_path, file) + try: + samples, file_meta = parse_input_file_lines(full_path) + + # Take only what we need to stay under max_total_samples + remaining = max_total_samples - len(all_samples) + if remaining <= 0: + break + + if len(samples) > remaining: + print(f"Taking {remaining} of {len(samples)} samples from {file}") + samples = samples[:remaining] + else: + print(f"Taking all {len(samples)} samples from {file}") + + all_samples.extend(samples) + files_processed += 1 + samples_collected += len(samples) + + # Stop if we've reached our limit + if len(all_samples) >= max_total_samples: + break + + except Exception as e: + print(f"Error processing {file}: {e}") + + print(f"Processed {files_processed} files, collected {samples_collected} samples") + return all_samples + +# --- Main --- +if __name__ == "__main__": + # Create argument parser + parser = argparse.ArgumentParser(description='Analyze checksum algorithms in files.') + parser.add_argument('path', help='Path to file or directory to analyze') + parser.add_argument('--full', action='store_true', help='Show detailed output with all analyses') + parser.add_argument('--byte-analysis', action='store_true', help='Perform byte-level contribution analysis') + parser.add_argument('--large', action='store_true', help='Perform large-scale analysis optimized for big files') + parser.add_argument('--max-samples', type=int, default=1000, + help='Maximum number of samples for intensive analyses (byte-level and large-scale)') + + args = parser.parse_args() + + path = args.path + show_full = args.full + perform_byte_analysis = args.byte_analysis + large_analysis = args.large + max_samples = args.max_samples + + all_matches = [] + byte_insights = {} + + if os.path.isdir(path): + # Standard brute force - process all samples without limits + print("Phase 1: Running standard brute force analysis...") + for file in os.listdir(path): + if file.endswith(".txt"): + full_path = os.path.join(path, file) + try: + parsed_samples, file_meta = parse_input_file_lines(full_path) + # Process all samples for standard analysis + match_results = bruteforce_all_methods( + parsed_samples, + label_prefix=file, + file_metadata={"file": file, **file_meta} + ) + all_matches.extend(match_results) + except Exception as e: + print(f"Error processing {file}: {e}") + + # Display standard results + print_results_with_summary(all_matches, per_file=True, show_full=show_full) + + if perform_byte_analysis: + # Limit to max_samples for the intensive byte-level analysis + print(f"\n\nPhase 2: Running byte-level contribution analysis (limit: {max_samples} samples)...") + files_analyzed = 0 + total_samples_analyzed = 0 + + for file in list(os.listdir(path)): + # Stop if we've hit our sample limit or analyzed enough files + if total_samples_analyzed >= max_samples or files_analyzed >= 3: + break + + if file.endswith(".txt"): + full_path = os.path.join(path, file) + try: + parsed_samples, file_meta = parse_input_file_lines(full_path) + if not parsed_samples: + print(f"ā ļø No valid samples found in {file}") + continue + + # Determine how many samples to take from this file + samples_remaining = max_samples - total_samples_analyzed + if samples_remaining <= 0: + break + + samples_to_analyze = parsed_samples + if len(parsed_samples) > samples_remaining: + print(f"Limiting to {samples_remaining} samples from {file}") + samples_to_analyze = parsed_samples[:samples_remaining] + else: + print(f"Analyzing all {len(parsed_samples)} samples from {file}") + + total_samples_analyzed += len(samples_to_analyze) + files_analyzed += 1 + + print(f"\nš Analyzing file: {file} ({len(samples_to_analyze)} samples)") + match_results, file_insights = evaluate_with_byte_analysis( + samples_to_analyze, + label_prefix=f"BYTE_ANALYSIS_{file}", + detailed=True + ) + + if not file_insights: + print(f"ā ļø No byte-level insights found for {file}") + + byte_insights.update(file_insights) + except Exception as e: + print(f"ā ļø Error analyzing {file}: {e}") + + print(f"\nCompleted byte-level analysis on {total_samples_analyzed} samples from {files_analyzed} files") + + # Overall summary + print("\n\n𧬠Byte Contribution Analysis Summary") + print("āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā") + print(f"Total samples analyzed: {len(byte_insights)}") + print(f"Methods with most influence on checksums:") + + # Collect statistics on which methods have highest average impact + method_impacts = defaultdict(list) + for key, data in byte_insights.items(): + if "contributions" in data: + # Get average of max impacts across all bytes + impacts = [info["max_impact"] for info in data["contributions"]["byte_contributions"].values()] + if impacts: + avg_impact = sum(impacts) / len(impacts) + method_impacts[data["method"]].append(avg_impact) + + # Show average impact by method + for method, impacts in method_impacts.items(): + if impacts: + avg = sum(impacts) / len(impacts) + print(f"{method:<15} ā Avg impact: {avg:.1f}") + + elif os.path.isfile(path): + parsed_samples, file_meta = parse_input_file_lines(path) + file_name = os.path.basename(path) + match_results = bruteforce_all_methods( + parsed_samples, + label_prefix=file_name, + file_metadata={"file": file_name, **file_meta} + ) + all_matches.extend(match_results) + + # Display results + print_results_with_summary(all_matches, per_file=True, show_full=show_full) + + if perform_byte_analysis and parsed_samples: + print("\nRunning byte-level contribution analysis...") + try: + match_results, file_insights = evaluate_with_byte_analysis( + parsed_samples, # Now correctly passing just the samples list + label_prefix=f"BYTE_ANALYSIS_{os.path.basename(path)}", + detailed=True + ) + + # Print just the first sample's analysis as an example + if file_insights: + key = next(iter(file_insights)) + data = file_insights[key] + sample_id = key.split('_')[1] if len(key.split('_')) > 1 else "?" + method_name = data["method"] + range_str = data["range"] + + # Get original sample data + if int(sample_id) <= len(parsed_samples): + data_bytes, expected = parsed_samples[int(sample_id)-1] + start, end = map(int, data["range"].split(':')) + sliced_data = data_bytes[start:end] + + print(f"\nByte analysis for Sample {sample_id} using {method_name}[{range_str}]") + print_byte_analysis(sliced_data, data["contributions"], method_name) + except Exception as e: + print(f"ā ļø Error during byte analysis: {e}") + + if os.path.isdir(path): + # ...existing code... + + if large_analysis: + print(f"\n\nPerforming large-scale file analysis (limit: {max_samples} samples per file)...") + files_analyzed = 0 + + for file in list(os.listdir(path)): + if files_analyzed >= 5: # Limit to 5 files for performance + break + + if file.endswith(".txt"): + full_path = os.path.join(path, file) + try: + analysis = analyze_large_file(full_path, max_samples=max_samples) + print_large_file_analysis(analysis) + files_analyzed += 1 + except Exception as e: + print(f"ā ļø Error during large file analysis of {file}: {e}") + + elif os.path.isfile(path): + # ...existing code... + + if large_analysis: + try: + analysis = analyze_large_file(path, max_samples=max_samples) + print_large_file_analysis(analysis) + except Exception as e: + print(f"ā ļø Error during large file analysis: {e}") + +def evaluate_with_byte_analysis(samples: List[Tuple[bytes, int]], label_prefix="", detailed=False) -> Tuple[List, Dict]: + """Analyze which methods work and provide byte-level insights""" + matches = [] + seen = set() + byte_insights = {} + + # Most promising methods based on previous analysis + methods = [ + ("WEIGHTED_SUM", checksum_weighted_sum), + ("ALT_SUM_XOR", checksum_alt_sum_xor), + ("BIT_FLIP_SUM", checksum_bit_flip_sum), + ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)), + ("HYBRID_SUM_XOR(0.5)", lambda d: checksum_hybrid_sum_xor(d, 0.5)), + ("BIT_FLIP_SUM(AA)", lambda d: checksum_adaptive_bit_flip_sum(d, 0xAA)) + ] + + for sample_index, (data, expected) in enumerate(samples[:5]): # Limit to first 5 samples for performance + length = len(data) + + # Focus on the most promising ranges + ranges_to_try = [] + + # Add the specific ranges that were most successful in our analysis + specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)] + for start, end in specific_ranges: + if start < length and end <= length and start < end: + ranges_to_try.append((start, end)) + + # Process each range with our methods + for start, end in ranges_to_try: + if end > start + 30: # Skip very large ranges to keep analysis fast + continue + + sliced = data[start:end] + label = f"[{start}:{end}]" + + for name, func in methods: + try: + result = func(sliced) + method_id = f"{name}{label}" + key = (sample_index, method_id, label_prefix) + + if result == expected and key not in seen: + seen.add(key) + matches.append((method_id, sample_index + 1, expected, label_prefix)) + + # For matching methods, perform byte contribution analysis + if detailed: + print(f"Analyzing contributions for sample {sample_index+1}, method {method_id}...") + byte_contributions = analyze_byte_contributions(sliced, func, expected) + optimal_changes = find_optimal_byte_changes(sliced, func, expected) + + # Store insights and also print them immediately + insights_key = f"sample_{sample_index+1}_{name}" + byte_insights[insights_key] = { + "contributions": byte_contributions, + "optimal_changes": optimal_changes, + "method": name, + "range": f"{start}:{end}", + "data": sliced # Store the data slice itself for easier analysis + } + + # Print analysis directly during collection for immediate feedback + print_byte_analysis(sliced, byte_contributions, method_id) + + # If we found compensation values, print them + if optimal_changes: + print("\nSuggested byte changes:") + for pos, new_val in optimal_changes: + print(f" Change byte at position {pos} from 0x{sliced[pos]:02X} to 0x{new_val:02X}") + + # Once we've found and analyzed one matching method for a sample, move on + # to keep the output manageable + break + except Exception as e: + continue + + # If we've already found and analyzed a method for this sample, move on + if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()): + break + + # If we've already found and analyzed a method for this sample, move on + if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()): + continue + + return matches, byte_insights |