diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/models.py b/ingestion/src/metadata/ingestion/source/database/databricks/models.py index 9ddc9088b4c..05f46ad5f57 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/models.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/models.py @@ -39,3 +39,9 @@ class LineageTableStreams(BaseModel): class LineageColumnStreams(BaseModel): upstream_cols: Optional[List[DatabricksColumn]] = [] downstream_cols: Optional[List[DatabricksColumn]] = [] + + +class ForeignConstrains(BaseModel): + child_columns: Optional[List[str]] = [] + parent_columns: Optional[List[str]] = [] + parent_table: str diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py index ed0e8e7e516..d809d2c8412 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py @@ -12,9 +12,11 @@ Databricks Unity Catalog Source source methods. """ import traceback -from typing import Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple from databricks.sdk.service.catalog import ColumnInfo +from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint +from databricks.sdk.service.catalog import TableConstraintList from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( @@ -24,7 +26,13 @@ from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Column, Table, TableType +from metadata.generated.schema.entity.data.table import ( + Column, + ConstraintType, + Table, + TableConstraint, + TableType, +) from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) @@ -38,11 +46,14 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.models.table_metadata import OMetaTableConstraints from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DatabaseServiceSource from metadata.ingestion.source.database.databricks.connection import get_connection +from metadata.ingestion.source.database.databricks.models import ForeignConstrains from metadata.ingestion.source.models import TableView from metadata.utils import fqn from metadata.utils.db_utils import get_view_lineage @@ -51,6 +62,16 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +# pylint: disable=invalid-name,not-callable +@classmethod +def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList": + return cls( + table_constraints=[DBTableConstraint.from_dict(constraint) for constraint in d] + ) + + +TableConstraintList.from_dict = from_dict + class DatabricksUnityCatalogSource(DatabaseServiceSource): """ @@ -73,6 +94,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): ) self.client = get_connection(self.service_connection) self.connection_obj = self.client + self.table_constraints = [] self.test_connection() @classmethod @@ -233,7 +255,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): Prepare a table request and pass it to the sink """ table_name, table_type = table_name_and_type - table = self.context.table_data + table = self.client.tables.get(self.context.table_data.full_name) schema_name = self.context.database_schema.name.__root__ db_name = self.context.database.name.__root__ table_constraints = None @@ -263,6 +285,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): ) ) + self.add_table_constraint_to_context(table.table_constraints) self.register_record(table_request=table_request) except Exception as exc: error = f"Unexpected exception to yield table [{table_name}]: {exc}" @@ -270,6 +293,87 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): logger.warning(error) self.status.failed(table_name, error, traceback.format_exc()) + def add_table_constraint_to_context(self, constraints: TableConstraintList) -> None: + """ + Function to handle table constraint for the current table and add it to context + """ + if constraints and constraints.table_constraints: + primary_constraints = [] + foreign_constraints = [] + for constraint in constraints.table_constraints: + if constraint.primary_key_constraint: + primary_constraints.append( + TableConstraint( + constraintType=ConstraintType.PRIMARY_KEY, + columns=constraint.primary_key_constraint.child_columns, + ) + ) + if constraint.foreign_key_constraint: + foreign_constraints.append( + ForeignConstrains( + child_columns=constraint.foreign_key_constraint.child_columns, + parent_columns=constraint.foreign_key_constraint.parent_columns, + parent_table=constraint.foreign_key_constraint.parent_table, + ) + ) + self.table_constraints.append( + OMetaTableConstraints( + table=self.context.table, + foreign_constraints=foreign_constraints, + constraints=primary_constraints, + ) + ) + + def _get_foreign_constraints( + self, table_constraints: OMetaTableConstraints + ) -> List[TableConstraint]: + """ + Search the referred table for foreign constraints + and get referred column fqn + """ + + foreign_constraints = [] + for constraint in table_constraints.foreign_constraints: + referred_column_fqns = [] + ref_table_fqn = constraint["parent_table"] + table_fqn_list = fqn.split(ref_table_fqn) + + referred_table = fqn.search_table_from_es( + metadata=self.metadata, + table_name=table_fqn_list[2], + schema_name=table_fqn_list[1], + database_name=table_fqn_list[0], + service_name=self.context.database_service.name.__root__, + ) + if referred_table: + for column in constraint["parent_columns"]: + col_fqn = get_column_fqn(table_entity=referred_table, column=column) + if col_fqn: + referred_column_fqns.append(col_fqn) + foreign_constraints.append( + TableConstraint( + constraintType=ConstraintType.FOREIGN_KEY, + columns=constraint["child_columns"], + referredColumns=referred_column_fqns, + ) + ) + + return foreign_constraints + + def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]: + """ + From topology. + process the table constraints of all tables + """ + for table_constraints in self.table_constraints: + foreign_constraints = self._get_foreign_constraints(table_constraints) + if foreign_constraints: + if table_constraints.constraints: + table_constraints.constraints.extend(foreign_constraints) + else: + table_constraints.constraints = foreign_constraints + yield table_constraints + def prepare(self): pass