Add: Schema and Database Mark Deletion (#22088)

* Added Schema and Database Mark Deletion

* removed unnecessary changes

* fixed marked deleted databases

* Added to all db connectors

* Added generated types

* Added tests
This commit is contained in:
Suman Maharana 2025-07-15 19:56:46 +05:30 committed by GitHub
parent 174e6ed980
commit 9838278ac4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 859 additions and 190 deletions

View File

@ -217,16 +217,17 @@ class CommonDbSourceService(
else None
)
yield Either(
right=CreateDatabaseRequest(
name=EntityName(database_name),
service=FullyQualifiedEntityName(self.context.get().database_service),
description=description,
sourceUrl=source_url,
tags=self.get_database_tag_labels(database_name=database_name),
)
database_request = CreateDatabaseRequest(
name=EntityName(database_name),
service=FullyQualifiedEntityName(self.context.get().database_service),
description=description,
sourceUrl=source_url,
tags=self.get_database_tag_labels(database_name=database_name),
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
@ -264,23 +265,24 @@ class CommonDbSourceService(
else None
)
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
description=description,
sourceUrl=source_url,
tags=self.get_schema_tag_labels(schema_name=schema_name),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
description=description,
sourceUrl=source_url,
tags=self.get_schema_tag_labels(schema_name=schema_name),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
@staticmethod
@calculate_execution_time()
def get_table_description(

View File

@ -122,14 +122,17 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
Prepare a database request and pass it to the sink
"""
yield Either(
right=CreateDatabaseRequest(
name=EntityName(database_name),
service=self.context.get().database_service,
sourceUrl=self.get_source_url(database_name=database_name),
)
database_request = CreateDatabaseRequest(
name=EntityName(database_name),
service=self.context.get().database_service,
sourceUrl=self.get_source_url(database_name=database_name),
)
yield Either(
right=database_request,
)
self.register_record_database_request(database_request=database_request)
@abstractmethod
def get_schema_name_list(self) -> List[str]:
"""
@ -164,24 +167,25 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
Prepare a database schema request and pass it to the sink
"""
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
sourceUrl=self.get_source_url(
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
),
)
)
),
sourceUrl=self.get_source_url(
database_name=self.context.get().database,
schema_name=schema_name,
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
@abstractmethod
def query_table_names_and_types(
self, schema_name: str

View File

@ -135,6 +135,7 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["databaseSchema"],
post_process=["mark_databases_as_deleted"],
)
databaseSchema: Annotated[
TopologyNode, Field(description="Database Schema Node")
@ -158,7 +159,11 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["table", "stored_procedure"],
post_process=["mark_tables_as_deleted", "mark_stored_procedures_as_deleted"],
post_process=[
"mark_schemas_as_deleted",
"mark_tables_as_deleted",
"mark_stored_procedures_as_deleted",
],
threads=True,
)
table: Annotated[
@ -217,6 +222,8 @@ class DatabaseServiceSource(
config: WorkflowSource
database_source_state: Set = set()
stored_procedure_source_state: Set = set()
database_entity_source_state: Set = set()
schema_entity_source_state: Set = set()
# Big union of types we want to fetch dynamically
service_connection: DatabaseConnection.model_fields["config"].annotation
@ -494,6 +501,64 @@ class DatabaseServiceSource(
self.stored_procedure_source_state.add(table_fqn)
def register_record_database_request(
self, database_request: CreateDatabaseRequest
) -> None:
"""
Mark the database record as scanned and update the database_entity_source_state
"""
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=database_request.name.root,
)
self.database_entity_source_state.add(database_fqn)
def register_record_schema_request(
self, schema_request: CreateDatabaseSchemaRequest
) -> None:
"""
Mark the schema record as scanned and update the schema_entity_source_state
"""
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_request.name.root,
)
self.schema_entity_source_state.add(schema_fqn)
def _get_filtered_database_names(
self, return_fqn: bool = False, add_to_status: bool = True
) -> Iterable[str]:
"""
Get filtered database names based on the database filter pattern
"""
database_names_iterable = getattr(
self, "get_database_names_raw", self.get_database_names
)()
for database_name in database_names_iterable:
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=database_name,
)
if filter_by_schema(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else database_name,
):
if add_to_status:
self.status.filter(database_fqn, "Database Filtered Out")
continue
yield database_fqn if return_fqn else database_name
def _get_filtered_schema_names(
self, return_fqn: bool = False, add_to_status: bool = True
) -> Iterable[str]:
@ -585,6 +650,89 @@ class DatabaseServiceSource(
params={"databaseSchema": schema_fqn},
)
def mark_databases_as_deleted(self):
"""
Use the current inspector to mark databases as deleted
"""
if self.source_config.markDeletedDatabases:
logger.info(
f"Mark Deleted Databases set to True. Processing service [{self.context.get().database_service}]"
)
# We need to include ALL databases from the source in the source state
# This includes both processed databases and all databases (filtered-in and filtered-out)
# to ensure we mark as deleted any databases that were previously ingested but are now
# filtered out, as well as any databases that were processed in this run
all_database_fqns = set()
# Get all databases from the source (both filtered-in and filtered-out)
for database_name in self._get_filtered_database_names():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=database_name,
)
all_database_fqns.add(database_fqn)
# Combine the processed databases with all databases from source
complete_db_source_state = self.database_entity_source_state.union(
all_database_fqns
)
yield from delete_entity_from_source(
metadata=self.metadata,
entity_type=Database,
entity_source_state=complete_db_source_state,
mark_deleted_entity=self.source_config.markDeletedDatabases,
params={"service": self.context.get().database_service},
)
def mark_schemas_as_deleted(self):
"""
Use the current inspector to mark schemas as deleted
"""
if not self.context.get().__dict__.get("database"):
raise ValueError(
"No Database found in the context. We cannot run the schema deletion."
)
if self.source_config.markDeletedSchemas:
logger.info(
f"Mark Deleted Schemas set to True. Processing database [{self.context.get().database}]"
)
# Build the database FQN to use as parameter
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
# Get all filtered-in schema FQNs to create a complete source state
# We need to include both processed schemas and filtered schemas in the source state
# to ensure we mark as deleted any schemas that were previously ingested but are now
# filtered out, as well as any schemas that were processed in this run
filtered_schema_fqns = set()
for schema_name in self._get_filtered_schema_names(
return_fqn=True, add_to_status=False
):
filtered_schema_fqns.add(schema_name)
# Combine the processed schemas with filtered schemas
complete_source_state = self.schema_entity_source_state.union(
filtered_schema_fqns
)
yield from delete_entity_from_source(
metadata=self.metadata,
entity_type=DatabaseSchema,
entity_source_state=complete_source_state,
mark_deleted_entity=self.source_config.markDeletedSchemas,
params={"database": database_fqn},
)
def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
"""
Get the life cycle data of the table

View File

@ -157,12 +157,13 @@ class DatalakeSource(DatabaseServiceSource):
"""
if isinstance(self.config_source, GCSConfig):
database_name = self.client.project
yield Either(
right=CreateDatabaseRequest(
name=EntityName(database_name),
service=FullyQualifiedEntityName(self.context.get().database_service),
)
database_request = CreateDatabaseRequest(
name=EntityName(database_name),
service=FullyQualifiedEntityName(self.context.get().database_service),
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
"""
@ -208,20 +209,21 @@ class DatalakeSource(DatabaseServiceSource):
From topology.
Prepare a database schema request and pass it to the sink
"""
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type( # pylint: disable=too-many-branches
self,
) -> Iterable[Tuple[str, TableType, SupportedTypes]]:

View File

@ -115,12 +115,12 @@ class DeltalakeSource(DatabaseServiceSource):
From topology.
Prepare a database request and pass it to the sink
"""
yield Either(
right=CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
database_request = CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
"""
@ -151,20 +151,21 @@ class DeltalakeSource(DatabaseServiceSource):
From topology.
Prepare a database schema request and pass it to the sink
"""
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"""
Handle table and views.

View File

@ -109,12 +109,13 @@ class DomodatabaseSource(DatabaseServiceSource):
def yield_database(
self, database_name: str
) -> Iterable[Either[CreateDatabaseRequest]]:
yield Either(
right=CreateDatabaseRequest(
name=EntityName(database_name),
service=self.context.get().database_service,
)
database_request = CreateDatabaseRequest(
name=EntityName(database_name),
service=self.context.get().database_service,
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
scheme_name = "default"
@ -123,19 +124,19 @@ class DomodatabaseSource(DatabaseServiceSource):
def yield_database_schema(
self, schema_name: str
) -> Iterable[Either[CreateDatabaseSchemaRequest]]:
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
schema_name = self.context.get().database_schema

View File

@ -174,12 +174,12 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource):
From topology.
Prepare a database request and pass it to the sink
"""
yield Either(
right=CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
database_request = CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
"""
@ -224,24 +224,24 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource):
From topology.
Prepare a database schema request and pass it to the sink
"""
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
description=self.schema_description_map.get(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
sourceUrl=self.get_source_url(
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
description=self.schema_description_map.get(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
),
)
)
),
sourceUrl=self.get_source_url(
database_name=self.context.get().database,
schema_name=schema_name,
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"""

View File

@ -108,12 +108,12 @@ class IcebergSource(DatabaseServiceSource):
Also, update the self.inspector value to the current db.
"""
yield Either(
right=CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
database_request = CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
"""
@ -155,19 +155,19 @@ class IcebergSource(DatabaseServiceSource):
From topology.
Prepare a database request and pass it to the sink.
"""
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"""

View File

@ -119,12 +119,12 @@ class SalesforceSource(DatabaseServiceSource):
From topology.
Prepare a database request and pass it to the sink
"""
yield Either(
right=CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
database_request = CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
"""
@ -139,19 +139,19 @@ class SalesforceSource(DatabaseServiceSource):
From topology.
Prepare a database schema request and pass it to the sink
"""
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"""

View File

@ -845,12 +845,12 @@ class SasSource(
def yield_database(
self, database_name: str
) -> Iterable[Either[CreateDatabaseRequest]]:
yield Either(
right=CreateDatabaseRequest(
name=EntityName(database_name),
service=self.context.get().database_service,
)
database_request = CreateDatabaseRequest(
name=EntityName(database_name),
service=self.context.get().database_service,
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[Tuple[str, str]]:
for database, database_schemas in self.database_schemas.items():
@ -860,18 +860,20 @@ class SasSource(
def yield_database_schema(
self, schema_name: Tuple[str, str]
) -> Iterable[Either[CreateDatabaseSchemaRequest]]:
yield Either(
right=CreateDatabaseSchemaRequest(
name=schema_name[1],
database=fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=schema_name[0],
),
)
schema_request = CreateDatabaseSchemaRequest(
name=schema_name[1],
database=fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=schema_name[0],
),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:

View File

@ -193,15 +193,15 @@ class UnitycatalogSource(
Prepare a database request and pass it to the sink
"""
catalog = self.client.catalogs.get(database_name)
yield Either(
right=CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
owners=self.get_owner_ref(catalog.owner),
description=catalog.comment,
tags=self.get_database_tag_labels(database_name),
)
database_request = CreateDatabaseRequest(
name=database_name,
service=self.context.get().database_service,
owners=self.get_owner_ref(catalog.owner),
description=catalog.comment,
tags=self.get_database_tag_labels(database_name),
)
yield Either(right=database_request)
self.register_record_database_request(database_request=database_request)
def get_database_schema_names(self) -> Iterable[str]:
"""
@ -247,22 +247,22 @@ class UnitycatalogSource(
schema = self.client.schemas.get(
full_name=f"{self.context.get().database}.{schema_name}"
)
yield Either(
right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
description=schema.comment,
owners=self.get_owner_ref(schema.owner),
tags=self.get_schema_tag_labels(schema_name),
)
schema_request = CreateDatabaseSchemaRequest(
name=EntityName(schema_name),
database=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
)
),
description=schema.comment,
owners=self.get_owner_ref(schema.owner),
tags=self.get_schema_tag_labels(schema_name),
)
yield Either(right=schema_request)
self.register_record_schema_request(schema_request=schema_request)
def get_tables_name_and_type(self) -> Iterable[Tuple[str, str]]:
"""

View File

@ -15,7 +15,7 @@ Test Postgres using the topology
import types
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from sqlalchemy.types import VARCHAR
@ -344,3 +344,452 @@ class PostgresUnitTest(TestCase):
def test_close_connection(self, engine, connection):
connection.return_value = True
self.postgres_source.close()
def test_mark_deleted_schemas_enabled(self):
"""Test mark deleted schemas when the config is enabled"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database information
self.postgres_source.context.get().__dict__["database"] = "test_db"
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the schema entity source state
self.postgres_source.schema_entity_source_state = {"test_schema_fqn"}
# Mock the _get_filtered_schema_names method
with patch.object(
self.postgres_source, "_get_filtered_schema_names"
) as mock_filtered_schemas:
mock_filtered_schemas.return_value = [
"test_schema_fqn",
"another_schema_fqn",
]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_schemas_as_deleted())
# Verify that delete_entity_from_source was called with correct parameters
mock_delete.assert_called_once()
call_args = mock_delete.call_args
self.assertEqual(call_args[1]["entity_type"], DatabaseSchema)
self.assertEqual(call_args[1]["mark_deleted_entity"], True)
self.assertEqual(
call_args[1]["params"], {"database": "test_service.test_db"}
)
# Verify the entity_source_state contains both processed and filtered schemas
expected_source_state = {
"test_schema_fqn",
"test_schema_fqn",
"another_schema_fqn",
}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_schemas_disabled(self):
"""Test mark deleted schemas when the config is disabled"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = False
self.postgres_source.source_config = mock_source_config
# Call the method
result = list(self.postgres_source.mark_schemas_as_deleted())
# Verify that no deletion operations are performed
self.assertEqual(result, [])
def test_mark_deleted_schemas_no_database_context(self):
"""Test mark deleted schemas when no database is in context"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = True
self.postgres_source.source_config = mock_source_config
# Remove database from context
self.postgres_source.context.get().__dict__.pop("database", None)
# Call the method and expect ValueError
with self.assertRaises(ValueError) as context:
list(self.postgres_source.mark_schemas_as_deleted())
self.assertIn("No Database found in the context", str(context.exception))
def test_mark_deleted_databases_enabled(self):
"""Test mark deleted databases when the config is enabled"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedDatabases = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database service information
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the database entity source state
self.postgres_source.database_entity_source_state = {"test_db_fqn"}
# Mock the _get_filtered_database_names method
with patch.object(
self.postgres_source, "_get_filtered_database_names"
) as mock_filtered_dbs:
mock_filtered_dbs.return_value = ["test_db", "another_db"]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_databases_as_deleted())
# Verify that delete_entity_from_source was called with correct parameters
mock_delete.assert_called_once()
call_args = mock_delete.call_args
self.assertEqual(call_args[1]["entity_type"], Database)
self.assertEqual(call_args[1]["mark_deleted_entity"], True)
self.assertEqual(call_args[1]["params"], {"service": "test_service"})
# Verify the entity_source_state contains both processed and filtered databases
expected_source_state = {
"test_db_fqn",
"test_service.test_db",
"test_service.another_db",
}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_databases_disabled(self):
"""Test mark deleted databases when the config is disabled"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedDatabases = False
self.postgres_source.source_config = mock_source_config
# Call the method
result = list(self.postgres_source.mark_databases_as_deleted())
# Verify that no deletion operations are performed
self.assertEqual(result, [])
def test_mark_deleted_schemas_with_schema_filter_pattern(self):
"""Test mark deleted schemas with schema filter pattern applied"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database information
self.postgres_source.context.get().__dict__["database"] = "test_db"
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the schema entity source state with some existing schemas
self.postgres_source.schema_entity_source_state = {
"test_service.test_db.schema1",
"test_service.test_db.schema2",
}
# Mock the _get_filtered_schema_names method to return filtered schemas
with patch.object(
self.postgres_source, "_get_filtered_schema_names"
) as mock_filtered_schemas:
mock_filtered_schemas.return_value = ["test_service.test_db.schema1"]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_schemas_as_deleted())
# Verify that delete_entity_from_source was called
mock_delete.assert_called_once()
call_args = mock_delete.call_args
# Verify the entity_source_state contains both processed and filtered schemas
expected_source_state = {
"test_service.test_db.schema1",
"test_service.test_db.schema2",
"test_service.test_db.schema1",
}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_databases_with_database_filter_pattern(self):
"""Test mark deleted databases with database filter pattern applied"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedDatabases = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database service information
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the database entity source state with some existing databases
self.postgres_source.database_entity_source_state = {
"test_service.db1",
"test_service.db2",
}
# Mock the _get_filtered_database_names method to return filtered databases
with patch.object(
self.postgres_source, "_get_filtered_database_names"
) as mock_filtered_dbs:
mock_filtered_dbs.return_value = ["db1"]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_databases_as_deleted())
# Verify that delete_entity_from_source was called
mock_delete.assert_called_once()
call_args = mock_delete.call_args
# Verify the entity_source_state contains both processed and filtered databases
expected_source_state = {
"test_service.db1",
"test_service.db2",
"test_service.db1",
}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_schemas_empty_source_state(self):
"""Test mark deleted schemas with empty source state"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database information
self.postgres_source.context.get().__dict__["database"] = "test_db"
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock empty schema entity source state
self.postgres_source.schema_entity_source_state = set()
# Mock the _get_filtered_schema_names method
with patch.object(
self.postgres_source, "_get_filtered_schema_names"
) as mock_filtered_schemas:
mock_filtered_schemas.return_value = ["test_service.test_db.schema1"]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_schemas_as_deleted())
# Verify that delete_entity_from_source was called with only filtered schemas
mock_delete.assert_called_once()
call_args = mock_delete.call_args
expected_source_state = {"test_service.test_db.schema1"}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_databases_empty_source_state(self):
"""Test mark deleted databases with empty source state"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedDatabases = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database service information
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock empty database entity source state
self.postgres_source.database_entity_source_state = set()
# Mock the _get_filtered_database_names method
with patch.object(
self.postgres_source, "_get_filtered_database_names"
) as mock_filtered_dbs:
mock_filtered_dbs.return_value = ["db1"]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_databases_as_deleted())
# Verify that delete_entity_from_source was called with only filtered databases
mock_delete.assert_called_once()
call_args = mock_delete.call_args
expected_source_state = {"test_service.db1"}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_schemas_exception_handling(self):
"""Test mark deleted schemas exception handling"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database information
self.postgres_source.context.get().__dict__["database"] = "test_db"
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the schema entity source state
self.postgres_source.schema_entity_source_state = {"test_schema_fqn"}
# Mock the _get_filtered_schema_names method to raise an exception
with patch.object(
self.postgres_source, "_get_filtered_schema_names"
) as mock_filtered_schemas:
mock_filtered_schemas.side_effect = Exception("Test exception")
# Call the method and expect it to handle the exception gracefully
with self.assertRaises(Exception):
list(self.postgres_source.mark_schemas_as_deleted())
def test_mark_deleted_databases_exception_handling(self):
"""Test mark deleted databases exception handling"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedDatabases = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database service information
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the database entity source state
self.postgres_source.database_entity_source_state = {"test_db_fqn"}
# Mock the _get_filtered_database_names method to raise an exception
with patch.object(
self.postgres_source, "_get_filtered_database_names"
) as mock_filtered_dbs:
mock_filtered_dbs.side_effect = Exception("Test exception")
# Call the method and expect it to handle the exception gracefully
with self.assertRaises(Exception):
list(self.postgres_source.mark_databases_as_deleted())
def test_mark_deleted_schemas_with_multiple_schemas(self):
"""Test mark deleted schemas with multiple schemas in source state"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedSchemas = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database information
self.postgres_source.context.get().__dict__["database"] = "test_db"
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the schema entity source state with multiple schemas
self.postgres_source.schema_entity_source_state = {
"test_service.test_db.schema1",
"test_service.test_db.schema2",
"test_service.test_db.schema3",
}
# Mock the _get_filtered_schema_names method
with patch.object(
self.postgres_source, "_get_filtered_schema_names"
) as mock_filtered_schemas:
mock_filtered_schemas.return_value = [
"test_service.test_db.schema1",
"test_service.test_db.schema2",
]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_schemas_as_deleted())
# Verify that delete_entity_from_source was called
mock_delete.assert_called_once()
call_args = mock_delete.call_args
# Verify the entity_source_state contains all schemas
expected_source_state = {
"test_service.test_db.schema1",
"test_service.test_db.schema2",
"test_service.test_db.schema3",
"test_service.test_db.schema1",
"test_service.test_db.schema2",
}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)
def test_mark_deleted_databases_with_multiple_databases(self):
"""Test mark deleted databases with multiple databases in source state"""
# Create a mock source config with the required attributes
mock_source_config = MagicMock()
mock_source_config.markDeletedDatabases = True
self.postgres_source.source_config = mock_source_config
# Mock the context to have database service information
self.postgres_source.context.get().__dict__["database_service"] = "test_service"
# Mock the database entity source state with multiple databases
self.postgres_source.database_entity_source_state = {
"test_service.db1",
"test_service.db2",
"test_service.db3",
}
# Mock the _get_filtered_database_names method
with patch.object(
self.postgres_source, "_get_filtered_database_names"
) as mock_filtered_dbs:
mock_filtered_dbs.return_value = ["db1", "db2"]
# Mock the delete_entity_from_source function
with patch(
"metadata.ingestion.source.database.database_service.delete_entity_from_source"
) as mock_delete:
mock_delete.return_value = iter([])
# Call the method
result = list(self.postgres_source.mark_databases_as_deleted())
# Verify that delete_entity_from_source was called
mock_delete.assert_called_once()
call_args = mock_delete.call_args
# Verify the entity_source_state contains all databases
expected_source_state = {
"test_service.db1",
"test_service.db2",
"test_service.db3",
"test_service.db1",
"test_service.db2",
}
self.assertEqual(
call_args[1]["entity_source_state"], expected_source_state
)

View File

@ -60,6 +60,18 @@
"default": true,
"title": "Mark Deleted Stored Procedures"
},
"markDeletedSchemas": {
"description": "Optional configuration to soft delete schemas in OpenMetadata if the source schemas are deleted. Also, if the schema is deleted, all the associated entities like tables, views, stored procedures, lineage, etc., with that schema will be deleted",
"type": "boolean",
"default": true,
"title": "Mark Deleted Schemas"
},
"markDeletedDatabases": {
"description": "Optional configuration to soft delete databases in OpenMetadata if the source databases are deleted. Also, if the database is deleted, all the associated entities like schemas, tables, views, stored procedures, lineage, etc., with that database will be deleted",
"type": "boolean",
"default": true,
"title": "Mark Deleted Databases"
},
"includeTables": {
"description": "Optional configuration to turn off fetching metadata for tables.",
"type": "boolean",

View File

@ -307,6 +307,18 @@ export interface Pipeline {
* getting the changes from Audit tables on the supporting databases.
*/
incremental?: IncrementalMetadataExtractionConfiguration;
/**
* Optional configuration to soft delete databases in OpenMetadata if the source databases
* are deleted. Also, if the database is deleted, all the associated entities like schemas,
* tables, views, stored procedures, lineage, etc., with that database will be deleted
*/
markDeletedDatabases?: boolean;
/**
* Optional configuration to soft delete schemas in OpenMetadata if the source schemas are
* deleted. Also, if the schema is deleted, all the associated entities like tables, views,
* stored procedures, lineage, etc., with that schema will be deleted
*/
markDeletedSchemas?: boolean;
/**
* Optional configuration to soft delete stored procedures in OpenMetadata if the source
* stored procedures are deleted. Also, if the stored procedures is deleted, all the

View File

@ -868,6 +868,18 @@ export interface Pipeline {
* getting the changes from Audit tables on the supporting databases.
*/
incremental?: IncrementalMetadataExtractionConfiguration;
/**
* Optional configuration to soft delete databases in OpenMetadata if the source databases
* are deleted. Also, if the database is deleted, all the associated entities like schemas,
* tables, views, stored procedures, lineage, etc., with that database will be deleted
*/
markDeletedDatabases?: boolean;
/**
* Optional configuration to soft delete schemas in OpenMetadata if the source schemas are
* deleted. Also, if the schema is deleted, all the associated entities like tables, views,
* stored procedures, lineage, etc., with that schema will be deleted
*/
markDeletedSchemas?: boolean;
/**
* Optional configuration to soft delete stored procedures in OpenMetadata if the source
* stored procedures are deleted. Also, if the stored procedures is deleted, all the

View File

@ -50,6 +50,18 @@ export interface DatabaseServiceMetadataPipeline {
* getting the changes from Audit tables on the supporting databases.
*/
incremental?: IncrementalMetadataExtractionConfiguration;
/**
* Optional configuration to soft delete databases in OpenMetadata if the source databases
* are deleted. Also, if the database is deleted, all the associated entities like schemas,
* tables, views, stored procedures, lineage, etc., with that database will be deleted
*/
markDeletedDatabases?: boolean;
/**
* Optional configuration to soft delete schemas in OpenMetadata if the source schemas are
* deleted. Also, if the schema is deleted, all the associated entities like tables, views,
* stored procedures, lineage, etc., with that schema will be deleted
*/
markDeletedSchemas?: boolean;
/**
* Optional configuration to soft delete stored procedures in OpenMetadata if the source
* stored procedures are deleted. Also, if the stored procedures is deleted, all the

View File

@ -4050,6 +4050,18 @@ export interface Pipeline {
* getting the changes from Audit tables on the supporting databases.
*/
incremental?: IncrementalMetadataExtractionConfiguration;
/**
* Optional configuration to soft delete databases in OpenMetadata if the source databases
* are deleted. Also, if the database is deleted, all the associated entities like schemas,
* tables, views, stored procedures, lineage, etc., with that database will be deleted
*/
markDeletedDatabases?: boolean;
/**
* Optional configuration to soft delete schemas in OpenMetadata if the source schemas are
* deleted. Also, if the schema is deleted, all the associated entities like tables, views,
* stored procedures, lineage, etc., with that schema will be deleted
*/
markDeletedSchemas?: boolean;
/**
* Optional configuration to soft delete stored procedures in OpenMetadata if the source
* stored procedures are deleted. Also, if the stored procedures is deleted, all the