Fix #12783: Metadata Ingestion is dropping constraints just to add them again afterwards (#12784)

This commit is contained in:
Sriharsha Chintalapani 2023-08-08 01:21:10 -07:00 committed by GitHub
parent 20e87eae16
commit d4ed941371
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 88 deletions

View File

@ -41,7 +41,6 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.models.tests_data import (
OMetaLogicalTestSuiteSample,
OMetaTestCaseResultsSample,
@ -98,7 +97,6 @@ class MetadataRestSink(Sink[Entity]):
self.write_record.register(OMetaPipelineStatus, self.write_pipeline_status)
self.write_record.register(DataModelLink, self.write_datamodel)
self.write_record.register(DashboardUsage, self.write_dashboard_usage)
self.write_record.register(OMetaTableConstraints, self.write_table_constraints)
self.write_record.register(
OMetaTableProfileSampleData, self.write_profile_sample_data
)
@ -469,23 +467,5 @@ class MetadataRestSink(Sink[Entity]):
f"Unexpected error while ingesting sample data for topic [{record.topic.name.__root__}]: {exc}"
)
def write_table_constraints(self, record: OMetaTableConstraints):
"""
Patch table constraints
"""
try:
self.metadata.patch_table_constraints(
table=record.table,
constraints=record.constraints,
)
logger.debug(
f"Successfully ingested table constraints for table id {record.table.id}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unexpected error while ingesting table constraints for table id [{record.table.id}]: {exc}"
)
def close(self):
pass

View File

