aboutsummaryrefslogtreecommitdiff
path: root/research/bad-bruteforcing/idiot.py
diff options
context:
space:
mode:
authorSigma-Ohio <crt@teleco.ch>2025-06-09 03:35:52 +0200
committerSigma-Ohio <crt@teleco.ch>2025-06-09 03:35:52 +0200
commitce4acec8d9d67f1c03ec8b55e1b2453503069cee (patch)
treea3eaa0c2805633441976eaa096aaec92e7a8eb0f /research/bad-bruteforcing/idiot.py
parentc5adf47c63b541be63bcf15fe93a2f392d12f3c2 (diff)
went insane
Diffstat (limited to 'research/bad-bruteforcing/idiot.py')
-rw-r--r--research/bad-bruteforcing/idiot.py1324
1 files changed, 1324 insertions, 0 deletions
diff --git a/research/bad-bruteforcing/idiot.py b/research/bad-bruteforcing/idiot.py
new file mode 100644
index 0000000..7ead36e
--- /dev/null
+++ b/research/bad-bruteforcing/idiot.py
@@ -0,0 +1,1324 @@
+import re
+import sys
+import os
+import argparse
+from typing import List, Tuple, Callable, Dict, Generator, Optional
+from collections import defaultdict, Counter
+import json
+import time
+from itertools import islice
+import math
+import random
+
+# --- This is pure AI Slop ---
+def checksum_sum(data: bytes) -> int:
+ return sum(data) % 256
+
+def checksum_xor(data: bytes) -> int:
+ result = 0
+ for b in data:
+ result ^= b
+ return result
+
+def checksum_sum_shifted(data: bytes, shift: int) -> int:
+ return sum((b << shift) & 0xFF for b in data) % 256
+
+def checksum_xor_shifted(data: bytes, shift: int) -> int:
+ result = 0
+ for b in data:
+ result ^= (b << shift) & 0xFF
+ return result
+
+def checksum_weighted_sum(data: bytes) -> int:
+ return sum((i + 1) * b for i, b in enumerate(data)) % 256
+
+def checksum_alt_sum_xor(data: bytes) -> int:
+ s = sum(data)
+ x = 0
+ for i, b in enumerate(data):
+ if i % 2 == 0:
+ x ^= b
+ else:
+ s ^= b
+ return (s + x) % 256
+
+def checksum_bit_flip_sum(data: bytes) -> int:
+ return sum(b ^ 0xFF for b in data) % 256
+
+# --- Input Parser ---
+def parse_input_file_lines(filepath: str) -> Tuple[List[Tuple[bytes, int]], Dict]:
+ samples = []
+ total_lines = 0
+ with open(filepath, "r") as f:
+ for line in f:
+ total_lines += 1
+ match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip())
+ if match:
+ hex_data = bytes.fromhex(match.group(1))
+ checksum = int(match.group(2), 16)
+ samples.append((hex_data, checksum))
+
+ # Return samples and metadata
+ return samples, {"total_lines": total_lines, "valid_samples": len(samples)}
+
+# --- Enhanced Input Parser for Large Files ---
+def parse_input_file_lines_batched(filepath: str, batch_size: int = 1000) -> Generator[List[Tuple[bytes, int]], None, Dict]:
+ """
+ Parse a large input file in batches to avoid memory issues.
+ Returns a generator that yields batches of samples.
+ """
+ samples = []
+ total_lines = 0
+ valid_samples = 0
+
+ try:
+ with open(filepath, "r") as f:
+ for line in f:
+ total_lines += 1
+ match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip())
+ if match:
+ hex_data = bytes.fromhex(match.group(1))
+ checksum = int(match.group(2), 16)
+ samples.append((hex_data, checksum))
+ valid_samples += 1
+
+ # Yield a batch when it reaches the batch size
+ if len(samples) >= batch_size:
+ yield samples
+ samples = []
+ except Exception as e:
+ print(f"Error reading file: {e}")
+
+ # Yield any remaining samples
+ if samples:
+ yield samples
+
+ # Return metadata about the entire file
+ return {"total_lines": total_lines, "valid_samples": valid_samples}
+
+# --- Brute Force Evaluation ---
+def bruteforce_all_methods(samples: List[Tuple[bytes, int]], label_prefix="", file_metadata=None) -> List[Tuple[str, int, int, str]]:
+ methods: List[Tuple[str, Callable[[bytes], int]]] = [
+ ("SUM", checksum_sum),
+ ("XOR", checksum_xor),
+ ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)),
+ ("SUM<<2", lambda d: checksum_sum_shifted(d, 2)),
+ ("XOR<<1", lambda d: checksum_xor_shifted(d, 1)),
+ ("XOR<<2", lambda d: checksum_xor_shifted(d, 2)),
+ ("WEIGHTED_SUM", checksum_weighted_sum),
+ ("ALT_SUM_XOR", checksum_alt_sum_xor),
+ ("BIT_FLIP_SUM", checksum_bit_flip_sum)
+ ]
+
+ seen = set()
+ matches = []
+ sample_methods = defaultdict(list) # Track methods that work for each sample
+
+ for sample_index, (data, expected) in enumerate(samples):
+ length = len(data)
+ sample_success = [] # Track successful methods for this sample
+
+ for start in range(length):
+ for end in range(start + 1, length + 1):
+ sliced = data[start:end]
+ label = f"[{start}:{end}]"
+ for name, func in methods:
+ try:
+ result = func(sliced)
+ method_id = f"{name}{label}"
+ key = (sample_index, method_id, label_prefix)
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+ sample_success.append((name, start, end))
+ except Exception:
+ continue
+
+ # Store methods that work for this sample
+ if sample_success:
+ sample_methods[sample_index] = sample_success
+
+ # Calculate consistency scores if we have enough samples
+ if len(samples) > 1 and sample_methods:
+ consistency_analysis = analyze_consistency(sample_methods, len(samples))
+ matches.append(("CONSISTENCY_DATA", 0, 0, json.dumps(consistency_analysis)))
+
+ # Add file metadata for reporting
+ if file_metadata:
+ file_name = file_metadata.get("file", "unknown")
+ matches.append(("FILE_METADATA", file_name, 0, json.dumps(file_metadata)))
+
+ return matches
+
+# --- Consistency Analysis ---
+def analyze_consistency(sample_methods: Dict[int, List[Tuple[str, int, int]]], total_samples: int) -> Dict:
+ """Analyze which methods work consistently across different samples."""
+ method_consistency = defaultdict(int)
+ range_consistency = defaultdict(int)
+ method_range_consistency = defaultdict(int)
+
+ # Count how many samples each method/range works for
+ for sample_idx, methods in sample_methods.items():
+ seen_methods = set()
+ seen_ranges = set()
+ seen_method_ranges = set()
+
+ for method, start, end in methods:
+ if method not in seen_methods:
+ seen_methods.add(method)
+ method_consistency[method] += 1
+
+ range_key = f"{start}:{end}"
+ if range_key not in seen_ranges:
+ seen_ranges.add(range_key)
+ range_consistency[range_key] += 1
+
+ method_range_key = f"{method}[{start}:{end}]"
+ if method_range_key not in seen_method_ranges:
+ seen_method_ranges.add(method_range_key)
+ method_range_consistency[method_range_key] += 1
+
+ # Calculate consistency percentages
+ method_scores = {method: count / total_samples * 100 for method, count in method_consistency.items()}
+ range_scores = {range_key: count / total_samples * 100 for range_key, count in range_consistency.items()}
+ method_range_scores = {mr: count / total_samples * 100 for mr, count in method_range_consistency.items()}
+
+ # Find the most consistent options
+ best_methods = sorted(method_scores.items(), key=lambda x: x[1], reverse=True)[:5]
+ best_ranges = sorted(range_scores.items(), key=lambda x: x[1], reverse=True)[:5]
+ best_method_ranges = sorted(method_range_scores.items(), key=lambda x: x[1], reverse=True)[:5]
+
+ return {
+ "best_methods": best_methods,
+ "best_ranges": best_ranges,
+ "best_method_ranges": best_method_ranges,
+ "total_samples": total_samples
+ }
+
+# --- Pattern Recognition ---
+def analyze_patterns(matches: List[Tuple[str, int, int, str]]) -> Dict:
+ patterns = {
+ "methods": Counter(),
+ "ranges": Counter(),
+ "start_positions": Counter(),
+ "end_positions": Counter(),
+ "lengths": Counter()
+ }
+
+ for method_id, _, _, _ in matches:
+ # Extract method name and range from method_id (e.g., "SUM[0:5]")
+ method_parts = re.match(r'([A-Z_<>0-9]+)\[(\d+):(\d+)\]', method_id)
+ if method_parts:
+ method_name, start, end = method_parts.groups()
+ start_pos, end_pos = int(start), int(end)
+ byte_range = f"[{start}:{end}]"
+ length = end_pos - start_pos
+
+ patterns["methods"][method_name] += 1
+ patterns["ranges"][byte_range] += 1
+ patterns["start_positions"][start_pos] += 1
+ patterns["end_positions"][end_pos] += 1
+ patterns["lengths"][length] += 1
+
+ return patterns
+
+# --- Result Display ---
+def print_results_with_summary(all_matches: List[Tuple[str, int, int, str]], per_file=False, insights=None, show_full=False):
+ """Print results with optional detailed analysis"""
+ # Extract consistency data and file metadata
+ consistency_data = {}
+ file_metadata = {}
+ filtered_matches = []
+
+ for match in all_matches:
+ if match[0] == "CONSISTENCY_DATA" and match[3]:
+ try:
+ file_data = match[3]
+ consistency_data[file_data] = json.loads(file_data)
+ except:
+ pass
+ elif match[0] == "FILE_METADATA" and match[3]:
+ try:
+ metadata = json.loads(match[3])
+ file_name = match[1] # Use the file name stored in match[1]
+ file_metadata[file_name] = metadata
+ except Exception as e:
+ print(f"Error processing metadata: {e}")
+ else:
+ filtered_matches.append(match)
+
+ all_matches = filtered_matches
+
+ if not all_matches:
+ print("āŒ No matches found.")
+ return
+
+ # Always organize by file
+ per_file_matches = defaultdict(list)
+ for match in all_matches:
+ per_file_matches[match[3]].append(match)
+
+ # Per-file statistics and pattern analysis
+ for file, matches in per_file_matches.items():
+ # Get file metadata if available
+ metadata = {}
+ for meta_file, meta_data in file_metadata.items():
+ if isinstance(meta_file, str) and file in meta_file: # Ensure meta_file is a string
+ metadata = meta_data
+ break
+
+ # Extract sample lines that matched successfully
+ matched_lines = set(line for _, line, _, _ in matches)
+
+ # Print file summary with line counts
+ print(f"\n\nšŸ“„ Results for: {file}")
+ if metadata:
+ total_lines = metadata.get("total_lines", "?")
+ valid_samples = metadata.get("valid_samples", len(matched_lines))
+ success_rate = (len(matched_lines)/valid_samples*100) if valid_samples > 0 else 0
+ print(f"āœ… Matches Found: {len(matched_lines)}/{valid_samples} samples " +
+ f"({success_rate:.1f}% success rate)")
+ print(f"šŸ“ Total file lines: {total_lines}, Valid samples: {valid_samples}")
+ else:
+ print(f"āœ… Matches Found: {len(matches)}")
+
+ # Only show individual matches if per_file flag is set AND full details are requested
+ if per_file and show_full:
+ for method_id, line, expected, _ in matches[:20]: # Show only first 20 to avoid flooding
+ print(f"Line {line:03d} | Method: {method_id:20s} | Expected: {expected:02X}")
+ if len(matches) > 20:
+ print(f"... and {len(matches) - 20} more matches")
+ elif per_file:
+ # In condensed mode, just show counts per line
+ line_counts = Counter(line for _, line, _, _ in matches)
+ print(f"Lines with matches: {', '.join(str(l) for l in sorted(line_counts.keys()))}")
+ if len(line_counts) > 10:
+ print(f"Total lines with matches: {len(line_counts)}")
+
+ # Pattern analysis for this file
+ patterns = analyze_patterns(matches)
+
+ # Print top methods for this file
+ print("\nšŸ“Š Most Successful Methods in this file:")
+ for method, count in patterns["methods"].most_common(5):
+ print(f"{method:<15} → {count} matches")
+
+ if show_full:
+ # Print top ranges for this file
+ print("\nšŸ“ Most Common Byte Ranges:")
+ for range_str, count in patterns["ranges"].most_common(5):
+ print(f"{range_str:<10} → {count} matches")
+
+ # Print common start positions
+ print("\nšŸ” Common Start Positions:")
+ for pos, count in patterns["start_positions"].most_common(5):
+ print(f"Position {pos:<3} → {count} matches")
+
+ # Print common end positions
+ print("\nšŸ”Ž Common End Positions:")
+ for pos, count in patterns["end_positions"].most_common(5):
+ print(f"Position {pos:<3} → {count} matches")
+
+ # Print common byte lengths
+ print("\nšŸ“Š Common Byte Lengths:")
+ for length, count in patterns["lengths"].most_common(5):
+ print(f"{length} bytes → {count} matches")
+
+ # Visual representation of match distribution
+ if patterns["start_positions"] and patterns["end_positions"]:
+ max_pos = max(max(patterns["end_positions"].keys()),
+ max(patterns["start_positions"].keys()))
+ print("\nšŸ“ˆ Match Distribution (frequency by position):")
+ scale = 30 # Reduced scale for more compact output
+ max_count = max(max(patterns["start_positions"].values()),
+ max(patterns["end_positions"].values()))
+ for pos in range(min(max_pos + 1, 40)): # Limit to first 40 positions
+ start_count = patterns["start_positions"].get(pos, 0)
+ end_count = patterns["end_positions"].get(pos, 0)
+ start_bar = 'ā–ˆ' * int((start_count / max_count) * scale) if start_count else ''
+ end_bar = 'ā–‘' * int((end_count / max_count) * scale) if end_count else ''
+ print(f"{pos:2d}: {start_bar}|{end_bar}")
+ print(" ā–ˆā–ˆā–ˆ = start positions, ā–‘ā–‘ā–‘ = end positions")
+
+ # Print byte-level insights for each sample if available
+ if insights and show_full:
+ file_insights = {k: v for k, v in insights.items() if k.startswith(f"sample_") and file in v.get("method", "")}
+ if file_insights:
+ print("\nšŸ”¬ Byte-Level Analysis:")
+ for key, data in file_insights.items():
+ parts = key.split('_')
+ sample_id = parts[1] if len(parts) > 1 else "?"
+ print(f"\nSample {sample_id} with {data['method']}[{data['range']}]:")
+
+ # Show optimal byte changes
+ if data.get("optimal_changes"):
+ print("Optimal byte changes to achieve expected checksum:")
+ for pos, new_val in data["optimal_changes"]:
+ print(f" Change byte at position {pos} from 0x{data['contributions']['byte_contributions'][pos]['original_value']:02X} to 0x{new_val:02X}")
+ else:
+ print("No simple byte changes found to fix checksum")
+
+ # Global summary (always show this part)
+ print("\n\nšŸ“Š Global Summary of Most Successful Methods:")
+ method_counts = defaultdict(int)
+ for method_id, _, _, _ in all_matches:
+ method_counts[method_id] += 1
+
+ sorted_methods = sorted(method_counts.items(), key=lambda x: x[1], reverse=True)
+ for method_id, count in sorted_methods[:5]: # Reduced to top 5 for conciseness
+ print(f"{method_id:<25} → {count} matches")
+
+ # Show more detailed global pattern summary only in full mode
+ if show_full:
+ all_patterns = analyze_patterns(all_matches)
+ print("\nšŸ“ˆ Global Pattern Summary:")
+ print(f"Total unique methods found: {len(all_patterns['methods'])}")
+ print(f"Total unique byte ranges: {len(all_patterns['ranges'])}")
+ print(f"Most common method: {all_patterns['methods'].most_common(1)[0][0]} with {all_patterns['methods'].most_common(1)[0][1]} matches")
+
+ # Print global consensus analysis at the end
+ if consistency_data and show_full:
+ print("\n\n🧩 Global Consensus Analysis")
+ print("═══════════════════════════")
+ print("Methods that work across multiple files:")
+
+ # Collect global statistics from all files
+ global_methods = Counter()
+ global_ranges = Counter()
+ global_method_ranges = Counter()
+
+ for file_data in consistency_data.values():
+ for method, score in file_data.get("best_methods", []):
+ global_methods[method] += 1
+ for range_key, score in file_data.get("best_ranges", []):
+ global_ranges[range_key] += 1
+ for mr, score in file_data.get("best_method_ranges", []):
+ global_method_ranges[mr] += 1
+
+ # Display methods that work across multiple files
+ num_files = len(consistency_data)
+ print(f"\nšŸ“Š Methods that work across multiple files (total files: {num_files}):")
+ for method, count in global_methods.most_common(5):
+ print(f"{method:<15} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)")
+
+ print(f"\nšŸ“ Byte ranges that work across multiple files:")
+ for range_key, count in global_ranges.most_common(5):
+ print(f"[{range_key}] → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)")
+
+ print(f"\nšŸ” Method+Range combinations that work across multiple files:")
+ for mr, count in global_method_ranges.most_common(5):
+ print(f"{mr:<20} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)")
+
+ # Generate a recommended approach
+ if global_method_ranges:
+ best_combo, count = global_method_ranges.most_common(1)[0]
+ if count >= num_files * 0.5: # If it works for at least half the files
+ print(f"\nāœ… Recommended global method: {best_combo}")
+ print(f" This combination works in top 5 for {count}/{num_files} files")
+ else:
+ print("\nāš ļø No single method+range combination works reliably across most files")
+ print(f" Best option ({best_combo}) only works in top 5 for {count}/{num_files} files")
+
+ # Try to find patterns in the most successful methods
+ if global_methods:
+ best_method, method_count = global_methods.most_common(1)[0]
+ print(f"\nšŸ’” Consider using {best_method} with file-specific byte ranges")
+ print(f" This algorithm appears in top 5 for {method_count}/{num_files} files")
+
+# --- Advanced Checksum Algorithms ---
+def checksum_weighted_sum_parametric(data: bytes, weight_start: float = 1.0, weight_step: float = 1.0) -> int:
+ """Weighted sum with configurable starting weight and step"""
+ return sum(int((weight_start + i * weight_step) * b) % 256 for i, b in enumerate(data)) % 256
+
+def checksum_hybrid_sum_xor(data: bytes, weight: float = 0.5) -> int:
+ """Hybrid checksum using weighted combination of sum and XOR"""
+ sum_result = sum(data) % 256
+ xor_result = 0
+ for b in data:
+ xor_result ^= b
+ return int((weight * sum_result + (1 - weight) * xor_result)) % 256
+
+def checksum_adaptive_bit_flip_sum(data: bytes, flip_mask: int = 0xFF) -> int:
+ """Bit flip sum with configurable flip mask"""
+ return sum(b ^ flip_mask for b in data) % 256
+
+def checksum_position_weighted_sum(data: bytes, position_weights: List[float] = None) -> int:
+ """Sum where each byte is weighted by its position in a specific pattern"""
+ if position_weights is None:
+ # Default to alternating weights
+ position_weights = [1.0, 0.5]
+
+ result = 0
+ for i, b in enumerate(data):
+ weight = position_weights[i % len(position_weights)]
+ result = (result + int(b * weight)) % 256
+ return result
+
+def evaluate_targeted_algorithms(samples: List[Tuple[bytes, int]], label_prefix="") -> List[Tuple[str, int, int, str]]:
+ """Run a more focused test on the most promising algorithms with fine-tuned parameters"""
+
+ # Based on consensus, focus testing on these methods with more parameter variations
+ matches = []
+ seen = set()
+
+ # Set up parameter variations for testing
+ bit_flip_masks = [0xFF, 0xF0, 0x0F, 0xCC, 0x55, 0xAA]
+ hybrid_weights = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
+ weight_steps = [0.9, 1.0, 1.1, 1.2, 1.5]
+ pos_weight_patterns = [
+ [1.0, 0.5], # Alternating
+ [1.0, 1.0, 0.5], # Every third byte gets half weight
+ [1.0, 0.75, 0.5, 0.25] # Descending weights
+ ]
+
+ # Process each sample with focused algorithms
+ for sample_index, (data, expected) in enumerate(samples):
+ length = len(data)
+
+ # Instead of trying every possible byte range, focus on the most promising ranges
+ # based on global patterns from previous analysis
+
+ # Try more specific ranges based on insights
+ ranges_to_try = []
+
+ # Focus on common start positions from global analysis: 0-5 and specific ranges
+ for start in [0, 1, 2, 3, 4, 5]:
+ # Try full data range
+ ranges_to_try.append((start, length))
+
+ # Try common end points (from previous runs)
+ for end_offset in [0, 1, 2, 4, 8]:
+ if length - end_offset > start + 1: # Ensure valid range
+ ranges_to_try.append((start, length - end_offset))
+
+ # Add specific ranges that were successful in multiple files
+ specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)]
+ for start, end in specific_ranges:
+ if start < length and end <= length and start < end:
+ ranges_to_try.append((start, end))
+
+ # Process the focused ranges with our most promising algorithms
+ for start, end in ranges_to_try:
+ sliced = data[start:end]
+ label = f"[{start}:{end}]"
+
+ # Test standard checksum methods that showed promise
+ methods = [
+ ("WEIGHTED_SUM", lambda d: checksum_weighted_sum(d)),
+ ("ALT_SUM_XOR", lambda d: checksum_alt_sum_xor(d)),
+ ("BIT_FLIP_SUM", lambda d: checksum_bit_flip_sum(d)),
+ ("SUM<<1", lambda d: checksum_sum_shifted(d, 1))
+ ]
+
+ # Test the standard methods
+ for name, func in methods:
+ try:
+ result = func(sliced)
+ method_id = f"{name}{label}"
+ key = (sample_index, method_id, label_prefix)
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+ except Exception:
+ continue
+
+ # Test advanced parametric methods
+ for mask in bit_flip_masks:
+ try:
+ result = checksum_adaptive_bit_flip_sum(sliced, mask)
+ method_id = f"BIT_FLIP_SUM({mask:02X}){label}"
+ key = (sample_index, method_id, label_prefix)
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+ except Exception:
+ continue
+
+ for weight in hybrid_weights:
+ try:
+ result = checksum_hybrid_sum_xor(sliced, weight)
+ method_id = f"HYBRID_SUM_XOR({weight:.1f}){label}"
+ key = (sample_index, method_id, label_prefix)
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+ except Exception:
+ continue
+
+ for step in weight_steps:
+ try:
+ result = checksum_weighted_sum_parametric(sliced, 1.0, step)
+ method_id = f"WEIGHTED_SUM_STEP({step:.1f}){label}"
+ key = (sample_index, method_id, label_prefix)
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+ except Exception:
+ continue
+
+ for i, pattern in enumerate(pos_weight_patterns):
+ try:
+ result = checksum_position_weighted_sum(sliced, pattern)
+ method_id = f"POS_WEIGHT_{i+1}{label}"
+ key = (sample_index, method_id, label_prefix)
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+ except Exception:
+ continue
+
+ return matches
+
+# --- Byte Change Correlation Analysis ---
+def analyze_byte_value_correlations(samples: List[Tuple[bytes, int]], max_samples: int = 1000) -> Dict:
+ """
+ Analyze how changing specific bytes correlates with changes in the checksum.
+ This helps understand the "sensitivity" of the checksum to specific byte positions.
+ """
+ # Sample if we have too many samples to process
+ if len(samples) > max_samples:
+ print(f"Sampling {max_samples} out of {len(samples)} for correlation analysis")
+ samples = random.sample(samples, max_samples)
+
+ # Initialize data structures for correlation analysis
+ bytes_by_position = defaultdict(list)
+ checksums_by_position_value = defaultdict(list)
+ correlations = {}
+ position_weights = {}
+
+ # Gather data by byte position
+ max_length = max(len(data) for data, _ in samples)
+ print(f"Analyzing correlations for {len(samples)} samples with max length {max_length}")
+
+ # Track all byte values and checksums by position
+ for data, checksum in samples:
+ for pos, value in enumerate(data):
+ bytes_by_position[pos].append(value)
+ checksums_by_position_value[(pos, value)].append(checksum)
+
+ # Calculate correlation strength for each position
+ for pos in range(max_length):
+ pos_values = bytes_by_position.get(pos, [])
+ if len(pos_values) <= 1:
+ continue
+
+ # Create value-to-checksum mapping and analyze patterns
+ value_impact = {}
+ checksum_changes = []
+
+ # Group by unique values at this position
+ unique_values = set(pos_values)
+ if len(unique_values) <= 1:
+ continue
+
+ # Analyze how changes in this position correlate with checksums
+ for val in unique_values:
+ checksums = checksums_by_position_value.get((pos, val), [])
+ if checksums:
+ avg_checksum = sum(checksums) / len(checksums)
+ value_impact[val] = avg_checksum
+
+ # If we have enough data, calculate correlation metrics
+ if len(value_impact) >= 2:
+ # Look for linear relationships
+ xy_pairs = [(val, cs) for val, cs in value_impact.items()]
+ correlation = calculate_correlation_coefficient(xy_pairs)
+
+ # Look for bit-level patterns (XOR, bit flips)
+ bit_patterns = analyze_bit_patterns(value_impact)
+
+ correlations[pos] = {
+ "strength": abs(correlation),
+ "direction": "positive" if correlation >= 0 else "negative",
+ "unique_values": len(unique_values),
+ "sample_count": len(pos_values),
+ "bit_patterns": bit_patterns
+ }
+
+ # Calculate a rough "weight" for this position in checksum calculations
+ pos_weight = abs(correlation) * (len(unique_values) / 256)
+ position_weights[pos] = pos_weight
+
+ # Sort positions by correlation strength
+ sorted_positions = sorted(correlations.keys(), key=lambda p: correlations[p]["strength"], reverse=True)
+ significant_positions = sorted_positions[:10] # Most influential positions
+
+ # Build response
+ return {
+ "significant_positions": significant_positions,
+ "position_correlations": {p: correlations[p] for p in significant_positions},
+ "position_weights": {p: position_weights[p] for p in position_weights if p in significant_positions},
+ "analyzed_samples": len(samples),
+ "max_length": max_length
+ }
+
+def calculate_correlation_coefficient(pairs: List[Tuple[int, int]]) -> float:
+ """Calculate Pearson's correlation coefficient between byte values and checksums."""
+ if len(pairs) < 2:
+ return 0.0
+
+ x_vals = [p[0] for p in pairs]
+ y_vals = [p[1] for p in pairs]
+
+ n = len(pairs)
+
+ # Calculate means
+ x_mean = sum(x_vals) / n
+ y_mean = sum(y_vals) / n
+
+ # Calculate correlation coefficient
+ numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_vals, y_vals))
+ denominator_x = sum((x - x_mean) ** 2 for x in x_vals)
+ denominator_y = sum((y - y_mean) ** 2 for y in y_vals)
+
+ if denominator_x == 0 or denominator_y == 0:
+ return 0.0
+
+ return numerator / math.sqrt(denominator_x * denominator_y)
+
+def analyze_bit_patterns(value_impact: Dict[int, float]) -> Dict:
+ """
+ Analyze bit-level patterns in how byte changes affect checksums.
+ Identifies patterns like "flipping bit 3 adds 8 to checksum" etc.
+ """
+ bit_influences = [0.0] * 8 # Influence of each bit position
+
+ # Calculate average impact when each bit is set vs unset
+ bit_set_checksums = [[] for _ in range(8)]
+ bit_unset_checksums = [[] for _ in range(8)]
+
+ for value, checksum in value_impact.items():
+ # Analyze each bit
+ for bit_pos in range(8):
+ bit_mask = 1 << bit_pos
+ if value & bit_mask: # Bit is set
+ bit_set_checksums[bit_pos].append(checksum)
+ else: # Bit is unset
+ bit_unset_checksums[bit_pos].append(checksum)
+
+ # Calculate average difference per bit
+ for bit_pos in range(8):
+ set_avg = sum(bit_set_checksums[bit_pos]) / len(bit_set_checksums[bit_pos]) if bit_set_checksums[bit_pos] else 0
+ unset_avg = sum(bit_unset_checksums[bit_pos]) / len(bit_unset_checksums[bit_pos]) if bit_unset_checksums[bit_pos] else 0
+
+ if set_avg and unset_avg:
+ influence = set_avg - unset_avg
+ bit_influences[bit_pos] = influence
+
+ # Determine the bit pattern type
+ pattern_types = {
+ "xor_like": all(abs(bit_influences[i]) >= 0.5 for i in range(8)),
+ "additive": all(bit_influences[i] >= 0 for i in range(8)),
+ "subtractive": all(bit_influences[i] <= 0 for i in range(8)),
+ "weighted": max(abs(b) for b in bit_influences) / (min(abs(b) for b in bit_influences) if min(abs(b) for b in bit_influences) else 1) > 3,
+ }
+
+ return {
+ "bit_influences": {i: bit_influences[i] for i in range(8)},
+ "pattern_type": next((ptype for ptype, matches in pattern_types.items() if matches), "mixed"),
+ "most_influential_bit": bit_influences.index(max(bit_influences, key=abs))
+ }
+
+def find_optimal_byte_changes(data: bytes, checksum_func: Callable, expected: int) -> List[Tuple[int, int]]:
+ """
+ Find the minimal set of byte changes needed to achieve the expected checksum.
+ Returns a list of (position, new_value) tuples.
+ """
+ base_checksum = checksum_func(data)
+ if base_checksum == expected:
+ return [] # No changes needed
+
+ # Try changing bytes to match target checksum using sensitivity information
+
+ # First try single byte changes - this is much faster and most likely case
+ for i in range(len(data)):
+ modified = bytearray(data)
+ target_diff = (expected - base_checksum) % 256
+
+ # Try calculating what value this position should have
+ if checksum_func == checksum_sum:
+ # For sum, we can directly calculate needed value
+ new_val = (data[i] + target_diff) % 256
+ modified[i] = new_val
+ if checksum_func(bytes(modified)) == expected:
+ return [(i, new_val)]
+ elif checksum_func == checksum_xor:
+ # For XOR, direct calculation also works
+ new_val = data[i] ^ (base_checksum ^ expected)
+ modified[i] = new_val
+ if checksum_func(bytes(modified)) == expected:
+ return [(i, new_val)]
+ else:
+ # For other algorithms, try incremental changes or use binary search
+ best_value = None
+ best_diff = 256
+
+ # Check common values first, then do a smarter search if needed
+ for test_val in [0, 1, 0xFF, expected, data[i] ^ 0xFF]:
+ if test_val == data[i]:
+ continue
+
+ modified[i] = test_val
+ new_checksum = checksum_func(bytes(modified))
+ if new_checksum == expected:
+ return [(i, test_val)]
+ diff = abs((new_checksum - expected) % 256)
+ if diff < best_diff:
+ best_diff = diff
+ best_value = test_val
+
+ # If we got close, try a more focused search around the promising value
+ if best_diff < 50 and best_value is not None:
+ for offset in range(-10, 11):
+ test_val = (best_value + offset) % 256
+ if test_val == data[i]:
+ continue
+
+ modified[i] = test_val
+ new_checksum = checksum_func(bytes(modified))
+ if new_checksum == expected:
+ return [(i, test_val)]
+
+ # If single byte changes don't work, try strategic two-byte changes
+ # For performance, we'll limit this to nearby byte combinations
+ for i in range(len(data)):
+ for j in range(i+1, min(i+8, len(data))): # Try up to 7 bytes ahead
+ for i_adj in [-1, 1]:
+ for j_adj in [-1, 1]:
+ modified = bytearray(data)
+ modified[i] = (data[i] + i_adj) % 256
+ modified[j] = (data[j] + j_adj) % 256
+
+ if checksum_func(bytes(modified)) == expected:
+ return [(i, modified[i]), (j, modified[j])]
+
+ return []
+
+# --- Large-Scale File Analysis ---
+def analyze_large_file(filepath: str, max_samples=1000) -> Dict:
+ """Analyze a large file efficiently by processing it in batches."""
+ start_time = time.time()
+ print(f"Starting large-scale analysis of {filepath}...")
+
+ # Process the file in batches to handle large files
+ batch_gen = parse_input_file_lines_batched(filepath, batch_size=1000)
+
+ # First batch will be used for detailed analysis
+ first_batch = next(batch_gen, [])
+ if not first_batch:
+ print("No valid samples found in file.")
+ return {}
+
+ # Collect metadata about the batch
+ batch_metadata = next(batch_gen, {"total_lines": 0, "valid_samples": 0})
+
+ # Perform initial algorithm identification on the first batch
+ print(f"Identifying potential checksum algorithms on first {len(first_batch)} samples...")
+ matches = bruteforce_all_methods(first_batch, label_prefix=os.path.basename(filepath))
+
+ # Extract the most promising algorithms and ranges
+ patterns = analyze_patterns([m for m in matches if m[0] != "CONSISTENCY_DATA"])
+ top_methods = patterns["methods"].most_common(3)
+ top_ranges = patterns["ranges"].most_common(3)
+
+ # Combining top methods with top ranges for focused analysis
+ focused_analysis = []
+ method_func_map = {
+ "SUM": checksum_sum,
+ "XOR": checksum_xor,
+ "SUM<<1": lambda d: checksum_sum_shifted(d, 1),
+ "SUM<<2": lambda d: checksum_sum_shifted(d, 2),
+ "XOR<<1": lambda d: checksum_xor_shifted(d, 1),
+ "XOR<<2": lambda d: checksum_xor_shifted(d, 2),
+ "WEIGHTED_SUM": checksum_weighted_sum,
+ "ALT_SUM_XOR": checksum_alt_sum_xor,
+ "BIT_FLIP_SUM": checksum_bit_flip_sum
+ }
+
+ # Collect a sample of data for correlation analysis
+ correlation_samples = first_batch.copy()
+
+ # Check more batches if we need more samples for correlation analysis
+ batches_processed = 1
+ while len(correlation_samples) < max_samples:
+ batch = next(batch_gen, None)
+ if batch is None:
+ break
+ correlation_samples.extend(batch[:max_samples - len(correlation_samples)])
+ batches_processed += 1
+ if batches_processed >= 10: # Limit to 10 batches for performance
+ break
+
+ # Perform correlation analysis
+ print(f"Performing byte correlation analysis on {len(correlation_samples)} samples...")
+ correlations = analyze_byte_value_correlations(correlation_samples, max_samples=max_samples)
+
+ # Test the most likely algorithms on the significant byte positions
+ print("Testing algorithm-position combinations...")
+ for method_name, _ in top_methods:
+ for range_str, _ in top_ranges:
+ range_parts = range_str.strip('[]').split(':')
+ if len(range_parts) == 2:
+ start, end = int(range_parts[0]), int(range_parts[1])
+ method_func = method_func_map.get(method_name)
+ if method_func:
+ success_count = 0
+ for data, expected in correlation_samples[:100]: # Test on first 100 samples
+ if len(data) >= end:
+ result = method_func(data[start:end])
+ if result == expected:
+ success_count += 1
+
+ success_rate = success_count / min(100, len(correlation_samples))
+ focused_analysis.append({
+ "method": method_name,
+ "range": f"[{start}:{end}]",
+ "success_rate": success_rate,
+ "success_count": success_count
+ })
+
+ # Sort by success rate
+ focused_analysis.sort(key=lambda x: x["success_rate"], reverse=True)
+
+ # Find byte positions that most strongly influence the checksum
+ influential_positions = correlations["significant_positions"][:5]
+
+ elapsed_time = time.time() - start_time
+
+ return {
+ "file_name": os.path.basename(filepath),
+ "samples_analyzed": len(correlation_samples),
+ "elapsed_time": elapsed_time,
+ "top_methods": [m[0] for m in top_methods],
+ "top_ranges": [r[0] for r in top_ranges],
+ "focused_analysis": focused_analysis[:5],
+ "influential_positions": influential_positions,
+ "position_correlations": {str(p): correlations["position_correlations"][p] for p in influential_positions},
+ "byte_pattern_summary": summarize_byte_patterns(correlations),
+ }
+
+def summarize_byte_patterns(correlations: Dict) -> Dict:
+ """Summarize patterns in byte correlations to help understand the checksum algorithm."""
+ if not correlations or "position_correlations" not in correlations:
+ return {}
+
+ # Identify patterns in how byte positions affect the checksum
+ positions = correlations.get("significant_positions", [])
+ if not positions:
+ return {}
+
+ # Count pattern types to identify algorithm characteristics
+ pattern_types = Counter()
+ for pos in positions:
+ if pos in correlations["position_correlations"]:
+ bit_patterns = correlations["position_correlations"][pos].get("bit_patterns", {})
+ pattern_type = bit_patterns.get("pattern_type", "unknown")
+ pattern_types[pattern_type] += 1
+
+ # Algorithm characteristics based on patterns
+ primary_pattern = pattern_types.most_common(1)[0][0] if pattern_types else "unknown"
+ algorithm_characteristics = {
+ "xor_like": "XOR-based algorithm (position-independent)",
+ "additive": "Sum-based algorithm (position-independent)",
+ "subtractive": "Subtraction-based algorithm (unusual)",
+ "weighted": "Weighted algorithm (position-dependent)",
+ "mixed": "Mixed algorithm (complex checksum)"
+ }
+
+ # Check position importance distribution
+ pos_weights = correlations.get("position_weights", {})
+ weight_values = list(pos_weights.values())
+ weight_variance = 0
+ if weight_values:
+ mean_weight = sum(weight_values) / len(weight_values)
+ weight_variance = sum((w - mean_weight) ** 2 for w in weight_values) / len(weight_values)
+
+ position_dependent = weight_variance > 0.05
+
+ return {
+ "dominant_pattern": primary_pattern,
+ "likely_algorithm_type": algorithm_characteristics.get(primary_pattern, "Unknown algorithm type"),
+ "position_dependent": position_dependent,
+ "weight_variance": weight_variance,
+ "recommendation": get_algorithm_recommendation(primary_pattern, position_dependent)
+ }
+
+def get_algorithm_recommendation(pattern_type: str, position_dependent: bool) -> str:
+ """Get a recommendation for checksum algorithm based on correlation analysis."""
+ if pattern_type == "xor_like" and not position_dependent:
+ return "XOR-based checksum recommended"
+ elif pattern_type == "xor_like" and position_dependent:
+ return "Position-dependent XOR (shifted XOR) recommended"
+ elif pattern_type == "additive" and not position_dependent:
+ return "Simple sum checksum recommended"
+ elif pattern_type == "additive" and position_dependent:
+ return "Weighted sum checksum recommended"
+ elif pattern_type == "weighted":
+ return "Complex weighted checksum recommended"
+ else:
+ return "Mixed or complex algorithm recommended, try ALT_SUM_XOR or custom hybrid"
+
+def print_large_file_analysis(analysis: Dict):
+ """Print the results of large-file analysis in a readable format."""
+ print("\nšŸ“Š Large File Analysis Results")
+ print("═══════════════════════════")
+ print(f"File: {analysis.get('file_name', 'Unknown')}")
+ print(f"Samples analyzed: {analysis.get('samples_analyzed', 0)}")
+ print(f"Analysis time: {analysis.get('elapsed_time', 0):.2f} seconds")
+
+ # Print the top methods and ranges
+ print("\nšŸ” Top Checksum Methods:")
+ for method in analysis.get('top_methods', []):
+ print(f" • {method}")
+
+ print("\nšŸ“ Top Byte Ranges:")
+ for range_str in analysis.get('top_ranges', []):
+ print(f" • {range_str}")
+
+ # Print the focused analysis results
+ print("\nāœ… Best Method+Range Combinations:")
+ for combo in analysis.get('focused_analysis', []):
+ print(f" • {combo['method']}{combo['range']} → {combo['success_rate']*100:.1f}% success rate ({combo['success_count']} samples)")
+
+ # Print the byte pattern summary
+ pattern_summary = analysis.get('byte_pattern_summary', {})
+ if pattern_summary:
+ print("\n🧠 Algorithm Characteristics:")
+ print(f" Dominant pattern: {pattern_summary.get('dominant_pattern', 'Unknown')}")
+ print(f" Likely algorithm: {pattern_summary.get('likely_algorithm_type', 'Unknown')}")
+ print(f" Position dependent: {'Yes' if pattern_summary.get('position_dependent', False) else 'No'}")
+ print(f"\nšŸ’” Recommendation: {pattern_summary.get('recommendation', 'Unknown')}")
+
+ # Print influential byte positions
+ print("\nšŸ”¢ Most Influential Byte Positions:")
+ positions = analysis.get('influential_positions', [])
+ pos_correlations = analysis.get('position_correlations', {})
+
+ for pos in positions:
+ pos_str = str(pos)
+ if pos_str in pos_correlations:
+ info = pos_correlations[pos_str]
+ print(f" • Position {pos}: {info['strength']:.3f} correlation strength, " +
+ f"{info['direction']} correlation, {info['unique_values']} unique values")
+
+ # Print bit patterns if available
+ bit_patterns = info.get("bit_patterns", {})
+ if bit_patterns:
+ most_influential_bit = bit_patterns.get("most_influential_bit", 0)
+ print(f" Most influential bit: {most_influential_bit} (bit {7-most_influential_bit} from left)")
+
+# --- Enhanced Folder Processing ---
+def process_folder_with_limits(folder_path: str, max_total_samples: int = 1000) -> List[Tuple[bytes, int]]:
+ """
+ Process files in a folder with a limit on total samples.
+ Returns a list of samples up to the specified limit.
+ """
+ all_samples = []
+ files_processed = 0
+ samples_collected = 0
+
+ print(f"Processing folder with limit of {max_total_samples} samples...")
+
+ for file in os.listdir(folder_path):
+ if file.endswith(".txt"):
+ full_path = os.path.join(folder_path, file)
+ try:
+ samples, file_meta = parse_input_file_lines(full_path)
+
+ # Take only what we need to stay under max_total_samples
+ remaining = max_total_samples - len(all_samples)
+ if remaining <= 0:
+ break
+
+ if len(samples) > remaining:
+ print(f"Taking {remaining} of {len(samples)} samples from {file}")
+ samples = samples[:remaining]
+ else:
+ print(f"Taking all {len(samples)} samples from {file}")
+
+ all_samples.extend(samples)
+ files_processed += 1
+ samples_collected += len(samples)
+
+ # Stop if we've reached our limit
+ if len(all_samples) >= max_total_samples:
+ break
+
+ except Exception as e:
+ print(f"Error processing {file}: {e}")
+
+ print(f"Processed {files_processed} files, collected {samples_collected} samples")
+ return all_samples
+
+# --- Main ---
+if __name__ == "__main__":
+ # Create argument parser
+ parser = argparse.ArgumentParser(description='Analyze checksum algorithms in files.')
+ parser.add_argument('path', help='Path to file or directory to analyze')
+ parser.add_argument('--full', action='store_true', help='Show detailed output with all analyses')
+ parser.add_argument('--byte-analysis', action='store_true', help='Perform byte-level contribution analysis')
+ parser.add_argument('--large', action='store_true', help='Perform large-scale analysis optimized for big files')
+ parser.add_argument('--max-samples', type=int, default=1000,
+ help='Maximum number of samples for intensive analyses (byte-level and large-scale)')
+
+ args = parser.parse_args()
+
+ path = args.path
+ show_full = args.full
+ perform_byte_analysis = args.byte_analysis
+ large_analysis = args.large
+ max_samples = args.max_samples
+
+ all_matches = []
+ byte_insights = {}
+
+ if os.path.isdir(path):
+ # Standard brute force - process all samples without limits
+ print("Phase 1: Running standard brute force analysis...")
+ for file in os.listdir(path):
+ if file.endswith(".txt"):
+ full_path = os.path.join(path, file)
+ try:
+ parsed_samples, file_meta = parse_input_file_lines(full_path)
+ # Process all samples for standard analysis
+ match_results = bruteforce_all_methods(
+ parsed_samples,
+ label_prefix=file,
+ file_metadata={"file": file, **file_meta}
+ )
+ all_matches.extend(match_results)
+ except Exception as e:
+ print(f"Error processing {file}: {e}")
+
+ # Display standard results
+ print_results_with_summary(all_matches, per_file=True, show_full=show_full)
+
+ if perform_byte_analysis:
+ # Limit to max_samples for the intensive byte-level analysis
+ print(f"\n\nPhase 2: Running byte-level contribution analysis (limit: {max_samples} samples)...")
+ files_analyzed = 0
+ total_samples_analyzed = 0
+
+ for file in list(os.listdir(path)):
+ # Stop if we've hit our sample limit or analyzed enough files
+ if total_samples_analyzed >= max_samples or files_analyzed >= 3:
+ break
+
+ if file.endswith(".txt"):
+ full_path = os.path.join(path, file)
+ try:
+ parsed_samples, file_meta = parse_input_file_lines(full_path)
+ if not parsed_samples:
+ print(f"āš ļø No valid samples found in {file}")
+ continue
+
+ # Determine how many samples to take from this file
+ samples_remaining = max_samples - total_samples_analyzed
+ if samples_remaining <= 0:
+ break
+
+ samples_to_analyze = parsed_samples
+ if len(parsed_samples) > samples_remaining:
+ print(f"Limiting to {samples_remaining} samples from {file}")
+ samples_to_analyze = parsed_samples[:samples_remaining]
+ else:
+ print(f"Analyzing all {len(parsed_samples)} samples from {file}")
+
+ total_samples_analyzed += len(samples_to_analyze)
+ files_analyzed += 1
+
+ print(f"\nšŸ“„ Analyzing file: {file} ({len(samples_to_analyze)} samples)")
+ match_results, file_insights = evaluate_with_byte_analysis(
+ samples_to_analyze,
+ label_prefix=f"BYTE_ANALYSIS_{file}",
+ detailed=True
+ )
+
+ if not file_insights:
+ print(f"āš ļø No byte-level insights found for {file}")
+
+ byte_insights.update(file_insights)
+ except Exception as e:
+ print(f"āš ļø Error analyzing {file}: {e}")
+
+ print(f"\nCompleted byte-level analysis on {total_samples_analyzed} samples from {files_analyzed} files")
+
+ # Overall summary
+ print("\n\n🧬 Byte Contribution Analysis Summary")
+ print("═════════════════════════════════════")
+ print(f"Total samples analyzed: {len(byte_insights)}")
+ print(f"Methods with most influence on checksums:")
+
+ # Collect statistics on which methods have highest average impact
+ method_impacts = defaultdict(list)
+ for key, data in byte_insights.items():
+ if "contributions" in data:
+ # Get average of max impacts across all bytes
+ impacts = [info["max_impact"] for info in data["contributions"]["byte_contributions"].values()]
+ if impacts:
+ avg_impact = sum(impacts) / len(impacts)
+ method_impacts[data["method"]].append(avg_impact)
+
+ # Show average impact by method
+ for method, impacts in method_impacts.items():
+ if impacts:
+ avg = sum(impacts) / len(impacts)
+ print(f"{method:<15} → Avg impact: {avg:.1f}")
+
+ elif os.path.isfile(path):
+ parsed_samples, file_meta = parse_input_file_lines(path)
+ file_name = os.path.basename(path)
+ match_results = bruteforce_all_methods(
+ parsed_samples,
+ label_prefix=file_name,
+ file_metadata={"file": file_name, **file_meta}
+ )
+ all_matches.extend(match_results)
+
+ # Display results
+ print_results_with_summary(all_matches, per_file=True, show_full=show_full)
+
+ if perform_byte_analysis and parsed_samples:
+ print("\nRunning byte-level contribution analysis...")
+ try:
+ match_results, file_insights = evaluate_with_byte_analysis(
+ parsed_samples, # Now correctly passing just the samples list
+ label_prefix=f"BYTE_ANALYSIS_{os.path.basename(path)}",
+ detailed=True
+ )
+
+ # Print just the first sample's analysis as an example
+ if file_insights:
+ key = next(iter(file_insights))
+ data = file_insights[key]
+ sample_id = key.split('_')[1] if len(key.split('_')) > 1 else "?"
+ method_name = data["method"]
+ range_str = data["range"]
+
+ # Get original sample data
+ if int(sample_id) <= len(parsed_samples):
+ data_bytes, expected = parsed_samples[int(sample_id)-1]
+ start, end = map(int, data["range"].split(':'))
+ sliced_data = data_bytes[start:end]
+
+ print(f"\nByte analysis for Sample {sample_id} using {method_name}[{range_str}]")
+ print_byte_analysis(sliced_data, data["contributions"], method_name)
+ except Exception as e:
+ print(f"āš ļø Error during byte analysis: {e}")
+
+ if os.path.isdir(path):
+ # ...existing code...
+
+ if large_analysis:
+ print(f"\n\nPerforming large-scale file analysis (limit: {max_samples} samples per file)...")
+ files_analyzed = 0
+
+ for file in list(os.listdir(path)):
+ if files_analyzed >= 5: # Limit to 5 files for performance
+ break
+
+ if file.endswith(".txt"):
+ full_path = os.path.join(path, file)
+ try:
+ analysis = analyze_large_file(full_path, max_samples=max_samples)
+ print_large_file_analysis(analysis)
+ files_analyzed += 1
+ except Exception as e:
+ print(f"āš ļø Error during large file analysis of {file}: {e}")
+
+ elif os.path.isfile(path):
+ # ...existing code...
+
+ if large_analysis:
+ try:
+ analysis = analyze_large_file(path, max_samples=max_samples)
+ print_large_file_analysis(analysis)
+ except Exception as e:
+ print(f"āš ļø Error during large file analysis: {e}")
+
+def evaluate_with_byte_analysis(samples: List[Tuple[bytes, int]], label_prefix="", detailed=False) -> Tuple[List, Dict]:
+ """Analyze which methods work and provide byte-level insights"""
+ matches = []
+ seen = set()
+ byte_insights = {}
+
+ # Most promising methods based on previous analysis
+ methods = [
+ ("WEIGHTED_SUM", checksum_weighted_sum),
+ ("ALT_SUM_XOR", checksum_alt_sum_xor),
+ ("BIT_FLIP_SUM", checksum_bit_flip_sum),
+ ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)),
+ ("HYBRID_SUM_XOR(0.5)", lambda d: checksum_hybrid_sum_xor(d, 0.5)),
+ ("BIT_FLIP_SUM(AA)", lambda d: checksum_adaptive_bit_flip_sum(d, 0xAA))
+ ]
+
+ for sample_index, (data, expected) in enumerate(samples[:5]): # Limit to first 5 samples for performance
+ length = len(data)
+
+ # Focus on the most promising ranges
+ ranges_to_try = []
+
+ # Add the specific ranges that were most successful in our analysis
+ specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)]
+ for start, end in specific_ranges:
+ if start < length and end <= length and start < end:
+ ranges_to_try.append((start, end))
+
+ # Process each range with our methods
+ for start, end in ranges_to_try:
+ if end > start + 30: # Skip very large ranges to keep analysis fast
+ continue
+
+ sliced = data[start:end]
+ label = f"[{start}:{end}]"
+
+ for name, func in methods:
+ try:
+ result = func(sliced)
+ method_id = f"{name}{label}"
+ key = (sample_index, method_id, label_prefix)
+
+ if result == expected and key not in seen:
+ seen.add(key)
+ matches.append((method_id, sample_index + 1, expected, label_prefix))
+
+ # For matching methods, perform byte contribution analysis
+ if detailed:
+ print(f"Analyzing contributions for sample {sample_index+1}, method {method_id}...")
+ byte_contributions = analyze_byte_contributions(sliced, func, expected)
+ optimal_changes = find_optimal_byte_changes(sliced, func, expected)
+
+ # Store insights and also print them immediately
+ insights_key = f"sample_{sample_index+1}_{name}"
+ byte_insights[insights_key] = {
+ "contributions": byte_contributions,
+ "optimal_changes": optimal_changes,
+ "method": name,
+ "range": f"{start}:{end}",
+ "data": sliced # Store the data slice itself for easier analysis
+ }
+
+ # Print analysis directly during collection for immediate feedback
+ print_byte_analysis(sliced, byte_contributions, method_id)
+
+ # If we found compensation values, print them
+ if optimal_changes:
+ print("\nSuggested byte changes:")
+ for pos, new_val in optimal_changes:
+ print(f" Change byte at position {pos} from 0x{sliced[pos]:02X} to 0x{new_val:02X}")
+
+ # Once we've found and analyzed one matching method for a sample, move on
+ # to keep the output manageable
+ break
+ except Exception as e:
+ continue
+
+ # If we've already found and analyzed a method for this sample, move on
+ if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()):
+ break
+
+ # If we've already found and analyzed a method for this sample, move on
+ if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()):
+ continue
+
+ return matches, byte_insights