From 19570dbb48676496844a5a7862cbeca1ee44a783 Mon Sep 17 00:00:00 2001 From: Sigma-Ohio Date: Mon, 9 Jun 2025 05:44:03 +0200 Subject: I AM SO SIGMA !!!IOEUFOASDUFSDJIOF --- research/bad-bruteforcing/idiot.py | 1324 ------------------------------------ 1 file changed, 1324 deletions(-) delete mode 100644 research/bad-bruteforcing/idiot.py (limited to 'research/bad-bruteforcing/idiot.py') diff --git a/research/bad-bruteforcing/idiot.py b/research/bad-bruteforcing/idiot.py deleted file mode 100644 index 7ead36e..0000000 --- a/research/bad-bruteforcing/idiot.py +++ /dev/null @@ -1,1324 +0,0 @@ -import re -import sys -import os -import argparse -from typing import List, Tuple, Callable, Dict, Generator, Optional -from collections import defaultdict, Counter -import json -import time -from itertools import islice -import math -import random - -# --- This is pure AI Slop --- -def checksum_sum(data: bytes) -> int: - return sum(data) % 256 - -def checksum_xor(data: bytes) -> int: - result = 0 - for b in data: - result ^= b - return result - -def checksum_sum_shifted(data: bytes, shift: int) -> int: - return sum((b << shift) & 0xFF for b in data) % 256 - -def checksum_xor_shifted(data: bytes, shift: int) -> int: - result = 0 - for b in data: - result ^= (b << shift) & 0xFF - return result - -def checksum_weighted_sum(data: bytes) -> int: - return sum((i + 1) * b for i, b in enumerate(data)) % 256 - -def checksum_alt_sum_xor(data: bytes) -> int: - s = sum(data) - x = 0 - for i, b in enumerate(data): - if i % 2 == 0: - x ^= b - else: - s ^= b - return (s + x) % 256 - -def checksum_bit_flip_sum(data: bytes) -> int: - return sum(b ^ 0xFF for b in data) % 256 - -# --- Input Parser --- -def parse_input_file_lines(filepath: str) -> Tuple[List[Tuple[bytes, int]], Dict]: - samples = [] - total_lines = 0 - with open(filepath, "r") as f: - for line in f: - total_lines += 1 - match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip()) - if match: - hex_data = bytes.fromhex(match.group(1)) - checksum = int(match.group(2), 16) - samples.append((hex_data, checksum)) - - # Return samples and metadata - return samples, {"total_lines": total_lines, "valid_samples": len(samples)} - -# --- Enhanced Input Parser for Large Files --- -def parse_input_file_lines_batched(filepath: str, batch_size: int = 1000) -> Generator[List[Tuple[bytes, int]], None, Dict]: - """ - Parse a large input file in batches to avoid memory issues. - Returns a generator that yields batches of samples. - """ - samples = [] - total_lines = 0 - valid_samples = 0 - - try: - with open(filepath, "r") as f: - for line in f: - total_lines += 1 - match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip()) - if match: - hex_data = bytes.fromhex(match.group(1)) - checksum = int(match.group(2), 16) - samples.append((hex_data, checksum)) - valid_samples += 1 - - # Yield a batch when it reaches the batch size - if len(samples) >= batch_size: - yield samples - samples = [] - except Exception as e: - print(f"Error reading file: {e}") - - # Yield any remaining samples - if samples: - yield samples - - # Return metadata about the entire file - return {"total_lines": total_lines, "valid_samples": valid_samples} - -# --- Brute Force Evaluation --- -def bruteforce_all_methods(samples: List[Tuple[bytes, int]], label_prefix="", file_metadata=None) -> List[Tuple[str, int, int, str]]: - methods: List[Tuple[str, Callable[[bytes], int]]] = [ - ("SUM", checksum_sum), - ("XOR", checksum_xor), - ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)), - ("SUM<<2", lambda d: checksum_sum_shifted(d, 2)), - ("XOR<<1", lambda d: checksum_xor_shifted(d, 1)), - ("XOR<<2", lambda d: checksum_xor_shifted(d, 2)), - ("WEIGHTED_SUM", checksum_weighted_sum), - ("ALT_SUM_XOR", checksum_alt_sum_xor), - ("BIT_FLIP_SUM", checksum_bit_flip_sum) - ] - - seen = set() - matches = [] - sample_methods = defaultdict(list) # Track methods that work for each sample - - for sample_index, (data, expected) in enumerate(samples): - length = len(data) - sample_success = [] # Track successful methods for this sample - - for start in range(length): - for end in range(start + 1, length + 1): - sliced = data[start:end] - label = f"[{start}:{end}]" - for name, func in methods: - try: - result = func(sliced) - method_id = f"{name}{label}" - key = (sample_index, method_id, label_prefix) - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - sample_success.append((name, start, end)) - except Exception: - continue - - # Store methods that work for this sample - if sample_success: - sample_methods[sample_index] = sample_success - - # Calculate consistency scores if we have enough samples - if len(samples) > 1 and sample_methods: - consistency_analysis = analyze_consistency(sample_methods, len(samples)) - matches.append(("CONSISTENCY_DATA", 0, 0, json.dumps(consistency_analysis))) - - # Add file metadata for reporting - if file_metadata: - file_name = file_metadata.get("file", "unknown") - matches.append(("FILE_METADATA", file_name, 0, json.dumps(file_metadata))) - - return matches - -# --- Consistency Analysis --- -def analyze_consistency(sample_methods: Dict[int, List[Tuple[str, int, int]]], total_samples: int) -> Dict: - """Analyze which methods work consistently across different samples.""" - method_consistency = defaultdict(int) - range_consistency = defaultdict(int) - method_range_consistency = defaultdict(int) - - # Count how many samples each method/range works for - for sample_idx, methods in sample_methods.items(): - seen_methods = set() - seen_ranges = set() - seen_method_ranges = set() - - for method, start, end in methods: - if method not in seen_methods: - seen_methods.add(method) - method_consistency[method] += 1 - - range_key = f"{start}:{end}" - if range_key not in seen_ranges: - seen_ranges.add(range_key) - range_consistency[range_key] += 1 - - method_range_key = f"{method}[{start}:{end}]" - if method_range_key not in seen_method_ranges: - seen_method_ranges.add(method_range_key) - method_range_consistency[method_range_key] += 1 - - # Calculate consistency percentages - method_scores = {method: count / total_samples * 100 for method, count in method_consistency.items()} - range_scores = {range_key: count / total_samples * 100 for range_key, count in range_consistency.items()} - method_range_scores = {mr: count / total_samples * 100 for mr, count in method_range_consistency.items()} - - # Find the most consistent options - best_methods = sorted(method_scores.items(), key=lambda x: x[1], reverse=True)[:5] - best_ranges = sorted(range_scores.items(), key=lambda x: x[1], reverse=True)[:5] - best_method_ranges = sorted(method_range_scores.items(), key=lambda x: x[1], reverse=True)[:5] - - return { - "best_methods": best_methods, - "best_ranges": best_ranges, - "best_method_ranges": best_method_ranges, - "total_samples": total_samples - } - -# --- Pattern Recognition --- -def analyze_patterns(matches: List[Tuple[str, int, int, str]]) -> Dict: - patterns = { - "methods": Counter(), - "ranges": Counter(), - "start_positions": Counter(), - "end_positions": Counter(), - "lengths": Counter() - } - - for method_id, _, _, _ in matches: - # Extract method name and range from method_id (e.g., "SUM[0:5]") - method_parts = re.match(r'([A-Z_<>0-9]+)\[(\d+):(\d+)\]', method_id) - if method_parts: - method_name, start, end = method_parts.groups() - start_pos, end_pos = int(start), int(end) - byte_range = f"[{start}:{end}]" - length = end_pos - start_pos - - patterns["methods"][method_name] += 1 - patterns["ranges"][byte_range] += 1 - patterns["start_positions"][start_pos] += 1 - patterns["end_positions"][end_pos] += 1 - patterns["lengths"][length] += 1 - - return patterns - -# --- Result Display --- -def print_results_with_summary(all_matches: List[Tuple[str, int, int, str]], per_file=False, insights=None, show_full=False): - """Print results with optional detailed analysis""" - # Extract consistency data and file metadata - consistency_data = {} - file_metadata = {} - filtered_matches = [] - - for match in all_matches: - if match[0] == "CONSISTENCY_DATA" and match[3]: - try: - file_data = match[3] - consistency_data[file_data] = json.loads(file_data) - except: - pass - elif match[0] == "FILE_METADATA" and match[3]: - try: - metadata = json.loads(match[3]) - file_name = match[1] # Use the file name stored in match[1] - file_metadata[file_name] = metadata - except Exception as e: - print(f"Error processing metadata: {e}") - else: - filtered_matches.append(match) - - all_matches = filtered_matches - - if not all_matches: - print("āŒ No matches found.") - return - - # Always organize by file - per_file_matches = defaultdict(list) - for match in all_matches: - per_file_matches[match[3]].append(match) - - # Per-file statistics and pattern analysis - for file, matches in per_file_matches.items(): - # Get file metadata if available - metadata = {} - for meta_file, meta_data in file_metadata.items(): - if isinstance(meta_file, str) and file in meta_file: # Ensure meta_file is a string - metadata = meta_data - break - - # Extract sample lines that matched successfully - matched_lines = set(line for _, line, _, _ in matches) - - # Print file summary with line counts - print(f"\n\nšŸ“„ Results for: {file}") - if metadata: - total_lines = metadata.get("total_lines", "?") - valid_samples = metadata.get("valid_samples", len(matched_lines)) - success_rate = (len(matched_lines)/valid_samples*100) if valid_samples > 0 else 0 - print(f"āœ… Matches Found: {len(matched_lines)}/{valid_samples} samples " + - f"({success_rate:.1f}% success rate)") - print(f"šŸ“ Total file lines: {total_lines}, Valid samples: {valid_samples}") - else: - print(f"āœ… Matches Found: {len(matches)}") - - # Only show individual matches if per_file flag is set AND full details are requested - if per_file and show_full: - for method_id, line, expected, _ in matches[:20]: # Show only first 20 to avoid flooding - print(f"Line {line:03d} | Method: {method_id:20s} | Expected: {expected:02X}") - if len(matches) > 20: - print(f"... and {len(matches) - 20} more matches") - elif per_file: - # In condensed mode, just show counts per line - line_counts = Counter(line for _, line, _, _ in matches) - print(f"Lines with matches: {', '.join(str(l) for l in sorted(line_counts.keys()))}") - if len(line_counts) > 10: - print(f"Total lines with matches: {len(line_counts)}") - - # Pattern analysis for this file - patterns = analyze_patterns(matches) - - # Print top methods for this file - print("\nšŸ“Š Most Successful Methods in this file:") - for method, count in patterns["methods"].most_common(5): - print(f"{method:<15} → {count} matches") - - if show_full: - # Print top ranges for this file - print("\nšŸ“ Most Common Byte Ranges:") - for range_str, count in patterns["ranges"].most_common(5): - print(f"{range_str:<10} → {count} matches") - - # Print common start positions - print("\nšŸ” Common Start Positions:") - for pos, count in patterns["start_positions"].most_common(5): - print(f"Position {pos:<3} → {count} matches") - - # Print common end positions - print("\nšŸ”Ž Common End Positions:") - for pos, count in patterns["end_positions"].most_common(5): - print(f"Position {pos:<3} → {count} matches") - - # Print common byte lengths - print("\nšŸ“Š Common Byte Lengths:") - for length, count in patterns["lengths"].most_common(5): - print(f"{length} bytes → {count} matches") - - # Visual representation of match distribution - if patterns["start_positions"] and patterns["end_positions"]: - max_pos = max(max(patterns["end_positions"].keys()), - max(patterns["start_positions"].keys())) - print("\nšŸ“ˆ Match Distribution (frequency by position):") - scale = 30 # Reduced scale for more compact output - max_count = max(max(patterns["start_positions"].values()), - max(patterns["end_positions"].values())) - for pos in range(min(max_pos + 1, 40)): # Limit to first 40 positions - start_count = patterns["start_positions"].get(pos, 0) - end_count = patterns["end_positions"].get(pos, 0) - start_bar = 'ā–ˆ' * int((start_count / max_count) * scale) if start_count else '' - end_bar = 'ā–‘' * int((end_count / max_count) * scale) if end_count else '' - print(f"{pos:2d}: {start_bar}|{end_bar}") - print(" ā–ˆā–ˆā–ˆ = start positions, ā–‘ā–‘ā–‘ = end positions") - - # Print byte-level insights for each sample if available - if insights and show_full: - file_insights = {k: v for k, v in insights.items() if k.startswith(f"sample_") and file in v.get("method", "")} - if file_insights: - print("\nšŸ”¬ Byte-Level Analysis:") - for key, data in file_insights.items(): - parts = key.split('_') - sample_id = parts[1] if len(parts) > 1 else "?" - print(f"\nSample {sample_id} with {data['method']}[{data['range']}]:") - - # Show optimal byte changes - if data.get("optimal_changes"): - print("Optimal byte changes to achieve expected checksum:") - for pos, new_val in data["optimal_changes"]: - print(f" Change byte at position {pos} from 0x{data['contributions']['byte_contributions'][pos]['original_value']:02X} to 0x{new_val:02X}") - else: - print("No simple byte changes found to fix checksum") - - # Global summary (always show this part) - print("\n\nšŸ“Š Global Summary of Most Successful Methods:") - method_counts = defaultdict(int) - for method_id, _, _, _ in all_matches: - method_counts[method_id] += 1 - - sorted_methods = sorted(method_counts.items(), key=lambda x: x[1], reverse=True) - for method_id, count in sorted_methods[:5]: # Reduced to top 5 for conciseness - print(f"{method_id:<25} → {count} matches") - - # Show more detailed global pattern summary only in full mode - if show_full: - all_patterns = analyze_patterns(all_matches) - print("\nšŸ“ˆ Global Pattern Summary:") - print(f"Total unique methods found: {len(all_patterns['methods'])}") - print(f"Total unique byte ranges: {len(all_patterns['ranges'])}") - print(f"Most common method: {all_patterns['methods'].most_common(1)[0][0]} with {all_patterns['methods'].most_common(1)[0][1]} matches") - - # Print global consensus analysis at the end - if consistency_data and show_full: - print("\n\n🧩 Global Consensus Analysis") - print("═══════════════════════════") - print("Methods that work across multiple files:") - - # Collect global statistics from all files - global_methods = Counter() - global_ranges = Counter() - global_method_ranges = Counter() - - for file_data in consistency_data.values(): - for method, score in file_data.get("best_methods", []): - global_methods[method] += 1 - for range_key, score in file_data.get("best_ranges", []): - global_ranges[range_key] += 1 - for mr, score in file_data.get("best_method_ranges", []): - global_method_ranges[mr] += 1 - - # Display methods that work across multiple files - num_files = len(consistency_data) - print(f"\nšŸ“Š Methods that work across multiple files (total files: {num_files}):") - for method, count in global_methods.most_common(5): - print(f"{method:<15} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") - - print(f"\nšŸ“ Byte ranges that work across multiple files:") - for range_key, count in global_ranges.most_common(5): - print(f"[{range_key}] → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") - - print(f"\nšŸ” Method+Range combinations that work across multiple files:") - for mr, count in global_method_ranges.most_common(5): - print(f"{mr:<20} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)") - - # Generate a recommended approach - if global_method_ranges: - best_combo, count = global_method_ranges.most_common(1)[0] - if count >= num_files * 0.5: # If it works for at least half the files - print(f"\nāœ… Recommended global method: {best_combo}") - print(f" This combination works in top 5 for {count}/{num_files} files") - else: - print("\nāš ļø No single method+range combination works reliably across most files") - print(f" Best option ({best_combo}) only works in top 5 for {count}/{num_files} files") - - # Try to find patterns in the most successful methods - if global_methods: - best_method, method_count = global_methods.most_common(1)[0] - print(f"\nšŸ’” Consider using {best_method} with file-specific byte ranges") - print(f" This algorithm appears in top 5 for {method_count}/{num_files} files") - -# --- Advanced Checksum Algorithms --- -def checksum_weighted_sum_parametric(data: bytes, weight_start: float = 1.0, weight_step: float = 1.0) -> int: - """Weighted sum with configurable starting weight and step""" - return sum(int((weight_start + i * weight_step) * b) % 256 for i, b in enumerate(data)) % 256 - -def checksum_hybrid_sum_xor(data: bytes, weight: float = 0.5) -> int: - """Hybrid checksum using weighted combination of sum and XOR""" - sum_result = sum(data) % 256 - xor_result = 0 - for b in data: - xor_result ^= b - return int((weight * sum_result + (1 - weight) * xor_result)) % 256 - -def checksum_adaptive_bit_flip_sum(data: bytes, flip_mask: int = 0xFF) -> int: - """Bit flip sum with configurable flip mask""" - return sum(b ^ flip_mask for b in data) % 256 - -def checksum_position_weighted_sum(data: bytes, position_weights: List[float] = None) -> int: - """Sum where each byte is weighted by its position in a specific pattern""" - if position_weights is None: - # Default to alternating weights - position_weights = [1.0, 0.5] - - result = 0 - for i, b in enumerate(data): - weight = position_weights[i % len(position_weights)] - result = (result + int(b * weight)) % 256 - return result - -def evaluate_targeted_algorithms(samples: List[Tuple[bytes, int]], label_prefix="") -> List[Tuple[str, int, int, str]]: - """Run a more focused test on the most promising algorithms with fine-tuned parameters""" - - # Based on consensus, focus testing on these methods with more parameter variations - matches = [] - seen = set() - - # Set up parameter variations for testing - bit_flip_masks = [0xFF, 0xF0, 0x0F, 0xCC, 0x55, 0xAA] - hybrid_weights = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] - weight_steps = [0.9, 1.0, 1.1, 1.2, 1.5] - pos_weight_patterns = [ - [1.0, 0.5], # Alternating - [1.0, 1.0, 0.5], # Every third byte gets half weight - [1.0, 0.75, 0.5, 0.25] # Descending weights - ] - - # Process each sample with focused algorithms - for sample_index, (data, expected) in enumerate(samples): - length = len(data) - - # Instead of trying every possible byte range, focus on the most promising ranges - # based on global patterns from previous analysis - - # Try more specific ranges based on insights - ranges_to_try = [] - - # Focus on common start positions from global analysis: 0-5 and specific ranges - for start in [0, 1, 2, 3, 4, 5]: - # Try full data range - ranges_to_try.append((start, length)) - - # Try common end points (from previous runs) - for end_offset in [0, 1, 2, 4, 8]: - if length - end_offset > start + 1: # Ensure valid range - ranges_to_try.append((start, length - end_offset)) - - # Add specific ranges that were successful in multiple files - specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)] - for start, end in specific_ranges: - if start < length and end <= length and start < end: - ranges_to_try.append((start, end)) - - # Process the focused ranges with our most promising algorithms - for start, end in ranges_to_try: - sliced = data[start:end] - label = f"[{start}:{end}]" - - # Test standard checksum methods that showed promise - methods = [ - ("WEIGHTED_SUM", lambda d: checksum_weighted_sum(d)), - ("ALT_SUM_XOR", lambda d: checksum_alt_sum_xor(d)), - ("BIT_FLIP_SUM", lambda d: checksum_bit_flip_sum(d)), - ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)) - ] - - # Test the standard methods - for name, func in methods: - try: - result = func(sliced) - method_id = f"{name}{label}" - key = (sample_index, method_id, label_prefix) - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - except Exception: - continue - - # Test advanced parametric methods - for mask in bit_flip_masks: - try: - result = checksum_adaptive_bit_flip_sum(sliced, mask) - method_id = f"BIT_FLIP_SUM({mask:02X}){label}" - key = (sample_index, method_id, label_prefix) - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - except Exception: - continue - - for weight in hybrid_weights: - try: - result = checksum_hybrid_sum_xor(sliced, weight) - method_id = f"HYBRID_SUM_XOR({weight:.1f}){label}" - key = (sample_index, method_id, label_prefix) - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - except Exception: - continue - - for step in weight_steps: - try: - result = checksum_weighted_sum_parametric(sliced, 1.0, step) - method_id = f"WEIGHTED_SUM_STEP({step:.1f}){label}" - key = (sample_index, method_id, label_prefix) - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - except Exception: - continue - - for i, pattern in enumerate(pos_weight_patterns): - try: - result = checksum_position_weighted_sum(sliced, pattern) - method_id = f"POS_WEIGHT_{i+1}{label}" - key = (sample_index, method_id, label_prefix) - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - except Exception: - continue - - return matches - -# --- Byte Change Correlation Analysis --- -def analyze_byte_value_correlations(samples: List[Tuple[bytes, int]], max_samples: int = 1000) -> Dict: - """ - Analyze how changing specific bytes correlates with changes in the checksum. - This helps understand the "sensitivity" of the checksum to specific byte positions. - """ - # Sample if we have too many samples to process - if len(samples) > max_samples: - print(f"Sampling {max_samples} out of {len(samples)} for correlation analysis") - samples = random.sample(samples, max_samples) - - # Initialize data structures for correlation analysis - bytes_by_position = defaultdict(list) - checksums_by_position_value = defaultdict(list) - correlations = {} - position_weights = {} - - # Gather data by byte position - max_length = max(len(data) for data, _ in samples) - print(f"Analyzing correlations for {len(samples)} samples with max length {max_length}") - - # Track all byte values and checksums by position - for data, checksum in samples: - for pos, value in enumerate(data): - bytes_by_position[pos].append(value) - checksums_by_position_value[(pos, value)].append(checksum) - - # Calculate correlation strength for each position - for pos in range(max_length): - pos_values = bytes_by_position.get(pos, []) - if len(pos_values) <= 1: - continue - - # Create value-to-checksum mapping and analyze patterns - value_impact = {} - checksum_changes = [] - - # Group by unique values at this position - unique_values = set(pos_values) - if len(unique_values) <= 1: - continue - - # Analyze how changes in this position correlate with checksums - for val in unique_values: - checksums = checksums_by_position_value.get((pos, val), []) - if checksums: - avg_checksum = sum(checksums) / len(checksums) - value_impact[val] = avg_checksum - - # If we have enough data, calculate correlation metrics - if len(value_impact) >= 2: - # Look for linear relationships - xy_pairs = [(val, cs) for val, cs in value_impact.items()] - correlation = calculate_correlation_coefficient(xy_pairs) - - # Look for bit-level patterns (XOR, bit flips) - bit_patterns = analyze_bit_patterns(value_impact) - - correlations[pos] = { - "strength": abs(correlation), - "direction": "positive" if correlation >= 0 else "negative", - "unique_values": len(unique_values), - "sample_count": len(pos_values), - "bit_patterns": bit_patterns - } - - # Calculate a rough "weight" for this position in checksum calculations - pos_weight = abs(correlation) * (len(unique_values) / 256) - position_weights[pos] = pos_weight - - # Sort positions by correlation strength - sorted_positions = sorted(correlations.keys(), key=lambda p: correlations[p]["strength"], reverse=True) - significant_positions = sorted_positions[:10] # Most influential positions - - # Build response - return { - "significant_positions": significant_positions, - "position_correlations": {p: correlations[p] for p in significant_positions}, - "position_weights": {p: position_weights[p] for p in position_weights if p in significant_positions}, - "analyzed_samples": len(samples), - "max_length": max_length - } - -def calculate_correlation_coefficient(pairs: List[Tuple[int, int]]) -> float: - """Calculate Pearson's correlation coefficient between byte values and checksums.""" - if len(pairs) < 2: - return 0.0 - - x_vals = [p[0] for p in pairs] - y_vals = [p[1] for p in pairs] - - n = len(pairs) - - # Calculate means - x_mean = sum(x_vals) / n - y_mean = sum(y_vals) / n - - # Calculate correlation coefficient - numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_vals, y_vals)) - denominator_x = sum((x - x_mean) ** 2 for x in x_vals) - denominator_y = sum((y - y_mean) ** 2 for y in y_vals) - - if denominator_x == 0 or denominator_y == 0: - return 0.0 - - return numerator / math.sqrt(denominator_x * denominator_y) - -def analyze_bit_patterns(value_impact: Dict[int, float]) -> Dict: - """ - Analyze bit-level patterns in how byte changes affect checksums. - Identifies patterns like "flipping bit 3 adds 8 to checksum" etc. - """ - bit_influences = [0.0] * 8 # Influence of each bit position - - # Calculate average impact when each bit is set vs unset - bit_set_checksums = [[] for _ in range(8)] - bit_unset_checksums = [[] for _ in range(8)] - - for value, checksum in value_impact.items(): - # Analyze each bit - for bit_pos in range(8): - bit_mask = 1 << bit_pos - if value & bit_mask: # Bit is set - bit_set_checksums[bit_pos].append(checksum) - else: # Bit is unset - bit_unset_checksums[bit_pos].append(checksum) - - # Calculate average difference per bit - for bit_pos in range(8): - set_avg = sum(bit_set_checksums[bit_pos]) / len(bit_set_checksums[bit_pos]) if bit_set_checksums[bit_pos] else 0 - unset_avg = sum(bit_unset_checksums[bit_pos]) / len(bit_unset_checksums[bit_pos]) if bit_unset_checksums[bit_pos] else 0 - - if set_avg and unset_avg: - influence = set_avg - unset_avg - bit_influences[bit_pos] = influence - - # Determine the bit pattern type - pattern_types = { - "xor_like": all(abs(bit_influences[i]) >= 0.5 for i in range(8)), - "additive": all(bit_influences[i] >= 0 for i in range(8)), - "subtractive": all(bit_influences[i] <= 0 for i in range(8)), - "weighted": max(abs(b) for b in bit_influences) / (min(abs(b) for b in bit_influences) if min(abs(b) for b in bit_influences) else 1) > 3, - } - - return { - "bit_influences": {i: bit_influences[i] for i in range(8)}, - "pattern_type": next((ptype for ptype, matches in pattern_types.items() if matches), "mixed"), - "most_influential_bit": bit_influences.index(max(bit_influences, key=abs)) - } - -def find_optimal_byte_changes(data: bytes, checksum_func: Callable, expected: int) -> List[Tuple[int, int]]: - """ - Find the minimal set of byte changes needed to achieve the expected checksum. - Returns a list of (position, new_value) tuples. - """ - base_checksum = checksum_func(data) - if base_checksum == expected: - return [] # No changes needed - - # Try changing bytes to match target checksum using sensitivity information - - # First try single byte changes - this is much faster and most likely case - for i in range(len(data)): - modified = bytearray(data) - target_diff = (expected - base_checksum) % 256 - - # Try calculating what value this position should have - if checksum_func == checksum_sum: - # For sum, we can directly calculate needed value - new_val = (data[i] + target_diff) % 256 - modified[i] = new_val - if checksum_func(bytes(modified)) == expected: - return [(i, new_val)] - elif checksum_func == checksum_xor: - # For XOR, direct calculation also works - new_val = data[i] ^ (base_checksum ^ expected) - modified[i] = new_val - if checksum_func(bytes(modified)) == expected: - return [(i, new_val)] - else: - # For other algorithms, try incremental changes or use binary search - best_value = None - best_diff = 256 - - # Check common values first, then do a smarter search if needed - for test_val in [0, 1, 0xFF, expected, data[i] ^ 0xFF]: - if test_val == data[i]: - continue - - modified[i] = test_val - new_checksum = checksum_func(bytes(modified)) - if new_checksum == expected: - return [(i, test_val)] - diff = abs((new_checksum - expected) % 256) - if diff < best_diff: - best_diff = diff - best_value = test_val - - # If we got close, try a more focused search around the promising value - if best_diff < 50 and best_value is not None: - for offset in range(-10, 11): - test_val = (best_value + offset) % 256 - if test_val == data[i]: - continue - - modified[i] = test_val - new_checksum = checksum_func(bytes(modified)) - if new_checksum == expected: - return [(i, test_val)] - - # If single byte changes don't work, try strategic two-byte changes - # For performance, we'll limit this to nearby byte combinations - for i in range(len(data)): - for j in range(i+1, min(i+8, len(data))): # Try up to 7 bytes ahead - for i_adj in [-1, 1]: - for j_adj in [-1, 1]: - modified = bytearray(data) - modified[i] = (data[i] + i_adj) % 256 - modified[j] = (data[j] + j_adj) % 256 - - if checksum_func(bytes(modified)) == expected: - return [(i, modified[i]), (j, modified[j])] - - return [] - -# --- Large-Scale File Analysis --- -def analyze_large_file(filepath: str, max_samples=1000) -> Dict: - """Analyze a large file efficiently by processing it in batches.""" - start_time = time.time() - print(f"Starting large-scale analysis of {filepath}...") - - # Process the file in batches to handle large files - batch_gen = parse_input_file_lines_batched(filepath, batch_size=1000) - - # First batch will be used for detailed analysis - first_batch = next(batch_gen, []) - if not first_batch: - print("No valid samples found in file.") - return {} - - # Collect metadata about the batch - batch_metadata = next(batch_gen, {"total_lines": 0, "valid_samples": 0}) - - # Perform initial algorithm identification on the first batch - print(f"Identifying potential checksum algorithms on first {len(first_batch)} samples...") - matches = bruteforce_all_methods(first_batch, label_prefix=os.path.basename(filepath)) - - # Extract the most promising algorithms and ranges - patterns = analyze_patterns([m for m in matches if m[0] != "CONSISTENCY_DATA"]) - top_methods = patterns["methods"].most_common(3) - top_ranges = patterns["ranges"].most_common(3) - - # Combining top methods with top ranges for focused analysis - focused_analysis = [] - method_func_map = { - "SUM": checksum_sum, - "XOR": checksum_xor, - "SUM<<1": lambda d: checksum_sum_shifted(d, 1), - "SUM<<2": lambda d: checksum_sum_shifted(d, 2), - "XOR<<1": lambda d: checksum_xor_shifted(d, 1), - "XOR<<2": lambda d: checksum_xor_shifted(d, 2), - "WEIGHTED_SUM": checksum_weighted_sum, - "ALT_SUM_XOR": checksum_alt_sum_xor, - "BIT_FLIP_SUM": checksum_bit_flip_sum - } - - # Collect a sample of data for correlation analysis - correlation_samples = first_batch.copy() - - # Check more batches if we need more samples for correlation analysis - batches_processed = 1 - while len(correlation_samples) < max_samples: - batch = next(batch_gen, None) - if batch is None: - break - correlation_samples.extend(batch[:max_samples - len(correlation_samples)]) - batches_processed += 1 - if batches_processed >= 10: # Limit to 10 batches for performance - break - - # Perform correlation analysis - print(f"Performing byte correlation analysis on {len(correlation_samples)} samples...") - correlations = analyze_byte_value_correlations(correlation_samples, max_samples=max_samples) - - # Test the most likely algorithms on the significant byte positions - print("Testing algorithm-position combinations...") - for method_name, _ in top_methods: - for range_str, _ in top_ranges: - range_parts = range_str.strip('[]').split(':') - if len(range_parts) == 2: - start, end = int(range_parts[0]), int(range_parts[1]) - method_func = method_func_map.get(method_name) - if method_func: - success_count = 0 - for data, expected in correlation_samples[:100]: # Test on first 100 samples - if len(data) >= end: - result = method_func(data[start:end]) - if result == expected: - success_count += 1 - - success_rate = success_count / min(100, len(correlation_samples)) - focused_analysis.append({ - "method": method_name, - "range": f"[{start}:{end}]", - "success_rate": success_rate, - "success_count": success_count - }) - - # Sort by success rate - focused_analysis.sort(key=lambda x: x["success_rate"], reverse=True) - - # Find byte positions that most strongly influence the checksum - influential_positions = correlations["significant_positions"][:5] - - elapsed_time = time.time() - start_time - - return { - "file_name": os.path.basename(filepath), - "samples_analyzed": len(correlation_samples), - "elapsed_time": elapsed_time, - "top_methods": [m[0] for m in top_methods], - "top_ranges": [r[0] for r in top_ranges], - "focused_analysis": focused_analysis[:5], - "influential_positions": influential_positions, - "position_correlations": {str(p): correlations["position_correlations"][p] for p in influential_positions}, - "byte_pattern_summary": summarize_byte_patterns(correlations), - } - -def summarize_byte_patterns(correlations: Dict) -> Dict: - """Summarize patterns in byte correlations to help understand the checksum algorithm.""" - if not correlations or "position_correlations" not in correlations: - return {} - - # Identify patterns in how byte positions affect the checksum - positions = correlations.get("significant_positions", []) - if not positions: - return {} - - # Count pattern types to identify algorithm characteristics - pattern_types = Counter() - for pos in positions: - if pos in correlations["position_correlations"]: - bit_patterns = correlations["position_correlations"][pos].get("bit_patterns", {}) - pattern_type = bit_patterns.get("pattern_type", "unknown") - pattern_types[pattern_type] += 1 - - # Algorithm characteristics based on patterns - primary_pattern = pattern_types.most_common(1)[0][0] if pattern_types else "unknown" - algorithm_characteristics = { - "xor_like": "XOR-based algorithm (position-independent)", - "additive": "Sum-based algorithm (position-independent)", - "subtractive": "Subtraction-based algorithm (unusual)", - "weighted": "Weighted algorithm (position-dependent)", - "mixed": "Mixed algorithm (complex checksum)" - } - - # Check position importance distribution - pos_weights = correlations.get("position_weights", {}) - weight_values = list(pos_weights.values()) - weight_variance = 0 - if weight_values: - mean_weight = sum(weight_values) / len(weight_values) - weight_variance = sum((w - mean_weight) ** 2 for w in weight_values) / len(weight_values) - - position_dependent = weight_variance > 0.05 - - return { - "dominant_pattern": primary_pattern, - "likely_algorithm_type": algorithm_characteristics.get(primary_pattern, "Unknown algorithm type"), - "position_dependent": position_dependent, - "weight_variance": weight_variance, - "recommendation": get_algorithm_recommendation(primary_pattern, position_dependent) - } - -def get_algorithm_recommendation(pattern_type: str, position_dependent: bool) -> str: - """Get a recommendation for checksum algorithm based on correlation analysis.""" - if pattern_type == "xor_like" and not position_dependent: - return "XOR-based checksum recommended" - elif pattern_type == "xor_like" and position_dependent: - return "Position-dependent XOR (shifted XOR) recommended" - elif pattern_type == "additive" and not position_dependent: - return "Simple sum checksum recommended" - elif pattern_type == "additive" and position_dependent: - return "Weighted sum checksum recommended" - elif pattern_type == "weighted": - return "Complex weighted checksum recommended" - else: - return "Mixed or complex algorithm recommended, try ALT_SUM_XOR or custom hybrid" - -def print_large_file_analysis(analysis: Dict): - """Print the results of large-file analysis in a readable format.""" - print("\nšŸ“Š Large File Analysis Results") - print("═══════════════════════════") - print(f"File: {analysis.get('file_name', 'Unknown')}") - print(f"Samples analyzed: {analysis.get('samples_analyzed', 0)}") - print(f"Analysis time: {analysis.get('elapsed_time', 0):.2f} seconds") - - # Print the top methods and ranges - print("\nšŸ” Top Checksum Methods:") - for method in analysis.get('top_methods', []): - print(f" • {method}") - - print("\nšŸ“ Top Byte Ranges:") - for range_str in analysis.get('top_ranges', []): - print(f" • {range_str}") - - # Print the focused analysis results - print("\nāœ… Best Method+Range Combinations:") - for combo in analysis.get('focused_analysis', []): - print(f" • {combo['method']}{combo['range']} → {combo['success_rate']*100:.1f}% success rate ({combo['success_count']} samples)") - - # Print the byte pattern summary - pattern_summary = analysis.get('byte_pattern_summary', {}) - if pattern_summary: - print("\n🧠 Algorithm Characteristics:") - print(f" Dominant pattern: {pattern_summary.get('dominant_pattern', 'Unknown')}") - print(f" Likely algorithm: {pattern_summary.get('likely_algorithm_type', 'Unknown')}") - print(f" Position dependent: {'Yes' if pattern_summary.get('position_dependent', False) else 'No'}") - print(f"\nšŸ’” Recommendation: {pattern_summary.get('recommendation', 'Unknown')}") - - # Print influential byte positions - print("\nšŸ”¢ Most Influential Byte Positions:") - positions = analysis.get('influential_positions', []) - pos_correlations = analysis.get('position_correlations', {}) - - for pos in positions: - pos_str = str(pos) - if pos_str in pos_correlations: - info = pos_correlations[pos_str] - print(f" • Position {pos}: {info['strength']:.3f} correlation strength, " + - f"{info['direction']} correlation, {info['unique_values']} unique values") - - # Print bit patterns if available - bit_patterns = info.get("bit_patterns", {}) - if bit_patterns: - most_influential_bit = bit_patterns.get("most_influential_bit", 0) - print(f" Most influential bit: {most_influential_bit} (bit {7-most_influential_bit} from left)") - -# --- Enhanced Folder Processing --- -def process_folder_with_limits(folder_path: str, max_total_samples: int = 1000) -> List[Tuple[bytes, int]]: - """ - Process files in a folder with a limit on total samples. - Returns a list of samples up to the specified limit. - """ - all_samples = [] - files_processed = 0 - samples_collected = 0 - - print(f"Processing folder with limit of {max_total_samples} samples...") - - for file in os.listdir(folder_path): - if file.endswith(".txt"): - full_path = os.path.join(folder_path, file) - try: - samples, file_meta = parse_input_file_lines(full_path) - - # Take only what we need to stay under max_total_samples - remaining = max_total_samples - len(all_samples) - if remaining <= 0: - break - - if len(samples) > remaining: - print(f"Taking {remaining} of {len(samples)} samples from {file}") - samples = samples[:remaining] - else: - print(f"Taking all {len(samples)} samples from {file}") - - all_samples.extend(samples) - files_processed += 1 - samples_collected += len(samples) - - # Stop if we've reached our limit - if len(all_samples) >= max_total_samples: - break - - except Exception as e: - print(f"Error processing {file}: {e}") - - print(f"Processed {files_processed} files, collected {samples_collected} samples") - return all_samples - -# --- Main --- -if __name__ == "__main__": - # Create argument parser - parser = argparse.ArgumentParser(description='Analyze checksum algorithms in files.') - parser.add_argument('path', help='Path to file or directory to analyze') - parser.add_argument('--full', action='store_true', help='Show detailed output with all analyses') - parser.add_argument('--byte-analysis', action='store_true', help='Perform byte-level contribution analysis') - parser.add_argument('--large', action='store_true', help='Perform large-scale analysis optimized for big files') - parser.add_argument('--max-samples', type=int, default=1000, - help='Maximum number of samples for intensive analyses (byte-level and large-scale)') - - args = parser.parse_args() - - path = args.path - show_full = args.full - perform_byte_analysis = args.byte_analysis - large_analysis = args.large - max_samples = args.max_samples - - all_matches = [] - byte_insights = {} - - if os.path.isdir(path): - # Standard brute force - process all samples without limits - print("Phase 1: Running standard brute force analysis...") - for file in os.listdir(path): - if file.endswith(".txt"): - full_path = os.path.join(path, file) - try: - parsed_samples, file_meta = parse_input_file_lines(full_path) - # Process all samples for standard analysis - match_results = bruteforce_all_methods( - parsed_samples, - label_prefix=file, - file_metadata={"file": file, **file_meta} - ) - all_matches.extend(match_results) - except Exception as e: - print(f"Error processing {file}: {e}") - - # Display standard results - print_results_with_summary(all_matches, per_file=True, show_full=show_full) - - if perform_byte_analysis: - # Limit to max_samples for the intensive byte-level analysis - print(f"\n\nPhase 2: Running byte-level contribution analysis (limit: {max_samples} samples)...") - files_analyzed = 0 - total_samples_analyzed = 0 - - for file in list(os.listdir(path)): - # Stop if we've hit our sample limit or analyzed enough files - if total_samples_analyzed >= max_samples or files_analyzed >= 3: - break - - if file.endswith(".txt"): - full_path = os.path.join(path, file) - try: - parsed_samples, file_meta = parse_input_file_lines(full_path) - if not parsed_samples: - print(f"āš ļø No valid samples found in {file}") - continue - - # Determine how many samples to take from this file - samples_remaining = max_samples - total_samples_analyzed - if samples_remaining <= 0: - break - - samples_to_analyze = parsed_samples - if len(parsed_samples) > samples_remaining: - print(f"Limiting to {samples_remaining} samples from {file}") - samples_to_analyze = parsed_samples[:samples_remaining] - else: - print(f"Analyzing all {len(parsed_samples)} samples from {file}") - - total_samples_analyzed += len(samples_to_analyze) - files_analyzed += 1 - - print(f"\nšŸ“„ Analyzing file: {file} ({len(samples_to_analyze)} samples)") - match_results, file_insights = evaluate_with_byte_analysis( - samples_to_analyze, - label_prefix=f"BYTE_ANALYSIS_{file}", - detailed=True - ) - - if not file_insights: - print(f"āš ļø No byte-level insights found for {file}") - - byte_insights.update(file_insights) - except Exception as e: - print(f"āš ļø Error analyzing {file}: {e}") - - print(f"\nCompleted byte-level analysis on {total_samples_analyzed} samples from {files_analyzed} files") - - # Overall summary - print("\n\n🧬 Byte Contribution Analysis Summary") - print("═════════════════════════════════════") - print(f"Total samples analyzed: {len(byte_insights)}") - print(f"Methods with most influence on checksums:") - - # Collect statistics on which methods have highest average impact - method_impacts = defaultdict(list) - for key, data in byte_insights.items(): - if "contributions" in data: - # Get average of max impacts across all bytes - impacts = [info["max_impact"] for info in data["contributions"]["byte_contributions"].values()] - if impacts: - avg_impact = sum(impacts) / len(impacts) - method_impacts[data["method"]].append(avg_impact) - - # Show average impact by method - for method, impacts in method_impacts.items(): - if impacts: - avg = sum(impacts) / len(impacts) - print(f"{method:<15} → Avg impact: {avg:.1f}") - - elif os.path.isfile(path): - parsed_samples, file_meta = parse_input_file_lines(path) - file_name = os.path.basename(path) - match_results = bruteforce_all_methods( - parsed_samples, - label_prefix=file_name, - file_metadata={"file": file_name, **file_meta} - ) - all_matches.extend(match_results) - - # Display results - print_results_with_summary(all_matches, per_file=True, show_full=show_full) - - if perform_byte_analysis and parsed_samples: - print("\nRunning byte-level contribution analysis...") - try: - match_results, file_insights = evaluate_with_byte_analysis( - parsed_samples, # Now correctly passing just the samples list - label_prefix=f"BYTE_ANALYSIS_{os.path.basename(path)}", - detailed=True - ) - - # Print just the first sample's analysis as an example - if file_insights: - key = next(iter(file_insights)) - data = file_insights[key] - sample_id = key.split('_')[1] if len(key.split('_')) > 1 else "?" - method_name = data["method"] - range_str = data["range"] - - # Get original sample data - if int(sample_id) <= len(parsed_samples): - data_bytes, expected = parsed_samples[int(sample_id)-1] - start, end = map(int, data["range"].split(':')) - sliced_data = data_bytes[start:end] - - print(f"\nByte analysis for Sample {sample_id} using {method_name}[{range_str}]") - print_byte_analysis(sliced_data, data["contributions"], method_name) - except Exception as e: - print(f"āš ļø Error during byte analysis: {e}") - - if os.path.isdir(path): - # ...existing code... - - if large_analysis: - print(f"\n\nPerforming large-scale file analysis (limit: {max_samples} samples per file)...") - files_analyzed = 0 - - for file in list(os.listdir(path)): - if files_analyzed >= 5: # Limit to 5 files for performance - break - - if file.endswith(".txt"): - full_path = os.path.join(path, file) - try: - analysis = analyze_large_file(full_path, max_samples=max_samples) - print_large_file_analysis(analysis) - files_analyzed += 1 - except Exception as e: - print(f"āš ļø Error during large file analysis of {file}: {e}") - - elif os.path.isfile(path): - # ...existing code... - - if large_analysis: - try: - analysis = analyze_large_file(path, max_samples=max_samples) - print_large_file_analysis(analysis) - except Exception as e: - print(f"āš ļø Error during large file analysis: {e}") - -def evaluate_with_byte_analysis(samples: List[Tuple[bytes, int]], label_prefix="", detailed=False) -> Tuple[List, Dict]: - """Analyze which methods work and provide byte-level insights""" - matches = [] - seen = set() - byte_insights = {} - - # Most promising methods based on previous analysis - methods = [ - ("WEIGHTED_SUM", checksum_weighted_sum), - ("ALT_SUM_XOR", checksum_alt_sum_xor), - ("BIT_FLIP_SUM", checksum_bit_flip_sum), - ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)), - ("HYBRID_SUM_XOR(0.5)", lambda d: checksum_hybrid_sum_xor(d, 0.5)), - ("BIT_FLIP_SUM(AA)", lambda d: checksum_adaptive_bit_flip_sum(d, 0xAA)) - ] - - for sample_index, (data, expected) in enumerate(samples[:5]): # Limit to first 5 samples for performance - length = len(data) - - # Focus on the most promising ranges - ranges_to_try = [] - - # Add the specific ranges that were most successful in our analysis - specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)] - for start, end in specific_ranges: - if start < length and end <= length and start < end: - ranges_to_try.append((start, end)) - - # Process each range with our methods - for start, end in ranges_to_try: - if end > start + 30: # Skip very large ranges to keep analysis fast - continue - - sliced = data[start:end] - label = f"[{start}:{end}]" - - for name, func in methods: - try: - result = func(sliced) - method_id = f"{name}{label}" - key = (sample_index, method_id, label_prefix) - - if result == expected and key not in seen: - seen.add(key) - matches.append((method_id, sample_index + 1, expected, label_prefix)) - - # For matching methods, perform byte contribution analysis - if detailed: - print(f"Analyzing contributions for sample {sample_index+1}, method {method_id}...") - byte_contributions = analyze_byte_contributions(sliced, func, expected) - optimal_changes = find_optimal_byte_changes(sliced, func, expected) - - # Store insights and also print them immediately - insights_key = f"sample_{sample_index+1}_{name}" - byte_insights[insights_key] = { - "contributions": byte_contributions, - "optimal_changes": optimal_changes, - "method": name, - "range": f"{start}:{end}", - "data": sliced # Store the data slice itself for easier analysis - } - - # Print analysis directly during collection for immediate feedback - print_byte_analysis(sliced, byte_contributions, method_id) - - # If we found compensation values, print them - if optimal_changes: - print("\nSuggested byte changes:") - for pos, new_val in optimal_changes: - print(f" Change byte at position {pos} from 0x{sliced[pos]:02X} to 0x{new_val:02X}") - - # Once we've found and analyzed one matching method for a sample, move on - # to keep the output manageable - break - except Exception as e: - continue - - # If we've already found and analyzed a method for this sample, move on - if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()): - break - - # If we've already found and analyzed a method for this sample, move on - if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()): - continue - - return matches, byte_insights -- cgit v1.2.3-70-g09d2