diff --git a/NessusActionPlanner.py b/NessusActionPlanner.py deleted file mode 100644 index 9596e92..0000000 --- a/NessusActionPlanner.py +++ /dev/null @@ -1,540 +0,0 @@ -#!/usr/bin/env python3 -""" -Nessus Vulnerability Action Plan Generator -Processes multiple .nessus files and creates actionable remediation plans -""" - -import xml.etree.ElementTree as ET -from collections import defaultdict -from dataclasses import dataclass, field -from typing import List, Dict, Set -import csv -import argparse -from pathlib import Path - - -@dataclass -class Vulnerability: - """Represents a vulnerability finding""" - plugin_id: str - plugin_name: str - severity: int # 0=Info, 1=Low, 2=Medium, 3=High, 4=Critical - cve_list: List[str] = field(default_factory=list) - hosts: Set[str] = field(default_factory=set) - - @property - def severity_text(self): - severity_map = {0: "Info", 1: "Low", 2: "Medium", 3: "High", 4: "Critical"} - return severity_map.get(self.severity, "Unknown") - - -@dataclass -class ActionPlan: - """Represents a remediation action plan""" - action: str - systems: List[str] - ip_addresses: List[str] - cve_reduction_count: int - critical_reduction: int - high_reduction: int - system_impact_risk: str = "" - recommended_ifc: str = "" - third_party_software: str = "" - responsible_party: str = "" - documentation: str = "" - comments: str = "" - - -class NessusParser: - """Parses .nessus files and extracts vulnerability data""" - - def __init__(self): - self.vulnerabilities: Dict[str, Vulnerability] = {} - - def parse_file(self, filepath: str): - """Parse a single .nessus file""" - try: - tree = ET.parse(filepath) - root = tree.getroot() - - # Iterate through all report hosts - for report_host in root.findall('.//ReportHost'): - host_ip = report_host.get('name') - - # Get host properties for system name - host_properties = report_host.find('HostProperties') - system_name = host_ip # Default to IP - - if host_properties is not None: - for tag in host_properties.findall('tag'): - if tag.get('name') == 'host-fqdn': - system_name = tag.text - break - elif tag.get('name') == 'netbios-name': - system_name = tag.text - - # Process each vulnerability item - for item in report_host.findall('ReportItem'): - plugin_id = item.get('pluginID') - severity = int(item.get('severity', 0)) - - # Skip informational findings - if severity == 0: - continue - - plugin_name = item.get('pluginName', 'Unknown') - - # Extract CVEs - cve_list = [] - cve_elem = item.find('cve') - if cve_elem is not None and cve_elem.text: - cve_list = [cve_elem.text] - - # Additional CVEs might be in multiple elements - for cve in item.findall('cve'): - if cve.text and cve.text not in cve_list: - cve_list.append(cve.text) - - # Add or update vulnerability - if plugin_id not in self.vulnerabilities: - self.vulnerabilities[plugin_id] = Vulnerability( - plugin_id=plugin_id, - plugin_name=plugin_name, - severity=severity, - cve_list=cve_list, - hosts={f"{system_name} ({host_ip})"} - ) - else: - self.vulnerabilities[plugin_id].hosts.add(f"{system_name} ({host_ip})") - # Update CVE list if new ones found - for cve in cve_list: - if cve not in self.vulnerabilities[plugin_id].cve_list: - self.vulnerabilities[plugin_id].cve_list.append(cve) - - print(f"✓ Parsed: {filepath}") - - except Exception as e: - print(f"✗ Error parsing {filepath}: {str(e)}") - - def parse_multiple_files(self, filepaths: List[str]): - """Parse multiple .nessus files""" - for filepath in filepaths: - self.parse_file(filepath) - - print(f"\nTotal unique vulnerabilities found: {len(self.vulnerabilities)}") - - -class ActionPlanGenerator: - """Generates actionable remediation plans from vulnerability data""" - - def __init__(self, vulnerabilities: Dict[str, Vulnerability], consolidate_versions: bool = False): - self.vulnerabilities = vulnerabilities - self.action_plans: List[ActionPlan] = [] - self.consolidate_versions = consolidate_versions - - def generate_plans(self): - """Generate action plans - one row per action-system combination""" - # Group vulnerabilities by plugin name (similar remediation action) - action_groups = defaultdict(list) - - for vuln in self.vulnerabilities.values(): - # Use plugin name as the action descriptor - action_groups[vuln.plugin_name].append(vuln) - - # Optionally consolidate version-based vulnerabilities (e.g., Adobe AIR updates) - if self.consolidate_versions: - action_groups = self._consolidate_version_vulns(action_groups) - - # Create action plans - one per action-system combination - for action_name, vulns in action_groups.items(): - # Determine primary action description - action_desc = self._generate_action_description(action_name, vulns) - - # Get all unique CVEs for this action (used for all systems) - total_cves = set() - for vuln in vulns: - total_cves.update(vuln.cve_list) - - # Group by system - system_data = defaultdict(lambda: { - 'ips': set(), - 'critical_count': 0, - 'high_count': 0 - }) - - for vuln in vulns: - for host in vuln.hosts: - # Parse "system (ip)" format - if '(' in host and ')' in host: - system = host.split('(')[0].strip() - ip = host.split('(')[1].rstrip(')') - else: - system = host - ip = host - - system_data[system]['ips'].add(ip) - - # Count severity reductions per system - if vuln.severity == 4: # Critical - system_data[system]['critical_count'] += 1 - elif vuln.severity == 3: # High - system_data[system]['high_count'] += 1 - - # Create one action plan per system - for system, data in system_data.items(): - plan = ActionPlan( - action=action_desc, - systems=[system], - ip_addresses=sorted(list(data['ips'])), - cve_reduction_count=len(total_cves), - critical_reduction=data['critical_count'], - high_reduction=data['high_count'] - ) - - self.action_plans.append(plan) - - # Sort by priority (critical first, then high, then CVE count) - self.action_plans.sort( - key=lambda x: (x.critical_reduction, x.high_reduction, x.cve_reduction_count), - reverse=True - ) - - return self.action_plans - - def _consolidate_version_vulns(self, action_groups: Dict) -> Dict: - """Consolidate vulnerabilities that are different versions of the same software""" - import re - - # Group by software base name (without version numbers) - software_groups = defaultdict(list) - - for action_name in list(action_groups.keys()): - # Extract base software name by removing version patterns - # Patterns like: "Adobe AIR <= 19.0.0.241", "Apache 2.4.49", "OpenSSL 1.1.1" - - # Remove common version patterns - base_name = action_name - - # Remove patterns like "<= version", "< version", "version.number.number", bulletin codes - patterns = [ - r'\s*[<>=]+\s*\d+\.[\d.]+', # <= 19.0.0.241 - r'\(APSB[\w-]+\)', # (APSB15-32) - r'\(KB\d+\)', # (KB5001234) - r'\(CVE-[\d-]+\)', # (CVE-2024-0001) - r'\s+v?\d+\.[\d.]+\s*', # version 2.4.49 or v2.4.49 - r'\s*<\s*\d+\.[\d.]+', # < 3.0 - r'\s*>\s*\d+\.[\d.]+', # > 2.0 - ] - - for pattern in patterns: - base_name = re.sub(pattern, '', base_name) - - # Remove common suffixes after version removal - base_name = re.sub(r'\s+(Multiple\s+)?Vulnerabilities?.*$', '', base_name, flags=re.IGNORECASE) - - # Clean up extra whitespace and punctuation at the end - base_name = re.sub(r'\s+', ' ', base_name).strip() - base_name = re.sub(r'[,\s]+$', '', base_name) - - # Group by base name - software_groups[base_name].append(action_name) - - # Consolidate groups that have multiple versions - consolidated = {} - - for base_name, action_list in software_groups.items(): - if len(action_list) > 1: - # Multiple versions found - consolidate them - # Merge all vulnerabilities under the base name - merged_vulns = [] - for action_name in action_list: - merged_vulns.extend(action_groups[action_name]) - - # Use a consolidated action name - consolidated_name = f"Update to latest version: {base_name}" - consolidated[consolidated_name] = merged_vulns - else: - # Single version - keep as is - action_name = action_list[0] - consolidated[action_name] = action_groups[action_name] - - return consolidated - - def _generate_action_description(self, plugin_name: str, vulns: List[Vulnerability]) -> str: - """Generate a clear action description from plugin name""" - # Clean up common patterns in plugin names - action = plugin_name - - # Common patterns to make more actionable - if "unsupported" in action.lower() or "end of life" in action.lower(): - action = f"Upgrade or replace unsupported software: {plugin_name}" - elif "missing" in action.lower() and "patch" in action.lower(): - action = f"Apply missing patches: {plugin_name}" - elif "vulnerability" in action.lower() or "multiple vulnerabilities" in action.lower(): - action = f"Remediate: {plugin_name}" - elif "update" in action.lower(): - action = f"Apply updates: {plugin_name}" - - return action - - def export_to_csv(self, output_file: str): - """Export action plans to CSV file""" - if not self.action_plans: - print("No action plans to export. Run generate_plans() first.") - return - - with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = [ - 'Action', - 'System', - 'IP Address', - 'CVE Reduction Count', - 'Critical Reduction', - 'High Reduction', - 'System Impact Risk', - 'Recommended IFC', - '3rd Party Software', - 'Responsible Party', - 'Documentation', - 'Comments' - ] - - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - for plan in self.action_plans: - writer.writerow({ - 'Action': plan.action, - 'System': plan.systems[0] if plan.systems else '', - 'IP Address': '; '.join(plan.ip_addresses), - 'CVE Reduction Count': plan.cve_reduction_count, - 'Critical Reduction': plan.critical_reduction, - 'High Reduction': plan.high_reduction, - 'System Impact Risk': plan.system_impact_risk, - 'Recommended IFC': plan.recommended_ifc, - '3rd Party Software': plan.third_party_software, - 'Responsible Party': plan.responsible_party, - 'Documentation': plan.documentation, - 'Comments': plan.comments - }) - - print(f"\n✓ Action plan exported to: {output_file}") - print(f" Total action items: {len(self.action_plans)}") - - def export_summary_csv(self, output_file: str): - """Export summary CSV with actions aggregated across all systems""" - if not self.action_plans: - print("No action plans to export. Run generate_plans() first.") - return - - # Aggregate by action - action_summary = defaultdict(lambda: { - 'systems': set(), - 'cve_count': 0, - 'critical_total': 0, - 'high_total': 0 - }) - - for plan in self.action_plans: - action = plan.action - action_summary[action]['systems'].add(plan.systems[0] if plan.systems else 'Unknown') - action_summary[action]['cve_count'] = plan.cve_reduction_count # Same for all systems with this action - action_summary[action]['critical_total'] += plan.critical_reduction - action_summary[action]['high_total'] += plan.high_reduction - - # Calculate totals for percentages - total_cve = len(set(plan.cve_reduction_count for plan in self.action_plans if plan.cve_reduction_count > 0)) - # Better approach: get unique CVEs across ALL actions - all_unique_cves = set() - action_cve_map = {} - for plan in self.action_plans: - if plan.action not in action_cve_map: - action_cve_map[plan.action] = plan.cve_reduction_count - all_unique_cves.add(plan.action) # Use action as proxy since CVE count is per action - - # Recalculate: sum unique CVEs across all actions - total_cve = sum(set(action_cve_map.values())) - total_critical = sum(plan.critical_reduction for plan in self.action_plans) - total_high = sum(plan.high_reduction for plan in self.action_plans) - - # Create summary list - summary_list = [] - for action, data in action_summary.items(): - summary_list.append({ - 'action': action, - 'system_count': len(data['systems']), - 'cve_reduction': data['cve_count'], - 'cve_percent': (data['cve_count'] / total_cve * 100) if total_cve > 0 else 0, - 'critical_reduction': data['critical_total'], - 'critical_percent': (data['critical_total'] / total_critical * 100) if total_critical > 0 else 0, - 'high_reduction': data['high_total'], - 'high_percent': (data['high_total'] / total_high * 100) if total_high > 0 else 0 - }) - - # Sort by total impact (critical + high) - summary_list.sort( - key=lambda x: (x['critical_reduction'], x['high_reduction'], x['cve_reduction']), - reverse=True - ) - - # Write summary CSV - with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = [ - 'Action', - 'System Count', - 'CVE Reduction', - '% of Total CVE Reduction', - 'Critical Reduction', - '% of Total Critical Reduction', - 'High Reduction', - '% of Total High Reduction' - ] - - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - for item in summary_list: - writer.writerow({ - 'Action': item['action'], - 'System Count': item['system_count'], - 'CVE Reduction': item['cve_reduction'], - '% of Total CVE Reduction': f"{item['cve_percent']:.1f}%", - 'Critical Reduction': item['critical_reduction'], - '% of Total Critical Reduction': f"{item['critical_percent']:.1f}%", - 'High Reduction': item['high_reduction'], - '% of Total High Reduction': f"{item['high_percent']:.1f}%" - }) - - print(f"\n✓ Summary exported to: {output_file}") - print(f" Total unique actions: {len(summary_list)}") - print(f" Total CVEs addressed: {total_cve}") - print(f" Total Critical vulnerabilities: {total_critical}") - print(f" Total High vulnerabilities: {total_high}") - - - def print_summary(self): - """Print a summary of the action plans""" - if not self.action_plans: - print("No action plans generated.") - return - - print("\n" + "="*80) - print("VULNERABILITY REMEDIATION ACTION PLAN SUMMARY") - print("="*80) - - total_critical = sum(p.critical_reduction for p in self.action_plans) - total_high = sum(p.high_reduction for p in self.action_plans) - total_cves = sum(p.cve_reduction_count for p in self.action_plans) - - print(f"\nTotal Actions: {len(self.action_plans)}") - print(f"Total Critical Vulnerabilities: {total_critical}") - print(f"Total High Vulnerabilities: {total_high}") - print(f"Total Unique CVEs: {total_cves}") - - print(f"\n{'Priority':<10} {'Action':<50} {'Critical':<10} {'High':<10} {'CVEs':<10}") - print("-"*90) - - for i, plan in enumerate(self.action_plans[:10], 1): # Top 10 - action_short = plan.action[:47] + "..." if len(plan.action) > 50 else plan.action - print(f"{i:<10} {action_short:<50} {plan.critical_reduction:<10} {plan.high_reduction:<10} {plan.cve_reduction_count:<10}") - - if len(self.action_plans) > 10: - print(f"\n... and {len(self.action_plans) - 10} more actions") - - -def main(): - parser = argparse.ArgumentParser( - description='Parse Nessus scan files and generate vulnerability action plans', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - %(prog)s scan1.nessus scan2.nessus -o action_plan.csv - %(prog)s *.nessus -o remediation_plan.csv - %(prog)s scan1.nessus --summary-only -o summary.csv - """ - ) - - parser.add_argument( - 'nessus_files', - nargs='+', - help='One or more .nessus files to process' - ) - - parser.add_argument( - '-o', '--output', - default='vulnerability_action_plan.csv', - help='Output CSV file (default: vulnerability_action_plan.csv)' - ) - - parser.add_argument( - '--summary', - action='store_true', - help='Also generate a summary CSV file (appends _summary to filename)' - ) - - parser.add_argument( - '--summary-only', - action='store_true', - help='Generate only the summary CSV file, skip detailed plan' - ) - - parser.add_argument( - '--consolidate', - action='store_true', - help='Consolidate multiple versions of same software into single action (e.g., Adobe AIR updates)' - ) - - args = parser.parse_args() - - # Validate input files - valid_files = [] - for filepath in args.nessus_files: - if Path(filepath).exists(): - valid_files.append(filepath) - else: - print(f"Warning: File not found: {filepath}") - - if not valid_files: - print("Error: No valid .nessus files provided") - return 1 - - print(f"\nProcessing {len(valid_files)} .nessus file(s)...\n") - - # Parse files - nessus_parser = NessusParser() - nessus_parser.parse_multiple_files(valid_files) - - # Generate action plans - plan_generator = ActionPlanGenerator( - nessus_parser.vulnerabilities, - consolidate_versions=args.consolidate - ) - plan_generator.generate_plans() - - # Display summary - plan_generator.print_summary() - - # Determine what to export - if args.summary_only: - # Only export summary - plan_generator.export_summary_csv(args.output) - elif args.summary: - # Export both detailed and summary - plan_generator.export_to_csv(args.output) - - # Generate summary filename - output_path = Path(args.output) - summary_filename = output_path.stem + '_summary' + output_path.suffix - summary_path = output_path.parent / summary_filename - plan_generator.export_summary_csv(str(summary_path)) - else: - # Default: only export detailed plan - plan_generator.export_to_csv(args.output) - print("\nTip: Use --summary flag to also generate a summary CSV") - - print("\n✓ Complete!") - return 0 - - -if __name__ == '__main__': - exit(main()) \ No newline at end of file