#!/usr/bin/env python3 """ Nessus Vulnerability Action Plan Generator Processes multiple .nessus files and creates actionable remediation plans """ import xml.etree.ElementTree as ET from collections import defaultdict from dataclasses import dataclass, field from typing import List, Dict, Set import csv import argparse from pathlib import Path @dataclass class Vulnerability: """Represents a vulnerability finding""" plugin_id: str plugin_name: str severity: int # 0=Info, 1=Low, 2=Medium, 3=High, 4=Critical cve_list: List[str] = field(default_factory=list) hosts: Set[str] = field(default_factory=set) @property def severity_text(self): severity_map = {0: "Info", 1: "Low", 2: "Medium", 3: "High", 4: "Critical"} return severity_map.get(self.severity, "Unknown") @dataclass class ActionPlan: """Represents a remediation action plan""" action: str systems: List[str] ip_addresses: List[str] cve_reduction_count: int critical_reduction: int high_reduction: int system_impact_risk: str = "" recommended_ifc: str = "" third_party_software: str = "" responsible_party: str = "" documentation: str = "" comments: str = "" class NessusParser: """Parses .nessus files and extracts vulnerability data""" def __init__(self): self.vulnerabilities: Dict[str, Vulnerability] = {} def parse_file(self, filepath: str): """Parse a single .nessus file""" try: tree = ET.parse(filepath) root = tree.getroot() # Iterate through all report hosts for report_host in root.findall('.//ReportHost'): host_ip = report_host.get('name') # Get host properties for system name host_properties = report_host.find('HostProperties') system_name = host_ip # Default to IP if host_properties is not None: for tag in host_properties.findall('tag'): if tag.get('name') == 'host-fqdn': system_name = tag.text break elif tag.get('name') == 'netbios-name': system_name = tag.text # Process each vulnerability item for item in report_host.findall('ReportItem'): plugin_id = item.get('pluginID') severity = int(item.get('severity', 0)) # Skip informational findings if severity == 0: continue plugin_name = item.get('pluginName', 'Unknown') # Extract CVEs cve_list = [] cve_elem = item.find('cve') if cve_elem is not None and cve_elem.text: cve_list = [cve_elem.text] # Additional CVEs might be in multiple elements for cve in item.findall('cve'): if cve.text and cve.text not in cve_list: cve_list.append(cve.text) # Add or update vulnerability if plugin_id not in self.vulnerabilities: self.vulnerabilities[plugin_id] = Vulnerability( plugin_id=plugin_id, plugin_name=plugin_name, severity=severity, cve_list=cve_list, hosts={f"{system_name} ({host_ip})"} ) else: self.vulnerabilities[plugin_id].hosts.add(f"{system_name} ({host_ip})") # Update CVE list if new ones found for cve in cve_list: if cve not in self.vulnerabilities[plugin_id].cve_list: self.vulnerabilities[plugin_id].cve_list.append(cve) print(f"✓ Parsed: {filepath}") except Exception as e: print(f"✗ Error parsing {filepath}: {str(e)}") def parse_multiple_files(self, filepaths: List[str]): """Parse multiple .nessus files""" for filepath in filepaths: self.parse_file(filepath) print(f"\nTotal unique vulnerabilities found: {len(self.vulnerabilities)}") class ActionPlanGenerator: """Generates actionable remediation plans from vulnerability data""" def __init__(self, vulnerabilities: Dict[str, Vulnerability]): self.vulnerabilities = vulnerabilities self.action_plans: List[ActionPlan] = [] def generate_plans(self): """Generate action plans - one row per action-system combination""" # Group vulnerabilities by plugin name (similar remediation action) action_groups = defaultdict(list) for vuln in self.vulnerabilities.values(): # Use plugin name as the action descriptor action_groups[vuln.plugin_name].append(vuln) # Create action plans - one per action-system combination for action_name, vulns in action_groups.items(): # Determine primary action description action_desc = self._generate_action_description(action_name, vulns) # Get all unique CVEs for this action (used for all systems) total_cves = set() for vuln in vulns: total_cves.update(vuln.cve_list) # Group by system system_data = defaultdict(lambda: { 'ips': set(), 'critical_count': 0, 'high_count': 0 }) for vuln in vulns: for host in vuln.hosts: # Parse "system (ip)" format if '(' in host and ')' in host: system = host.split('(')[0].strip() ip = host.split('(')[1].rstrip(')') else: system = host ip = host system_data[system]['ips'].add(ip) # Count severity reductions per system if vuln.severity == 4: # Critical system_data[system]['critical_count'] += 1 elif vuln.severity == 3: # High system_data[system]['high_count'] += 1 # Create one action plan per system for system, data in system_data.items(): plan = ActionPlan( action=action_desc, systems=[system], ip_addresses=sorted(list(data['ips'])), cve_reduction_count=len(total_cves), critical_reduction=data['critical_count'], high_reduction=data['high_count'] ) self.action_plans.append(plan) # Sort by priority (critical first, then high, then CVE count) self.action_plans.sort( key=lambda x: (x.critical_reduction, x.high_reduction, x.cve_reduction_count), reverse=True ) return self.action_plans def _generate_action_description(self, plugin_name: str, vulns: List[Vulnerability]) -> str: """Generate a clear action description from plugin name""" # Clean up common patterns in plugin names action = plugin_name # Common patterns to make more actionable if "unsupported" in action.lower() or "end of life" in action.lower(): action = f"Upgrade or replace unsupported software: {plugin_name}" elif "missing" in action.lower() and "patch" in action.lower(): action = f"Apply missing patches: {plugin_name}" elif "vulnerability" in action.lower() or "multiple vulnerabilities" in action.lower(): action = f"Remediate: {plugin_name}" elif "update" in action.lower(): action = f"Apply updates: {plugin_name}" return action def export_to_csv(self, output_file: str): """Export action plans to CSV file""" if not self.action_plans: print("No action plans to export. Run generate_plans() first.") return with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = [ 'Action', 'System', 'IP Address', 'CVE Reduction Count', 'Critical Reduction', 'High Reduction', 'System Impact Risk', 'Recommended IFC', '3rd Party Software', 'Responsible Party', 'Documentation', 'Comments' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for plan in self.action_plans: writer.writerow({ 'Action': plan.action, 'System': plan.systems[0] if plan.systems else '', 'IP Address': '; '.join(plan.ip_addresses), 'CVE Reduction Count': plan.cve_reduction_count, 'Critical Reduction': plan.critical_reduction, 'High Reduction': plan.high_reduction, 'System Impact Risk': plan.system_impact_risk, 'Recommended IFC': plan.recommended_ifc, '3rd Party Software': plan.third_party_software, 'Responsible Party': plan.responsible_party, 'Documentation': plan.documentation, 'Comments': plan.comments }) print(f"\n✓ Action plan exported to: {output_file}") print(f" Total action items: {len(self.action_plans)}") def export_summary_csv(self, output_file: str): """Export summary CSV with actions aggregated across all systems""" if not self.action_plans: print("No action plans to export. Run generate_plans() first.") return # Aggregate by action action_summary = defaultdict(lambda: { 'systems': set(), 'cve_count': 0, 'critical_total': 0, 'high_total': 0 }) for plan in self.action_plans: action = plan.action action_summary[action]['systems'].add(plan.systems[0] if plan.systems else 'Unknown') action_summary[action]['cve_count'] = plan.cve_reduction_count # Same for all systems with this action action_summary[action]['critical_total'] += plan.critical_reduction action_summary[action]['high_total'] += plan.high_reduction # Calculate totals for percentages total_cve = len(set(plan.cve_reduction_count for plan in self.action_plans if plan.cve_reduction_count > 0)) # Better approach: get unique CVEs across ALL actions all_unique_cves = set() action_cve_map = {} for plan in self.action_plans: if plan.action not in action_cve_map: action_cve_map[plan.action] = plan.cve_reduction_count all_unique_cves.add(plan.action) # Use action as proxy since CVE count is per action # Recalculate: sum unique CVEs across all actions total_cve = sum(set(action_cve_map.values())) total_critical = sum(plan.critical_reduction for plan in self.action_plans) total_high = sum(plan.high_reduction for plan in self.action_plans) # Create summary list summary_list = [] for action, data in action_summary.items(): summary_list.append({ 'action': action, 'system_count': len(data['systems']), 'cve_reduction': data['cve_count'], 'cve_percent': (data['cve_count'] / total_cve * 100) if total_cve > 0 else 0, 'critical_reduction': data['critical_total'], 'critical_percent': (data['critical_total'] / total_critical * 100) if total_critical > 0 else 0, 'high_reduction': data['high_total'], 'high_percent': (data['high_total'] / total_high * 100) if total_high > 0 else 0 }) # Sort by total impact (critical + high) summary_list.sort( key=lambda x: (x['critical_reduction'], x['high_reduction'], x['cve_reduction']), reverse=True ) # Write summary CSV with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = [ 'Action', 'System Count', 'CVE Reduction', '% of Total CVE Reduction', 'Critical Reduction', '% of Total Critical Reduction', 'High Reduction', '% of Total High Reduction' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in summary_list: writer.writerow({ 'Action': item['action'], 'System Count': item['system_count'], 'CVE Reduction': item['cve_reduction'], '% of Total CVE Reduction': f"{item['cve_percent']:.1f}%", 'Critical Reduction': item['critical_reduction'], '% of Total Critical Reduction': f"{item['critical_percent']:.1f}%", 'High Reduction': item['high_reduction'], '% of Total High Reduction': f"{item['high_percent']:.1f}%" }) print(f"\n✓ Summary exported to: {output_file}") print(f" Total unique actions: {len(summary_list)}") print(f" Total CVEs addressed: {total_cve}") print(f" Total Critical vulnerabilities: {total_critical}") print(f" Total High vulnerabilities: {total_high}") def print_summary(self): """Print a summary of the action plans""" if not self.action_plans: print("No action plans generated.") return print("\n" + "="*80) print("VULNERABILITY REMEDIATION ACTION PLAN SUMMARY") print("="*80) total_critical = sum(p.critical_reduction for p in self.action_plans) total_high = sum(p.high_reduction for p in self.action_plans) total_cves = sum(p.cve_reduction_count for p in self.action_plans) print(f"\nTotal Actions: {len(self.action_plans)}") print(f"Total Critical Vulnerabilities: {total_critical}") print(f"Total High Vulnerabilities: {total_high}") print(f"Total Unique CVEs: {total_cves}") print(f"\n{'Priority':<10} {'Action':<50} {'Critical':<10} {'High':<10} {'CVEs':<10}") print("-"*90) for i, plan in enumerate(self.action_plans[:10], 1): # Top 10 action_short = plan.action[:47] + "..." if len(plan.action) > 50 else plan.action print(f"{i:<10} {action_short:<50} {plan.critical_reduction:<10} {plan.high_reduction:<10} {plan.cve_reduction_count:<10}") if len(self.action_plans) > 10: print(f"\n... and {len(self.action_plans) - 10} more actions") def main(): parser = argparse.ArgumentParser( description='Parse Nessus scan files and generate vulnerability action plans', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s scan1.nessus scan2.nessus -o action_plan.csv %(prog)s *.nessus -o remediation_plan.csv %(prog)s scan1.nessus --summary-only -o summary.csv """ ) parser.add_argument( 'nessus_files', nargs='+', help='One or more .nessus files to process' ) parser.add_argument( '-o', '--output', default='vulnerability_action_plan.csv', help='Output CSV file (default: vulnerability_action_plan.csv)' ) parser.add_argument( '--summary', action='store_true', help='Also generate a summary CSV file (appends _summary to filename)' ) parser.add_argument( '--summary-only', action='store_true', help='Generate only the summary CSV file, skip detailed plan' ) args = parser.parse_args() # Validate input files valid_files = [] for filepath in args.nessus_files: if Path(filepath).exists(): valid_files.append(filepath) else: print(f"Warning: File not found: {filepath}") if not valid_files: print("Error: No valid .nessus files provided") return 1 print(f"\nProcessing {len(valid_files)} .nessus file(s)...\n") # Parse files nessus_parser = NessusParser() nessus_parser.parse_multiple_files(valid_files) # Generate action plans plan_generator = ActionPlanGenerator(nessus_parser.vulnerabilities) plan_generator.generate_plans() # Display summary plan_generator.print_summary() # Determine what to export if args.summary_only: # Only export summary plan_generator.export_summary_csv(args.output) elif args.summary: # Export both detailed and summary plan_generator.export_to_csv(args.output) # Generate summary filename output_path = Path(args.output) summary_filename = output_path.stem + '_summary' + output_path.suffix summary_path = output_path.parent / summary_filename plan_generator.export_summary_csv(str(summary_path)) else: # Default: only export detailed plan plan_generator.export_to_csv(args.output) print("\nTip: Use --summary flag to also generate a summary CSV") print("\n✓ Complete!") return 0 if __name__ == '__main__': exit(main())