Update NessusActionPlanner.py

This commit is contained in:
2026-02-05 10:57:35 -08:00
parent 9adb12991c
commit 913f424305

View File

@@ -1,185 +1,540 @@
#!/usr/bin/env python3
"""
Create comprehensive test with Windows and RHEL Plugin 66334 formats
Nessus Vulnerability Action Plan Generator
Processes multiple .nessus files and creates actionable remediation plans
"""
import xml.etree.ElementTree as ET
from xml.dom import minidom
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Set
import csv
import argparse
from pathlib import Path
def create_windows_host(report, host_ip, host_name):
"""Create Windows host with vulnerabilities"""
report_host = ET.SubElement(report, 'ReportHost', name=host_ip)
@dataclass
class Vulnerability:
"""Represents a vulnerability finding"""
plugin_id: str
plugin_name: str
severity: int # 0=Info, 1=Low, 2=Medium, 3=High, 4=Critical
cve_list: List[str] = field(default_factory=list)
hosts: Set[str] = field(default_factory=set)
# Host properties
host_properties = ET.SubElement(report_host, 'HostProperties')
tag = ET.SubElement(host_properties, 'tag', name='netbios-name')
tag.text = host_name
@property
def severity_text(self):
severity_map = {0: "Info", 1: "Low", 2: "Medium", 3: "High", 4: "Critical"}
return severity_map.get(self.severity, "Unknown")
# Windows vulnerabilities
vulns = [
{'plugin_id': '200001', 'name': 'MS KB5073724 Security Update', 'severity': '4', 'cves': ['CVE-2024-0001', 'CVE-2024-0002']},
{'plugin_id': '200002', 'name': 'MS KB5049613 Critical Patch', 'severity': '4', 'cves': ['CVE-2024-0003']},
{'plugin_id': '200003', 'name': 'Adobe AIR < 23.0.0.257 Multiple Vulnerabilities', 'severity': '4', 'cves': ['CVE-2024-0010', 'CVE-2024-0011']},
{'plugin_id': '200004', 'name': 'Adobe AIR Unsupported Version', 'severity': '3', 'cves': ['CVE-2024-0012']},
{'plugin_id': '200005', 'name': 'Curl Use-After-Free < 7.87', 'severity': '4', 'cves': ['CVE-2022-43552']},
{'plugin_id': '200006', 'name': 'Microsoft Edge < 143.0.3650.139', 'severity': '3', 'cves': ['CVE-2026-0628']},
]
@dataclass
class ActionPlan:
"""Represents a remediation action plan"""
action: str
systems: List[str]
ip_addresses: List[str]
cve_reduction_count: int
critical_reduction: int
high_reduction: int
system_impact_risk: str = ""
recommended_ifc: str = ""
third_party_software: str = ""
responsible_party: str = ""
documentation: str = ""
comments: str = ""
class NessusParser:
"""Parses .nessus files and extracts vulnerability data"""
def __init__(self):
self.vulnerabilities: Dict[str, Vulnerability] = {}
def parse_file(self, filepath: str):
"""Parse a single .nessus file"""
try:
tree = ET.parse(filepath)
root = tree.getroot()
# Iterate through all report hosts
for report_host in root.findall('.//ReportHost'):
host_ip = report_host.get('name')
# Get host properties for system name
host_properties = report_host.find('HostProperties')
system_name = host_ip # Default to IP
if host_properties is not None:
for tag in host_properties.findall('tag'):
if tag.get('name') == 'host-fqdn':
system_name = tag.text
break
elif tag.get('name') == 'netbios-name':
system_name = tag.text
# Process each vulnerability item
for item in report_host.findall('ReportItem'):
plugin_id = item.get('pluginID')
severity = int(item.get('severity', 0))
# Skip informational findings
if severity == 0:
continue
plugin_name = item.get('pluginName', 'Unknown')
# Extract CVEs
cve_list = []
cve_elem = item.find('cve')
if cve_elem is not None and cve_elem.text:
cve_list = [cve_elem.text]
# Additional CVEs might be in multiple elements
for cve in item.findall('cve'):
if cve.text and cve.text not in cve_list:
cve_list.append(cve.text)
# Add or update vulnerability
if plugin_id not in self.vulnerabilities:
self.vulnerabilities[plugin_id] = Vulnerability(
plugin_id=plugin_id,
plugin_name=plugin_name,
severity=severity,
cve_list=cve_list,
hosts={f"{system_name} ({host_ip})"}
)
else:
self.vulnerabilities[plugin_id].hosts.add(f"{system_name} ({host_ip})")
# Update CVE list if new ones found
for cve in cve_list:
if cve not in self.vulnerabilities[plugin_id].cve_list:
self.vulnerabilities[plugin_id].cve_list.append(cve)
print(f"✓ Parsed: {filepath}")
except Exception as e:
print(f"✗ Error parsing {filepath}: {str(e)}")
def parse_multiple_files(self, filepaths: List[str]):
"""Parse multiple .nessus files"""
for filepath in filepaths:
self.parse_file(filepath)
print(f"\nTotal unique vulnerabilities found: {len(self.vulnerabilities)}")
class ActionPlanGenerator:
"""Generates actionable remediation plans from vulnerability data"""
def __init__(self, vulnerabilities: Dict[str, Vulnerability], consolidate_versions: bool = False):
self.vulnerabilities = vulnerabilities
self.action_plans: List[ActionPlan] = []
self.consolidate_versions = consolidate_versions
def generate_plans(self):
"""Generate action plans - one row per action-system combination"""
# Group vulnerabilities by plugin name (similar remediation action)
action_groups = defaultdict(list)
for vuln in self.vulnerabilities.values():
# Use plugin name as the action descriptor
action_groups[vuln.plugin_name].append(vuln)
# Optionally consolidate version-based vulnerabilities (e.g., Adobe AIR updates)
if self.consolidate_versions:
action_groups = self._consolidate_version_vulns(action_groups)
# Create action plans - one per action-system combination
for action_name, vulns in action_groups.items():
# Determine primary action description
action_desc = self._generate_action_description(action_name, vulns)
# Get all unique CVEs for this action (used for all systems)
total_cves = set()
for vuln in vulns:
total_cves.update(vuln.cve_list)
# Group by system
system_data = defaultdict(lambda: {
'ips': set(),
'critical_count': 0,
'high_count': 0
})
for vuln in vulns:
report_item = ET.SubElement(report_host, 'ReportItem',
port='0', svc_name='general', protocol='tcp',
severity=vuln['severity'], pluginID=vuln['plugin_id'], pluginName=vuln['name'])
for cve in vuln['cves']:
cve_elem = ET.SubElement(report_item, 'cve')
cve_elem.text = cve
for host in vuln.hosts:
# Parse "system (ip)" format
if '(' in host and ')' in host:
system = host.split('(')[0].strip()
ip = host.split('(')[1].rstrip(')')
else:
system = host
ip = host
# Plugin 66334
patch_output = """
. You need to take the following 18 actions :
system_data[system]['ips'].add(ip)
+ Install the following Microsoft patches :
- KB5073724 (39 vulnerabilities)
- KB5049613
- KB5044023
- KB5039893
- KB5039884
- KB5036608
- KB5033909
- KB5031988
# Count severity reductions per system
if vuln.severity == 4: # Critical
system_data[system]['critical_count'] += 1
elif vuln.severity == 3: # High
system_data[system]['high_count'] += 1
[ Adobe AIR <= 22.0.0.153 Android Applications Runtime Analytics MitM (APSB16-31) (93523) ]
# Create one action plan per system
for system, data in system_data.items():
plan = ActionPlan(
action=action_desc,
systems=[system],
ip_addresses=sorted(list(data['ips'])),
cve_reduction_count=len(total_cves),
critical_reduction=data['critical_count'],
high_reduction=data['high_count']
)
+ Action to take : Upgrade to Adobe AIR version 23.0.0.257 or later.
self.action_plans.append(plan)
+Impact : Taking this action will resolve 564 different vulnerabilities (CVEs).
# Sort by priority (critical first, then high, then CVE count)
self.action_plans.sort(
key=lambda x: (x.critical_reduction, x.high_reduction, x.cve_reduction_count),
reverse=True
)
return self.action_plans
[ Curl Use-After-Free < 7.87 (CVE-2022-43552) (171859) ]
def _consolidate_version_vulns(self, action_groups: Dict) -> Dict:
"""Consolidate vulnerabilities that are different versions of the same software"""
import re
+ Action to take : Upgrade Curl to version 7.87.0 or later
# Group by software base name (without version numbers)
software_groups = defaultdict(list)
for action_name in list(action_groups.keys()):
# Extract base software name by removing version patterns
# Patterns like: "Adobe AIR <= 19.0.0.241", "Apache 2.4.49", "OpenSSL 1.1.1"
[ Microsoft Edge (Chromium) < 143.0.3650.139 (CVE-2026-0628) (282534) ]
# Remove common version patterns
base_name = action_name
+ Action to take : Upgrade to Microsoft Edge version 143.0.3650.139 or later.
+Impact : Taking this action will resolve 96 different vulnerabilities (CVEs).
"""
report_item = ET.SubElement(report_host, 'ReportItem',
port='0', svc_name='general', protocol='tcp',
severity='0', pluginID='66334', pluginName='Patch Report')
plugin_output = ET.SubElement(report_item, 'plugin_output')
plugin_output.text = patch_output
def create_rhel_host(report, host_ip, host_name):
"""Create RHEL host with vulnerabilities"""
report_host = ET.SubElement(report, 'ReportHost', name=host_ip)
# Host properties
host_properties = ET.SubElement(report_host, 'HostProperties')
tag = ET.SubElement(host_properties, 'tag', name='host-fqdn')
tag.text = f"{host_name}.company.local"
# RHEL vulnerabilities
vulns = [
{'plugin_id': '300001', 'name': 'Apache Log4j 1.2 JMSAppender RCE', 'severity': '4', 'cves': ['CVE-2021-4104']},
{'plugin_id': '300002', 'name': 'Oracle Java SE July 2022 CPU', 'severity': '4', 'cves': ['CVE-2024-0020', 'CVE-2024-0021']},
{'plugin_id': '300003', 'name': 'RHEL 8 : java-1.8.0-openjdk (RHSA-2025:18815)', 'severity': '3', 'cves': ['CVE-2024-0030']},
{'plugin_id': '300004', 'name': 'RHEL 8 : NetworkManager (RHSA-2025:0288)', 'severity': '2', 'cves': []},
{'plugin_id': '300005', 'name': 'RHEL 8 : NetworkManager-libreswan (RHSA-2024:8353)', 'severity': '2', 'cves': []},
{'plugin_id': '300006', 'name': 'RHEL 8 : bcc (RHSA-2024:8831)', 'severity': '2', 'cves': []},
# Remove patterns like "<= version", "< version", "version.number.number", bulletin codes
patterns = [
r'\s*[<>=]+\s*\d+\.[\d.]+', # <= 19.0.0.241
r'\(APSB[\w-]+\)', # (APSB15-32)
r'\(KB\d+\)', # (KB5001234)
r'\(CVE-[\d-]+\)', # (CVE-2024-0001)
r'\s+v?\d+\.[\d.]+\s*', # version 2.4.49 or v2.4.49
r'\s*<\s*\d+\.[\d.]+', # < 3.0
r'\s*>\s*\d+\.[\d.]+', # > 2.0
]
for vuln in vulns:
report_item = ET.SubElement(report_host, 'ReportItem',
port='0', svc_name='general', protocol='tcp',
severity=vuln['severity'], pluginID=vuln['plugin_id'], pluginName=vuln['name'])
for cve in vuln['cves']:
cve_elem = ET.SubElement(report_item, 'cve')
cve_elem.text = cve
for pattern in patterns:
base_name = re.sub(pattern, '', base_name)
# Plugin 66334
patch_output = """
. You need to take the following 110 actions :
# Remove common suffixes after version removal
base_name = re.sub(r'\s+(Multiple\s+)?Vulnerabilities?.*$', '', base_name, flags=re.IGNORECASE)
# Clean up extra whitespace and punctuation at the end
base_name = re.sub(r'\s+', ' ', base_name).strip()
base_name = re.sub(r'[,\s]+$', '', base_name)
# Group by base name
software_groups[base_name].append(action_name)
# Consolidate groups that have multiple versions
consolidated = {}
for base_name, action_list in software_groups.items():
if len(action_list) > 1:
# Multiple versions found - consolidate them
# Merge all vulnerabilities under the base name
merged_vulns = []
for action_name in action_list:
merged_vulns.extend(action_groups[action_name])
# Use a consolidated action name
consolidated_name = f"Update to latest version: {base_name}"
consolidated[consolidated_name] = merged_vulns
else:
# Single version - keep as is
action_name = action_list[0]
consolidated[action_name] = action_groups[action_name]
return consolidated
def _generate_action_description(self, plugin_name: str, vulns: List[Vulnerability]) -> str:
"""Generate a clear action description from plugin name"""
# Clean up common patterns in plugin names
action = plugin_name
# Common patterns to make more actionable
if "unsupported" in action.lower() or "end of life" in action.lower():
action = f"Upgrade or replace unsupported software: {plugin_name}"
elif "missing" in action.lower() and "patch" in action.lower():
action = f"Apply missing patches: {plugin_name}"
elif "vulnerability" in action.lower() or "multiple vulnerabilities" in action.lower():
action = f"Remediate: {plugin_name}"
elif "update" in action.lower():
action = f"Apply updates: {plugin_name}"
return action
def export_to_csv(self, output_file: str):
"""Export action plans to CSV file"""
if not self.action_plans:
print("No action plans to export. Run generate_plans() first.")
return
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Action',
'System',
'IP Address',
'CVE Reduction Count',
'Critical Reduction',
'High Reduction',
'System Impact Risk',
'Recommended IFC',
'3rd Party Software',
'Responsible Party',
'Documentation',
'Comments'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for plan in self.action_plans:
writer.writerow({
'Action': plan.action,
'System': plan.systems[0] if plan.systems else '',
'IP Address': '; '.join(plan.ip_addresses),
'CVE Reduction Count': plan.cve_reduction_count,
'Critical Reduction': plan.critical_reduction,
'High Reduction': plan.high_reduction,
'System Impact Risk': plan.system_impact_risk,
'Recommended IFC': plan.recommended_ifc,
'3rd Party Software': plan.third_party_software,
'Responsible Party': plan.responsible_party,
'Documentation': plan.documentation,
'Comments': plan.comments
})
print(f"\n✓ Action plan exported to: {output_file}")
print(f" Total action items: {len(self.action_plans)}")
def export_summary_csv(self, output_file: str):
"""Export summary CSV with actions aggregated across all systems"""
if not self.action_plans:
print("No action plans to export. Run generate_plans() first.")
return
# Aggregate by action
action_summary = defaultdict(lambda: {
'systems': set(),
'cve_count': 0,
'critical_total': 0,
'high_total': 0
})
for plan in self.action_plans:
action = plan.action
action_summary[action]['systems'].add(plan.systems[0] if plan.systems else 'Unknown')
action_summary[action]['cve_count'] = plan.cve_reduction_count # Same for all systems with this action
action_summary[action]['critical_total'] += plan.critical_reduction
action_summary[action]['high_total'] += plan.high_reduction
# Calculate totals for percentages
total_cve = len(set(plan.cve_reduction_count for plan in self.action_plans if plan.cve_reduction_count > 0))
# Better approach: get unique CVEs across ALL actions
all_unique_cves = set()
action_cve_map = {}
for plan in self.action_plans:
if plan.action not in action_cve_map:
action_cve_map[plan.action] = plan.cve_reduction_count
all_unique_cves.add(plan.action) # Use action as proxy since CVE count is per action
# Recalculate: sum unique CVEs across all actions
total_cve = sum(set(action_cve_map.values()))
total_critical = sum(plan.critical_reduction for plan in self.action_plans)
total_high = sum(plan.high_reduction for plan in self.action_plans)
# Create summary list
summary_list = []
for action, data in action_summary.items():
summary_list.append({
'action': action,
'system_count': len(data['systems']),
'cve_reduction': data['cve_count'],
'cve_percent': (data['cve_count'] / total_cve * 100) if total_cve > 0 else 0,
'critical_reduction': data['critical_total'],
'critical_percent': (data['critical_total'] / total_critical * 100) if total_critical > 0 else 0,
'high_reduction': data['high_total'],
'high_percent': (data['high_total'] / total_high * 100) if total_high > 0 else 0
})
# Sort by total impact (critical + high)
summary_list.sort(
key=lambda x: (x['critical_reduction'], x['high_reduction'], x['cve_reduction']),
reverse=True
)
# Write summary CSV
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Action',
'System Count',
'CVE Reduction',
'% of Total CVE Reduction',
'Critical Reduction',
'% of Total Critical Reduction',
'High Reduction',
'% of Total High Reduction'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in summary_list:
writer.writerow({
'Action': item['action'],
'System Count': item['system_count'],
'CVE Reduction': item['cve_reduction'],
'% of Total CVE Reduction': f"{item['cve_percent']:.1f}%",
'Critical Reduction': item['critical_reduction'],
'% of Total Critical Reduction': f"{item['critical_percent']:.1f}%",
'High Reduction': item['high_reduction'],
'% of Total High Reduction': f"{item['high_percent']:.1f}%"
})
print(f"\n✓ Summary exported to: {output_file}")
print(f" Total unique actions: {len(summary_list)}")
print(f" Total CVEs addressed: {total_cve}")
print(f" Total Critical vulnerabilities: {total_critical}")
print(f" Total High vulnerabilities: {total_high}")
[ Apache Log4j 1.2 JMSAppender Remote Code Execution (CVE-2021-4104) (156103) ]
def print_summary(self):
"""Print a summary of the action plans"""
if not self.action_plans:
print("No action plans generated.")
return
+ Action to take : Upgrade to Apache Log4j version 2.16.0 or later since 1.x is end of life.
print("\n" + "="*80)
print("VULNERABILITY REMEDIATION ACTION PLAN SUMMARY")
print("="*80)
Upgrading to the latest versions for Apache Log4j is highly recommended as intermediate versions / patches have known high severity vulnerabilities.
total_critical = sum(p.critical_reduction for p in self.action_plans)
total_high = sum(p.high_reduction for p in self.action_plans)
total_cves = sum(p.cve_reduction_count for p in self.action_plans)
print(f"\nTotal Actions: {len(self.action_plans)}")
print(f"Total Critical Vulnerabilities: {total_critical}")
print(f"Total High Vulnerabilities: {total_high}")
print(f"Total Unique CVEs: {total_cves}")
[ Oracle Java SE Multiple Vulnerabilities (July 2022 CPU) (163304) ]
print(f"\n{'Priority':<10} {'Action':<50} {'Critical':<10} {'High':<10} {'CVEs':<10}")
print("-"*90)
+ Action to take : Apply the appropriate patch according to the July 2022 Oracle Critical Patch Update advisory.
for i, plan in enumerate(self.action_plans[:10], 1): # Top 10
action_short = plan.action[:47] + "..." if len(plan.action) > 50 else plan.action
print(f"{i:<10} {action_short:<50} {plan.critical_reduction:<10} {plan.high_reduction:<10} {plan.cve_reduction_count:<10}")
+Impact : Taking this action will resolve 348 different vulnerabilities (CVEs).
[ RHEL 8 / 9 : java-1.8.0-openjdk (RHSA-2025:18815) (271273) ]
+ Action to take : Update the RHEL java-1.8.0-openjdk package based on the guidance in RHSA-2025:18815.
+Impact : Taking this action will resolve 24 different vulnerabilities (CVEs).
[ RHEL 8 : Bug fix of NetworkManager (Moderate) (RHSA-2025:0288) (214070) ]
+ Action to take : Update the affected packages.
[ RHEL 8 : NetworkManager-libreswan (RHSA-2024:8353) (209549) ]
+ Action to take : Update the RHEL NetworkManager-libreswan package based on the guidance in RHSA-2024:8353.
[ RHEL 8 : bcc (RHSA-2024:8831) (210348) ]
+ Action to take : Update the RHEL bcc package based on the guidance in RHSA-2024:8831.
"""
report_item = ET.SubElement(report_host, 'ReportItem',
port='0', svc_name='general', protocol='tcp',
severity='0', pluginID='66334', pluginName='Patch Report')
plugin_output = ET.SubElement(report_item, 'plugin_output')
plugin_output.text = patch_output
if len(self.action_plans) > 10:
print(f"\n... and {len(self.action_plans) - 10} more actions")
def main():
root = ET.Element('NessusClientData_v2')
report = ET.SubElement(root, 'Report', name='Comprehensive Patch Report Test')
parser = argparse.ArgumentParser(
description='Parse Nessus scan files and generate vulnerability action plans',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s scan1.nessus scan2.nessus -o action_plan.csv
%(prog)s *.nessus -o remediation_plan.csv
%(prog)s scan1.nessus --summary-only -o summary.csv
"""
)
# Create Windows host
create_windows_host(report, "169.254.33.107", "DESKTOP-UE5DFOC")
parser.add_argument(
'nessus_files',
nargs='+',
help='One or more .nessus files to process'
)
# Create RHEL host
create_rhel_host(report, "192.168.1.50", "rhel-server01")
parser.add_argument(
'-o', '--output',
default='vulnerability_action_plan.csv',
help='Output CSV file (default: vulnerability_action_plan.csv)'
)
# Pretty print
xml_str = minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")
parser.add_argument(
'--summary',
action='store_true',
help='Also generate a summary CSV file (appends _summary to filename)'
)
filename = 'comprehensive_patch_test.nessus'
with open(filename, 'w') as f:
f.write(xml_str)
parser.add_argument(
'--summary-only',
action='store_true',
help='Generate only the summary CSV file, skip detailed plan'
)
print(f"✓ Created comprehensive test file: {filename}")
print("\nIncludes:")
print(" HOST 1 (Windows):")
print(" - KB patches with and without counts")
print(" - Adobe AIR upgrade")
print(" - Curl upgrade")
print(" - Microsoft Edge upgrade")
print("\n HOST 2 (RHEL 8):")
print(" - Apache Log4j upgrade")
print(" - Oracle Java patches")
print(" - RHEL package updates (java, NetworkManager, bcc)")
print("\nTest with:")
print(" python patch_report_extractor.py comprehensive_patch_test.nessus --summary -o result.csv")
parser.add_argument(
'--consolidate',
action='store_true',
help='Consolidate multiple versions of same software into single action (e.g., Adobe AIR updates)'
)
args = parser.parse_args()
# Validate input files
valid_files = []
for filepath in args.nessus_files:
if Path(filepath).exists():
valid_files.append(filepath)
else:
print(f"Warning: File not found: {filepath}")
if not valid_files:
print("Error: No valid .nessus files provided")
return 1
print(f"\nProcessing {len(valid_files)} .nessus file(s)...\n")
# Parse files
nessus_parser = NessusParser()
nessus_parser.parse_multiple_files(valid_files)
# Generate action plans
plan_generator = ActionPlanGenerator(
nessus_parser.vulnerabilities,
consolidate_versions=args.consolidate
)
plan_generator.generate_plans()
# Display summary
plan_generator.print_summary()
# Determine what to export
if args.summary_only:
# Only export summary
plan_generator.export_summary_csv(args.output)
elif args.summary:
# Export both detailed and summary
plan_generator.export_to_csv(args.output)
# Generate summary filename
output_path = Path(args.output)
summary_filename = output_path.stem + '_summary' + output_path.suffix
summary_path = output_path.parent / summary_filename
plan_generator.export_summary_csv(str(summary_path))
else:
# Default: only export detailed plan
plan_generator.export_to_csv(args.output)
print("\nTip: Use --summary flag to also generate a summary CSV")
print("\n✓ Complete!")
return 0
if __name__ == '__main__':
main()
exit(main())