Add NessusActionPlanner.py
This commit is contained in:
349
NessusActionPlanner.py
Normal file
349
NessusActionPlanner.py
Normal file
@@ -0,0 +1,349 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Nessus Vulnerability Action Plan Generator
|
||||
Processes multiple .nessus files and creates actionable remediation plans
|
||||
"""
|
||||
|
||||
import xml.etree.ElementTree as ET
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Dict, Set
|
||||
import csv
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class Vulnerability:
|
||||
"""Represents a vulnerability finding"""
|
||||
plugin_id: str
|
||||
plugin_name: str
|
||||
severity: int # 0=Info, 1=Low, 2=Medium, 3=High, 4=Critical
|
||||
cve_list: List[str] = field(default_factory=list)
|
||||
hosts: Set[str] = field(default_factory=set)
|
||||
|
||||
@property
|
||||
def severity_text(self):
|
||||
severity_map = {0: "Info", 1: "Low", 2: "Medium", 3: "High", 4: "Critical"}
|
||||
return severity_map.get(self.severity, "Unknown")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActionPlan:
|
||||
"""Represents a remediation action plan"""
|
||||
action: str
|
||||
systems: List[str]
|
||||
ip_addresses: List[str]
|
||||
cve_reduction_count: int
|
||||
critical_reduction: int
|
||||
high_reduction: int
|
||||
system_impact_risk: str = ""
|
||||
recommended_ifc: str = ""
|
||||
third_party_software: str = ""
|
||||
responsible_party: str = ""
|
||||
documentation: str = ""
|
||||
comments: str = ""
|
||||
|
||||
|
||||
class NessusParser:
|
||||
"""Parses .nessus files and extracts vulnerability data"""
|
||||
|
||||
def __init__(self):
|
||||
self.vulnerabilities: Dict[str, Vulnerability] = {}
|
||||
|
||||
def parse_file(self, filepath: str):
|
||||
"""Parse a single .nessus file"""
|
||||
try:
|
||||
tree = ET.parse(filepath)
|
||||
root = tree.getroot()
|
||||
|
||||
# Iterate through all report hosts
|
||||
for report_host in root.findall('.//ReportHost'):
|
||||
host_ip = report_host.get('name')
|
||||
|
||||
# Get host properties for system name
|
||||
host_properties = report_host.find('HostProperties')
|
||||
system_name = host_ip # Default to IP
|
||||
|
||||
if host_properties is not None:
|
||||
for tag in host_properties.findall('tag'):
|
||||
if tag.get('name') == 'host-fqdn':
|
||||
system_name = tag.text
|
||||
break
|
||||
elif tag.get('name') == 'netbios-name':
|
||||
system_name = tag.text
|
||||
|
||||
# Process each vulnerability item
|
||||
for item in report_host.findall('ReportItem'):
|
||||
plugin_id = item.get('pluginID')
|
||||
severity = int(item.get('severity', 0))
|
||||
|
||||
# Skip informational findings
|
||||
if severity == 0:
|
||||
continue
|
||||
|
||||
plugin_name = item.get('pluginName', 'Unknown')
|
||||
|
||||
# Extract CVEs
|
||||
cve_list = []
|
||||
cve_elem = item.find('cve')
|
||||
if cve_elem is not None and cve_elem.text:
|
||||
cve_list = [cve_elem.text]
|
||||
|
||||
# Additional CVEs might be in multiple elements
|
||||
for cve in item.findall('cve'):
|
||||
if cve.text and cve.text not in cve_list:
|
||||
cve_list.append(cve.text)
|
||||
|
||||
# Add or update vulnerability
|
||||
if plugin_id not in self.vulnerabilities:
|
||||
self.vulnerabilities[plugin_id] = Vulnerability(
|
||||
plugin_id=plugin_id,
|
||||
plugin_name=plugin_name,
|
||||
severity=severity,
|
||||
cve_list=cve_list,
|
||||
hosts={f"{system_name} ({host_ip})"}
|
||||
)
|
||||
else:
|
||||
self.vulnerabilities[plugin_id].hosts.add(f"{system_name} ({host_ip})")
|
||||
# Update CVE list if new ones found
|
||||
for cve in cve_list:
|
||||
if cve not in self.vulnerabilities[plugin_id].cve_list:
|
||||
self.vulnerabilities[plugin_id].cve_list.append(cve)
|
||||
|
||||
print(f"✓ Parsed: {filepath}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Error parsing {filepath}: {str(e)}")
|
||||
|
||||
def parse_multiple_files(self, filepaths: List[str]):
|
||||
"""Parse multiple .nessus files"""
|
||||
for filepath in filepaths:
|
||||
self.parse_file(filepath)
|
||||
|
||||
print(f"\nTotal unique vulnerabilities found: {len(self.vulnerabilities)}")
|
||||
|
||||
|
||||
class ActionPlanGenerator:
|
||||
"""Generates actionable remediation plans from vulnerability data"""
|
||||
|
||||
def __init__(self, vulnerabilities: Dict[str, Vulnerability]):
|
||||
self.vulnerabilities = vulnerabilities
|
||||
self.action_plans: List[ActionPlan] = []
|
||||
|
||||
def generate_plans(self):
|
||||
"""Generate action plans - one row per action-system combination"""
|
||||
# Group vulnerabilities by plugin name (similar remediation action)
|
||||
action_groups = defaultdict(list)
|
||||
|
||||
for vuln in self.vulnerabilities.values():
|
||||
# Use plugin name as the action descriptor
|
||||
action_groups[vuln.plugin_name].append(vuln)
|
||||
|
||||
# Create action plans - one per action-system combination
|
||||
for action_name, vulns in action_groups.items():
|
||||
# Determine primary action description
|
||||
action_desc = self._generate_action_description(action_name, vulns)
|
||||
|
||||
# Get all unique CVEs for this action (used for all systems)
|
||||
total_cves = set()
|
||||
for vuln in vulns:
|
||||
total_cves.update(vuln.cve_list)
|
||||
|
||||
# Group by system
|
||||
system_data = defaultdict(lambda: {
|
||||
'ips': set(),
|
||||
'critical_count': 0,
|
||||
'high_count': 0
|
||||
})
|
||||
|
||||
for vuln in vulns:
|
||||
for host in vuln.hosts:
|
||||
# Parse "system (ip)" format
|
||||
if '(' in host and ')' in host:
|
||||
system = host.split('(')[0].strip()
|
||||
ip = host.split('(')[1].rstrip(')')
|
||||
else:
|
||||
system = host
|
||||
ip = host
|
||||
|
||||
system_data[system]['ips'].add(ip)
|
||||
|
||||
# Count severity reductions per system
|
||||
if vuln.severity == 4: # Critical
|
||||
system_data[system]['critical_count'] += 1
|
||||
elif vuln.severity == 3: # High
|
||||
system_data[system]['high_count'] += 1
|
||||
|
||||
# Create one action plan per system
|
||||
for system, data in system_data.items():
|
||||
plan = ActionPlan(
|
||||
action=action_desc,
|
||||
systems=[system],
|
||||
ip_addresses=sorted(list(data['ips'])),
|
||||
cve_reduction_count=len(total_cves),
|
||||
critical_reduction=data['critical_count'],
|
||||
high_reduction=data['high_count']
|
||||
)
|
||||
|
||||
self.action_plans.append(plan)
|
||||
|
||||
# Sort by priority (critical first, then high, then CVE count)
|
||||
self.action_plans.sort(
|
||||
key=lambda x: (x.critical_reduction, x.high_reduction, x.cve_reduction_count),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
return self.action_plans
|
||||
|
||||
def _generate_action_description(self, plugin_name: str, vulns: List[Vulnerability]) -> str:
|
||||
"""Generate a clear action description from plugin name"""
|
||||
# Clean up common patterns in plugin names
|
||||
action = plugin_name
|
||||
|
||||
# Common patterns to make more actionable
|
||||
if "unsupported" in action.lower() or "end of life" in action.lower():
|
||||
action = f"Upgrade or replace unsupported software: {plugin_name}"
|
||||
elif "missing" in action.lower() and "patch" in action.lower():
|
||||
action = f"Apply missing patches: {plugin_name}"
|
||||
elif "vulnerability" in action.lower() or "multiple vulnerabilities" in action.lower():
|
||||
action = f"Remediate: {plugin_name}"
|
||||
elif "update" in action.lower():
|
||||
action = f"Apply updates: {plugin_name}"
|
||||
|
||||
return action
|
||||
|
||||
def export_to_csv(self, output_file: str):
|
||||
"""Export action plans to CSV file"""
|
||||
if not self.action_plans:
|
||||
print("No action plans to export. Run generate_plans() first.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
|
||||
fieldnames = [
|
||||
'Action',
|
||||
'System',
|
||||
'IP Address',
|
||||
'CVE Reduction Count',
|
||||
'Critical Reduction',
|
||||
'High Reduction',
|
||||
'System Impact Risk',
|
||||
'Recommended IFC',
|
||||
'3rd Party Software',
|
||||
'Responsible Party',
|
||||
'Documentation',
|
||||
'Comments'
|
||||
]
|
||||
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
|
||||
for plan in self.action_plans:
|
||||
writer.writerow({
|
||||
'Action': plan.action,
|
||||
'System': plan.systems[0] if plan.systems else '',
|
||||
'IP Address': '; '.join(plan.ip_addresses),
|
||||
'CVE Reduction Count': plan.cve_reduction_count,
|
||||
'Critical Reduction': plan.critical_reduction,
|
||||
'High Reduction': plan.high_reduction,
|
||||
'System Impact Risk': plan.system_impact_risk,
|
||||
'Recommended IFC': plan.recommended_ifc,
|
||||
'3rd Party Software': plan.third_party_software,
|
||||
'Responsible Party': plan.responsible_party,
|
||||
'Documentation': plan.documentation,
|
||||
'Comments': plan.comments
|
||||
})
|
||||
|
||||
print(f"\n✓ Action plan exported to: {output_file}")
|
||||
print(f" Total action items: {len(self.action_plans)}")
|
||||
|
||||
|
||||
def print_summary(self):
|
||||
"""Print a summary of the action plans"""
|
||||
if not self.action_plans:
|
||||
print("No action plans generated.")
|
||||
return
|
||||
|
||||
print("\n" + "="*80)
|
||||
print("VULNERABILITY REMEDIATION ACTION PLAN SUMMARY")
|
||||
print("="*80)
|
||||
|
||||
total_critical = sum(p.critical_reduction for p in self.action_plans)
|
||||
total_high = sum(p.high_reduction for p in self.action_plans)
|
||||
total_cves = sum(p.cve_reduction_count for p in self.action_plans)
|
||||
|
||||
print(f"\nTotal Actions: {len(self.action_plans)}")
|
||||
print(f"Total Critical Vulnerabilities: {total_critical}")
|
||||
print(f"Total High Vulnerabilities: {total_high}")
|
||||
print(f"Total Unique CVEs: {total_cves}")
|
||||
|
||||
print(f"\n{'Priority':<10} {'Action':<50} {'Critical':<10} {'High':<10} {'CVEs':<10}")
|
||||
print("-"*90)
|
||||
|
||||
for i, plan in enumerate(self.action_plans[:10], 1): # Top 10
|
||||
action_short = plan.action[:47] + "..." if len(plan.action) > 50 else plan.action
|
||||
print(f"{i:<10} {action_short:<50} {plan.critical_reduction:<10} {plan.high_reduction:<10} {plan.cve_reduction_count:<10}")
|
||||
|
||||
if len(self.action_plans) > 10:
|
||||
print(f"\n... and {len(self.action_plans) - 10} more actions")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Parse Nessus scan files and generate vulnerability action plans',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s scan1.nessus scan2.nessus -o action_plan.csv
|
||||
%(prog)s *.nessus -o remediation_plan.csv
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'nessus_files',
|
||||
nargs='+',
|
||||
help='One or more .nessus files to process'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output',
|
||||
default='vulnerability_action_plan.csv',
|
||||
help='Output CSV file (default: vulnerability_action_plan.csv)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Validate input files
|
||||
valid_files = []
|
||||
for filepath in args.nessus_files:
|
||||
if Path(filepath).exists():
|
||||
valid_files.append(filepath)
|
||||
else:
|
||||
print(f"Warning: File not found: {filepath}")
|
||||
|
||||
if not valid_files:
|
||||
print("Error: No valid .nessus files provided")
|
||||
return 1
|
||||
|
||||
print(f"\nProcessing {len(valid_files)} .nessus file(s)...\n")
|
||||
|
||||
# Parse files
|
||||
nessus_parser = NessusParser()
|
||||
nessus_parser.parse_multiple_files(valid_files)
|
||||
|
||||
# Generate action plans
|
||||
plan_generator = ActionPlanGenerator(nessus_parser.vulnerabilities)
|
||||
plan_generator.generate_plans()
|
||||
|
||||
# Display summary
|
||||
plan_generator.print_summary()
|
||||
|
||||
# Export to CSV
|
||||
plan_generator.export_to_csv(args.output)
|
||||
|
||||
print("\n✓ Complete!")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
exit(main())
|
||||
Reference in New Issue
Block a user