diff --git a/PatchReportExtractor.py b/PatchReportExtractor.py new file mode 100644 index 0000000..cb123cc --- /dev/null +++ b/PatchReportExtractor.py @@ -0,0 +1,506 @@ +#!/usr/bin/env python3 +""" +Extract structured action items from Nessus Plugin 66334 (Patch Report) +This provides pre-packaged, actionable patch recommendations per host +""" + +import xml.etree.ElementTree as ET +from collections import defaultdict +from dataclasses import dataclass +from typing import List, Dict +import csv +import argparse +import re +from pathlib import Path + + +@dataclass +class PatchAction: + """Represents a patch action from Plugin 66334""" + action: str + system: str + ip_address: str + vulnerability_count: int + software: str = "" + version_needed: str = "" + critical_count: int = 0 + high_count: int = 0 + medium_count: int = 0 + related_cves: List[str] = None + + def __post_init__(self): + if self.related_cves is None: + self.related_cves = [] + + +class PatchReportParser: + """Parses Plugin 66334 Patch Report data from .nessus files""" + + def __init__(self): + self.patch_actions: List[PatchAction] = [] + self.all_vulnerabilities: Dict = {} # Store all vulns for cross-reference + self.host_vulnerabilities: Dict = {} # Vulns per host for matching + + def parse_file(self, filepath: str): + """Parse a single .nessus file for Plugin 66334 data""" + try: + tree = ET.parse(filepath) + root = tree.getroot() + + for report_host in root.findall('.//ReportHost'): + host_ip = report_host.get('name') + host_properties = report_host.find('HostProperties') + system_name = host_ip + + if host_properties is not None: + for tag in host_properties.findall('tag'): + if tag.get('name') == 'host-fqdn': + system_name = tag.text + break + elif tag.get('name') == 'netbios-name' and not system_name != host_ip: + system_name = tag.text + + # Store this host's vulnerabilities + host_key = f"{system_name}_{host_ip}" + self.host_vulnerabilities[host_key] = [] + + # Collect ALL vulnerabilities for this host (for cross-reference) + for item in report_host.findall('ReportItem'): + plugin_id = item.get('pluginID') + plugin_name = item.get('pluginName', '') + severity = int(item.get('severity', 0)) + + # Handle Plugin 66334 specially (before skipping informational) + if plugin_id == '66334': + self._parse_patch_report_output(item, system_name, host_ip) + continue # Don't add it to host_vulnerabilities + + # Skip informational + if severity == 0: + continue + + # Collect CVEs + cves = [] + for cve in item.findall('cve'): + if cve.text: + cves.append(cve.text) + + # Store vulnerability info + vuln_info = { + 'plugin_id': plugin_id, + 'plugin_name': plugin_name, + 'severity': severity, + 'cves': cves + } + self.host_vulnerabilities[host_key].append(vuln_info) + + # Now cross-reference patch actions with vulnerabilities + self._match_patch_actions_to_vulnerabilities() + + print(f"✓ Parsed: {filepath}") + + except Exception as e: + print(f"✗ Error parsing {filepath}: {str(e)}") + + def _parse_patch_report_output(self, report_item, system_name: str, host_ip: str): + """Extract structured actions from the patch report output""" + plugin_output = report_item.find('plugin_output') + if plugin_output is None or not plugin_output.text: + return + + output = plugin_output.text + + # Pattern 1: Microsoft KB patches + # Example with count: - KB5073724 (39 vulnerabilities) + # Example without count: - KB5049613 + kb_pattern = r'-\s*(KB\d+)(?:\s*\((\d+)\s+vulnerabilit(?:y|ies)\))?' + kb_matches = re.findall(kb_pattern, output, re.IGNORECASE) + + if kb_matches: + for kb, count in kb_matches: + action = PatchAction( + action=f"Install Microsoft patch {kb}", + system=system_name, + ip_address=host_ip, + vulnerability_count=int(count) if count else 0, + software="Microsoft Windows", + version_needed=kb + ) + self.patch_actions.append(action) + + # Pattern 2: Software upgrades with structured format + # Example: + # [ 7-Zip < 25.01 (249179) ] + # + Action to take : Upgrade to 7-Zip version 25.01 or later. + # +Impact : Taking this action will resolve 5 different vulnerabilities (CVEs). + + # Match the software/version line + software_blocks = re.split(r'\n\s*\+\s*Action to take\s*:', output) + + for i in range(1, len(software_blocks)): + block = software_blocks[i] + + # Extract action description + action_match = re.search(r'^([^\n]+)', block) + if not action_match: + continue + + action_text = action_match.group(1).strip() + + # Extract impact/vulnerability count + impact_match = re.search(r'resolve\s+(\d+)\s+different vulnerabilit(?:y|ies)', block, re.IGNORECASE) + vuln_count = int(impact_match.group(1)) if impact_match else 0 + + # Try to extract software name and version from the previous block + prev_block_start = max(0, i - 1) + prev_text = software_blocks[prev_block_start][-200:] # Last 200 chars + + software_name = "" + version_needed = "" + + # Pattern: [ Software < version (...) ] + sw_match = re.search(r'\[\s*([^\<\[]+?)\s*<\s*([\d.]+)', prev_text) + if sw_match: + software_name = sw_match.group(1).strip() + version_needed = sw_match.group(2).strip() + + action = PatchAction( + action=action_text, + system=system_name, + ip_address=host_ip, + vulnerability_count=vuln_count, + software=software_name, + version_needed=version_needed + ) + self.patch_actions.append(action) + + def _match_patch_actions_to_vulnerabilities(self): + """Cross-reference patch actions with actual vulnerabilities to get severity counts""" + for action in self.patch_actions: + host_key = f"{action.system}_{action.ip_address}" + + if host_key not in self.host_vulnerabilities: + continue + + host_vulns = self.host_vulnerabilities[host_key] + + # Match vulnerabilities to this action based on: + # 1. KB number in action matches KB in vulnerability + # 2. Software name in action matches software in vulnerability plugin name + # 3. CVEs mentioned in patch report + + matched_vulns = [] + + for vuln in host_vulns: + plugin_name = vuln['plugin_name'].lower() + action_text = action.action.lower() + software_name = action.software.lower() + + # Match KB patches + if 'kb' in action_text: + kb_match = re.search(r'kb(\d+)', action_text) + if kb_match: + kb_num = kb_match.group(1) + if f'kb{kb_num}' in plugin_name or kb_num in plugin_name: + matched_vulns.append(vuln) + continue + + # Match software name + if software_name and software_name in plugin_name: + # Further validate by checking version if available + if action.version_needed: + # If version is mentioned, check if this vuln relates to older versions + if '<' in plugin_name or 'unsupported' in plugin_name or 'outdated' in plugin_name: + matched_vulns.append(vuln) + else: + matched_vulns.append(vuln) + continue + + # Match common software patterns + software_keywords = { + '7-zip': ['7-zip', '7zip'], + 'apache': ['apache'], + 'log4j': ['log4j'], + 'openssh': ['openssh', 'ssh'], + 'dell': ['dell'], + 'microsoft': ['microsoft', 'windows', r'ms\d{2}-'], + 'adobe': ['adobe'], + 'air': ['adobe air', 'air'], + 'curl': ['curl'], + 'edge': ['edge', 'chromium'], + 'oracle': ['oracle'], + 'java': ['java', 'openjdk', 'jre', 'jdk'], + 'rhel': ['rhel', 'red hat'], + 'networkmanager': ['networkmanager'], + 'bcc': ['bcc'], + } + + for key, patterns in software_keywords.items(): + if key in software_name or key in action_text: + for pattern in patterns: + if re.search(pattern, plugin_name, re.IGNORECASE): + matched_vulns.append(vuln) + break + + # Calculate severity counts from matched vulnerabilities + critical = sum(1 for v in matched_vulns if v['severity'] == 4) + high = sum(1 for v in matched_vulns if v['severity'] == 3) + medium = sum(1 for v in matched_vulns if v['severity'] == 2) + + # Collect CVEs + cves = [] + for v in matched_vulns: + cves.extend(v['cves']) + + # Update action with severity info + action.critical_count = critical + action.high_count = high + action.medium_count = medium + action.related_cves = list(set(cves)) # Unique CVEs + + def parse_multiple_files(self, filepaths: List[str]): + """Parse multiple .nessus files""" + for filepath in filepaths: + self.parse_file(filepath) + + print(f"\nTotal patch actions extracted: {len(self.patch_actions)}") + + def export_to_csv(self, output_file: str): + """Export patch actions to CSV""" + if not self.patch_actions: + print("No patch actions found.") + return + + with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = [ + 'Action', + 'System', + 'IP Address', + 'CVE Reduction Count', + 'Critical Reduction', + 'High Reduction', + 'Medium Reduction', + 'Software', + 'Version Needed', + 'System Impact Risk', + 'Recommended IFC', + '3rd Party Software', + 'Responsible Party', + 'Documentation', + 'Comments' + ] + + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + for action in self.patch_actions: + writer.writerow({ + 'Action': action.action, + 'System': action.system, + 'IP Address': action.ip_address, + 'CVE Reduction Count': len(action.related_cves), + 'Critical Reduction': action.critical_count, + 'High Reduction': action.high_count, + 'Medium Reduction': action.medium_count, + 'Software': action.software, + 'Version Needed': action.version_needed, + 'System Impact Risk': '', + 'Recommended IFC': '', + '3rd Party Software': '', + 'Responsible Party': '', + 'Documentation': '', + 'Comments': '' + }) + + print(f"\n✓ Patch actions exported to: {output_file}") + print(f" Total action items: {len(self.patch_actions)}") + + def export_summary_csv(self, output_file: str): + """Export summary of patch actions aggregated by action type""" + if not self.patch_actions: + print("No patch actions found.") + return + + # Aggregate by action + action_summary = defaultdict(lambda: { + 'systems': set(), + 'total_vulns': 0, + 'total_cves': set(), + 'critical': 0, + 'high': 0, + 'medium': 0 + }) + + for action in self.patch_actions: + key = action.action + action_summary[key]['systems'].add(action.system) + action_summary[key]['total_vulns'] += action.vulnerability_count + action_summary[key]['total_cves'].update(action.related_cves) + action_summary[key]['critical'] += action.critical_count + action_summary[key]['high'] += action.high_count + action_summary[key]['medium'] += action.medium_count + + # Create summary list + summary_list = [] + total_critical = sum(a.critical_count for a in self.patch_actions) + total_high = sum(a.high_count for a in self.patch_actions) + total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves)) + + for action_text, data in action_summary.items(): + summary_list.append({ + 'action': action_text, + 'system_count': len(data['systems']), + 'cve_count': len(data['total_cves']), + 'cve_percent': (len(data['total_cves']) / total_cves * 100) if total_cves > 0 else 0, + 'critical': data['critical'], + 'critical_percent': (data['critical'] / total_critical * 100) if total_critical > 0 else 0, + 'high': data['high'], + 'high_percent': (data['high'] / total_high * 100) if total_high > 0 else 0 + }) + + # Sort by critical then high + summary_list.sort(key=lambda x: (x['critical'], x['high'], x['cve_count']), reverse=True) + + # Write summary CSV + with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = [ + 'Action', + 'System Count', + 'CVE Reduction', + '% of Total CVE Reduction', + 'Critical Reduction', + '% of Total Critical Reduction', + 'High Reduction', + '% of Total High Reduction' + ] + + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + for item in summary_list: + writer.writerow({ + 'Action': item['action'], + 'System Count': item['system_count'], + 'CVE Reduction': item['cve_count'], + '% of Total CVE Reduction': f"{item['cve_percent']:.1f}%", + 'Critical Reduction': item['critical'], + '% of Total Critical Reduction': f"{item['critical_percent']:.1f}%", + 'High Reduction': item['high'], + '% of Total High Reduction': f"{item['high_percent']:.1f}%" + }) + + print(f"\n✓ Summary exported to: {output_file}") + print(f" Total unique actions: {len(summary_list)}") + print(f" Total CVEs: {total_cves}") + print(f" Total Critical vulnerabilities: {total_critical}") + print(f" Total High vulnerabilities: {total_high}") + + def print_summary(self): + """Print a summary of patch actions""" + if not self.patch_actions: + print("No patch actions found.") + return + + print("\n" + "="*80) + print("PATCH REPORT ACTION PLAN SUMMARY (Plugin 66334)") + print("="*80) + + total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves)) + total_critical = sum(a.critical_count for a in self.patch_actions) + total_high = sum(a.high_count for a in self.patch_actions) + unique_systems = len(set(a.system for a in self.patch_actions)) + + print(f"\nTotal Actions: {len(self.patch_actions)}") + print(f"Unique Systems: {unique_systems}") + print(f"Total CVEs Addressed: {total_cves}") + print(f"Total Critical Vulnerabilities: {total_critical}") + print(f"Total High Vulnerabilities: {total_high}") + + print(f"\n{'Priority':<10} {'Action':<45} {'System':<15} {'Crit':<6} {'High':<6} {'CVEs':<6}") + print("-"*95) + + # Sort by critical then high + sorted_actions = sorted( + self.patch_actions, + key=lambda x: (x.critical_count, x.high_count, len(x.related_cves)), + reverse=True + ) + + for i, action in enumerate(sorted_actions[:15], 1): + action_short = action.action[:42] + "..." if len(action.action) > 45 else action.action + system_short = action.system[:12] + "..." if len(action.system) > 15 else action.system + cve_count = len(action.related_cves) + print(f"{i:<10} {action_short:<45} {system_short:<15} {action.critical_count:<6} {action.high_count:<6} {cve_count:<6}") + + if len(sorted_actions) > 15: + print(f"\n... and {len(sorted_actions) - 15} more actions") + + +def main(): + parser = argparse.ArgumentParser( + description='Extract structured patch actions from Nessus Plugin 66334 (Patch Report)', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s scan1.nessus scan2.nessus -o patch_actions.csv + %(prog)s *.nessus --summary -o patches.csv + """ + ) + + parser.add_argument( + 'nessus_files', + nargs='+', + help='One or more .nessus files to process' + ) + + parser.add_argument( + '-o', '--output', + default='patch_report_actions.csv', + help='Output CSV file (default: patch_report_actions.csv)' + ) + + parser.add_argument( + '--summary', + action='store_true', + help='Also generate a summary CSV file' + ) + + args = parser.parse_args() + + # Validate input files + valid_files = [] + for filepath in args.nessus_files: + if Path(filepath).exists(): + valid_files.append(filepath) + else: + print(f"Warning: File not found: {filepath}") + + if not valid_files: + print("Error: No valid .nessus files provided") + return 1 + + print(f"\nExtracting Patch Report data from {len(valid_files)} .nessus file(s)...\n") + + # Parse files + parser_obj = PatchReportParser() + parser_obj.parse_multiple_files(valid_files) + + # Display summary + parser_obj.print_summary() + + # Export detailed actions + parser_obj.export_to_csv(args.output) + + # Optionally export summary + if args.summary: + output_path = Path(args.output) + summary_filename = output_path.stem + '_summary' + output_path.suffix + summary_path = output_path.parent / summary_filename + parser_obj.export_summary_csv(str(summary_path)) + + print("\n✓ Complete!") + return 0 + + +if __name__ == '__main__': + exit(main()) \ No newline at end of file