Databricks Support Table Constraints (#12138)

* Databricks Support Table Constraints

* pylint fix

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Mayur Singal 2023-06-26 10:31:53 +05:30 committed by GitHub
parent e13a5de5ae
commit 18c8eb318e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 113 additions and 3 deletions

View File

@ -39,3 +39,9 @@ class LineageTableStreams(BaseModel):
class LineageColumnStreams(BaseModel):
upstream_cols: Optional[List[DatabricksColumn]] = []
downstream_cols: Optional[List[DatabricksColumn]] = []
class ForeignConstrains(BaseModel):
child_columns: Optional[List[str]] = []
parent_columns: Optional[List[str]] = []
parent_table: str

View File

@ -12,9 +12,11 @@
Databricks Unity Catalog Source source methods.
"""
import traceback
from typing import Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple
from databricks.sdk.service.catalog import ColumnInfo
from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint
from databricks.sdk.service.catalog import TableConstraintList
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
@ -24,7 +26,13 @@ from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.data.table import (
Column,
ConstraintType,
Table,
TableConstraint,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
@ -38,11 +46,14 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import ForeignConstrains
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
@ -51,6 +62,16 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
# pylint: disable=invalid-name,not-callable
@classmethod
def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList":
return cls(
table_constraints=[DBTableConstraint.from_dict(constraint) for constraint in d]
)
TableConstraintList.from_dict = from_dict
class DatabricksUnityCatalogSource(DatabaseServiceSource):
"""
@ -73,6 +94,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
)
self.client = get_connection(self.service_connection)
self.connection_obj = self.client
self.table_constraints = []
self.test_connection()
@classmethod
@ -233,7 +255,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
Prepare a table request and pass it to the sink
"""
table_name, table_type = table_name_and_type
table = self.context.table_data
table = self.client.tables.get(self.context.table_data.full_name)
schema_name = self.context.database_schema.name.__root__
db_name = self.context.database.name.__root__
table_constraints = None
@ -263,6 +285,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
)
)
self.add_table_constraint_to_context(table.table_constraints)
self.register_record(table_request=table_request)
except Exception as exc:
error = f"Unexpected exception to yield table [{table_name}]: {exc}"
@ -270,6 +293,87 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
logger.warning(error)
self.status.failed(table_name, error, traceback.format_exc())
def add_table_constraint_to_context(self, constraints: TableConstraintList) -> None:
"""
Function to handle table constraint for the current table and add it to context
"""
if constraints and constraints.table_constraints:
primary_constraints = []
foreign_constraints = []
for constraint in constraints.table_constraints:
if constraint.primary_key_constraint:
primary_constraints.append(
TableConstraint(
constraintType=ConstraintType.PRIMARY_KEY,
columns=constraint.primary_key_constraint.child_columns,
)
)
if constraint.foreign_key_constraint:
foreign_constraints.append(
ForeignConstrains(
child_columns=constraint.foreign_key_constraint.child_columns,
parent_columns=constraint.foreign_key_constraint.parent_columns,
parent_table=constraint.foreign_key_constraint.parent_table,
)
)
self.table_constraints.append(
OMetaTableConstraints(
table=self.context.table,
foreign_constraints=foreign_constraints,
constraints=primary_constraints,
)
)
def _get_foreign_constraints(
self, table_constraints: OMetaTableConstraints
) -> List[TableConstraint]:
"""
Search the referred table for foreign constraints
and get referred column fqn
"""
foreign_constraints = []
for constraint in table_constraints.foreign_constraints:
referred_column_fqns = []
ref_table_fqn = constraint["parent_table"]
table_fqn_list = fqn.split(ref_table_fqn)
referred_table = fqn.search_table_from_es(
metadata=self.metadata,
table_name=table_fqn_list[2],
schema_name=table_fqn_list[1],
database_name=table_fqn_list[0],
service_name=self.context.database_service.name.__root__,
)
if referred_table:
for column in constraint["parent_columns"]:
col_fqn = get_column_fqn(table_entity=referred_table, column=column)
if col_fqn:
referred_column_fqns.append(col_fqn)
foreign_constraints.append(
TableConstraint(
constraintType=ConstraintType.FOREIGN_KEY,
columns=constraint["child_columns"],
referredColumns=referred_column_fqns,
)
)
return foreign_constraints
def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]:
"""
From topology.
process the table constraints of all tables
"""
for table_constraints in self.table_constraints:
foreign_constraints = self._get_foreign_constraints(table_constraints)
if foreign_constraints:
if table_constraints.constraints:
table_constraints.constraints.extend(foreign_constraints)
else:
table_constraints.constraints = foreign_constraints
yield table_constraints
def prepare(self):
pass