added complex desc (#12478)

This commit is contained in:
Onkar Ravgan 2023-07-19 12:01:12 +05:30 committed by GitHub
parent 3534913b96
commit ceef44205b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 3 deletions

View File

@ -13,7 +13,7 @@
Databricks Source Model module Databricks Source Model module
""" """
from typing import List, Optional from typing import List, Optional, Union
from pydantic import BaseModel from pydantic import BaseModel
@ -45,3 +45,27 @@ class ForeignConstrains(BaseModel):
child_columns: Optional[List[str]] = [] child_columns: Optional[List[str]] = []
parent_columns: Optional[List[str]] = [] parent_columns: Optional[List[str]] = []
parent_table: str parent_table: str
class Metadata(BaseModel):
comment: Optional[str]
class ColumnJson(BaseModel):
name: Optional[str]
type: Optional[Union["Type", str]]
metadata: Optional[Metadata]
class ElementType(BaseModel):
type: Optional[str]
fields: Optional[List[ColumnJson]]
class Type(BaseModel):
type: Optional[str]
elementType: Optional[Union[ElementType, str]]
fields: Optional[List[ColumnJson]]
ColumnJson.update_forward_refs()

View File

@ -11,6 +11,7 @@
""" """
Databricks Unity Catalog Source source methods. Databricks Unity Catalog Source source methods.
""" """
import json
import traceback import traceback
from typing import Dict, Iterable, List, Optional, Tuple from typing import Dict, Iterable, List, Optional, Tuple
@ -53,7 +54,12 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.databricks.connection import get_connection from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import ForeignConstrains from metadata.ingestion.source.database.databricks.models import (
ColumnJson,
ElementType,
ForeignConstrains,
Type,
)
from metadata.ingestion.source.models import TableView from metadata.ingestion.source.models import TableView
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage from metadata.utils.db_utils import get_view_lineage
@ -377,6 +383,44 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
def prepare(self): def prepare(self):
pass pass
def add_complex_datatype_descriptions(
self, column: Column, column_json: ColumnJson
):
"""
Method to add descriptions to complex datatypes
"""
try:
if column.children is None:
if column_json.metadata:
column.description = column_json.metadata.comment
else:
for i, child in enumerate(column.children):
if column_json.metadata:
column.description = column_json.metadata.comment
if (
column_json.type
and isinstance(column_json.type, Type)
and column_json.type.fields
):
self.add_complex_datatype_descriptions(
child, column_json.type.fields[i]
)
if (
column_json.type
and isinstance(column_json.type, Type)
and column_json.type.type.lower() == "array"
and isinstance(column_json.type.elementType, ElementType)
):
self.add_complex_datatype_descriptions(
child,
column_json.type.elementType.fields[i],
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to add description to complex datatypes for column [{column.name}]: {exc}"
)
def get_columns(self, column_data: List[ColumnInfo]) -> Optional[Iterable[Column]]: def get_columns(self, column_data: List[ColumnInfo]) -> Optional[Iterable[Column]]:
# process table regular columns info # process table regular columns info
@ -389,7 +433,12 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
parsed_string["name"] = column.name[:64] parsed_string["name"] = column.name[:64]
parsed_string["dataLength"] = parsed_string.get("dataLength", 1) parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
parsed_string["description"] = column.comment parsed_string["description"] = column.comment
yield Column(**parsed_string) parsed_column = Column(**parsed_string)
self.add_complex_datatype_descriptions(
column=parsed_column,
column_json=ColumnJson.parse_obj(json.loads(column.type_json)),
)
yield parsed_column
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
logger.info("Processing Lineage for Views") logger.info("Processing Lineage for Views")