datahub/metadata-ingestion/scripts/capability_summary.py

221 lines
7.4 KiB
Python
Raw Normal View History

import dataclasses
import json
import logging
import pathlib
from datetime import datetime, timezone
from typing import Dict, Optional
import click
from docgen_types import Plugin
from utils import should_write_json_file
from datahub.ingestion.api.decorators import SupportStatus
from datahub.ingestion.source.source_registry import source_registry
logger = logging.getLogger(__name__)
DENY_LIST = {
"snowflake-summary",
"snowflake-queries",
"bigquery-queries",
}
def load_plugin_capabilities(plugin_name: str) -> Optional[Plugin]:
"""Load plugin capabilities without generating full documentation."""
logger.debug(f"Loading capabilities for {plugin_name}")
try:
class_or_exception = source_registry._ensure_not_lazy(plugin_name)
if isinstance(class_or_exception, Exception):
# Log the specific error but don't re-raise it
logger.warning(f"Plugin {plugin_name} failed to load: {class_or_exception}")
return None
source_type = source_registry.get(plugin_name)
logger.debug(f"Source class is {source_type}")
if hasattr(source_type, "get_platform_name"):
platform_name = source_type.get_platform_name()
else:
platform_name = plugin_name.title()
platform_id = None
if hasattr(source_type, "get_platform_id"):
platform_id = source_type.get_platform_id()
if platform_id is None:
logger.warning(f"Platform ID not found for {plugin_name}")
return None
plugin = Plugin(
name=plugin_name,
platform_id=platform_id,
platform_name=platform_name,
classname=".".join([source_type.__module__, source_type.__name__]),
)
if hasattr(source_type, "get_support_status"):
plugin.support_status = source_type.get_support_status()
if hasattr(source_type, "get_capabilities"):
capabilities = list(source_type.get_capabilities())
if capabilities:
capabilities.sort(key=lambda x: x.capability.value)
plugin.capabilities = capabilities
else:
logger.debug(f"No capabilities defined for {plugin_name}")
plugin.capabilities = []
else:
logger.debug(f"No get_capabilities method for {plugin_name}")
plugin.capabilities = []
return plugin
except Exception as e:
logger.warning(f"Failed to load capabilities for {plugin_name}: {e}")
return None
@dataclasses.dataclass
class CapabilitySummary:
"""Summary of capabilities across all plugins."""
plugin_details: Dict[str, Dict] # plugin_name -> detailed info
def generate_capability_summary() -> CapabilitySummary:
"""Generate a comprehensive summary of capabilities across all plugins."""
plugin_details: Dict[str, Dict] = {}
for plugin_name in sorted(source_registry.mapping.keys()):
if plugin_name in DENY_LIST:
logger.info(f"Skipping {plugin_name} as it is on the deny list")
continue
plugin = load_plugin_capabilities(plugin_name)
if plugin is None:
continue
plugin_details[plugin_name] = {
"platform_id": plugin.platform_id,
"platform_name": plugin.platform_name,
"classname": plugin.classname,
"support_status": plugin.support_status.name
if plugin.support_status != SupportStatus.UNKNOWN
else None,
"capabilities": [],
}
if plugin.capabilities:
for cap_setting in plugin.capabilities:
capability_name = cap_setting.capability.name
plugin_details[plugin_name]["capabilities"].append(
{
"capability": capability_name,
"supported": cap_setting.supported,
"description": cap_setting.description,
}
)
return CapabilitySummary(
plugin_details=plugin_details,
)
def save_capability_report(summary: CapabilitySummary, output_dir: str) -> None:
"""Save the capability summary as JSON files, but only write if contents have changed."""
output_path = pathlib.Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
existing_capabilities = {}
existing_summary_file = pathlib.Path(
"./src/datahub/ingestion/autogenerated/capability_summary.json"
)
if existing_summary_file.exists():
try:
with open(existing_summary_file, "r") as f:
existing_data = json.load(f)
existing_capabilities = existing_data.get("plugin_details", {})
logger.info(
f"Loaded existing capability data for {len(existing_capabilities)} plugins"
)
except Exception as e:
logger.warning(f"Failed to load existing capability data: {e}")
missing_plugins = set(existing_capabilities.keys()) - set(
summary.plugin_details.keys()
)
for plugin_name in missing_plugins:
logger.warning(
f"Plugin {plugin_name} failed to load, using existing capability data as fallback. Manually remove from capability_summary.json if you want to remove it from the report."
)
summary.plugin_details[plugin_name] = existing_capabilities[plugin_name]
summary_dict = dataclasses.asdict(summary)
summary_dict["generated_by"] = "metadata-ingestion/scripts/capability_summary.py"
summary_dict["generated_at"] = datetime.now(timezone.utc).isoformat()
summary_json = json.dumps(summary_dict, indent=2, sort_keys=True)
summary_file = output_path / "capability_summary.json"
write_file = should_write_json_file(
summary_file, summary_dict, "capability summary file"
)
if write_file:
with open(summary_file, "w") as f:
f.write(summary_json)
logger.info(f"Capability summary saved to {summary_file}")
@click.command()
@click.option(
"--output-dir",
type=str,
default="./autogenerated",
help="Output directory for capability reports",
)
@click.option(
"--source",
type=str,
required=False,
help="Generate report for specific source only",
)
def generate_capability_report(output_dir: str, source: Optional[str] = None) -> None:
"""Generate a comprehensive capability report for all ingestion sources."""
logger.info("Starting capability report generation...")
if source:
if source not in source_registry.mapping:
logger.error(f"Source '{source}' not found in registry")
return
original_mapping = source_registry.mapping.copy()
source_registry.mapping = {source: original_mapping[source]}
try:
summary = generate_capability_summary()
save_capability_report(summary, output_dir)
print("Capability Report Generation Complete")
print("=====================================")
print(f"Total plugins processed: {len(summary.plugin_details)}")
print(f"Plugins with capabilities: {len(summary.plugin_details)}")
print(f"Output directory: {output_dir}")
finally:
if source:
source_registry.mapping = original_mapping
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s %(levelname)-8s {%(name)s:%(lineno)d}] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %Z",
)
generate_capability_report()