diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 011e5cf309d..10c047be6dc 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -41,7 +41,6 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData -from metadata.ingestion.models.table_metadata import OMetaTableConstraints from metadata.ingestion.models.tests_data import ( OMetaLogicalTestSuiteSample, OMetaTestCaseResultsSample, @@ -98,7 +97,6 @@ class MetadataRestSink(Sink[Entity]): self.write_record.register(OMetaPipelineStatus, self.write_pipeline_status) self.write_record.register(DataModelLink, self.write_datamodel) self.write_record.register(DashboardUsage, self.write_dashboard_usage) - self.write_record.register(OMetaTableConstraints, self.write_table_constraints) self.write_record.register( OMetaTableProfileSampleData, self.write_profile_sample_data ) @@ -469,23 +467,5 @@ class MetadataRestSink(Sink[Entity]): f"Unexpected error while ingesting sample data for topic [{record.topic.name.__root__}]: {exc}" ) - def write_table_constraints(self, record: OMetaTableConstraints): - """ - Patch table constraints - """ - try: - self.metadata.patch_table_constraints( - table=record.table, - constraints=record.constraints, - ) - logger.debug( - f"Successfully ingested table constraints for table id {record.table.id}" - ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Unexpected error while ingesting table constraints for table id [{record.table.id}]: {exc}" - ) - def close(self): pass diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 8c9304f07d5..ad4fabf1534 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -46,7 +46,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification -from metadata.ingestion.models.table_metadata import OMetaTableConstraints from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.database_service import DatabaseServiceSource @@ -367,7 +366,9 @@ class CommonDbSourceService( schema_name=schema_name, inspector=self.inspector, ) - + table_constraints = self.update_table_constraints( + table_constraints, foreign_columns + ) table_request = CreateTableRequest( name=table_name, tableType=table_type, @@ -377,6 +378,7 @@ class CommonDbSourceService( inspector=self.inspector, ), columns=columns, + tableConstraints=table_constraints, viewDefinition=view_definition, databaseSchema=self.context.database_schema.fullyQualifiedName, tags=self.get_tag_labels( @@ -411,15 +413,6 @@ class CommonDbSourceService( ) self.context.table_views.append(table_view) - if table_constraints or foreign_columns: - self.context.table_constrains.append( - OMetaTableConstraints( - foreign_constraints=foreign_columns, - constraints=table_constraints, - table=self.context.table, - ) - ) - except Exception as exc: error = f"Unexpected exception to yield table [{table_name}]: {exc}" logger.debug(traceback.format_exc()) @@ -439,52 +432,56 @@ class CommonDbSourceService( timeout_seconds=self.source_config.viewParsingTimeoutLimit, ) - def _get_foreign_constraints( - self, table_constraints: OMetaTableConstraints - ) -> List[TableConstraint]: + def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]: """ Search the referred table for foreign constraints and get referred column fqn """ foreign_constraints = [] - for constraint in table_constraints.foreign_constraints: + for column in foreign_columns: referred_column_fqns = [] referred_table = fqn.search_table_from_es( metadata=self.metadata, - table_name=constraint.get("referred_table"), - schema_name=constraint.get("referred_schema"), + table_name=column.get("referred_table"), + schema_name=column.get("referred_schema"), database_name=None, service_name=self.context.database_service.name.__root__, ) if referred_table: - for column in constraint.get("referred_columns"): - col_fqn = get_column_fqn(table_entity=referred_table, column=column) + for referred_column in column.get("referred_columns"): + col_fqn = get_column_fqn( + table_entity=referred_table, column=referred_column + ) if col_fqn: referred_column_fqns.append(col_fqn) + else: + # do not build partial foreign constraint. It will updated in next run. + continue foreign_constraints.append( TableConstraint( constraintType=ConstraintType.FOREIGN_KEY, - columns=constraint.get("constrained_columns"), + columns=column.get("constrained_columns"), referredColumns=referred_column_fqns, ) ) return foreign_constraints - def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]: + def update_table_constraints( + self, table_constraints, foreign_columns + ) -> List[TableConstraint]: """ From topology. process the table constraints of all tables """ - for table_constraints in self.context.table_constrains: - foreign_constraints = self._get_foreign_constraints(table_constraints) - if foreign_constraints: - if table_constraints.constraints: - table_constraints.constraints.extend(foreign_constraints) - else: - table_constraints.constraints = foreign_constraints - yield table_constraints + foreign_table_constraints = self._get_foreign_constraints(foreign_columns) + if foreign_table_constraints: + if table_constraints: + table_constraints.extend(foreign_table_constraints) + else: + table_constraints = foreign_table_constraints + return table_constraints @property def connection(self) -> Connection: diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 89a6ed65a8a..736de690516 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import ( Column, DataModel, Table, + TableConstraint, TableType, ) from metadata.generated.schema.entity.services.databaseService import ( @@ -46,7 +47,6 @@ from metadata.ingestion.api.source import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin from metadata.ingestion.models.delete_entity import delete_entity_from_source from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification -from metadata.ingestion.models.table_metadata import OMetaTableConstraints from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, @@ -92,7 +92,7 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["database"], - post_process=["yield_view_lineage", "yield_table_constraints"], + post_process=["yield_view_lineage"], ) database = TopologyNode( producer="get_database_names", @@ -234,14 +234,13 @@ class DatabaseServiceSource( Parses view definition to get lineage information """ - def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]: + def update_table_constraints( + self, table_constraints: List[TableConstraint], foreign_columns: [] + ) -> List[TableConstraint]: """ - From topology. process the table constraints of all tables - by default no need to process table constraints - specially for non SQA sources + transform SQLAlchemy returned foreign_columns into list of TableConstraint. """ - yield from [] @abstractmethod def yield_table( diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py index 6e6922bed10..9f6a2b11d22 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py @@ -49,7 +49,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification -from metadata.ingestion.models.table_metadata import OMetaTableConstraints from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DatabaseServiceSource @@ -68,6 +67,7 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() + # pylint: disable=invalid-name,not-callable @classmethod def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList": @@ -267,6 +267,14 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): table_constraints = None try: columns = self.get_columns(table.columns) + ( + primary_constraints, + foreign_constraints, + ) = self.get_table_constraints(table.table_constraints) + + table_constraints = self.update_table_constraints( + primary_constraints, foreign_constraints + ) table_request = CreateTableRequest( name=table_name, @@ -291,7 +299,6 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): ) ) - self.add_table_constraint_to_context(table.table_constraints) self.register_record(table_request=table_request) except Exception as exc: error = f"Unexpected exception to yield table [{table_name}]: {exc}" @@ -299,13 +306,16 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): logger.warning(error) self.status.failed(table_name, error, traceback.format_exc()) - def add_table_constraint_to_context(self, constraints: TableConstraintList) -> None: + def get_table_constraints( + self, constraints: TableConstraintList + ) -> Tuple[List[TableConstraint], List[ForeignConstrains]]: """ Function to handle table constraint for the current table and add it to context """ + + primary_constraints = [] + foreign_constraints = [] if constraints and constraints.table_constraints: - primary_constraints = [] - foreign_constraints = [] for constraint in constraints.table_constraints: if constraint.primary_key_constraint: primary_constraints.append( @@ -322,26 +332,18 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): parent_table=constraint.foreign_key_constraint.parent_table, ) ) - self.table_constraints.append( - OMetaTableConstraints( - table=self.context.table, - foreign_constraints=foreign_constraints, - constraints=primary_constraints, - ) - ) + return primary_constraints, foreign_constraints - def _get_foreign_constraints( - self, table_constraints: OMetaTableConstraints - ) -> List[TableConstraint]: + def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]: """ Search the referred table for foreign constraints and get referred column fqn """ - foreign_constraints = [] - for constraint in table_constraints.foreign_constraints: + table_constraints = [] + for column in foreign_columns: referred_column_fqns = [] - ref_table_fqn = constraint["parent_table"] + ref_table_fqn = column.parent_table table_fqn_list = fqn.split(ref_table_fqn) referred_table = fqn.search_table_from_es( @@ -352,33 +354,39 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): service_name=self.context.database_service.name.__root__, ) if referred_table: - for column in constraint["parent_columns"]: - col_fqn = get_column_fqn(table_entity=referred_table, column=column) + for parent_column in column.parent_columns: + col_fqn = get_column_fqn( + table_entity=referred_table, column=parent_column + ) if col_fqn: referred_column_fqns.append(col_fqn) - foreign_constraints.append( + else: + continue + + table_constraints.append( TableConstraint( constraintType=ConstraintType.FOREIGN_KEY, - columns=constraint["child_columns"], + columns=column.child_columns, referredColumns=referred_column_fqns, ) ) - return foreign_constraints + return table_constraints - def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]: + def update_table_constraints( + self, table_constraints, foreign_columns + ) -> List[TableConstraint]: """ From topology. process the table constraints of all tables """ - for table_constraints in self.table_constraints: - foreign_constraints = self._get_foreign_constraints(table_constraints) - if foreign_constraints: - if table_constraints.constraints: - table_constraints.constraints.extend(foreign_constraints) - else: - table_constraints.constraints = foreign_constraints - yield table_constraints + foreign_table_constraints = self._get_foreign_constraints(foreign_columns) + if foreign_table_constraints: + if table_constraints: + table_constraints.extend(foreign_table_constraints) + else: + table_constraints = foreign_table_constraints + return table_constraints def prepare(self): pass