Add PatchReportExtractor.py

This commit is contained in:
2026-02-05 10:13:06 -08:00
parent c7c0c37621
commit f4127fb3f1

506
PatchReportExtractor.py Normal file
View File

@@ -0,0 +1,506 @@
#!/usr/bin/env python3
"""
Extract structured action items from Nessus Plugin 66334 (Patch Report)
This provides pre-packaged, actionable patch recommendations per host
"""
import xml.etree.ElementTree as ET
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict
import csv
import argparse
import re
from pathlib import Path
@dataclass
class PatchAction:
"""Represents a patch action from Plugin 66334"""
action: str
system: str
ip_address: str
vulnerability_count: int
software: str = ""
version_needed: str = ""
critical_count: int = 0
high_count: int = 0
medium_count: int = 0
related_cves: List[str] = None
def __post_init__(self):
if self.related_cves is None:
self.related_cves = []
class PatchReportParser:
"""Parses Plugin 66334 Patch Report data from .nessus files"""
def __init__(self):
self.patch_actions: List[PatchAction] = []
self.all_vulnerabilities: Dict = {} # Store all vulns for cross-reference
self.host_vulnerabilities: Dict = {} # Vulns per host for matching
def parse_file(self, filepath: str):
"""Parse a single .nessus file for Plugin 66334 data"""
try:
tree = ET.parse(filepath)
root = tree.getroot()
for report_host in root.findall('.//ReportHost'):
host_ip = report_host.get('name')
host_properties = report_host.find('HostProperties')
system_name = host_ip
if host_properties is not None:
for tag in host_properties.findall('tag'):
if tag.get('name') == 'host-fqdn':
system_name = tag.text
break
elif tag.get('name') == 'netbios-name' and not system_name != host_ip:
system_name = tag.text
# Store this host's vulnerabilities
host_key = f"{system_name}_{host_ip}"
self.host_vulnerabilities[host_key] = []
# Collect ALL vulnerabilities for this host (for cross-reference)
for item in report_host.findall('ReportItem'):
plugin_id = item.get('pluginID')
plugin_name = item.get('pluginName', '')
severity = int(item.get('severity', 0))
# Handle Plugin 66334 specially (before skipping informational)
if plugin_id == '66334':
self._parse_patch_report_output(item, system_name, host_ip)
continue # Don't add it to host_vulnerabilities
# Skip informational
if severity == 0:
continue
# Collect CVEs
cves = []
for cve in item.findall('cve'):
if cve.text:
cves.append(cve.text)
# Store vulnerability info
vuln_info = {
'plugin_id': plugin_id,
'plugin_name': plugin_name,
'severity': severity,
'cves': cves
}
self.host_vulnerabilities[host_key].append(vuln_info)
# Now cross-reference patch actions with vulnerabilities
self._match_patch_actions_to_vulnerabilities()
print(f"✓ Parsed: {filepath}")
except Exception as e:
print(f"✗ Error parsing {filepath}: {str(e)}")
def _parse_patch_report_output(self, report_item, system_name: str, host_ip: str):
"""Extract structured actions from the patch report output"""
plugin_output = report_item.find('plugin_output')
if plugin_output is None or not plugin_output.text:
return
output = plugin_output.text
# Pattern 1: Microsoft KB patches
# Example with count: - KB5073724 (39 vulnerabilities)
# Example without count: - KB5049613
kb_pattern = r'-\s*(KB\d+)(?:\s*\((\d+)\s+vulnerabilit(?:y|ies)\))?'
kb_matches = re.findall(kb_pattern, output, re.IGNORECASE)
if kb_matches:
for kb, count in kb_matches:
action = PatchAction(
action=f"Install Microsoft patch {kb}",
system=system_name,
ip_address=host_ip,
vulnerability_count=int(count) if count else 0,
software="Microsoft Windows",
version_needed=kb
)
self.patch_actions.append(action)
# Pattern 2: Software upgrades with structured format
# Example:
# [ 7-Zip < 25.01 (249179) ]
# + Action to take : Upgrade to 7-Zip version 25.01 or later.
# +Impact : Taking this action will resolve 5 different vulnerabilities (CVEs).
# Match the software/version line
software_blocks = re.split(r'\n\s*\+\s*Action to take\s*:', output)
for i in range(1, len(software_blocks)):
block = software_blocks[i]
# Extract action description
action_match = re.search(r'^([^\n]+)', block)
if not action_match:
continue
action_text = action_match.group(1).strip()
# Extract impact/vulnerability count
impact_match = re.search(r'resolve\s+(\d+)\s+different vulnerabilit(?:y|ies)', block, re.IGNORECASE)
vuln_count = int(impact_match.group(1)) if impact_match else 0
# Try to extract software name and version from the previous block
prev_block_start = max(0, i - 1)
prev_text = software_blocks[prev_block_start][-200:] # Last 200 chars
software_name = ""
version_needed = ""
# Pattern: [ Software < version (...) ]
sw_match = re.search(r'\[\s*([^\<\[]+?)\s*<\s*([\d.]+)', prev_text)
if sw_match:
software_name = sw_match.group(1).strip()
version_needed = sw_match.group(2).strip()
action = PatchAction(
action=action_text,
system=system_name,
ip_address=host_ip,
vulnerability_count=vuln_count,
software=software_name,
version_needed=version_needed
)
self.patch_actions.append(action)
def _match_patch_actions_to_vulnerabilities(self):
"""Cross-reference patch actions with actual vulnerabilities to get severity counts"""
for action in self.patch_actions:
host_key = f"{action.system}_{action.ip_address}"
if host_key not in self.host_vulnerabilities:
continue
host_vulns = self.host_vulnerabilities[host_key]
# Match vulnerabilities to this action based on:
# 1. KB number in action matches KB in vulnerability
# 2. Software name in action matches software in vulnerability plugin name
# 3. CVEs mentioned in patch report
matched_vulns = []
for vuln in host_vulns:
plugin_name = vuln['plugin_name'].lower()
action_text = action.action.lower()
software_name = action.software.lower()
# Match KB patches
if 'kb' in action_text:
kb_match = re.search(r'kb(\d+)', action_text)
if kb_match:
kb_num = kb_match.group(1)
if f'kb{kb_num}' in plugin_name or kb_num in plugin_name:
matched_vulns.append(vuln)
continue
# Match software name
if software_name and software_name in plugin_name:
# Further validate by checking version if available
if action.version_needed:
# If version is mentioned, check if this vuln relates to older versions
if '<' in plugin_name or 'unsupported' in plugin_name or 'outdated' in plugin_name:
matched_vulns.append(vuln)
else:
matched_vulns.append(vuln)
continue
# Match common software patterns
software_keywords = {
'7-zip': ['7-zip', '7zip'],
'apache': ['apache'],
'log4j': ['log4j'],
'openssh': ['openssh', 'ssh'],
'dell': ['dell'],
'microsoft': ['microsoft', 'windows', r'ms\d{2}-'],
'adobe': ['adobe'],
'air': ['adobe air', 'air'],
'curl': ['curl'],
'edge': ['edge', 'chromium'],
'oracle': ['oracle'],
'java': ['java', 'openjdk', 'jre', 'jdk'],
'rhel': ['rhel', 'red hat'],
'networkmanager': ['networkmanager'],
'bcc': ['bcc'],
}
for key, patterns in software_keywords.items():
if key in software_name or key in action_text:
for pattern in patterns:
if re.search(pattern, plugin_name, re.IGNORECASE):
matched_vulns.append(vuln)
break
# Calculate severity counts from matched vulnerabilities
critical = sum(1 for v in matched_vulns if v['severity'] == 4)
high = sum(1 for v in matched_vulns if v['severity'] == 3)
medium = sum(1 for v in matched_vulns if v['severity'] == 2)
# Collect CVEs
cves = []
for v in matched_vulns:
cves.extend(v['cves'])
# Update action with severity info
action.critical_count = critical
action.high_count = high
action.medium_count = medium
action.related_cves = list(set(cves)) # Unique CVEs
def parse_multiple_files(self, filepaths: List[str]):
"""Parse multiple .nessus files"""
for filepath in filepaths:
self.parse_file(filepath)
print(f"\nTotal patch actions extracted: {len(self.patch_actions)}")
def export_to_csv(self, output_file: str):
"""Export patch actions to CSV"""
if not self.patch_actions:
print("No patch actions found.")
return
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Action',
'System',
'IP Address',
'CVE Reduction Count',
'Critical Reduction',
'High Reduction',
'Medium Reduction',
'Software',
'Version Needed',
'System Impact Risk',
'Recommended IFC',
'3rd Party Software',
'Responsible Party',
'Documentation',
'Comments'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for action in self.patch_actions:
writer.writerow({
'Action': action.action,
'System': action.system,
'IP Address': action.ip_address,
'CVE Reduction Count': len(action.related_cves),
'Critical Reduction': action.critical_count,
'High Reduction': action.high_count,
'Medium Reduction': action.medium_count,
'Software': action.software,
'Version Needed': action.version_needed,
'System Impact Risk': '',
'Recommended IFC': '',
'3rd Party Software': '',
'Responsible Party': '',
'Documentation': '',
'Comments': ''
})
print(f"\n✓ Patch actions exported to: {output_file}")
print(f" Total action items: {len(self.patch_actions)}")
def export_summary_csv(self, output_file: str):
"""Export summary of patch actions aggregated by action type"""
if not self.patch_actions:
print("No patch actions found.")
return
# Aggregate by action
action_summary = defaultdict(lambda: {
'systems': set(),
'total_vulns': 0,
'total_cves': set(),
'critical': 0,
'high': 0,
'medium': 0
})
for action in self.patch_actions:
key = action.action
action_summary[key]['systems'].add(action.system)
action_summary[key]['total_vulns'] += action.vulnerability_count
action_summary[key]['total_cves'].update(action.related_cves)
action_summary[key]['critical'] += action.critical_count
action_summary[key]['high'] += action.high_count
action_summary[key]['medium'] += action.medium_count
# Create summary list
summary_list = []
total_critical = sum(a.critical_count for a in self.patch_actions)
total_high = sum(a.high_count for a in self.patch_actions)
total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves))
for action_text, data in action_summary.items():
summary_list.append({
'action': action_text,
'system_count': len(data['systems']),
'cve_count': len(data['total_cves']),
'cve_percent': (len(data['total_cves']) / total_cves * 100) if total_cves > 0 else 0,
'critical': data['critical'],
'critical_percent': (data['critical'] / total_critical * 100) if total_critical > 0 else 0,
'high': data['high'],
'high_percent': (data['high'] / total_high * 100) if total_high > 0 else 0
})
# Sort by critical then high
summary_list.sort(key=lambda x: (x['critical'], x['high'], x['cve_count']), reverse=True)
# Write summary CSV
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Action',
'System Count',
'CVE Reduction',
'% of Total CVE Reduction',
'Critical Reduction',
'% of Total Critical Reduction',
'High Reduction',
'% of Total High Reduction'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in summary_list:
writer.writerow({
'Action': item['action'],
'System Count': item['system_count'],
'CVE Reduction': item['cve_count'],
'% of Total CVE Reduction': f"{item['cve_percent']:.1f}%",
'Critical Reduction': item['critical'],
'% of Total Critical Reduction': f"{item['critical_percent']:.1f}%",
'High Reduction': item['high'],
'% of Total High Reduction': f"{item['high_percent']:.1f}%"
})
print(f"\n✓ Summary exported to: {output_file}")
print(f" Total unique actions: {len(summary_list)}")
print(f" Total CVEs: {total_cves}")
print(f" Total Critical vulnerabilities: {total_critical}")
print(f" Total High vulnerabilities: {total_high}")
def print_summary(self):
"""Print a summary of patch actions"""
if not self.patch_actions:
print("No patch actions found.")
return
print("\n" + "="*80)
print("PATCH REPORT ACTION PLAN SUMMARY (Plugin 66334)")
print("="*80)
total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves))
total_critical = sum(a.critical_count for a in self.patch_actions)
total_high = sum(a.high_count for a in self.patch_actions)
unique_systems = len(set(a.system for a in self.patch_actions))
print(f"\nTotal Actions: {len(self.patch_actions)}")
print(f"Unique Systems: {unique_systems}")
print(f"Total CVEs Addressed: {total_cves}")
print(f"Total Critical Vulnerabilities: {total_critical}")
print(f"Total High Vulnerabilities: {total_high}")
print(f"\n{'Priority':<10} {'Action':<45} {'System':<15} {'Crit':<6} {'High':<6} {'CVEs':<6}")
print("-"*95)
# Sort by critical then high
sorted_actions = sorted(
self.patch_actions,
key=lambda x: (x.critical_count, x.high_count, len(x.related_cves)),
reverse=True
)
for i, action in enumerate(sorted_actions[:15], 1):
action_short = action.action[:42] + "..." if len(action.action) > 45 else action.action
system_short = action.system[:12] + "..." if len(action.system) > 15 else action.system
cve_count = len(action.related_cves)
print(f"{i:<10} {action_short:<45} {system_short:<15} {action.critical_count:<6} {action.high_count:<6} {cve_count:<6}")
if len(sorted_actions) > 15:
print(f"\n... and {len(sorted_actions) - 15} more actions")
def main():
parser = argparse.ArgumentParser(
description='Extract structured patch actions from Nessus Plugin 66334 (Patch Report)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s scan1.nessus scan2.nessus -o patch_actions.csv
%(prog)s *.nessus --summary -o patches.csv
"""
)
parser.add_argument(
'nessus_files',
nargs='+',
help='One or more .nessus files to process'
)
parser.add_argument(
'-o', '--output',
default='patch_report_actions.csv',
help='Output CSV file (default: patch_report_actions.csv)'
)
parser.add_argument(
'--summary',
action='store_true',
help='Also generate a summary CSV file'
)
args = parser.parse_args()
# Validate input files
valid_files = []
for filepath in args.nessus_files:
if Path(filepath).exists():
valid_files.append(filepath)
else:
print(f"Warning: File not found: {filepath}")
if not valid_files:
print("Error: No valid .nessus files provided")
return 1
print(f"\nExtracting Patch Report data from {len(valid_files)} .nessus file(s)...\n")
# Parse files
parser_obj = PatchReportParser()
parser_obj.parse_multiple_files(valid_files)
# Display summary
parser_obj.print_summary()
# Export detailed actions
parser_obj.export_to_csv(args.output)
# Optionally export summary
if args.summary:
output_path = Path(args.output)
summary_filename = output_path.stem + '_summary' + output_path.suffix
summary_path = output_path.parent / summary_filename
parser_obj.export_summary_csv(str(summary_path))
print("\n✓ Complete!")
return 0
if __name__ == '__main__':
exit(main())