@ -46,7 +46,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
@ -367,7 +366,9 @@ class CommonDbSourceService(
schema_name=schema_name,
inspector=self.inspector,
)
table_constraints = self.update_table_constraints(
table_constraints, foreign_columns
)
table_request = CreateTableRequest(
name=table_name,
tableType=table_type,
@ -377,6 +378,7 @@ class CommonDbSourceService(
inspector=self.inspector,
),
columns=columns,
tableConstraints=table_constraints,
viewDefinition=view_definition,
databaseSchema=self.context.database_schema.fullyQualifiedName,
tags=self.get_tag_labels(
@ -411,15 +413,6 @@ class CommonDbSourceService(
)
self.context.table_views.append(table_view)
if table_constraints or foreign_columns:
self.context.table_constrains.append(
OMetaTableConstraints(
foreign_constraints=foreign_columns,
constraints=table_constraints,
table=self.context.table,
)
)
except Exception as exc:
error = f"Unexpected exception to yield table [{table_name}]: {exc}"
logger.debug(traceback.format_exc())
@ -439,52 +432,56 @@ class CommonDbSourceService(
timeout_seconds=self.source_config.viewParsingTimeoutLimit,
)
def _get_foreign_constraints(
self, table_constraints: OMetaTableConstraints
) -> List[TableConstraint]:
def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:
"""
Search the referred table for foreign constraints
and get referred column fqn
"""
foreign_constraints = []
for constraint in table_constraints.foreign_constraints:
for column in foreign_columns:
referred_column_fqns = []
referred_table = fqn.search_table_from_es(
metadata=self.metadata,
table_name=constraint.get("referred_table"),
schema_name=constraint.get("referred_schema"),
table_name=column.get("referred_table"),
schema_name=column.get("referred_schema"),
database_name=None,
service_name=self.context.database_service.name.__root__,
)
if referred_table:
for column in constraint.get("referred_columns"):
col_fqn = get_column_fqn(table_entity=referred_table, column=column)
for referred_column in column.get("referred_columns"):
col_fqn = get_column_fqn(
table_entity=referred_table, column=referred_column
)
if col_fqn:
referred_column_fqns.append(col_fqn)
else:
# do not build partial foreign constraint. It will updated in next run.
continue
foreign_constraints.append(
TableConstraint(
constraintType=ConstraintType.FOREIGN_KEY,
columns=constraint.get("constrained_columns"),
columns=column.get("constrained_columns"),
referredColumns=referred_column_fqns,
)
)
return foreign_constraints
def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]:
def update_table_constraints(
self, table_constraints, foreign_columns
) -> List[TableConstraint]:
"""
From topology.
process the table constraints of all tables
"""
for table_constraints in self.context.table_constrains:
foreign_constraints = self._get_foreign_constraints(table_constraints)
if foreign_constraints:
if table_constraints.constraints:
table_constraints.constraints.extend(foreign_constraints)
else:
table_constraints.constraints = foreign_constraints
yield table_constraints
foreign_table_constraints = self._get_foreign_constraints(foreign_columns)
if foreign_table_constraints:
if table_constraints:
table_constraints.extend(foreign_table_constraints)
else:
table_constraints = foreign_table_constraints
return table_constraints
@property
def connection(self) -> Connection:

View File

@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
Table,
TableConstraint,
TableType,
)
from metadata.generated.schema.entity.services.databaseService import (
@ -46,7 +47,6 @@ from metadata.ingestion.api.source import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import delete_entity_from_source
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
@ -92,7 +92,7 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["database"],
post_process=["yield_view_lineage", "yield_table_constraints"],
post_process=["yield_view_lineage"],
)
database = TopologyNode(
producer="get_database_names",
@ -234,14 +234,13 @@ class DatabaseServiceSource(
Parses view definition to get lineage information
"""
def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]:
def update_table_constraints(
self, table_constraints: List[TableConstraint], foreign_columns: []
) -> List[TableConstraint]:
"""
From topology.
process the table constraints of all tables
by default no need to process table constraints
specially for non SQA sources
transform SQLAlchemy returned foreign_columns into list of TableConstraint.
"""
yield from []
@abstractmethod
def yield_table(

View File

@ -49,7 +49,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
@ -68,6 +67,7 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
# pylint: disable=invalid-name,not-callable
@classmethod
def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList":
@ -267,6 +267,14 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
table_constraints = None
try:
columns = self.get_columns(table.columns)
(
primary_constraints,
foreign_constraints,
) = self.get_table_constraints(table.table_constraints)
table_constraints = self.update_table_constraints(
primary_constraints, foreign_constraints
)
table_request = CreateTableRequest(
name=table_name,
@ -291,7 +299,6 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
)
)
self.add_table_constraint_to_context(table.table_constraints)
self.register_record(table_request=table_request)
except Exception as exc:
error = f"Unexpected exception to yield table [{table_name}]: {exc}"
@ -299,13 +306,16 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
logger.warning(error)
self.status.failed(table_name, error, traceback.format_exc())
def add_table_constraint_to_context(self, constraints: TableConstraintList) -> None:
def get_table_constraints(
self, constraints: TableConstraintList
) -> Tuple[List[TableConstraint], List[ForeignConstrains]]:
"""
Function to handle table constraint for the current table and add it to context
"""
primary_constraints = []
foreign_constraints = []
if constraints and constraints.table_constraints:
primary_constraints = []
foreign_constraints = []
for constraint in constraints.table_constraints:
if constraint.primary_key_constraint:
primary_constraints.append(
@ -322,26 +332,18 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
parent_table=constraint.foreign_key_constraint.parent_table,
)
)
self.table_constraints.append(
OMetaTableConstraints(
table=self.context.table,
foreign_constraints=foreign_constraints,
constraints=primary_constraints,
)
)
return primary_constraints, foreign_constraints
def _get_foreign_constraints(
self, table_constraints: OMetaTableConstraints
) -> List[TableConstraint]:
def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:
"""
Search the referred table for foreign constraints
and get referred column fqn
"""
foreign_constraints = []
for constraint in table_constraints.foreign_constraints:
table_constraints = []
for column in foreign_columns:
referred_column_fqns = []
ref_table_fqn = constraint["parent_table"]
ref_table_fqn = column.parent_table
table_fqn_list = fqn.split(ref_table_fqn)
referred_table = fqn.search_table_from_es(
@ -352,33 +354,39 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
service_name=self.context.database_service.name.__root__,
)
if referred_table:
for column in constraint["parent_columns"]:
col_fqn = get_column_fqn(table_entity=referred_table, column=column)
for parent_column in column.parent_columns:
col_fqn = get_column_fqn(
table_entity=referred_table, column=parent_column
)
if col_fqn:
referred_column_fqns.append(col_fqn)
foreign_constraints.append(
else:
continue
table_constraints.append(
TableConstraint(
constraintType=ConstraintType.FOREIGN_KEY,
columns=constraint["child_columns"],
columns=column.child_columns,
referredColumns=referred_column_fqns,
)
)
return foreign_constraints
return table_constraints
def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]:
def update_table_constraints(
self, table_constraints, foreign_columns
) -> List[TableConstraint]:
"""
From topology.
process the table constraints of all tables
"""
for table_constraints in self.table_constraints:
foreign_constraints = self._get_foreign_constraints(table_constraints)
if foreign_constraints:
if table_constraints.constraints:
table_constraints.constraints.extend(foreign_constraints)
else:
table_constraints.constraints = foreign_constraints
yield table_constraints
foreign_table_constraints = self._get_foreign_constraints(foreign_columns)
if foreign_table_constraints:
if table_constraints:
table_constraints.extend(foreign_table_constraints)
else:
table_constraints = foreign_table_constraints
return table_constraints
def prepare(self):
pass