From c7c0c37621d77018fe4ec2fbf22f12303a2e59d7 Mon Sep 17 00:00:00 2001 From: allen Date: Thu, 5 Feb 2026 10:11:33 -0800 Subject: [PATCH] Update NessusActionPlanner.py --- NessusActionPlanner.py | 588 +++++++++++------------------------------ 1 file changed, 153 insertions(+), 435 deletions(-) diff --git a/NessusActionPlanner.py b/NessusActionPlanner.py index cba116c..c54115b 100644 --- a/NessusActionPlanner.py +++ b/NessusActionPlanner.py @@ -1,467 +1,185 @@ #!/usr/bin/env python3 """ -Nessus Vulnerability Action Plan Generator -Processes multiple .nessus files and creates actionable remediation plans +Create comprehensive test with Windows and RHEL Plugin 66334 formats """ import xml.etree.ElementTree as ET -from collections import defaultdict -from dataclasses import dataclass, field -from typing import List, Dict, Set -import csv -import argparse -from pathlib import Path +from xml.dom import minidom -@dataclass -class Vulnerability: - """Represents a vulnerability finding""" - plugin_id: str - plugin_name: str - severity: int # 0=Info, 1=Low, 2=Medium, 3=High, 4=Critical - cve_list: List[str] = field(default_factory=list) - hosts: Set[str] = field(default_factory=set) +def create_windows_host(report, host_ip, host_name): + """Create Windows host with vulnerabilities""" + report_host = ET.SubElement(report, 'ReportHost', name=host_ip) - @property - def severity_text(self): - severity_map = {0: "Info", 1: "Low", 2: "Medium", 3: "High", 4: "Critical"} - return severity_map.get(self.severity, "Unknown") + # Host properties + host_properties = ET.SubElement(report_host, 'HostProperties') + tag = ET.SubElement(host_properties, 'tag', name='netbios-name') + tag.text = host_name + + # Windows vulnerabilities + vulns = [ + {'plugin_id': '200001', 'name': 'MS KB5073724 Security Update', 'severity': '4', 'cves': ['CVE-2024-0001', 'CVE-2024-0002']}, + {'plugin_id': '200002', 'name': 'MS KB5049613 Critical Patch', 'severity': '4', 'cves': ['CVE-2024-0003']}, + {'plugin_id': '200003', 'name': 'Adobe AIR < 23.0.0.257 Multiple Vulnerabilities', 'severity': '4', 'cves': ['CVE-2024-0010', 'CVE-2024-0011']}, + {'plugin_id': '200004', 'name': 'Adobe AIR Unsupported Version', 'severity': '3', 'cves': ['CVE-2024-0012']}, + {'plugin_id': '200005', 'name': 'Curl Use-After-Free < 7.87', 'severity': '4', 'cves': ['CVE-2022-43552']}, + {'plugin_id': '200006', 'name': 'Microsoft Edge < 143.0.3650.139', 'severity': '3', 'cves': ['CVE-2026-0628']}, + ] + + for vuln in vulns: + report_item = ET.SubElement(report_host, 'ReportItem', + port='0', svc_name='general', protocol='tcp', + severity=vuln['severity'], pluginID=vuln['plugin_id'], pluginName=vuln['name']) + for cve in vuln['cves']: + cve_elem = ET.SubElement(report_item, 'cve') + cve_elem.text = cve + + # Plugin 66334 + patch_output = """ +. You need to take the following 18 actions : + ++ Install the following Microsoft patches : +- KB5073724 (39 vulnerabilities) +- KB5049613 +- KB5044023 +- KB5039893 +- KB5039884 +- KB5036608 +- KB5033909 +- KB5031988 + +[ Adobe AIR <= 22.0.0.153 Android Applications Runtime Analytics MitM (APSB16-31) (93523) ] + ++ Action to take : Upgrade to Adobe AIR version 23.0.0.257 or later. + ++Impact : Taking this action will resolve 564 different vulnerabilities (CVEs). -@dataclass -class ActionPlan: - """Represents a remediation action plan""" - action: str - systems: List[str] - ip_addresses: List[str] - cve_reduction_count: int - critical_reduction: int - high_reduction: int - system_impact_risk: str = "" - recommended_ifc: str = "" - third_party_software: str = "" - responsible_party: str = "" - documentation: str = "" - comments: str = "" +[ Curl Use-After-Free < 7.87 (CVE-2022-43552) (171859) ] + ++ Action to take : Upgrade Curl to version 7.87.0 or later -class NessusParser: - """Parses .nessus files and extracts vulnerability data""" +[ Microsoft Edge (Chromium) < 143.0.3650.139 (CVE-2026-0628) (282534) ] + ++ Action to take : Upgrade to Microsoft Edge version 143.0.3650.139 or later. + ++Impact : Taking this action will resolve 96 different vulnerabilities (CVEs). +""" - def __init__(self): - self.vulnerabilities: Dict[str, Vulnerability] = {} - - def parse_file(self, filepath: str): - """Parse a single .nessus file""" - try: - tree = ET.parse(filepath) - root = tree.getroot() - - # Iterate through all report hosts - for report_host in root.findall('.//ReportHost'): - host_ip = report_host.get('name') - - # Get host properties for system name - host_properties = report_host.find('HostProperties') - system_name = host_ip # Default to IP - - if host_properties is not None: - for tag in host_properties.findall('tag'): - if tag.get('name') == 'host-fqdn': - system_name = tag.text - break - elif tag.get('name') == 'netbios-name': - system_name = tag.text - - # Process each vulnerability item - for item in report_host.findall('ReportItem'): - plugin_id = item.get('pluginID') - severity = int(item.get('severity', 0)) - - # Skip informational findings - if severity == 0: - continue - - plugin_name = item.get('pluginName', 'Unknown') - - # Extract CVEs - cve_list = [] - cve_elem = item.find('cve') - if cve_elem is not None and cve_elem.text: - cve_list = [cve_elem.text] - - # Additional CVEs might be in multiple elements - for cve in item.findall('cve'): - if cve.text and cve.text not in cve_list: - cve_list.append(cve.text) - - # Add or update vulnerability - if plugin_id not in self.vulnerabilities: - self.vulnerabilities[plugin_id] = Vulnerability( - plugin_id=plugin_id, - plugin_name=plugin_name, - severity=severity, - cve_list=cve_list, - hosts={f"{system_name} ({host_ip})"} - ) - else: - self.vulnerabilities[plugin_id].hosts.add(f"{system_name} ({host_ip})") - # Update CVE list if new ones found - for cve in cve_list: - if cve not in self.vulnerabilities[plugin_id].cve_list: - self.vulnerabilities[plugin_id].cve_list.append(cve) - - print(f"✓ Parsed: {filepath}") - - except Exception as e: - print(f"✗ Error parsing {filepath}: {str(e)}") - - def parse_multiple_files(self, filepaths: List[str]): - """Parse multiple .nessus files""" - for filepath in filepaths: - self.parse_file(filepath) - - print(f"\nTotal unique vulnerabilities found: {len(self.vulnerabilities)}") + report_item = ET.SubElement(report_host, 'ReportItem', + port='0', svc_name='general', protocol='tcp', + severity='0', pluginID='66334', pluginName='Patch Report') + plugin_output = ET.SubElement(report_item, 'plugin_output') + plugin_output.text = patch_output -class ActionPlanGenerator: - """Generates actionable remediation plans from vulnerability data""" +def create_rhel_host(report, host_ip, host_name): + """Create RHEL host with vulnerabilities""" + report_host = ET.SubElement(report, 'ReportHost', name=host_ip) - def __init__(self, vulnerabilities: Dict[str, Vulnerability]): - self.vulnerabilities = vulnerabilities - self.action_plans: List[ActionPlan] = [] + # Host properties + host_properties = ET.SubElement(report_host, 'HostProperties') + tag = ET.SubElement(host_properties, 'tag', name='host-fqdn') + tag.text = f"{host_name}.company.local" - def generate_plans(self): - """Generate action plans - one row per action-system combination""" - # Group vulnerabilities by plugin name (similar remediation action) - action_groups = defaultdict(list) - - for vuln in self.vulnerabilities.values(): - # Use plugin name as the action descriptor - action_groups[vuln.plugin_name].append(vuln) - - # Create action plans - one per action-system combination - for action_name, vulns in action_groups.items(): - # Determine primary action description - action_desc = self._generate_action_description(action_name, vulns) - - # Get all unique CVEs for this action (used for all systems) - total_cves = set() - for vuln in vulns: - total_cves.update(vuln.cve_list) - - # Group by system - system_data = defaultdict(lambda: { - 'ips': set(), - 'critical_count': 0, - 'high_count': 0 - }) - - for vuln in vulns: - for host in vuln.hosts: - # Parse "system (ip)" format - if '(' in host and ')' in host: - system = host.split('(')[0].strip() - ip = host.split('(')[1].rstrip(')') - else: - system = host - ip = host - - system_data[system]['ips'].add(ip) - - # Count severity reductions per system - if vuln.severity == 4: # Critical - system_data[system]['critical_count'] += 1 - elif vuln.severity == 3: # High - system_data[system]['high_count'] += 1 - - # Create one action plan per system - for system, data in system_data.items(): - plan = ActionPlan( - action=action_desc, - systems=[system], - ip_addresses=sorted(list(data['ips'])), - cve_reduction_count=len(total_cves), - critical_reduction=data['critical_count'], - high_reduction=data['high_count'] - ) - - self.action_plans.append(plan) - - # Sort by priority (critical first, then high, then CVE count) - self.action_plans.sort( - key=lambda x: (x.critical_reduction, x.high_reduction, x.cve_reduction_count), - reverse=True - ) - - return self.action_plans + # RHEL vulnerabilities + vulns = [ + {'plugin_id': '300001', 'name': 'Apache Log4j 1.2 JMSAppender RCE', 'severity': '4', 'cves': ['CVE-2021-4104']}, + {'plugin_id': '300002', 'name': 'Oracle Java SE July 2022 CPU', 'severity': '4', 'cves': ['CVE-2024-0020', 'CVE-2024-0021']}, + {'plugin_id': '300003', 'name': 'RHEL 8 : java-1.8.0-openjdk (RHSA-2025:18815)', 'severity': '3', 'cves': ['CVE-2024-0030']}, + {'plugin_id': '300004', 'name': 'RHEL 8 : NetworkManager (RHSA-2025:0288)', 'severity': '2', 'cves': []}, + {'plugin_id': '300005', 'name': 'RHEL 8 : NetworkManager-libreswan (RHSA-2024:8353)', 'severity': '2', 'cves': []}, + {'plugin_id': '300006', 'name': 'RHEL 8 : bcc (RHSA-2024:8831)', 'severity': '2', 'cves': []}, + ] - def _generate_action_description(self, plugin_name: str, vulns: List[Vulnerability]) -> str: - """Generate a clear action description from plugin name""" - # Clean up common patterns in plugin names - action = plugin_name - - # Common patterns to make more actionable - if "unsupported" in action.lower() or "end of life" in action.lower(): - action = f"Upgrade or replace unsupported software: {plugin_name}" - elif "missing" in action.lower() and "patch" in action.lower(): - action = f"Apply missing patches: {plugin_name}" - elif "vulnerability" in action.lower() or "multiple vulnerabilities" in action.lower(): - action = f"Remediate: {plugin_name}" - elif "update" in action.lower(): - action = f"Apply updates: {plugin_name}" - - return action + for vuln in vulns: + report_item = ET.SubElement(report_host, 'ReportItem', + port='0', svc_name='general', protocol='tcp', + severity=vuln['severity'], pluginID=vuln['plugin_id'], pluginName=vuln['name']) + for cve in vuln['cves']: + cve_elem = ET.SubElement(report_item, 'cve') + cve_elem.text = cve - def export_to_csv(self, output_file: str): - """Export action plans to CSV file""" - if not self.action_plans: - print("No action plans to export. Run generate_plans() first.") - return - - with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = [ - 'Action', - 'System', - 'IP Address', - 'CVE Reduction Count', - 'Critical Reduction', - 'High Reduction', - 'System Impact Risk', - 'Recommended IFC', - '3rd Party Software', - 'Responsible Party', - 'Documentation', - 'Comments' - ] - - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - for plan in self.action_plans: - writer.writerow({ - 'Action': plan.action, - 'System': plan.systems[0] if plan.systems else '', - 'IP Address': '; '.join(plan.ip_addresses), - 'CVE Reduction Count': plan.cve_reduction_count, - 'Critical Reduction': plan.critical_reduction, - 'High Reduction': plan.high_reduction, - 'System Impact Risk': plan.system_impact_risk, - 'Recommended IFC': plan.recommended_ifc, - '3rd Party Software': plan.third_party_software, - 'Responsible Party': plan.responsible_party, - 'Documentation': plan.documentation, - 'Comments': plan.comments - }) - - print(f"\n✓ Action plan exported to: {output_file}") - print(f" Total action items: {len(self.action_plans)}") - - def export_summary_csv(self, output_file: str): - """Export summary CSV with actions aggregated across all systems""" - if not self.action_plans: - print("No action plans to export. Run generate_plans() first.") - return - - # Aggregate by action - action_summary = defaultdict(lambda: { - 'systems': set(), - 'cve_count': 0, - 'critical_total': 0, - 'high_total': 0 - }) - - for plan in self.action_plans: - action = plan.action - action_summary[action]['systems'].add(plan.systems[0] if plan.systems else 'Unknown') - action_summary[action]['cve_count'] = plan.cve_reduction_count # Same for all systems with this action - action_summary[action]['critical_total'] += plan.critical_reduction - action_summary[action]['high_total'] += plan.high_reduction - - # Calculate totals for percentages - total_cve = len(set(plan.cve_reduction_count for plan in self.action_plans if plan.cve_reduction_count > 0)) - # Better approach: get unique CVEs across ALL actions - all_unique_cves = set() - action_cve_map = {} - for plan in self.action_plans: - if plan.action not in action_cve_map: - action_cve_map[plan.action] = plan.cve_reduction_count - all_unique_cves.add(plan.action) # Use action as proxy since CVE count is per action - - # Recalculate: sum unique CVEs across all actions - total_cve = sum(set(action_cve_map.values())) - total_critical = sum(plan.critical_reduction for plan in self.action_plans) - total_high = sum(plan.high_reduction for plan in self.action_plans) - - # Create summary list - summary_list = [] - for action, data in action_summary.items(): - summary_list.append({ - 'action': action, - 'system_count': len(data['systems']), - 'cve_reduction': data['cve_count'], - 'cve_percent': (data['cve_count'] / total_cve * 100) if total_cve > 0 else 0, - 'critical_reduction': data['critical_total'], - 'critical_percent': (data['critical_total'] / total_critical * 100) if total_critical > 0 else 0, - 'high_reduction': data['high_total'], - 'high_percent': (data['high_total'] / total_high * 100) if total_high > 0 else 0 - }) - - # Sort by total impact (critical + high) - summary_list.sort( - key=lambda x: (x['critical_reduction'], x['high_reduction'], x['cve_reduction']), - reverse=True - ) - - # Write summary CSV - with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = [ - 'Action', - 'System Count', - 'CVE Reduction', - '% of Total CVE Reduction', - 'Critical Reduction', - '% of Total Critical Reduction', - 'High Reduction', - '% of Total High Reduction' - ] - - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - for item in summary_list: - writer.writerow({ - 'Action': item['action'], - 'System Count': item['system_count'], - 'CVE Reduction': item['cve_reduction'], - '% of Total CVE Reduction': f"{item['cve_percent']:.1f}%", - 'Critical Reduction': item['critical_reduction'], - '% of Total Critical Reduction': f"{item['critical_percent']:.1f}%", - 'High Reduction': item['high_reduction'], - '% of Total High Reduction': f"{item['high_percent']:.1f}%" - }) - - print(f"\n✓ Summary exported to: {output_file}") - print(f" Total unique actions: {len(summary_list)}") - print(f" Total CVEs addressed: {total_cve}") - print(f" Total Critical vulnerabilities: {total_critical}") - print(f" Total High vulnerabilities: {total_high}") + # Plugin 66334 + patch_output = """ +. You need to take the following 110 actions : + +[ Apache Log4j 1.2 JMSAppender Remote Code Execution (CVE-2021-4104) (156103) ] + ++ Action to take : Upgrade to Apache Log4j version 2.16.0 or later since 1.x is end of life. + +Upgrading to the latest versions for Apache Log4j is highly recommended as intermediate versions / patches have known high severity vulnerabilities. + + +[ Oracle Java SE Multiple Vulnerabilities (July 2022 CPU) (163304) ] + ++ Action to take : Apply the appropriate patch according to the July 2022 Oracle Critical Patch Update advisory. + ++Impact : Taking this action will resolve 348 different vulnerabilities (CVEs). + + +[ RHEL 8 / 9 : java-1.8.0-openjdk (RHSA-2025:18815) (271273) ] + ++ Action to take : Update the RHEL java-1.8.0-openjdk package based on the guidance in RHSA-2025:18815. + ++Impact : Taking this action will resolve 24 different vulnerabilities (CVEs). + + +[ RHEL 8 : Bug fix of NetworkManager (Moderate) (RHSA-2025:0288) (214070) ] + ++ Action to take : Update the affected packages. + + +[ RHEL 8 : NetworkManager-libreswan (RHSA-2024:8353) (209549) ] + ++ Action to take : Update the RHEL NetworkManager-libreswan package based on the guidance in RHSA-2024:8353. + + +[ RHEL 8 : bcc (RHSA-2024:8831) (210348) ] + ++ Action to take : Update the RHEL bcc package based on the guidance in RHSA-2024:8831. +""" - def print_summary(self): - """Print a summary of the action plans""" - if not self.action_plans: - print("No action plans generated.") - return - - print("\n" + "="*80) - print("VULNERABILITY REMEDIATION ACTION PLAN SUMMARY") - print("="*80) - - total_critical = sum(p.critical_reduction for p in self.action_plans) - total_high = sum(p.high_reduction for p in self.action_plans) - total_cves = sum(p.cve_reduction_count for p in self.action_plans) - - print(f"\nTotal Actions: {len(self.action_plans)}") - print(f"Total Critical Vulnerabilities: {total_critical}") - print(f"Total High Vulnerabilities: {total_high}") - print(f"Total Unique CVEs: {total_cves}") - - print(f"\n{'Priority':<10} {'Action':<50} {'Critical':<10} {'High':<10} {'CVEs':<10}") - print("-"*90) - - for i, plan in enumerate(self.action_plans[:10], 1): # Top 10 - action_short = plan.action[:47] + "..." if len(plan.action) > 50 else plan.action - print(f"{i:<10} {action_short:<50} {plan.critical_reduction:<10} {plan.high_reduction:<10} {plan.cve_reduction_count:<10}") - - if len(self.action_plans) > 10: - print(f"\n... and {len(self.action_plans) - 10} more actions") + report_item = ET.SubElement(report_host, 'ReportItem', + port='0', svc_name='general', protocol='tcp', + severity='0', pluginID='66334', pluginName='Patch Report') + plugin_output = ET.SubElement(report_item, 'plugin_output') + plugin_output.text = patch_output def main(): - parser = argparse.ArgumentParser( - description='Parse Nessus scan files and generate vulnerability action plans', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - %(prog)s scan1.nessus scan2.nessus -o action_plan.csv - %(prog)s *.nessus -o remediation_plan.csv - %(prog)s scan1.nessus --summary-only -o summary.csv - """ - ) + root = ET.Element('NessusClientData_v2') + report = ET.SubElement(root, 'Report', name='Comprehensive Patch Report Test') - parser.add_argument( - 'nessus_files', - nargs='+', - help='One or more .nessus files to process' - ) + # Create Windows host + create_windows_host(report, "169.254.33.107", "DESKTOP-UE5DFOC") - parser.add_argument( - '-o', '--output', - default='vulnerability_action_plan.csv', - help='Output CSV file (default: vulnerability_action_plan.csv)' - ) + # Create RHEL host + create_rhel_host(report, "192.168.1.50", "rhel-server01") - parser.add_argument( - '--summary', - action='store_true', - help='Also generate a summary CSV file (appends _summary to filename)' - ) + # Pretty print + xml_str = minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ") - parser.add_argument( - '--summary-only', - action='store_true', - help='Generate only the summary CSV file, skip detailed plan' - ) + filename = 'comprehensive_patch_test.nessus' + with open(filename, 'w') as f: + f.write(xml_str) - args = parser.parse_args() - - # Validate input files - valid_files = [] - for filepath in args.nessus_files: - if Path(filepath).exists(): - valid_files.append(filepath) - else: - print(f"Warning: File not found: {filepath}") - - if not valid_files: - print("Error: No valid .nessus files provided") - return 1 - - print(f"\nProcessing {len(valid_files)} .nessus file(s)...\n") - - # Parse files - nessus_parser = NessusParser() - nessus_parser.parse_multiple_files(valid_files) - - # Generate action plans - plan_generator = ActionPlanGenerator(nessus_parser.vulnerabilities) - plan_generator.generate_plans() - - # Display summary - plan_generator.print_summary() - - # Determine what to export - if args.summary_only: - # Only export summary - plan_generator.export_summary_csv(args.output) - elif args.summary: - # Export both detailed and summary - plan_generator.export_to_csv(args.output) - - # Generate summary filename - output_path = Path(args.output) - summary_filename = output_path.stem + '_summary' + output_path.suffix - summary_path = output_path.parent / summary_filename - plan_generator.export_summary_csv(str(summary_path)) - else: - # Default: only export detailed plan - plan_generator.export_to_csv(args.output) - print("\nTip: Use --summary flag to also generate a summary CSV") - - print("\n✓ Complete!") - return 0 + print(f"✓ Created comprehensive test file: {filename}") + print("\nIncludes:") + print(" HOST 1 (Windows):") + print(" - KB patches with and without counts") + print(" - Adobe AIR upgrade") + print(" - Curl upgrade") + print(" - Microsoft Edge upgrade") + print("\n HOST 2 (RHEL 8):") + print(" - Apache Log4j upgrade") + print(" - Oracle Java patches") + print(" - RHEL package updates (java, NetworkManager, bcc)") + print("\nTest with:") + print(" python patch_report_extractor.py comprehensive_patch_test.nessus --summary -o result.csv") if __name__ == '__main__': - exit(main()) \ No newline at end of file + main() \ No newline at end of file