diff --git a/NessusActionPlanner.py b/NessusActionPlanner.py new file mode 100644 index 0000000..beb9d5d --- /dev/null +++ b/NessusActionPlanner.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 +""" +Nessus Vulnerability Action Plan Generator +Processes multiple .nessus files and creates actionable remediation plans +""" + +import xml.etree.ElementTree as ET +from collections import defaultdict +from dataclasses import dataclass, field +from typing import List, Dict, Set +import csv +import argparse +from pathlib import Path + + +@dataclass +class Vulnerability: + """Represents a vulnerability finding""" + plugin_id: str + plugin_name: str + severity: int # 0=Info, 1=Low, 2=Medium, 3=High, 4=Critical + cve_list: List[str] = field(default_factory=list) + hosts: Set[str] = field(default_factory=set) + + @property + def severity_text(self): + severity_map = {0: "Info", 1: "Low", 2: "Medium", 3: "High", 4: "Critical"} + return severity_map.get(self.severity, "Unknown") + + +@dataclass +class ActionPlan: + """Represents a remediation action plan""" + action: str + systems: List[str] + ip_addresses: List[str] + cve_reduction_count: int + critical_reduction: int + high_reduction: int + system_impact_risk: str = "" + recommended_ifc: str = "" + third_party_software: str = "" + responsible_party: str = "" + documentation: str = "" + comments: str = "" + + +class NessusParser: + """Parses .nessus files and extracts vulnerability data""" + + def __init__(self): + self.vulnerabilities: Dict[str, Vulnerability] = {} + + def parse_file(self, filepath: str): + """Parse a single .nessus file""" + try: + tree = ET.parse(filepath) + root = tree.getroot() + + # Iterate through all report hosts + for report_host in root.findall('.//ReportHost'): + host_ip = report_host.get('name') + + # Get host properties for system name + host_properties = report_host.find('HostProperties') + system_name = host_ip # Default to IP + + if host_properties is not None: + for tag in host_properties.findall('tag'): + if tag.get('name') == 'host-fqdn': + system_name = tag.text + break + elif tag.get('name') == 'netbios-name': + system_name = tag.text + + # Process each vulnerability item + for item in report_host.findall('ReportItem'): + plugin_id = item.get('pluginID') + severity = int(item.get('severity', 0)) + + # Skip informational findings + if severity == 0: + continue + + plugin_name = item.get('pluginName', 'Unknown') + + # Extract CVEs + cve_list = [] + cve_elem = item.find('cve') + if cve_elem is not None and cve_elem.text: + cve_list = [cve_elem.text] + + # Additional CVEs might be in multiple elements + for cve in item.findall('cve'): + if cve.text and cve.text not in cve_list: + cve_list.append(cve.text) + + # Add or update vulnerability + if plugin_id not in self.vulnerabilities: + self.vulnerabilities[plugin_id] = Vulnerability( + plugin_id=plugin_id, + plugin_name=plugin_name, + severity=severity, + cve_list=cve_list, + hosts={f"{system_name} ({host_ip})"} + ) + else: + self.vulnerabilities[plugin_id].hosts.add(f"{system_name} ({host_ip})") + # Update CVE list if new ones found + for cve in cve_list: + if cve not in self.vulnerabilities[plugin_id].cve_list: + self.vulnerabilities[plugin_id].cve_list.append(cve) + + print(f"āœ“ Parsed: {filepath}") + + except Exception as e: + print(f"āœ— Error parsing {filepath}: {str(e)}") + + def parse_multiple_files(self, filepaths: List[str]): + """Parse multiple .nessus files""" + for filepath in filepaths: + self.parse_file(filepath) + + print(f"\nTotal unique vulnerabilities found: {len(self.vulnerabilities)}") + + +class ActionPlanGenerator: + """Generates actionable remediation plans from vulnerability data""" + + def __init__(self, vulnerabilities: Dict[str, Vulnerability]): + self.vulnerabilities = vulnerabilities + self.action_plans: List[ActionPlan] = [] + + def generate_plans(self): + """Generate action plans - one row per action-system combination""" + # Group vulnerabilities by plugin name (similar remediation action) + action_groups = defaultdict(list) + + for vuln in self.vulnerabilities.values(): + # Use plugin name as the action descriptor + action_groups[vuln.plugin_name].append(vuln) + + # Create action plans - one per action-system combination + for action_name, vulns in action_groups.items(): + # Determine primary action description + action_desc = self._generate_action_description(action_name, vulns) + + # Get all unique CVEs for this action (used for all systems) + total_cves = set() + for vuln in vulns: + total_cves.update(vuln.cve_list) + + # Group by system + system_data = defaultdict(lambda: { + 'ips': set(), + 'critical_count': 0, + 'high_count': 0 + }) + + for vuln in vulns: + for host in vuln.hosts: + # Parse "system (ip)" format + if '(' in host and ')' in host: + system = host.split('(')[0].strip() + ip = host.split('(')[1].rstrip(')') + else: + system = host + ip = host + + system_data[system]['ips'].add(ip) + + # Count severity reductions per system + if vuln.severity == 4: # Critical + system_data[system]['critical_count'] += 1 + elif vuln.severity == 3: # High + system_data[system]['high_count'] += 1 + + # Create one action plan per system + for system, data in system_data.items(): + plan = ActionPlan( + action=action_desc, + systems=[system], + ip_addresses=sorted(list(data['ips'])), + cve_reduction_count=len(total_cves), + critical_reduction=data['critical_count'], + high_reduction=data['high_count'] + ) + + self.action_plans.append(plan) + + # Sort by priority (critical first, then high, then CVE count) + self.action_plans.sort( + key=lambda x: (x.critical_reduction, x.high_reduction, x.cve_reduction_count), + reverse=True + ) + + return self.action_plans + + def _generate_action_description(self, plugin_name: str, vulns: List[Vulnerability]) -> str: + """Generate a clear action description from plugin name""" + # Clean up common patterns in plugin names + action = plugin_name + + # Common patterns to make more actionable + if "unsupported" in action.lower() or "end of life" in action.lower(): + action = f"Upgrade or replace unsupported software: {plugin_name}" + elif "missing" in action.lower() and "patch" in action.lower(): + action = f"Apply missing patches: {plugin_name}" + elif "vulnerability" in action.lower() or "multiple vulnerabilities" in action.lower(): + action = f"Remediate: {plugin_name}" + elif "update" in action.lower(): + action = f"Apply updates: {plugin_name}" + + return action + + def export_to_csv(self, output_file: str): + """Export action plans to CSV file""" + if not self.action_plans: + print("No action plans to export. Run generate_plans() first.") + return + + with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = [ + 'Action', + 'System', + 'IP Address', + 'CVE Reduction Count', + 'Critical Reduction', + 'High Reduction', + 'System Impact Risk', + 'Recommended IFC', + '3rd Party Software', + 'Responsible Party', + 'Documentation', + 'Comments' + ] + + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + for plan in self.action_plans: + writer.writerow({ + 'Action': plan.action, + 'System': plan.systems[0] if plan.systems else '', + 'IP Address': '; '.join(plan.ip_addresses), + 'CVE Reduction Count': plan.cve_reduction_count, + 'Critical Reduction': plan.critical_reduction, + 'High Reduction': plan.high_reduction, + 'System Impact Risk': plan.system_impact_risk, + 'Recommended IFC': plan.recommended_ifc, + '3rd Party Software': plan.third_party_software, + 'Responsible Party': plan.responsible_party, + 'Documentation': plan.documentation, + 'Comments': plan.comments + }) + + print(f"\nāœ“ Action plan exported to: {output_file}") + print(f" Total action items: {len(self.action_plans)}") + + + def print_summary(self): + """Print a summary of the action plans""" + if not self.action_plans: + print("No action plans generated.") + return + + print("\n" + "="*80) + print("VULNERABILITY REMEDIATION ACTION PLAN SUMMARY") + print("="*80) + + total_critical = sum(p.critical_reduction for p in self.action_plans) + total_high = sum(p.high_reduction for p in self.action_plans) + total_cves = sum(p.cve_reduction_count for p in self.action_plans) + + print(f"\nTotal Actions: {len(self.action_plans)}") + print(f"Total Critical Vulnerabilities: {total_critical}") + print(f"Total High Vulnerabilities: {total_high}") + print(f"Total Unique CVEs: {total_cves}") + + print(f"\n{'Priority':<10} {'Action':<50} {'Critical':<10} {'High':<10} {'CVEs':<10}") + print("-"*90) + + for i, plan in enumerate(self.action_plans[:10], 1): # Top 10 + action_short = plan.action[:47] + "..." if len(plan.action) > 50 else plan.action + print(f"{i:<10} {action_short:<50} {plan.critical_reduction:<10} {plan.high_reduction:<10} {plan.cve_reduction_count:<10}") + + if len(self.action_plans) > 10: + print(f"\n... and {len(self.action_plans) - 10} more actions") + + +def main(): + parser = argparse.ArgumentParser( + description='Parse Nessus scan files and generate vulnerability action plans', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s scan1.nessus scan2.nessus -o action_plan.csv + %(prog)s *.nessus -o remediation_plan.csv + """ + ) + + parser.add_argument( + 'nessus_files', + nargs='+', + help='One or more .nessus files to process' + ) + + parser.add_argument( + '-o', '--output', + default='vulnerability_action_plan.csv', + help='Output CSV file (default: vulnerability_action_plan.csv)' + ) + + args = parser.parse_args() + + # Validate input files + valid_files = [] + for filepath in args.nessus_files: + if Path(filepath).exists(): + valid_files.append(filepath) + else: + print(f"Warning: File not found: {filepath}") + + if not valid_files: + print("Error: No valid .nessus files provided") + return 1 + + print(f"\nProcessing {len(valid_files)} .nessus file(s)...\n") + + # Parse files + nessus_parser = NessusParser() + nessus_parser.parse_multiple_files(valid_files) + + # Generate action plans + plan_generator = ActionPlanGenerator(nessus_parser.vulnerabilities) + plan_generator.generate_plans() + + # Display summary + plan_generator.print_summary() + + # Export to CSV + plan_generator.export_to_csv(args.output) + + print("\nāœ“ Complete!") + return 0 + + +if __name__ == '__main__': + exit(main())