diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/models.py b/ingestion/src/metadata/ingestion/source/database/databricks/models.py index 05f46ad5f57..b9b0b227909 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/models.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/models.py @@ -13,7 +13,7 @@ Databricks Source Model module """ -from typing import List, Optional +from typing import List, Optional, Union from pydantic import BaseModel @@ -45,3 +45,27 @@ class ForeignConstrains(BaseModel): child_columns: Optional[List[str]] = [] parent_columns: Optional[List[str]] = [] parent_table: str + + +class Metadata(BaseModel): + comment: Optional[str] + + +class ColumnJson(BaseModel): + name: Optional[str] + type: Optional[Union["Type", str]] + metadata: Optional[Metadata] + + +class ElementType(BaseModel): + type: Optional[str] + fields: Optional[List[ColumnJson]] + + +class Type(BaseModel): + type: Optional[str] + elementType: Optional[Union[ElementType, str]] + fields: Optional[List[ColumnJson]] + + +ColumnJson.update_forward_refs() diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py index d809d2c8412..6e6922bed10 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py @@ -11,6 +11,7 @@ """ Databricks Unity Catalog Source source methods. """ +import json import traceback from typing import Dict, Iterable, List, Optional, Tuple @@ -53,7 +54,12 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DatabaseServiceSource from metadata.ingestion.source.database.databricks.connection import get_connection -from metadata.ingestion.source.database.databricks.models import ForeignConstrains +from metadata.ingestion.source.database.databricks.models import ( + ColumnJson, + ElementType, + ForeignConstrains, + Type, +) from metadata.ingestion.source.models import TableView from metadata.utils import fqn from metadata.utils.db_utils import get_view_lineage @@ -377,6 +383,44 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): def prepare(self): pass + def add_complex_datatype_descriptions( + self, column: Column, column_json: ColumnJson + ): + """ + Method to add descriptions to complex datatypes + """ + try: + if column.children is None: + if column_json.metadata: + column.description = column_json.metadata.comment + else: + for i, child in enumerate(column.children): + if column_json.metadata: + column.description = column_json.metadata.comment + if ( + column_json.type + and isinstance(column_json.type, Type) + and column_json.type.fields + ): + self.add_complex_datatype_descriptions( + child, column_json.type.fields[i] + ) + if ( + column_json.type + and isinstance(column_json.type, Type) + and column_json.type.type.lower() == "array" + and isinstance(column_json.type.elementType, ElementType) + ): + self.add_complex_datatype_descriptions( + child, + column_json.type.elementType.fields[i], + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Unable to add description to complex datatypes for column [{column.name}]: {exc}" + ) + def get_columns(self, column_data: List[ColumnInfo]) -> Optional[Iterable[Column]]: # process table regular columns info @@ -389,7 +433,12 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): parsed_string["name"] = column.name[:64] parsed_string["dataLength"] = parsed_string.get("dataLength", 1) parsed_string["description"] = column.comment - yield Column(**parsed_string) + parsed_column = Column(**parsed_string) + self.add_complex_datatype_descriptions( + column=parsed_column, + column_json=ColumnJson.parse_obj(json.loads(column.type_json)), + ) + yield parsed_column def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: logger.info("Processing Lineage for Views")