Files
Tips-Tricks/PatchReportExtractor.py
2026-02-05 10:25:51 -08:00

529 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Extract structured action items from Nessus Plugin 66334 (Patch Report)
This provides pre-packaged, actionable patch recommendations per host
"""
import xml.etree.ElementTree as ET
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict
import csv
import argparse
import re
from pathlib import Path
@dataclass
class PatchAction:
"""Represents a patch action from Plugin 66334"""
action: str
system: str
ip_address: str
vulnerability_count: int
software: str = ""
version_needed: str = ""
critical_count: int = 0
high_count: int = 0
medium_count: int = 0
related_cves: List[str] = None
def __post_init__(self):
if self.related_cves is None:
self.related_cves = []
class PatchReportParser:
"""Parses Plugin 66334 Patch Report data from .nessus files"""
def __init__(self):
self.patch_actions: List[PatchAction] = []
self.all_vulnerabilities: Dict = {} # Store all vulns for cross-reference
self.host_vulnerabilities: Dict = {} # Vulns per host for matching
def parse_file(self, filepath: str):
"""Parse a single .nessus file for Plugin 66334 data"""
try:
tree = ET.parse(filepath)
root = tree.getroot()
for report_host in root.findall('.//ReportHost'):
host_ip = report_host.get('name')
host_properties = report_host.find('HostProperties')
system_name = host_ip
if host_properties is not None:
for tag in host_properties.findall('tag'):
if tag.get('name') == 'host-fqdn':
system_name = tag.text
break
elif tag.get('name') == 'netbios-name' and not system_name != host_ip:
system_name = tag.text
# Store this host's vulnerabilities
host_key = f"{system_name}_{host_ip}"
self.host_vulnerabilities[host_key] = []
# Collect ALL vulnerabilities for this host (for cross-reference)
for item in report_host.findall('ReportItem'):
plugin_id = item.get('pluginID')
plugin_name = item.get('pluginName', '')
severity = int(item.get('severity', 0))
# Handle Plugin 66334 specially (before skipping informational)
if plugin_id == '66334':
self._parse_patch_report_output(item, system_name, host_ip)
continue # Don't add it to host_vulnerabilities
# Skip informational
if severity == 0:
continue
# Collect CVEs
cves = []
for cve in item.findall('cve'):
if cve.text:
cves.append(cve.text)
# Store vulnerability info
vuln_info = {
'plugin_id': plugin_id,
'plugin_name': plugin_name,
'severity': severity,
'cves': cves
}
self.host_vulnerabilities[host_key].append(vuln_info)
# Now cross-reference patch actions with vulnerabilities
self._match_patch_actions_to_vulnerabilities()
print(f"✓ Parsed: {filepath}")
except Exception as e:
print(f"✗ Error parsing {filepath}: {str(e)}")
def _parse_patch_report_output(self, report_item, system_name: str, host_ip: str):
"""Extract structured actions from the patch report output"""
plugin_output = report_item.find('plugin_output')
if plugin_output is None or not plugin_output.text:
return
output = plugin_output.text
# Pattern 1: Microsoft KB patches
# Example with count: - KB5073724 (39 vulnerabilities)
# Example without count: - KB5049613
kb_pattern = r'-\s*(KB\d+)(?:\s*\((\d+)\s+vulnerabilit(?:y|ies)\))?'
kb_matches = re.findall(kb_pattern, output, re.IGNORECASE)
if kb_matches:
for kb, count in kb_matches:
action = PatchAction(
action=f"Install Microsoft patch {kb}",
system=system_name,
ip_address=host_ip,
vulnerability_count=int(count) if count else 0,
software="Microsoft Windows",
version_needed=kb
)
self.patch_actions.append(action)
# Pattern 2: Software upgrades with structured format
# Example:
# [ 7-Zip < 25.01 (249179) ]
# + Action to take : Upgrade to 7-Zip version 25.01 or later.
# +Impact : Taking this action will resolve 5 different vulnerabilities (CVEs).
# Match the software/version line
software_blocks = re.split(r'\n\s*\+\s*Action to take\s*:', output)
for i in range(1, len(software_blocks)):
block = software_blocks[i]
# Extract action description
action_match = re.search(r'^([^\n]+)', block)
if not action_match:
continue
action_text = action_match.group(1).strip()
# Extract impact/vulnerability count
impact_match = re.search(r'resolve\s+(\d+)\s+different vulnerabilit(?:y|ies)', block, re.IGNORECASE)
vuln_count = int(impact_match.group(1)) if impact_match else 0
# Try to extract software name and version from the previous block
prev_block_start = max(0, i - 1)
prev_text = software_blocks[prev_block_start][-200:] # Last 200 chars
software_name = ""
version_needed = ""
# Pattern: [ Software < version (...) ]
sw_match = re.search(r'\[\s*([^\<\[]+?)\s*<\s*([\d.]+)', prev_text)
if sw_match:
software_name = sw_match.group(1).strip()
version_needed = sw_match.group(2).strip()
action = PatchAction(
action=action_text,
system=system_name,
ip_address=host_ip,
vulnerability_count=vuln_count,
software=software_name,
version_needed=version_needed
)
self.patch_actions.append(action)
def _match_patch_actions_to_vulnerabilities(self):
"""Cross-reference patch actions with actual vulnerabilities to get severity counts"""
for action in self.patch_actions:
host_key = f"{action.system}_{action.ip_address}"
if host_key not in self.host_vulnerabilities:
continue
host_vulns = self.host_vulnerabilities[host_key]
# Match vulnerabilities to this action based on:
# 1. KB number in action matches KB in vulnerability
# 2. Specific package name for RHEL (e.g., "java-1.8.0-openjdk", "firefox")
# 3. Software name and version for general software
# 4. CVEs mentioned in patch report
matched_vulns = []
action_text_lower = action.action.lower()
software_name_lower = action.software.lower()
for vuln in host_vulns:
plugin_name = vuln['plugin_name'].lower()
# PRIORITY 1: Match KB patches by KB number
if 'kb' in action_text_lower:
kb_match = re.search(r'kb(\d+)', action_text_lower)
if kb_match:
kb_num = kb_match.group(1)
if f'kb{kb_num}' in plugin_name or f'kb {kb_num}' in plugin_name:
matched_vulns.append(vuln)
continue
# PRIORITY 2: Match RHEL packages by specific package name
# Extract package name from action like "Update the RHEL firefox package..."
if 'rhel' in action_text_lower or 'rhsa' in action_text_lower:
# Extract package name from patterns like:
# "Update the RHEL firefox package based on..."
# "Update the RHEL java-1.8.0-openjdk package..."
package_match = re.search(r'rhel\s+([a-z0-9\-._]+)\s+package', action_text_lower)
if package_match:
package_name = package_match.group(1)
# Match if the package name appears in the vulnerability plugin name
if package_name in plugin_name:
matched_vulns.append(vuln)
continue
# Also try to extract RHSA number for exact matching
rhsa_match = re.search(r'rhsa[-:](\d{4}:\d+)', action_text_lower)
if rhsa_match:
rhsa_num = rhsa_match.group(1)
if rhsa_num in plugin_name or rhsa_num.replace(':', '-') in plugin_name:
matched_vulns.append(vuln)
continue
# If no specific package found, skip broad RHEL matching
# This prevents matching all RHEL vulns to every RHEL action
continue
# PRIORITY 3: Match by specific software name with version validation
if software_name_lower and software_name_lower in plugin_name:
# Further validate by checking version if available
if action.version_needed:
# If version is mentioned, check if this vuln relates to older versions
if '<' in plugin_name or '<=' in plugin_name or 'unsupported' in plugin_name or 'outdated' in plugin_name:
matched_vulns.append(vuln)
continue
else:
# No version requirement, but software name matches
matched_vulns.append(vuln)
continue
# PRIORITY 4: Match common software patterns (only for non-RHEL)
# This should be specific, not catch-all
if 'rhel' not in action_text_lower and 'rhsa' not in action_text_lower:
software_keywords = {
'7-zip': ['7-zip', '7zip'],
'adobe air': ['adobe air', 'adobe.*air'],
'curl': [r'\bcurl\b'],
'edge': ['microsoft edge', 'edge.*chromium'],
'log4j': ['log4j'],
'java': [r'java se\b', r'oracle java\b', r'openjdk\b'],
}
for key, patterns in software_keywords.items():
if key in software_name_lower or key in action_text_lower:
for pattern in patterns:
if re.search(pattern, plugin_name, re.IGNORECASE):
matched_vulns.append(vuln)
break
# Calculate severity counts from matched vulnerabilities
critical = sum(1 for v in matched_vulns if v['severity'] == 4)
high = sum(1 for v in matched_vulns if v['severity'] == 3)
medium = sum(1 for v in matched_vulns if v['severity'] == 2)
# Collect CVEs
cves = []
for v in matched_vulns:
cves.extend(v['cves'])
# Update action with severity info
action.critical_count = critical
action.high_count = high
action.medium_count = medium
action.related_cves = list(set(cves)) # Unique CVEs
def parse_multiple_files(self, filepaths: List[str]):
"""Parse multiple .nessus files"""
for filepath in filepaths:
self.parse_file(filepath)
print(f"\nTotal patch actions extracted: {len(self.patch_actions)}")
def export_to_csv(self, output_file: str):
"""Export patch actions to CSV"""
if not self.patch_actions:
print("No patch actions found.")
return
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Action',
'System',
'IP Address',
'CVE Reduction Count',
'Critical Reduction',
'High Reduction',
'Medium Reduction',
'Software',
'Version Needed',
'System Impact Risk',
'Recommended IFC',
'3rd Party Software',
'Responsible Party',
'Documentation',
'Comments'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for action in self.patch_actions:
writer.writerow({
'Action': action.action,
'System': action.system,
'IP Address': action.ip_address,
'CVE Reduction Count': len(action.related_cves),
'Critical Reduction': action.critical_count,
'High Reduction': action.high_count,
'Medium Reduction': action.medium_count,
'Software': action.software,
'Version Needed': action.version_needed,
'System Impact Risk': '',
'Recommended IFC': '',
'3rd Party Software': '',
'Responsible Party': '',
'Documentation': '',
'Comments': ''
})
print(f"\n✓ Patch actions exported to: {output_file}")
print(f" Total action items: {len(self.patch_actions)}")
def export_summary_csv(self, output_file: str):
"""Export summary of patch actions aggregated by action type"""
if not self.patch_actions:
print("No patch actions found.")
return
# Aggregate by action
action_summary = defaultdict(lambda: {
'systems': set(),
'total_vulns': 0,
'total_cves': set(),
'critical': 0,
'high': 0,
'medium': 0
})
for action in self.patch_actions:
key = action.action
action_summary[key]['systems'].add(action.system)
action_summary[key]['total_vulns'] += action.vulnerability_count
action_summary[key]['total_cves'].update(action.related_cves)
action_summary[key]['critical'] += action.critical_count
action_summary[key]['high'] += action.high_count
action_summary[key]['medium'] += action.medium_count
# Create summary list
summary_list = []
total_critical = sum(a.critical_count for a in self.patch_actions)
total_high = sum(a.high_count for a in self.patch_actions)
total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves))
for action_text, data in action_summary.items():
summary_list.append({
'action': action_text,
'system_count': len(data['systems']),
'cve_count': len(data['total_cves']),
'cve_percent': (len(data['total_cves']) / total_cves * 100) if total_cves > 0 else 0,
'critical': data['critical'],
'critical_percent': (data['critical'] / total_critical * 100) if total_critical > 0 else 0,
'high': data['high'],
'high_percent': (data['high'] / total_high * 100) if total_high > 0 else 0
})
# Sort by critical then high
summary_list.sort(key=lambda x: (x['critical'], x['high'], x['cve_count']), reverse=True)
# Write summary CSV
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Action',
'System Count',
'CVE Reduction',
'% of Total CVE Reduction',
'Critical Reduction',
'% of Total Critical Reduction',
'High Reduction',
'% of Total High Reduction'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in summary_list:
writer.writerow({
'Action': item['action'],
'System Count': item['system_count'],
'CVE Reduction': item['cve_count'],
'% of Total CVE Reduction': f"{item['cve_percent']:.1f}%",
'Critical Reduction': item['critical'],
'% of Total Critical Reduction': f"{item['critical_percent']:.1f}%",
'High Reduction': item['high'],
'% of Total High Reduction': f"{item['high_percent']:.1f}%"
})
print(f"\n✓ Summary exported to: {output_file}")
print(f" Total unique actions: {len(summary_list)}")
print(f" Total CVEs: {total_cves}")
print(f" Total Critical vulnerabilities: {total_critical}")
print(f" Total High vulnerabilities: {total_high}")
def print_summary(self):
"""Print a summary of patch actions"""
if not self.patch_actions:
print("No patch actions found.")
return
print("\n" + "="*80)
print("PATCH REPORT ACTION PLAN SUMMARY (Plugin 66334)")
print("="*80)
total_cves = len(set(cve for a in self.patch_actions for cve in a.related_cves))
total_critical = sum(a.critical_count for a in self.patch_actions)
total_high = sum(a.high_count for a in self.patch_actions)
unique_systems = len(set(a.system for a in self.patch_actions))
print(f"\nTotal Actions: {len(self.patch_actions)}")
print(f"Unique Systems: {unique_systems}")
print(f"Total CVEs Addressed: {total_cves}")
print(f"Total Critical Vulnerabilities: {total_critical}")
print(f"Total High Vulnerabilities: {total_high}")
print(f"\n{'Priority':<10} {'Action':<45} {'System':<15} {'Crit':<6} {'High':<6} {'CVEs':<6}")
print("-"*95)
# Sort by critical then high
sorted_actions = sorted(
self.patch_actions,
key=lambda x: (x.critical_count, x.high_count, len(x.related_cves)),
reverse=True
)
for i, action in enumerate(sorted_actions[:15], 1):
action_short = action.action[:42] + "..." if len(action.action) > 45 else action.action
system_short = action.system[:12] + "..." if len(action.system) > 15 else action.system
cve_count = len(action.related_cves)
print(f"{i:<10} {action_short:<45} {system_short:<15} {action.critical_count:<6} {action.high_count:<6} {cve_count:<6}")
if len(sorted_actions) > 15:
print(f"\n... and {len(sorted_actions) - 15} more actions")
def main():
parser = argparse.ArgumentParser(
description='Extract structured patch actions from Nessus Plugin 66334 (Patch Report)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s scan1.nessus scan2.nessus -o patch_actions.csv
%(prog)s *.nessus --summary -o patches.csv
"""
)
parser.add_argument(
'nessus_files',
nargs='+',
help='One or more .nessus files to process'
)
parser.add_argument(
'-o', '--output',
default='patch_report_actions.csv',
help='Output CSV file (default: patch_report_actions.csv)'
)
parser.add_argument(
'--summary',
action='store_true',
help='Also generate a summary CSV file'
)
args = parser.parse_args()
# Validate input files
valid_files = []
for filepath in args.nessus_files:
if Path(filepath).exists():
valid_files.append(filepath)
else:
print(f"Warning: File not found: {filepath}")
if not valid_files:
print("Error: No valid .nessus files provided")
return 1
print(f"\nExtracting Patch Report data from {len(valid_files)} .nessus file(s)...\n")
# Parse files
parser_obj = PatchReportParser()
parser_obj.parse_multiple_files(valid_files)
# Display summary
parser_obj.print_summary()
# Export detailed actions
parser_obj.export_to_csv(args.output)
# Optionally export summary
if args.summary:
output_path = Path(args.output)
summary_filename = output_path.stem + '_summary' + output_path.suffix
summary_path = output_path.parent / summary_filename
parser_obj.export_summary_csv(str(summary_path))
print("\n✓ Complete!")
return 0
if __name__ == '__main__':
exit(main())