diff --git a/PatchReportExtractor.py b/PatchReportExtractor.py deleted file mode 100644 index 14149e3..0000000 --- a/PatchReportExtractor.py +++ /dev/null @@ -1,529 +0,0 @@ -#!/usr/bin/env python3 -""" -Extract structured action items from Nessus Plugin 66334 (Patch Report) -This provides pre-packaged, actionable patch recommendations per host -""" - -import xml.etree.ElementTree as ET -from collections import defaultdict -from dataclasses import dataclass -from typing import List, Dict -import csv -import argparse -import re -from pathlib import Path - - -@dataclass -class PatchAction: - """Represents a patch action from Plugin 66334""" - action: str - system: str - ip_address: str - vulnerability_count: int - software: str = "" - version_needed: str = "" - critical_count: int = 0 - high_count: int = 0 - medium_count: int = 0 - related_cves: List[str] = None - - def __post_init__(self): - if self.related_cves is None: - self.related_cves = [] - - -class PatchReportParser: - """Parses Plugin 66334 Patch Report data from .nessus files""" - - def __init__(self): - self.patch_actions: List[PatchAction] = [] - self.all_vulnerabilities: Dict = {} # Store all vulns for cross-reference - self.host_vulnerabilities: Dict = {} # Vulns per host for matching - - def parse_file(self, filepath: str): - """Parse a single .nessus file for Plugin 66334 data""" - try: - tree = ET.parse(filepath) - root = tree.getroot() - - for report_host in root.findall('.//ReportHost'): - host_ip = report_host.get('name') - host_properties = report_host.find('HostProperties') - system_name = host_ip - - if host_properties is not None: - for tag in host_properties.findall('tag'): - if tag.get('name') == 'host-fqdn': - system_name = tag.text - break - elif tag.get('name') == 'netbios-name' and not system_name != host_ip: - system_name = tag.text - - # Store this host's vulnerabilities - host_key = f"{system_name}_{host_ip}" - self.host_vulnerabilities[host_key] = [] - - # Collect ALL vulnerabilities for this host (for cross-reference) - for item in report_host.findall('ReportItem'): - plugin_id = item.get('pluginID') - plugin_name = item.get('pluginName', '') - severity = int(item.get('severity', 0)) - - # Handle Plugin 66334 specially (before skipping informational) - if plugin_id == '66334': - self._parse_patch_report_output(item, system_name, host_ip) - continue # Don't add it to host_vulnerabilities - - # Skip informational - if severity == 0: - continue - - # Collect CVEs - cves = [] - for cve in item.findall('cve'): - if cve.text: - cves.append(cve.text) - - # Store vulnerability info - vuln_info = { - 'plugin_id': plugin_id, - 'plugin_name': plugin_name, - 'severity': severity, - 'cves': cves - } - self.host_vulnerabilities[host_key].append(vuln_info) - - # Now cross-reference patch actions with vulnerabilities - self._match_patch_actions_to_vulnerabilities() - - print(f"✓ Parsed: {filepath}") - - except Exception as e: - print(f"✗ Error parsing {filepath}: {str(e)}") - - def _parse_patch_report_output(self, report_item, system_name: str, host_ip: str): - """Extract structured actions from the patch report output""" - plugin_output = report_item.find('plugin_output') - if plugin_output is None or not plugin_output.text: - return - - output = plugin_output.text - - # Pattern 1: Microsoft KB patches - # Example with count: - KB5073724 (39 vulnerabilities) - # Example without count: - KB5049613 - kb_pattern = r'-\s*(KB\d+)(?:\s*\((\d+)\s+vulnerabilit(?:y|ies)\))?' - kb_matches = re.findall(kb_pattern, output, re.IGNORECASE) - - if kb_matches: - for kb, count in kb_matches: - action = PatchAction( - action=f"Install Microsoft patch {kb}", - system=system_name, - ip_address=host_ip, - vulnerability_count=int(count) if count else 0, - software="Microsoft Windows", - version_needed=kb - ) - self.patch_actions.append(action) - - # Pattern 2: Software upgrades with structured format - # Example: - # [ 7-Zip < 25.01 (249179) ] - # + Action to take : Upgrade to 7-Zip version 25.01 or later. - # +Impact : Taking this action will resolve 5 different vulnerabilities (CVEs). - - # Match the software/version line - software_blocks = re.split(r'\n\s*\+\s*Action to take\s*:', output) - - for i in range(1, len(software_blocks)): - block = software_blocks[i] - - # Extract action description - action_match = re.search(r'^([^\n]+)', block) - if not action_match: - continue - - action_text = action_match.group(1).strip() - - # Extract impact/vulnerability count - impact_match = re.search(r'resolve\s+(\d+)\s+different vulnerabilit(?:y|ies)', block, re.IGNORECASE) - vuln_count = int(impact_match.group(1)) if impact_match else 0 - - # Try to extract software name and version from the previous block - prev_block_start = max(0, i - 1) - prev_text = software_blocks[prev_block_start][-200:] # Last 200 chars - - software_name = "" - version_needed = "" - - # Pattern: [ Software < version (...) ] - sw_match = re.search(r'\[\s*([^\<\[]+?)\s*<\s*([\d.]+)', prev_text) - if sw_match: - software_name = sw_match.group(1).strip() - version_needed = sw_match.group(2).strip() - - action = PatchAction( - action=action_text, - system=system_name, - ip_address=host_ip, - vulnerability_count=vuln_count, - software=software_name, - version_needed=version_needed - ) - self.patch_actions.append(action) - - def _match_patch_actions_to_vulnerabilities(self): - """Cross-reference patch actions with actual vulnerabilities to get severity counts""" - for action in self.patch_actions: - host_key = f"{action.system}_{action.ip_address}" - - if host_key not in self.host_vulnerabilities: - continue - - host_vulns = self.host_vulnerabilities[host_key] - - # Match vulnerabilities to this action based on: - # 1. KB number in action matches KB in vulnerability - # 2. Specific package name for RHEL (e.g., "java-1.8.0-openjdk", "firefox") - # 3. Software name and version for general software - # 4. CVEs mentioned in patch report - - matched_vulns = [] - - action_text_lower = action.action.lower() - software_name_lower = action.software.lower() - - for vuln in host_vulns: - plugin_name = vuln['plugin_name'].lower() - - # PRIORITY 1: Match KB patches by KB number - if 'kb' in action_text_lower: - kb_match = re.search(r'kb(\d+)', action_text_lower) - if kb_match: - kb_num = kb_match.group(1) - if f'kb{kb_num}' in plugin_name or f'kb {kb_num}' in plugin_name: - matched_vulns.append(vuln) - continue - - # PRIORITY 2: Match RHEL packages by specific package name - # Extract package name from action like "Update the RHEL firefox package..." - if 'rhel' in action_text_lower or 'rhsa' in action_text_lower: - # Extract package name from patterns like: - # "Update the RHEL firefox package based on..." - # "Update the RHEL java-1.8.0-openjdk package..." - package_match = re.search(r'rhel\s+([a-z0-9\-._]+)\s+package', action_text_lower) - if package_match: - package_name = package_match.group(1) - # Match if the package name appears in the vulnerability plugin name - if package_name in plugin_name: - matched_vulns.append(vuln) - continue - - # Also try to extract RHSA number for exact matching - rhsa_match = re.search(r'rhsa[-:](\d{4}:\d+)', action_text_lower) - if rhsa_match: - rhsa_num = rhsa_match.group(1) - if rhsa_num in plugin_name or rhsa_num.replace(':', '-') in plugin_name: - matched_vulns.append(vuln) - continue - - # If no specific package found, skip broad RHEL matching - # This prevents matching all RHEL vulns to every RHEL action - continue - - # PRIORITY 3: Match by specific software name with version validation - if software_name_lower and software_name_lower in plugin_name: - # Further validate by checking version if available - if action.version_needed: - # If version is mentioned, check if this vuln relates to older versions - if '<' in plugin_name or '<=' in plugin_name or 'unsupported' in plugin_name or 'outdated' in plugin_name: - matched_vulns.append(vuln) - continue - else: - # No version requirement, but software name matches - matched_vulns.append(vuln) - continue - - # PRIORITY 4: Match common software patterns (only for non-RHEL) - # This should be specific, not catch-all - if 'rhel' not in action_text_lower and 'rhsa' not in action_text_lower: - software_keywords = { - '7-zip': ['7-zip', '7zip'], - 'adobe air': ['adobe air', 'adobe.*air'], - 'curl': [r'\bcurl\b'], - 'edge': ['microsoft edge', 'edge.*chromium'], - 'log4j': ['log4j'], - 'java': [r'java se\b', r'oracle java\b', r'openjdk\b'], - } - - for key, patterns in software_keywords.items(): - if key in software_name_lower or key in action_text_lower: - for pattern in patterns: - if re.search(pattern, plugin_name, re.IGNORECASE): - matched_vulns.append(vuln) - break - - # Calculate severity counts from matched vulnerabilities - critical = sum(1 for v in matched_vulns if v['severity'] == 4) - high = sum(1 for v in matched_vulns if v['severity'] == 3) - medium = sum(1 for v in matched_vulns if v['severity'] == 2) - - # Collect CVEs - cves = [] - for v in matched_vulns: - cves.extend(v['cves']) - - # Update action with severity info - action.critical_count = critical - action.high_count = high - action.medium_count = medium - action.related_cves = list(set(cves)) # Unique CVEs - - def parse_multiple_files(self, filepaths: List[str]): - """Parse multiple .nessus files""" - for filepath in filepaths: - self.parse_file(filepath) - - print(f"\nTotal patch actions extracted: {len(self.patch_actions)}") - - def export_to_csv(self, output_file: str): - """Export patch actions to CSV""" - if not self.patch_actions: - print("No patch actions found.") - return - - with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = [ - 'Action', - 'System', - 'IP Address', - 'CVE Reduction Count', - 'Critical Reduction', - 'High Reduction', - 'Medium Reduction', - 'Software', - 'Version Needed', - 'System Impact Risk', - 'Recommended IFC', - '3rd Party Software', - 'Responsible Party', - 'Documentation', - 'Comments' - ] - - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - for action in self.patch_actions: - writer.writerow({ - 'Action': action.action, - 'System': action.system, - 'IP Address': action.ip_address, - 'CVE Reduction Count': len(action.related_cves), - 'Critical Reduction': action.critical_count, - 'High Reduction': action.high_count, - 'Medium Reduction': action.medium_count, - 'Software': action.software, - 'Version Needed': action.version_needed, - 'System Impact Risk': '', - 'Recommended IFC': '', - '3rd Party Software': '', - 'Responsible Party': '', - 'Documentation': '', - 'Comments': '' - }) - - print(f"\n✓ Patch actions exported to: {output_file}") - print(f" Total action items: {len(self.patch_actions)}") - - def export_summary_csv(self, output_file: str): - """Export summary of patch actions aggregated by action type""" - if not self.patch_actions: - print("No patch actions found.") - return - - # Aggregate by action - action_summary = defaultdict(lambda: { - 'systems': set(), - 'total_vulns': 0, - 'total_cves': set(), - 'critical': 0, - 'high': 0, - 'medium': 0 - }) - - for action in self.patch_actions: - key = action.action - action_summary[key]['systems'].add(action.system) - action_summary[key]['total_vulns'] += action.vulnerability_count - action_summary[key]['total_cves'].update(action.related_cves) - action_summary[key]['critical'] += action.critical_count - action_summary[key]['high'] += action.high_count - action_summary[key]['medium'] += action.medium_count - - # Create summary list - summary_list = [] - total_critical = sum(a.critical_count for a in self.patch_actions) - total_high = sum(a.high_count for a in self.patch_actions) - total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves)) - - for action_text, data in action_summary.items(): - summary_list.append({ - 'action': action_text, - 'system_count': len(data['systems']), - 'cve_count': len(data['total_cves']), - 'cve_percent': (len(data['total_cves']) / total_cves * 100) if total_cves > 0 else 0, - 'critical': data['critical'], - 'critical_percent': (data['critical'] / total_critical * 100) if total_critical > 0 else 0, - 'high': data['high'], - 'high_percent': (data['high'] / total_high * 100) if total_high > 0 else 0 - }) - - # Sort by critical then high - summary_list.sort(key=lambda x: (x['critical'], x['high'], x['cve_count']), reverse=True) - - # Write summary CSV - with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = [ - 'Action', - 'System Count', - 'CVE Reduction', - '% of Total CVE Reduction', - 'Critical Reduction', - '% of Total Critical Reduction', - 'High Reduction', - '% of Total High Reduction' - ] - - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - for item in summary_list: - writer.writerow({ - 'Action': item['action'], - 'System Count': item['system_count'], - 'CVE Reduction': item['cve_count'], - '% of Total CVE Reduction': f"{item['cve_percent']:.1f}%", - 'Critical Reduction': item['critical'], - '% of Total Critical Reduction': f"{item['critical_percent']:.1f}%", - 'High Reduction': item['high'], - '% of Total High Reduction': f"{item['high_percent']:.1f}%" - }) - - print(f"\n✓ Summary exported to: {output_file}") - print(f" Total unique actions: {len(summary_list)}") - print(f" Total CVEs: {total_cves}") - print(f" Total Critical vulnerabilities: {total_critical}") - print(f" Total High vulnerabilities: {total_high}") - - def print_summary(self): - """Print a summary of patch actions""" - if not self.patch_actions: - print("No patch actions found.") - return - - print("\n" + "="*80) - print("PATCH REPORT ACTION PLAN SUMMARY (Plugin 66334)") - print("="*80) - - total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves)) - total_critical = sum(a.critical_count for a in self.patch_actions) - total_high = sum(a.high_count for a in self.patch_actions) - unique_systems = len(set(a.system for a in self.patch_actions)) - - print(f"\nTotal Actions: {len(self.patch_actions)}") - print(f"Unique Systems: {unique_systems}") - print(f"Total CVEs Addressed: {total_cves}") - print(f"Total Critical Vulnerabilities: {total_critical}") - print(f"Total High Vulnerabilities: {total_high}") - - print(f"\n{'Priority':<10} {'Action':<45} {'System':<15} {'Crit':<6} {'High':<6} {'CVEs':<6}") - print("-"*95) - - # Sort by critical then high - sorted_actions = sorted( - self.patch_actions, - key=lambda x: (x.critical_count, x.high_count, len(x.related_cves)), - reverse=True - ) - - for i, action in enumerate(sorted_actions[:15], 1): - action_short = action.action[:42] + "..." if len(action.action) > 45 else action.action - system_short = action.system[:12] + "..." if len(action.system) > 15 else action.system - cve_count = len(action.related_cves) - print(f"{i:<10} {action_short:<45} {system_short:<15} {action.critical_count:<6} {action.high_count:<6} {cve_count:<6}") - - if len(sorted_actions) > 15: - print(f"\n... and {len(sorted_actions) - 15} more actions") - - -def main(): - parser = argparse.ArgumentParser( - description='Extract structured patch actions from Nessus Plugin 66334 (Patch Report)', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - %(prog)s scan1.nessus scan2.nessus -o patch_actions.csv - %(prog)s *.nessus --summary -o patches.csv - """ - ) - - parser.add_argument( - 'nessus_files', - nargs='+', - help='One or more .nessus files to process' - ) - - parser.add_argument( - '-o', '--output', - default='patch_report_actions.csv', - help='Output CSV file (default: patch_report_actions.csv)' - ) - - parser.add_argument( - '--summary', - action='store_true', - help='Also generate a summary CSV file' - ) - - args = parser.parse_args() - - # Validate input files - valid_files = [] - for filepath in args.nessus_files: - if Path(filepath).exists(): - valid_files.append(filepath) - else: - print(f"Warning: File not found: {filepath}") - - if not valid_files: - print("Error: No valid .nessus files provided") - return 1 - - print(f"\nExtracting Patch Report data from {len(valid_files)} .nessus file(s)...\n") - - # Parse files - parser_obj = PatchReportParser() - parser_obj.parse_multiple_files(valid_files) - - # Display summary - parser_obj.print_summary() - - # Export detailed actions - parser_obj.export_to_csv(args.output) - - # Optionally export summary - if args.summary: - output_path = Path(args.output) - summary_filename = output_path.stem + '_summary' + output_path.suffix - summary_path = output_path.parent / summary_filename - parser_obj.export_summary_csv(str(summary_path)) - - print("\n✓ Complete!") - return 0 - - -if __name__ == '__main__': - exit(main()) \ No newline at end of file