diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 2cf429b8a38..4902653e1ba 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -217,16 +217,17 @@ class CommonDbSourceService( else None ) - yield Either( - right=CreateDatabaseRequest( - name=EntityName(database_name), - service=FullyQualifiedEntityName(self.context.get().database_service), - description=description, - sourceUrl=source_url, - tags=self.get_database_tag_labels(database_name=database_name), - ) + database_request = CreateDatabaseRequest( + name=EntityName(database_name), + service=FullyQualifiedEntityName(self.context.get().database_service), + description=description, + sourceUrl=source_url, + tags=self.get_database_tag_labels(database_name=database_name), ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) + def get_raw_database_schema_names(self) -> Iterable[str]: if self.service_connection.__dict__.get("databaseSchema"): yield self.service_connection.databaseSchema @@ -264,23 +265,24 @@ class CommonDbSourceService( else None ) - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - description=description, - sourceUrl=source_url, - tags=self.get_schema_tag_labels(schema_name=schema_name), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), + description=description, + sourceUrl=source_url, + tags=self.get_schema_tag_labels(schema_name=schema_name), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) + @staticmethod @calculate_execution_time() def get_table_description( diff --git a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py index 52a7af91e74..47d44f6092f 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py @@ -122,14 +122,17 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC): Prepare a database request and pass it to the sink """ - yield Either( - right=CreateDatabaseRequest( - name=EntityName(database_name), - service=self.context.get().database_service, - sourceUrl=self.get_source_url(database_name=database_name), - ) + database_request = CreateDatabaseRequest( + name=EntityName(database_name), + service=self.context.get().database_service, + sourceUrl=self.get_source_url(database_name=database_name), ) + yield Either( + right=database_request, + ) + self.register_record_database_request(database_request=database_request) + @abstractmethod def get_schema_name_list(self) -> List[str]: """ @@ -164,24 +167,25 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC): Prepare a database schema request and pass it to the sink """ - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - sourceUrl=self.get_source_url( + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, database_name=self.context.get().database, - schema_name=schema_name, - ), - ) + ) + ), + sourceUrl=self.get_source_url( + database_name=self.context.get().database, + schema_name=schema_name, + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) + @abstractmethod def query_table_names_and_types( self, schema_name: str diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 56f8e9a8cfb..1366ccf5e2a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -135,6 +135,7 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["databaseSchema"], + post_process=["mark_databases_as_deleted"], ) databaseSchema: Annotated[ TopologyNode, Field(description="Database Schema Node") @@ -158,7 +159,11 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["table", "stored_procedure"], - post_process=["mark_tables_as_deleted", "mark_stored_procedures_as_deleted"], + post_process=[ + "mark_schemas_as_deleted", + "mark_tables_as_deleted", + "mark_stored_procedures_as_deleted", + ], threads=True, ) table: Annotated[ @@ -217,6 +222,8 @@ class DatabaseServiceSource( config: WorkflowSource database_source_state: Set = set() stored_procedure_source_state: Set = set() + database_entity_source_state: Set = set() + schema_entity_source_state: Set = set() # Big union of types we want to fetch dynamically service_connection: DatabaseConnection.model_fields["config"].annotation @@ -494,6 +501,64 @@ class DatabaseServiceSource( self.stored_procedure_source_state.add(table_fqn) + def register_record_database_request( + self, database_request: CreateDatabaseRequest + ) -> None: + """ + Mark the database record as scanned and update the database_entity_source_state + """ + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=database_request.name.root, + ) + + self.database_entity_source_state.add(database_fqn) + + def register_record_schema_request( + self, schema_request: CreateDatabaseSchemaRequest + ) -> None: + """ + Mark the schema record as scanned and update the schema_entity_source_state + """ + schema_fqn = fqn.build( + self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=schema_request.name.root, + ) + + self.schema_entity_source_state.add(schema_fqn) + + def _get_filtered_database_names( + self, return_fqn: bool = False, add_to_status: bool = True + ) -> Iterable[str]: + """ + Get filtered database names based on the database filter pattern + """ + database_names_iterable = getattr( + self, "get_database_names_raw", self.get_database_names + )() + for database_name in database_names_iterable: + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=database_name, + ) + if filter_by_schema( + self.source_config.databaseFilterPattern, + database_fqn + if self.source_config.useFqnForFiltering + else database_name, + ): + if add_to_status: + self.status.filter(database_fqn, "Database Filtered Out") + continue + yield database_fqn if return_fqn else database_name + def _get_filtered_schema_names( self, return_fqn: bool = False, add_to_status: bool = True ) -> Iterable[str]: @@ -585,6 +650,89 @@ class DatabaseServiceSource( params={"databaseSchema": schema_fqn}, ) + def mark_databases_as_deleted(self): + """ + Use the current inspector to mark databases as deleted + """ + if self.source_config.markDeletedDatabases: + logger.info( + f"Mark Deleted Databases set to True. Processing service [{self.context.get().database_service}]" + ) + + # We need to include ALL databases from the source in the source state + # This includes both processed databases and all databases (filtered-in and filtered-out) + # to ensure we mark as deleted any databases that were previously ingested but are now + # filtered out, as well as any databases that were processed in this run + all_database_fqns = set() + + # Get all databases from the source (both filtered-in and filtered-out) + for database_name in self._get_filtered_database_names(): + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=database_name, + ) + all_database_fqns.add(database_fqn) + + # Combine the processed databases with all databases from source + complete_db_source_state = self.database_entity_source_state.union( + all_database_fqns + ) + + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=Database, + entity_source_state=complete_db_source_state, + mark_deleted_entity=self.source_config.markDeletedDatabases, + params={"service": self.context.get().database_service}, + ) + + def mark_schemas_as_deleted(self): + """ + Use the current inspector to mark schemas as deleted + """ + if not self.context.get().__dict__.get("database"): + raise ValueError( + "No Database found in the context. We cannot run the schema deletion." + ) + + if self.source_config.markDeletedSchemas: + logger.info( + f"Mark Deleted Schemas set to True. Processing database [{self.context.get().database}]" + ) + + # Build the database FQN to use as parameter + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + + # Get all filtered-in schema FQNs to create a complete source state + # We need to include both processed schemas and filtered schemas in the source state + # to ensure we mark as deleted any schemas that were previously ingested but are now + # filtered out, as well as any schemas that were processed in this run + filtered_schema_fqns = set() + for schema_name in self._get_filtered_schema_names( + return_fqn=True, add_to_status=False + ): + filtered_schema_fqns.add(schema_name) + + # Combine the processed schemas with filtered schemas + complete_source_state = self.schema_entity_source_state.union( + filtered_schema_fqns + ) + + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=DatabaseSchema, + entity_source_state=complete_source_state, + mark_deleted_entity=self.source_config.markDeletedSchemas, + params={"database": database_fqn}, + ) + def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]: """ Get the life cycle data of the table diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index d91cd3e6247..00ed5d91ee3 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -157,12 +157,13 @@ class DatalakeSource(DatabaseServiceSource): """ if isinstance(self.config_source, GCSConfig): database_name = self.client.project - yield Either( - right=CreateDatabaseRequest( - name=EntityName(database_name), - service=FullyQualifiedEntityName(self.context.get().database_service), - ) + + database_request = CreateDatabaseRequest( + name=EntityName(database_name), + service=FullyQualifiedEntityName(self.context.get().database_service), ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: """ @@ -208,20 +209,21 @@ class DatalakeSource(DatabaseServiceSource): From topology. Prepare a database schema request and pass it to the sink """ - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) + def get_tables_name_and_type( # pylint: disable=too-many-branches self, ) -> Iterable[Tuple[str, TableType, SupportedTypes]]: diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py index bec4db42c81..c2e9fa447bc 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py @@ -115,12 +115,12 @@ class DeltalakeSource(DatabaseServiceSource): From topology. Prepare a database request and pass it to the sink """ - yield Either( - right=CreateDatabaseRequest( - name=database_name, - service=self.context.get().database_service, - ) + database_request = CreateDatabaseRequest( + name=database_name, + service=self.context.get().database_service, ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: """ @@ -151,20 +151,21 @@ class DeltalakeSource(DatabaseServiceSource): From topology. Prepare a database schema request and pass it to the sink """ - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) + def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: """ Handle table and views. diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index 0ddd8b37ab6..a6ebf7e2e48 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -109,12 +109,13 @@ class DomodatabaseSource(DatabaseServiceSource): def yield_database( self, database_name: str ) -> Iterable[Either[CreateDatabaseRequest]]: - yield Either( - right=CreateDatabaseRequest( - name=EntityName(database_name), - service=self.context.get().database_service, - ) + + database_request = CreateDatabaseRequest( + name=EntityName(database_name), + service=self.context.get().database_service, ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: scheme_name = "default" @@ -123,19 +124,19 @@ class DomodatabaseSource(DatabaseServiceSource): def yield_database_schema( self, schema_name: str ) -> Iterable[Either[CreateDatabaseSchemaRequest]]: - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: schema_name = self.context.get().database_schema diff --git a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py index baf4178dd21..390dc2a2883 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py @@ -174,12 +174,12 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource): From topology. Prepare a database request and pass it to the sink """ - yield Either( - right=CreateDatabaseRequest( - name=database_name, - service=self.context.get().database_service, - ) + database_request = CreateDatabaseRequest( + name=database_name, + service=self.context.get().database_service, ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: """ @@ -224,24 +224,24 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource): From topology. Prepare a database schema request and pass it to the sink """ - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - description=self.schema_description_map.get(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - sourceUrl=self.get_source_url( + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + description=self.schema_description_map.get(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, database_name=self.context.get().database, - schema_name=schema_name, - ), - ) + ) + ), + sourceUrl=self.get_source_url( + database_name=self.context.get().database, + schema_name=schema_name, + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py b/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py index c087c704ded..166ddd40c2f 100644 --- a/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py @@ -108,12 +108,12 @@ class IcebergSource(DatabaseServiceSource): Also, update the self.inspector value to the current db. """ - yield Either( - right=CreateDatabaseRequest( - name=database_name, - service=self.context.get().database_service, - ) + database_request = CreateDatabaseRequest( + name=database_name, + service=self.context.get().database_service, ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: """ @@ -155,19 +155,19 @@ class IcebergSource(DatabaseServiceSource): From topology. Prepare a database request and pass it to the sink. """ - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py index 6350f40cbd8..3730e806a6b 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py @@ -119,12 +119,12 @@ class SalesforceSource(DatabaseServiceSource): From topology. Prepare a database request and pass it to the sink """ - yield Either( - right=CreateDatabaseRequest( - name=database_name, - service=self.context.get().database_service, - ) + database_request = CreateDatabaseRequest( + name=database_name, + service=self.context.get().database_service, ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: """ @@ -139,19 +139,19 @@ class SalesforceSource(DatabaseServiceSource): From topology. Prepare a database schema request and pass it to the sink """ - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index cc018705fc1..3cf7344d873 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -845,12 +845,12 @@ class SasSource( def yield_database( self, database_name: str ) -> Iterable[Either[CreateDatabaseRequest]]: - yield Either( - right=CreateDatabaseRequest( - name=EntityName(database_name), - service=self.context.get().database_service, - ) + database_request = CreateDatabaseRequest( + name=EntityName(database_name), + service=self.context.get().database_service, ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[Tuple[str, str]]: for database, database_schemas in self.database_schemas.items(): @@ -860,18 +860,20 @@ class SasSource( def yield_database_schema( self, schema_name: Tuple[str, str] ) -> Iterable[Either[CreateDatabaseSchemaRequest]]: - yield Either( - right=CreateDatabaseSchemaRequest( - name=schema_name[1], - database=fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=schema_name[0], - ), - ) + + schema_request = CreateDatabaseSchemaRequest( + name=schema_name[1], + database=fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=schema_name[0], + ), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) + def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index c5f705f0132..3166fb42ad0 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -193,15 +193,15 @@ class UnitycatalogSource( Prepare a database request and pass it to the sink """ catalog = self.client.catalogs.get(database_name) - yield Either( - right=CreateDatabaseRequest( - name=database_name, - service=self.context.get().database_service, - owners=self.get_owner_ref(catalog.owner), - description=catalog.comment, - tags=self.get_database_tag_labels(database_name), - ) + database_request = CreateDatabaseRequest( + name=database_name, + service=self.context.get().database_service, + owners=self.get_owner_ref(catalog.owner), + description=catalog.comment, + tags=self.get_database_tag_labels(database_name), ) + yield Either(right=database_request) + self.register_record_database_request(database_request=database_request) def get_database_schema_names(self) -> Iterable[str]: """ @@ -247,22 +247,22 @@ class UnitycatalogSource( schema = self.client.schemas.get( full_name=f"{self.context.get().database}.{schema_name}" ) - yield Either( - right=CreateDatabaseSchemaRequest( - name=EntityName(schema_name), - database=FullyQualifiedEntityName( - fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - ), - description=schema.comment, - owners=self.get_owner_ref(schema.owner), - tags=self.get_schema_tag_labels(schema_name), - ) + schema_request = CreateDatabaseSchemaRequest( + name=EntityName(schema_name), + database=FullyQualifiedEntityName( + fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + ), + description=schema.comment, + owners=self.get_owner_ref(schema.owner), + tags=self.get_schema_tag_labels(schema_name), ) + yield Either(right=schema_request) + self.register_record_schema_request(schema_request=schema_request) def get_tables_name_and_type(self) -> Iterable[Tuple[str, str]]: """ diff --git a/ingestion/tests/unit/topology/database/test_postgres.py b/ingestion/tests/unit/topology/database/test_postgres.py index 5e3594e6d04..65f9f3fa9ad 100644 --- a/ingestion/tests/unit/topology/database/test_postgres.py +++ b/ingestion/tests/unit/topology/database/test_postgres.py @@ -15,7 +15,7 @@ Test Postgres using the topology import types from unittest import TestCase -from unittest.mock import patch +from unittest.mock import MagicMock, patch from sqlalchemy.types import VARCHAR @@ -344,3 +344,452 @@ class PostgresUnitTest(TestCase): def test_close_connection(self, engine, connection): connection.return_value = True self.postgres_source.close() + + def test_mark_deleted_schemas_enabled(self): + """Test mark deleted schemas when the config is enabled""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database information + self.postgres_source.context.get().__dict__["database"] = "test_db" + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the schema entity source state + self.postgres_source.schema_entity_source_state = {"test_schema_fqn"} + + # Mock the _get_filtered_schema_names method + with patch.object( + self.postgres_source, "_get_filtered_schema_names" + ) as mock_filtered_schemas: + mock_filtered_schemas.return_value = [ + "test_schema_fqn", + "another_schema_fqn", + ] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_schemas_as_deleted()) + + # Verify that delete_entity_from_source was called with correct parameters + mock_delete.assert_called_once() + call_args = mock_delete.call_args + self.assertEqual(call_args[1]["entity_type"], DatabaseSchema) + self.assertEqual(call_args[1]["mark_deleted_entity"], True) + self.assertEqual( + call_args[1]["params"], {"database": "test_service.test_db"} + ) + + # Verify the entity_source_state contains both processed and filtered schemas + expected_source_state = { + "test_schema_fqn", + "test_schema_fqn", + "another_schema_fqn", + } + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_schemas_disabled(self): + """Test mark deleted schemas when the config is disabled""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = False + self.postgres_source.source_config = mock_source_config + + # Call the method + result = list(self.postgres_source.mark_schemas_as_deleted()) + + # Verify that no deletion operations are performed + self.assertEqual(result, []) + + def test_mark_deleted_schemas_no_database_context(self): + """Test mark deleted schemas when no database is in context""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = True + self.postgres_source.source_config = mock_source_config + + # Remove database from context + self.postgres_source.context.get().__dict__.pop("database", None) + + # Call the method and expect ValueError + with self.assertRaises(ValueError) as context: + list(self.postgres_source.mark_schemas_as_deleted()) + + self.assertIn("No Database found in the context", str(context.exception)) + + def test_mark_deleted_databases_enabled(self): + """Test mark deleted databases when the config is enabled""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedDatabases = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database service information + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the database entity source state + self.postgres_source.database_entity_source_state = {"test_db_fqn"} + + # Mock the _get_filtered_database_names method + with patch.object( + self.postgres_source, "_get_filtered_database_names" + ) as mock_filtered_dbs: + mock_filtered_dbs.return_value = ["test_db", "another_db"] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_databases_as_deleted()) + + # Verify that delete_entity_from_source was called with correct parameters + mock_delete.assert_called_once() + call_args = mock_delete.call_args + self.assertEqual(call_args[1]["entity_type"], Database) + self.assertEqual(call_args[1]["mark_deleted_entity"], True) + self.assertEqual(call_args[1]["params"], {"service": "test_service"}) + + # Verify the entity_source_state contains both processed and filtered databases + expected_source_state = { + "test_db_fqn", + "test_service.test_db", + "test_service.another_db", + } + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_databases_disabled(self): + """Test mark deleted databases when the config is disabled""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedDatabases = False + self.postgres_source.source_config = mock_source_config + + # Call the method + result = list(self.postgres_source.mark_databases_as_deleted()) + + # Verify that no deletion operations are performed + self.assertEqual(result, []) + + def test_mark_deleted_schemas_with_schema_filter_pattern(self): + """Test mark deleted schemas with schema filter pattern applied""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database information + self.postgres_source.context.get().__dict__["database"] = "test_db" + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the schema entity source state with some existing schemas + self.postgres_source.schema_entity_source_state = { + "test_service.test_db.schema1", + "test_service.test_db.schema2", + } + + # Mock the _get_filtered_schema_names method to return filtered schemas + with patch.object( + self.postgres_source, "_get_filtered_schema_names" + ) as mock_filtered_schemas: + mock_filtered_schemas.return_value = ["test_service.test_db.schema1"] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_schemas_as_deleted()) + + # Verify that delete_entity_from_source was called + mock_delete.assert_called_once() + call_args = mock_delete.call_args + + # Verify the entity_source_state contains both processed and filtered schemas + expected_source_state = { + "test_service.test_db.schema1", + "test_service.test_db.schema2", + "test_service.test_db.schema1", + } + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_databases_with_database_filter_pattern(self): + """Test mark deleted databases with database filter pattern applied""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedDatabases = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database service information + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the database entity source state with some existing databases + self.postgres_source.database_entity_source_state = { + "test_service.db1", + "test_service.db2", + } + + # Mock the _get_filtered_database_names method to return filtered databases + with patch.object( + self.postgres_source, "_get_filtered_database_names" + ) as mock_filtered_dbs: + mock_filtered_dbs.return_value = ["db1"] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_databases_as_deleted()) + + # Verify that delete_entity_from_source was called + mock_delete.assert_called_once() + call_args = mock_delete.call_args + + # Verify the entity_source_state contains both processed and filtered databases + expected_source_state = { + "test_service.db1", + "test_service.db2", + "test_service.db1", + } + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_schemas_empty_source_state(self): + """Test mark deleted schemas with empty source state""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database information + self.postgres_source.context.get().__dict__["database"] = "test_db" + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock empty schema entity source state + self.postgres_source.schema_entity_source_state = set() + + # Mock the _get_filtered_schema_names method + with patch.object( + self.postgres_source, "_get_filtered_schema_names" + ) as mock_filtered_schemas: + mock_filtered_schemas.return_value = ["test_service.test_db.schema1"] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_schemas_as_deleted()) + + # Verify that delete_entity_from_source was called with only filtered schemas + mock_delete.assert_called_once() + call_args = mock_delete.call_args + expected_source_state = {"test_service.test_db.schema1"} + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_databases_empty_source_state(self): + """Test mark deleted databases with empty source state""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedDatabases = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database service information + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock empty database entity source state + self.postgres_source.database_entity_source_state = set() + + # Mock the _get_filtered_database_names method + with patch.object( + self.postgres_source, "_get_filtered_database_names" + ) as mock_filtered_dbs: + mock_filtered_dbs.return_value = ["db1"] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_databases_as_deleted()) + + # Verify that delete_entity_from_source was called with only filtered databases + mock_delete.assert_called_once() + call_args = mock_delete.call_args + expected_source_state = {"test_service.db1"} + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_schemas_exception_handling(self): + """Test mark deleted schemas exception handling""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database information + self.postgres_source.context.get().__dict__["database"] = "test_db" + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the schema entity source state + self.postgres_source.schema_entity_source_state = {"test_schema_fqn"} + + # Mock the _get_filtered_schema_names method to raise an exception + with patch.object( + self.postgres_source, "_get_filtered_schema_names" + ) as mock_filtered_schemas: + mock_filtered_schemas.side_effect = Exception("Test exception") + + # Call the method and expect it to handle the exception gracefully + with self.assertRaises(Exception): + list(self.postgres_source.mark_schemas_as_deleted()) + + def test_mark_deleted_databases_exception_handling(self): + """Test mark deleted databases exception handling""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedDatabases = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database service information + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the database entity source state + self.postgres_source.database_entity_source_state = {"test_db_fqn"} + + # Mock the _get_filtered_database_names method to raise an exception + with patch.object( + self.postgres_source, "_get_filtered_database_names" + ) as mock_filtered_dbs: + mock_filtered_dbs.side_effect = Exception("Test exception") + + # Call the method and expect it to handle the exception gracefully + with self.assertRaises(Exception): + list(self.postgres_source.mark_databases_as_deleted()) + + def test_mark_deleted_schemas_with_multiple_schemas(self): + """Test mark deleted schemas with multiple schemas in source state""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedSchemas = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database information + self.postgres_source.context.get().__dict__["database"] = "test_db" + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the schema entity source state with multiple schemas + self.postgres_source.schema_entity_source_state = { + "test_service.test_db.schema1", + "test_service.test_db.schema2", + "test_service.test_db.schema3", + } + + # Mock the _get_filtered_schema_names method + with patch.object( + self.postgres_source, "_get_filtered_schema_names" + ) as mock_filtered_schemas: + mock_filtered_schemas.return_value = [ + "test_service.test_db.schema1", + "test_service.test_db.schema2", + ] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_schemas_as_deleted()) + + # Verify that delete_entity_from_source was called + mock_delete.assert_called_once() + call_args = mock_delete.call_args + + # Verify the entity_source_state contains all schemas + expected_source_state = { + "test_service.test_db.schema1", + "test_service.test_db.schema2", + "test_service.test_db.schema3", + "test_service.test_db.schema1", + "test_service.test_db.schema2", + } + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) + + def test_mark_deleted_databases_with_multiple_databases(self): + """Test mark deleted databases with multiple databases in source state""" + # Create a mock source config with the required attributes + mock_source_config = MagicMock() + mock_source_config.markDeletedDatabases = True + self.postgres_source.source_config = mock_source_config + + # Mock the context to have database service information + self.postgres_source.context.get().__dict__["database_service"] = "test_service" + + # Mock the database entity source state with multiple databases + self.postgres_source.database_entity_source_state = { + "test_service.db1", + "test_service.db2", + "test_service.db3", + } + + # Mock the _get_filtered_database_names method + with patch.object( + self.postgres_source, "_get_filtered_database_names" + ) as mock_filtered_dbs: + mock_filtered_dbs.return_value = ["db1", "db2"] + + # Mock the delete_entity_from_source function + with patch( + "metadata.ingestion.source.database.database_service.delete_entity_from_source" + ) as mock_delete: + mock_delete.return_value = iter([]) + + # Call the method + result = list(self.postgres_source.mark_databases_as_deleted()) + + # Verify that delete_entity_from_source was called + mock_delete.assert_called_once() + call_args = mock_delete.call_args + + # Verify the entity_source_state contains all databases + expected_source_state = { + "test_service.db1", + "test_service.db2", + "test_service.db3", + "test_service.db1", + "test_service.db2", + } + self.assertEqual( + call_args[1]["entity_source_state"], expected_source_state + ) diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index 1230e20d89c..34cb6ecf835 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -60,6 +60,18 @@ "default": true, "title": "Mark Deleted Stored Procedures" }, + "markDeletedSchemas": { + "description": "Optional configuration to soft delete schemas in OpenMetadata if the source schemas are deleted. Also, if the schema is deleted, all the associated entities like tables, views, stored procedures, lineage, etc., with that schema will be deleted", + "type": "boolean", + "default": true, + "title": "Mark Deleted Schemas" + }, + "markDeletedDatabases": { + "description": "Optional configuration to soft delete databases in OpenMetadata if the source databases are deleted. Also, if the database is deleted, all the associated entities like schemas, tables, views, stored procedures, lineage, etc., with that database will be deleted", + "type": "boolean", + "default": true, + "title": "Mark Deleted Databases" + }, "includeTables": { "description": "Optional configuration to turn off fetching metadata for tables.", "type": "boolean", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 4347a6c9e69..b4bb20faeba 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -307,6 +307,18 @@ export interface Pipeline { * getting the changes from Audit tables on the supporting databases. */ incremental?: IncrementalMetadataExtractionConfiguration; + /** + * Optional configuration to soft delete databases in OpenMetadata if the source databases + * are deleted. Also, if the database is deleted, all the associated entities like schemas, + * tables, views, stored procedures, lineage, etc., with that database will be deleted + */ + markDeletedDatabases?: boolean; + /** + * Optional configuration to soft delete schemas in OpenMetadata if the source schemas are + * deleted. Also, if the schema is deleted, all the associated entities like tables, views, + * stored procedures, lineage, etc., with that schema will be deleted + */ + markDeletedSchemas?: boolean; /** * Optional configuration to soft delete stored procedures in OpenMetadata if the source * stored procedures are deleted. Also, if the stored procedures is deleted, all the diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 3f55d0c22ba..3217999ca02 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -868,6 +868,18 @@ export interface Pipeline { * getting the changes from Audit tables on the supporting databases. */ incremental?: IncrementalMetadataExtractionConfiguration; + /** + * Optional configuration to soft delete databases in OpenMetadata if the source databases + * are deleted. Also, if the database is deleted, all the associated entities like schemas, + * tables, views, stored procedures, lineage, etc., with that database will be deleted + */ + markDeletedDatabases?: boolean; + /** + * Optional configuration to soft delete schemas in OpenMetadata if the source schemas are + * deleted. Also, if the schema is deleted, all the associated entities like tables, views, + * stored procedures, lineage, etc., with that schema will be deleted + */ + markDeletedSchemas?: boolean; /** * Optional configuration to soft delete stored procedures in OpenMetadata if the source * stored procedures are deleted. Also, if the stored procedures is deleted, all the diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts index cd81c271567..2b9c4feeeeb 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts @@ -50,6 +50,18 @@ export interface DatabaseServiceMetadataPipeline { * getting the changes from Audit tables on the supporting databases. */ incremental?: IncrementalMetadataExtractionConfiguration; + /** + * Optional configuration to soft delete databases in OpenMetadata if the source databases + * are deleted. Also, if the database is deleted, all the associated entities like schemas, + * tables, views, stored procedures, lineage, etc., with that database will be deleted + */ + markDeletedDatabases?: boolean; + /** + * Optional configuration to soft delete schemas in OpenMetadata if the source schemas are + * deleted. Also, if the schema is deleted, all the associated entities like tables, views, + * stored procedures, lineage, etc., with that schema will be deleted + */ + markDeletedSchemas?: boolean; /** * Optional configuration to soft delete stored procedures in OpenMetadata if the source * stored procedures are deleted. Also, if the stored procedures is deleted, all the diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 5896eabedbb..4f2886d5dcd 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -4050,6 +4050,18 @@ export interface Pipeline { * getting the changes from Audit tables on the supporting databases. */ incremental?: IncrementalMetadataExtractionConfiguration; + /** + * Optional configuration to soft delete databases in OpenMetadata if the source databases + * are deleted. Also, if the database is deleted, all the associated entities like schemas, + * tables, views, stored procedures, lineage, etc., with that database will be deleted + */ + markDeletedDatabases?: boolean; + /** + * Optional configuration to soft delete schemas in OpenMetadata if the source schemas are + * deleted. Also, if the schema is deleted, all the associated entities like tables, views, + * stored procedures, lineage, etc., with that schema will be deleted + */ + markDeletedSchemas?: boolean; /** * Optional configuration to soft delete stored procedures in OpenMetadata if the source * stored procedures are deleted. Also, if the stored procedures is deleted, all the