#!/usr/bin/env python3 """ Extract structured action items from Nessus Plugin 66334 (Patch Report) This provides pre-packaged, actionable patch recommendations per host """ import xml.etree.ElementTree as ET from collections import defaultdict from dataclasses import dataclass from typing import List, Dict import csv import argparse import re from pathlib import Path @dataclass class PatchAction: """Represents a patch action from Plugin 66334""" action: str system: str ip_address: str vulnerability_count: int software: str = "" version_needed: str = "" critical_count: int = 0 high_count: int = 0 medium_count: int = 0 related_cves: List[str] = None def __post_init__(self): if self.related_cves is None: self.related_cves = [] class PatchReportParser: """Parses Plugin 66334 Patch Report data from .nessus files""" def __init__(self): self.patch_actions: List[PatchAction] = [] self.all_vulnerabilities: Dict = {} # Store all vulns for cross-reference self.host_vulnerabilities: Dict = {} # Vulns per host for matching def parse_file(self, filepath: str): """Parse a single .nessus file for Plugin 66334 data""" try: tree = ET.parse(filepath) root = tree.getroot() for report_host in root.findall('.//ReportHost'): host_ip = report_host.get('name') host_properties = report_host.find('HostProperties') system_name = host_ip if host_properties is not None: for tag in host_properties.findall('tag'): if tag.get('name') == 'host-fqdn': system_name = tag.text break elif tag.get('name') == 'netbios-name' and not system_name != host_ip: system_name = tag.text # Store this host's vulnerabilities host_key = f"{system_name}_{host_ip}" self.host_vulnerabilities[host_key] = [] # Collect ALL vulnerabilities for this host (for cross-reference) for item in report_host.findall('ReportItem'): plugin_id = item.get('pluginID') plugin_name = item.get('pluginName', '') severity = int(item.get('severity', 0)) # Handle Plugin 66334 specially (before skipping informational) if plugin_id == '66334': self._parse_patch_report_output(item, system_name, host_ip) continue # Don't add it to host_vulnerabilities # Skip informational if severity == 0: continue # Collect CVEs cves = [] for cve in item.findall('cve'): if cve.text: cves.append(cve.text) # Store vulnerability info vuln_info = { 'plugin_id': plugin_id, 'plugin_name': plugin_name, 'severity': severity, 'cves': cves } self.host_vulnerabilities[host_key].append(vuln_info) # Now cross-reference patch actions with vulnerabilities self._match_patch_actions_to_vulnerabilities() print(f"✓ Parsed: {filepath}") except Exception as e: print(f"✗ Error parsing {filepath}: {str(e)}") def _parse_patch_report_output(self, report_item, system_name: str, host_ip: str): """Extract structured actions from the patch report output""" plugin_output = report_item.find('plugin_output') if plugin_output is None or not plugin_output.text: return output = plugin_output.text # Pattern 1: Microsoft KB patches # Example with count: - KB5073724 (39 vulnerabilities) # Example without count: - KB5049613 kb_pattern = r'-\s*(KB\d+)(?:\s*\((\d+)\s+vulnerabilit(?:y|ies)\))?' kb_matches = re.findall(kb_pattern, output, re.IGNORECASE) if kb_matches: for kb, count in kb_matches: action = PatchAction( action=f"Install Microsoft patch {kb}", system=system_name, ip_address=host_ip, vulnerability_count=int(count) if count else 0, software="Microsoft Windows", version_needed=kb ) self.patch_actions.append(action) # Pattern 2: Software upgrades with structured format # Example: # [ 7-Zip < 25.01 (249179) ] # + Action to take : Upgrade to 7-Zip version 25.01 or later. # +Impact : Taking this action will resolve 5 different vulnerabilities (CVEs). # Match the software/version line software_blocks = re.split(r'\n\s*\+\s*Action to take\s*:', output) for i in range(1, len(software_blocks)): block = software_blocks[i] # Extract action description action_match = re.search(r'^([^\n]+)', block) if not action_match: continue action_text = action_match.group(1).strip() # Extract impact/vulnerability count impact_match = re.search(r'resolve\s+(\d+)\s+different vulnerabilit(?:y|ies)', block, re.IGNORECASE) vuln_count = int(impact_match.group(1)) if impact_match else 0 # Try to extract software name and version from the previous block prev_block_start = max(0, i - 1) prev_text = software_blocks[prev_block_start][-200:] # Last 200 chars software_name = "" version_needed = "" # Pattern: [ Software < version (...) ] sw_match = re.search(r'\[\s*([^\<\[]+?)\s*<\s*([\d.]+)', prev_text) if sw_match: software_name = sw_match.group(1).strip() version_needed = sw_match.group(2).strip() action = PatchAction( action=action_text, system=system_name, ip_address=host_ip, vulnerability_count=vuln_count, software=software_name, version_needed=version_needed ) self.patch_actions.append(action) def _match_patch_actions_to_vulnerabilities(self): """Cross-reference patch actions with actual vulnerabilities to get severity counts""" for action in self.patch_actions: host_key = f"{action.system}_{action.ip_address}" if host_key not in self.host_vulnerabilities: continue host_vulns = self.host_vulnerabilities[host_key] # Match vulnerabilities to this action based on: # 1. KB number in action matches KB in vulnerability # 2. Specific package name for RHEL (e.g., "java-1.8.0-openjdk", "firefox") # 3. Software name and version for general software # 4. CVEs mentioned in patch report matched_vulns = [] action_text_lower = action.action.lower() software_name_lower = action.software.lower() for vuln in host_vulns: plugin_name = vuln['plugin_name'].lower() # PRIORITY 1: Match KB patches by KB number if 'kb' in action_text_lower: kb_match = re.search(r'kb(\d+)', action_text_lower) if kb_match: kb_num = kb_match.group(1) if f'kb{kb_num}' in plugin_name or f'kb {kb_num}' in plugin_name: matched_vulns.append(vuln) continue # PRIORITY 2: Match RHEL packages by specific package name # Extract package name from action like "Update the RHEL firefox package..." if 'rhel' in action_text_lower or 'rhsa' in action_text_lower: # Extract package name from patterns like: # "Update the RHEL firefox package based on..." # "Update the RHEL java-1.8.0-openjdk package..." package_match = re.search(r'rhel\s+([a-z0-9\-._]+)\s+package', action_text_lower) if package_match: package_name = package_match.group(1) # Match if the package name appears in the vulnerability plugin name if package_name in plugin_name: matched_vulns.append(vuln) continue # Also try to extract RHSA number for exact matching rhsa_match = re.search(r'rhsa[-:](\d{4}:\d+)', action_text_lower) if rhsa_match: rhsa_num = rhsa_match.group(1) if rhsa_num in plugin_name or rhsa_num.replace(':', '-') in plugin_name: matched_vulns.append(vuln) continue # If no specific package found, skip broad RHEL matching # This prevents matching all RHEL vulns to every RHEL action continue # PRIORITY 3: Match by specific software name with version validation if software_name_lower and software_name_lower in plugin_name: # Further validate by checking version if available if action.version_needed: # If version is mentioned, check if this vuln relates to older versions if '<' in plugin_name or '<=' in plugin_name or 'unsupported' in plugin_name or 'outdated' in plugin_name: matched_vulns.append(vuln) continue else: # No version requirement, but software name matches matched_vulns.append(vuln) continue # PRIORITY 4: Match common software patterns (only for non-RHEL) # This should be specific, not catch-all if 'rhel' not in action_text_lower and 'rhsa' not in action_text_lower: software_keywords = { '7-zip': ['7-zip', '7zip'], 'adobe air': ['adobe air', 'adobe.*air'], 'curl': [r'\bcurl\b'], 'edge': ['microsoft edge', 'edge.*chromium'], 'log4j': ['log4j'], 'java': [r'java se\b', r'oracle java\b', r'openjdk\b'], } for key, patterns in software_keywords.items(): if key in software_name_lower or key in action_text_lower: for pattern in patterns: if re.search(pattern, plugin_name, re.IGNORECASE): matched_vulns.append(vuln) break # Calculate severity counts from matched vulnerabilities critical = sum(1 for v in matched_vulns if v['severity'] == 4) high = sum(1 for v in matched_vulns if v['severity'] == 3) medium = sum(1 for v in matched_vulns if v['severity'] == 2) # Collect CVEs cves = [] for v in matched_vulns: cves.extend(v['cves']) # Update action with severity info action.critical_count = critical action.high_count = high action.medium_count = medium action.related_cves = list(set(cves)) # Unique CVEs def parse_multiple_files(self, filepaths: List[str]): """Parse multiple .nessus files""" for filepath in filepaths: self.parse_file(filepath) print(f"\nTotal patch actions extracted: {len(self.patch_actions)}") def export_to_csv(self, output_file: str): """Export patch actions to CSV""" if not self.patch_actions: print("No patch actions found.") return with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = [ 'Action', 'System', 'IP Address', 'CVE Reduction Count', 'Critical Reduction', 'High Reduction', 'Medium Reduction', 'Software', 'Version Needed', 'System Impact Risk', 'Recommended IFC', '3rd Party Software', 'Responsible Party', 'Documentation', 'Comments' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for action in self.patch_actions: writer.writerow({ 'Action': action.action, 'System': action.system, 'IP Address': action.ip_address, 'CVE Reduction Count': len(action.related_cves), 'Critical Reduction': action.critical_count, 'High Reduction': action.high_count, 'Medium Reduction': action.medium_count, 'Software': action.software, 'Version Needed': action.version_needed, 'System Impact Risk': '', 'Recommended IFC': '', '3rd Party Software': '', 'Responsible Party': '', 'Documentation': '', 'Comments': '' }) print(f"\n✓ Patch actions exported to: {output_file}") print(f" Total action items: {len(self.patch_actions)}") def export_summary_csv(self, output_file: str): """Export summary of patch actions aggregated by action type""" if not self.patch_actions: print("No patch actions found.") return # Aggregate by action action_summary = defaultdict(lambda: { 'systems': set(), 'total_vulns': 0, 'total_cves': set(), 'critical': 0, 'high': 0, 'medium': 0 }) for action in self.patch_actions: key = action.action action_summary[key]['systems'].add(action.system) action_summary[key]['total_vulns'] += action.vulnerability_count action_summary[key]['total_cves'].update(action.related_cves) action_summary[key]['critical'] += action.critical_count action_summary[key]['high'] += action.high_count action_summary[key]['medium'] += action.medium_count # Create summary list summary_list = [] total_critical = sum(a.critical_count for a in self.patch_actions) total_high = sum(a.high_count for a in self.patch_actions) total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves)) for action_text, data in action_summary.items(): summary_list.append({ 'action': action_text, 'system_count': len(data['systems']), 'cve_count': len(data['total_cves']), 'cve_percent': (len(data['total_cves']) / total_cves * 100) if total_cves > 0 else 0, 'critical': data['critical'], 'critical_percent': (data['critical'] / total_critical * 100) if total_critical > 0 else 0, 'high': data['high'], 'high_percent': (data['high'] / total_high * 100) if total_high > 0 else 0 }) # Sort by critical then high summary_list.sort(key=lambda x: (x['critical'], x['high'], x['cve_count']), reverse=True) # Write summary CSV with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = [ 'Action', 'System Count', 'CVE Reduction', '% of Total CVE Reduction', 'Critical Reduction', '% of Total Critical Reduction', 'High Reduction', '% of Total High Reduction' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in summary_list: writer.writerow({ 'Action': item['action'], 'System Count': item['system_count'], 'CVE Reduction': item['cve_count'], '% of Total CVE Reduction': f"{item['cve_percent']:.1f}%", 'Critical Reduction': item['critical'], '% of Total Critical Reduction': f"{item['critical_percent']:.1f}%", 'High Reduction': item['high'], '% of Total High Reduction': f"{item['high_percent']:.1f}%" }) print(f"\n✓ Summary exported to: {output_file}") print(f" Total unique actions: {len(summary_list)}") print(f" Total CVEs: {total_cves}") print(f" Total Critical vulnerabilities: {total_critical}") print(f" Total High vulnerabilities: {total_high}") def print_summary(self): """Print a summary of patch actions""" if not self.patch_actions: print("No patch actions found.") return print("\n" + "="*80) print("PATCH REPORT ACTION PLAN SUMMARY (Plugin 66334)") print("="*80) total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves)) total_critical = sum(a.critical_count for a in self.patch_actions) total_high = sum(a.high_count for a in self.patch_actions) unique_systems = len(set(a.system for a in self.patch_actions)) print(f"\nTotal Actions: {len(self.patch_actions)}") print(f"Unique Systems: {unique_systems}") print(f"Total CVEs Addressed: {total_cves}") print(f"Total Critical Vulnerabilities: {total_critical}") print(f"Total High Vulnerabilities: {total_high}") print(f"\n{'Priority':<10} {'Action':<45} {'System':<15} {'Crit':<6} {'High':<6} {'CVEs':<6}") print("-"*95) # Sort by critical then high sorted_actions = sorted( self.patch_actions, key=lambda x: (x.critical_count, x.high_count, len(x.related_cves)), reverse=True ) for i, action in enumerate(sorted_actions[:15], 1): action_short = action.action[:42] + "..." if len(action.action) > 45 else action.action system_short = action.system[:12] + "..." if len(action.system) > 15 else action.system cve_count = len(action.related_cves) print(f"{i:<10} {action_short:<45} {system_short:<15} {action.critical_count:<6} {action.high_count:<6} {cve_count:<6}") if len(sorted_actions) > 15: print(f"\n... and {len(sorted_actions) - 15} more actions") def main(): parser = argparse.ArgumentParser( description='Extract structured patch actions from Nessus Plugin 66334 (Patch Report)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s scan1.nessus scan2.nessus -o patch_actions.csv %(prog)s *.nessus --summary -o patches.csv """ ) parser.add_argument( 'nessus_files', nargs='+', help='One or more .nessus files to process' ) parser.add_argument( '-o', '--output', default='patch_report_actions.csv', help='Output CSV file (default: patch_report_actions.csv)' ) parser.add_argument( '--summary', action='store_true', help='Also generate a summary CSV file' ) args = parser.parse_args() # Validate input files valid_files = [] for filepath in args.nessus_files: if Path(filepath).exists(): valid_files.append(filepath) else: print(f"Warning: File not found: {filepath}") if not valid_files: print("Error: No valid .nessus files provided") return 1 print(f"\nExtracting Patch Report data from {len(valid_files)} .nessus file(s)...\n") # Parse files parser_obj = PatchReportParser() parser_obj.parse_multiple_files(valid_files) # Display summary parser_obj.print_summary() # Export detailed actions parser_obj.export_to_csv(args.output) # Optionally export summary if args.summary: output_path = Path(args.output) summary_filename = output_path.stem + '_summary' + output_path.suffix summary_path = output_path.parent / summary_filename parser_obj.export_summary_csv(str(summary_path)) print("\n✓ Complete!") return 0 if __name__ == '__main__': exit(main())