import dataclasses import json import logging import pathlib from datetime import datetime, timezone from typing import Dict, Optional import click from docgen_types import Plugin from datahub.ingestion.api.decorators import SupportStatus from datahub.ingestion.source.source_registry import source_registry logger = logging.getLogger(__name__) DENY_LIST = { "snowflake-summary", "snowflake-queries", "bigquery-queries", } def load_plugin_capabilities(plugin_name: str) -> Optional[Plugin]: """Load plugin capabilities without generating full documentation.""" logger.debug(f"Loading capabilities for {plugin_name}") try: class_or_exception = source_registry._ensure_not_lazy(plugin_name) if isinstance(class_or_exception, Exception): # Log the specific error but don't re-raise it logger.warning(f"Plugin {plugin_name} failed to load: {class_or_exception}") return None source_type = source_registry.get(plugin_name) logger.debug(f"Source class is {source_type}") if hasattr(source_type, "get_platform_name"): platform_name = source_type.get_platform_name() else: platform_name = plugin_name.title() platform_id = None if hasattr(source_type, "get_platform_id"): platform_id = source_type.get_platform_id() if platform_id is None: logger.warning(f"Platform ID not found for {plugin_name}") return None plugin = Plugin( name=plugin_name, platform_id=platform_id, platform_name=platform_name, classname=".".join([source_type.__module__, source_type.__name__]), ) if hasattr(source_type, "get_support_status"): plugin.support_status = source_type.get_support_status() if hasattr(source_type, "get_capabilities"): capabilities = list(source_type.get_capabilities()) if capabilities: capabilities.sort(key=lambda x: x.capability.value) plugin.capabilities = capabilities else: logger.debug(f"No capabilities defined for {plugin_name}") plugin.capabilities = [] else: logger.debug(f"No get_capabilities method for {plugin_name}") plugin.capabilities = [] return plugin except Exception as e: logger.warning(f"Failed to load capabilities for {plugin_name}: {e}") return None @dataclasses.dataclass class CapabilitySummary: """Summary of capabilities across all plugins.""" plugin_details: Dict[str, Dict] # plugin_name -> detailed info def generate_capability_summary() -> CapabilitySummary: """Generate a comprehensive summary of capabilities across all plugins.""" plugin_details: Dict[str, Dict] = {} for plugin_name in sorted(source_registry.mapping.keys()): if plugin_name in DENY_LIST: logger.info(f"Skipping {plugin_name} as it is on the deny list") continue plugin = load_plugin_capabilities(plugin_name) if plugin is None: continue plugin_details[plugin_name] = { "platform_id": plugin.platform_id, "platform_name": plugin.platform_name, "classname": plugin.classname, "support_status": plugin.support_status.name if plugin.support_status != SupportStatus.UNKNOWN else None, "capabilities": [], } if plugin.capabilities: for cap_setting in plugin.capabilities: capability_name = cap_setting.capability.name plugin_details[plugin_name]["capabilities"].append( { "capability": capability_name, "supported": cap_setting.supported, "description": cap_setting.description, } ) return CapabilitySummary( plugin_details=plugin_details, ) def save_capability_report(summary: CapabilitySummary, output_dir: str) -> None: """Save the capability summary as JSON files, but only write if contents have changed.""" output_path = pathlib.Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) existing_capabilities = {} existing_summary_file = pathlib.Path( "./src/datahub/ingestion/autogenerated/capability_summary.json" ) if existing_summary_file.exists(): try: with open(existing_summary_file, "r") as f: existing_data = json.load(f) existing_capabilities = existing_data.get("plugin_details", {}) logger.info( f"Loaded existing capability data for {len(existing_capabilities)} plugins" ) except Exception as e: logger.warning(f"Failed to load existing capability data: {e}") missing_plugins = set(existing_capabilities.keys()) - set( summary.plugin_details.keys() ) for plugin_name in missing_plugins: logger.warning( f"Plugin {plugin_name} failed to load, using existing capability data as fallback. Manually remove from capability_summary.json if you want to remove it from the report." ) summary.plugin_details[plugin_name] = existing_capabilities[plugin_name] summary_dict = dataclasses.asdict(summary) summary_dict["generated_by"] = "metadata-ingestion/scripts/capability_summary.py" summary_dict["generated_at"] = datetime.now(timezone.utc).isoformat() summary_json = json.dumps(summary_dict, indent=2, sort_keys=True) summary_file = output_path / "capability_summary.json" write_file = True if summary_file.exists(): try: with open(summary_file, "r") as f: existing_data = json.load(f) # Create copies without generated_at for comparison existing_for_comparison = existing_data.copy() new_for_comparison = summary_dict.copy() existing_for_comparison.pop("generated_at", None) new_for_comparison.pop("generated_at", None) if json.dumps( existing_for_comparison, indent=2, sort_keys=True ) == json.dumps(new_for_comparison, indent=2, sort_keys=True): logger.info(f"No changes detected in {summary_file}, skipping write.") write_file = False except Exception as e: logger.warning(f"Could not read existing summary file: {e}") if write_file: with open(summary_file, "w") as f: f.write(summary_json) logger.info(f"Capability summary saved to {summary_file}") @click.command() @click.option( "--output-dir", type=str, default="./autogenerated", help="Output directory for capability reports", ) @click.option( "--source", type=str, required=False, help="Generate report for specific source only", ) def generate_capability_report(output_dir: str, source: Optional[str] = None) -> None: """Generate a comprehensive capability report for all ingestion sources.""" logger.info("Starting capability report generation...") if source: if source not in source_registry.mapping: logger.error(f"Source '{source}' not found in registry") return original_mapping = source_registry.mapping.copy() source_registry.mapping = {source: original_mapping[source]} try: summary = generate_capability_summary() save_capability_report(summary, output_dir) print("Capability Report Generation Complete") print("=====================================") print(f"Total plugins processed: {len(summary.plugin_details)}") print(f"Plugins with capabilities: {len(summary.plugin_details)}") print(f"Output directory: {output_dir}") finally: if source: source_registry.mapping = original_mapping if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="[%(asctime)s %(levelname)-8s {%(name)s:%(lineno)d}] - %(message)s", datefmt="%Y-%m-%d %H:%M:%S %Z", ) generate_capability_report()