datahub/metadata-ingestion/scripts/capability_summary.py

224 lines
7.6 KiB
Python

import dataclasses
import json
import logging
import pathlib
from datetime import datetime, timezone
from typing import Dict, Optional
import click
from docgen_types import Plugin
from utils import should_write_json_file
from datahub.ingestion.api.decorators import SupportStatus
from datahub.ingestion.source.source_registry import source_registry
logger = logging.getLogger(__name__)
DENY_LIST = {
"snowflake-summary",
"snowflake-queries",
"bigquery-queries",
}
def load_plugin_capabilities(plugin_name: str) -> Optional[Plugin]:
"""Load plugin capabilities without generating full documentation."""
logger.debug(f"Loading capabilities for {plugin_name}")
try:
class_or_exception = source_registry._ensure_not_lazy(plugin_name)
if isinstance(class_or_exception, Exception):
# Log the specific error but don't re-raise it
logger.warning(f"Plugin {plugin_name} failed to load: {class_or_exception}")
return None
source_type = source_registry.get(plugin_name)
logger.debug(f"Source class is {source_type}")
if hasattr(source_type, "get_platform_name"):
platform_name = source_type.get_platform_name()
else:
platform_name = plugin_name.title()
platform_id = None
if hasattr(source_type, "get_platform_id"):
platform_id = source_type.get_platform_id()
if platform_id is None:
logger.warning(f"Platform ID not found for {plugin_name}")
return None
plugin = Plugin(
name=plugin_name,
platform_id=platform_id,
platform_name=platform_name,
classname=".".join([source_type.__module__, source_type.__name__]),
)
if hasattr(source_type, "get_support_status"):
plugin.support_status = source_type.get_support_status()
if hasattr(source_type, "get_capabilities"):
capabilities = list(source_type.get_capabilities())
if capabilities:
capabilities.sort(key=lambda x: x.capability.value)
plugin.capabilities = capabilities
else:
logger.debug(f"No capabilities defined for {plugin_name}")
plugin.capabilities = []
else:
logger.debug(f"No get_capabilities method for {plugin_name}")
plugin.capabilities = []
return plugin
except Exception as e:
logger.warning(f"Failed to load capabilities for {plugin_name}: {e}")
return None
@dataclasses.dataclass
class CapabilitySummary:
"""Summary of capabilities across all plugins."""
plugin_details: Dict[str, Dict] # plugin_name -> detailed info
def generate_capability_summary() -> CapabilitySummary:
"""Generate a comprehensive summary of capabilities across all plugins."""
plugin_details: Dict[str, Dict] = {}
for plugin_name in sorted(source_registry.mapping.keys()):
if plugin_name in DENY_LIST:
logger.info(f"Skipping {plugin_name} as it is on the deny list")
continue
plugin = load_plugin_capabilities(plugin_name)
if plugin is None:
continue
plugin_details[plugin_name] = {
"platform_id": plugin.platform_id,
"platform_name": plugin.platform_name,
"classname": plugin.classname,
"support_status": plugin.support_status.name
if plugin.support_status != SupportStatus.UNKNOWN
else None,
"capabilities": [],
}
if plugin.capabilities:
for cap_setting in plugin.capabilities:
capability_name = cap_setting.capability.name
plugin_details[plugin_name]["capabilities"].append(
{
"capability": capability_name,
"supported": cap_setting.supported,
"description": cap_setting.description,
"subtype_modifier": [m for m in cap_setting.subtype_modifier]
if cap_setting.subtype_modifier
else None,
}
)
return CapabilitySummary(
plugin_details=plugin_details,
)
def save_capability_report(summary: CapabilitySummary, output_dir: str) -> None:
"""Save the capability summary as JSON files, but only write if contents have changed."""
output_path = pathlib.Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
existing_capabilities = {}
existing_summary_file = pathlib.Path(
"./src/datahub/ingestion/autogenerated/capability_summary.json"
)
if existing_summary_file.exists():
try:
with open(existing_summary_file, "r") as f:
existing_data = json.load(f)
existing_capabilities = existing_data.get("plugin_details", {})
logger.info(
f"Loaded existing capability data for {len(existing_capabilities)} plugins"
)
except Exception as e:
logger.warning(f"Failed to load existing capability data: {e}")
missing_plugins = set(existing_capabilities.keys()) - set(
summary.plugin_details.keys()
)
for plugin_name in missing_plugins:
logger.warning(
f"Plugin {plugin_name} failed to load, using existing capability data as fallback. Manually remove from capability_summary.json if you want to remove it from the report."
)
summary.plugin_details[plugin_name] = existing_capabilities[plugin_name]
summary_dict = dataclasses.asdict(summary)
summary_dict["generated_by"] = "metadata-ingestion/scripts/capability_summary.py"
summary_dict["generated_at"] = datetime.now(timezone.utc).isoformat()
summary_json = json.dumps(summary_dict, indent=2, sort_keys=True)
summary_file = output_path / "capability_summary.json"
write_file = should_write_json_file(
summary_file, summary_dict, "capability summary file"
)
if write_file:
with open(summary_file, "w") as f:
f.write(summary_json)
logger.info(f"Capability summary saved to {summary_file}")
@click.command()
@click.option(
"--output-dir",
type=str,
default="./autogenerated",
help="Output directory for capability reports",
)
@click.option(
"--source",
type=str,
required=False,
help="Generate report for specific source only",
)
def generate_capability_report(output_dir: str, source: Optional[str] = None) -> None:
"""Generate a comprehensive capability report for all ingestion sources."""
logger.info("Starting capability report generation...")
if source:
if source not in source_registry.mapping:
logger.error(f"Source '{source}' not found in registry")
return
original_mapping = source_registry.mapping.copy()
source_registry.mapping = {source: original_mapping[source]}
try:
summary = generate_capability_summary()
save_capability_report(summary, output_dir)
print("Capability Report Generation Complete")
print("=====================================")
print(f"Total plugins processed: {len(summary.plugin_details)}")
print(f"Plugins with capabilities: {len(summary.plugin_details)}")
print(f"Output directory: {output_dir}")
finally:
if source:
source_registry.mapping = original_mapping
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s %(levelname)-8s {%(name)s:%(lineno)d}] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %Z",
)
generate_capability_report()