From b8d0b3b6e431e2c789548069d15bc0502ac1d4f7 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Wed, 6 Dec 2023 08:57:41 +0530 Subject: [PATCH] #14168: Mark SP and Data model deletion (#14219) --- .../source/dashboard/dashboard_service.py | 32 ++++++++++++- .../source/dashboard/looker/metadata.py | 28 +++++------ .../source/dashboard/powerbi/metadata.py | 2 + .../source/dashboard/qliksense/metadata.py | 1 + .../source/dashboard/superset/api_source.py | 1 + .../source/dashboard/superset/db_source.py | 2 + .../source/dashboard/tableau/metadata.py | 2 + .../source/database/bigquery/metadata.py | 46 +++++++++--------- .../source/database/database_service.py | 47 +++++++++++++++---- .../source/database/datalake/metadata.py | 2 +- .../source/database/redshift/metadata.py | 32 +++++++------ .../source/database/snowflake/metadata.py | 47 +++++++++---------- .../dashboardServiceMetadataPipeline.json | 6 +++ .../databaseServiceMetadataPipeline.json | 6 +++ .../en-US/Database/workflows/metadata.md | 21 +++++++++ 15 files changed, 188 insertions(+), 87 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 3c27e0cd818..46f10c180d4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -110,7 +110,7 @@ class DashboardServiceTopology(ServiceTopology): ), ], children=["bulk_data_model", "dashboard"], - post_process=["mark_dashboards_as_deleted"], + post_process=["mark_dashboards_as_deleted", "mark_datamodels_as_deleted"], ) # Dashboard Services have very different approaches when # when dealing with data models. Tableau has the models @@ -198,6 +198,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): topology = DashboardServiceTopology() context = create_source_context(topology) dashboard_source_state: Set = set() + datamodel_source_state: Set = set() def __init__( self, @@ -371,6 +372,20 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): params={"service": self.context.dashboard_service}, ) + def mark_datamodels_as_deleted(self) -> Iterable[Either[DeleteEntity]]: + """ + Method to mark the datamodels as deleted + """ + if self.source_config.markDeletedDataModels: + logger.info("Mark Deleted Datamodels set to True") + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=DashboardDataModel, + entity_source_state=self.datamodel_source_state, + mark_deleted_entity=self.source_config.markDeletedDataModels, + params={"service": self.context.dashboard_service}, + ) + def process_owner(self, dashboard_details): """ Method to process the dashboard onwers @@ -413,6 +428,21 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): self.dashboard_source_state.add(dashboard_fqn) + def register_record_datamodel( + self, datamodel_requst: CreateDashboardDataModelRequest + ) -> None: + """ + Mark the datamodel record as scanned and update the datamodel_source_state + """ + datamodel_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=datamodel_requst.service.__root__, + data_model_name=datamodel_requst.name.__root__, + ) + + self.datamodel_source_state.add(datamodel_fqn) + def get_owner_details( # pylint: disable=useless-return self, dashboard_details # pylint: disable=unused-argument ) -> Optional[EntityReference]: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index a1c420801d3..584368beed2 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -403,6 +403,7 @@ class LookerSource(DashboardServiceSource): project=model.project_name, ) yield Either(right=explore_datamodel) + self.register_record_datamodel(datamodel_requst=explore_datamodel) # build datamodel by our hand since ack_sink=False self.context.dataModel = self._build_data_model(datamodel_name) @@ -487,24 +488,23 @@ class LookerSource(DashboardServiceSource): view: Optional[LookMlView] = project_parser.find_view(view_name=view_name) if view: - yield Either( - right=CreateDashboardDataModelRequest( - name=build_datamodel_name(explore.model_name, view.name), - displayName=view.name, - description=view.description, - service=self.context.dashboard_service, - dataModelType=DataModelType.LookMlView.value, - serviceType=DashboardServiceType.Looker.value, - columns=get_columns_from_model(view), - sql=project_parser.parsed_files.get(Includes(view.source_file)), - # In Looker, you need to create Explores and Views within a Project - project=explore.project_name, - ) + data_model_request = CreateDashboardDataModelRequest( + name=build_datamodel_name(explore.model_name, view.name), + displayName=view.name, + description=view.description, + service=self.context.dashboard_service, + dataModelType=DataModelType.LookMlView.value, + serviceType=DashboardServiceType.Looker.value, + columns=get_columns_from_model(view), + sql=project_parser.parsed_files.get(Includes(view.source_file)), + # In Looker, you need to create Explores and Views within a Project + project=explore.project_name, ) + yield Either(right=data_model_request) self._view_data_model = self._build_data_model( build_datamodel_name(explore.model_name, view.name) ) - + self.register_record_datamodel(datamodel_requst=data_model_request) yield from self.add_view_lineage(view, explore) else: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 8de8dc2fe24..c403066af4b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -324,6 +324,8 @@ class PowerbiSource(DashboardServiceSource): project=self._fetch_dataset_workspace(dataset_id=dataset.id), ) yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_requst=data_model_request) + except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py index 8731cfb921f..7b30bb1e59e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py @@ -221,6 +221,7 @@ class QliksenseSource(DashboardServiceSource): columns=self.get_column_info(data_model), ) yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_requst=data_model_request) except Exception as exc: name = ( data_model.tableName if data_model.tableName else data_model.id diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py index 9431c6c5988..66b06a46046 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py @@ -206,6 +206,7 @@ class SupersetAPISource(SupersetSourceMixin): dataModelType=DataModelType.SupersetDataModel.value, ) yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_requst=data_model_request) except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py index d01d8d66512..49e10dca33a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py @@ -200,6 +200,8 @@ class SupersetDBSource(SupersetSourceMixin): dataModelType=DataModelType.SupersetDataModel.value, ) yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_requst=data_model_request) + except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index 5d358df1cd1..24f1e9bafb5 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -190,6 +190,8 @@ class TableauSource(DashboardServiceSource): columns=self.get_column_info(data_model), ) yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_requst=data_model_request) + except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index b88af7a55a6..6929a485f6b 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -9,7 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -We require Taxonomy Admin permissions to fetch all Policy Tags +Bigquery source module """ import os import traceback @@ -604,32 +604,32 @@ class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource) """Prepare the stored procedure payload""" try: - yield Either( - right=CreateStoredProcedureRequest( - name=EntityName(__root__=stored_procedure.name), - storedProcedureCode=StoredProcedureCode( - language=STORED_PROC_LANGUAGE_MAP.get( - stored_procedure.language or "SQL", - ), - code=stored_procedure.definition, + stored_procedure_request = CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + storedProcedureCode=StoredProcedureCode( + language=STORED_PROC_LANGUAGE_MAP.get( + stored_procedure.language or "SQL", ), - databaseSchema=fqn.build( - metadata=self.metadata, - entity_type=DatabaseSchema, - service_name=self.context.database_service, + code=stored_procedure.definition, + ), + databaseSchema=fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + ), + sourceUrl=SourceUrl( + __root__=self.get_stored_procedure_url( database_name=self.context.database, schema_name=self.context.database_schema, - ), - sourceUrl=SourceUrl( - __root__=self.get_stored_procedure_url( - database_name=self.context.database, - schema_name=self.context.database_schema, - # Follow the same building strategy as tables - table_name=stored_procedure.name, - ) - ), - ) + # Follow the same building strategy as tables + table_name=stored_procedure.name, + ) + ), ) + yield Either(right=stored_procedure_request) + self.register_record_stored_proc_request(stored_procedure_request) except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 639c2dabf1d..6fa697501df 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -142,7 +142,7 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["table", "stored_procedure"], - post_process=["mark_tables_as_deleted"], + post_process=["mark_tables_as_deleted", "mark_stored_procedures_as_deleted"], ) table = TopologyNode( producer="get_tables_name_and_type", @@ -194,6 +194,7 @@ class DatabaseServiceSource( source_config: DatabaseServiceMetadataPipeline config: WorkflowSource database_source_state: Set = set() + stored_procedure_source_state: Set = set() # Big union of types we want to fetch dynamically service_connection: DatabaseConnection.__fields__["config"].type_ @@ -410,6 +411,23 @@ class DatabaseServiceSource( self.database_source_state.add(table_fqn) + def register_record_stored_proc_request( + self, stored_proc_request: CreateStoredProcedureRequest + ) -> None: + """ + Mark the table record as scanned and update the database_source_state + """ + table_fqn = fqn.build( + self.metadata, + entity_type=StoredProcedure, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + procedure_name=stored_proc_request.name.__root__, + ) + + self.stored_procedure_source_state.add(table_fqn) + def _get_filtered_schema_names( self, return_fqn: bool = False, add_to_status: bool = True ) -> Iterable[str]: @@ -451,16 +469,27 @@ class DatabaseServiceSource( params={"database": schema_fqn}, ) - def get_all_entities(self): + def mark_stored_procedures_as_deleted(self): """ - Get all the tables and cache them + Use the current inspector to mark Stored Procedures as deleted """ - all_table_entities = self.metadata.list_all_entities( - entity=Table, - params={"database": self.context.database_service}, - fields=["*"], - ) - self.context.table_entities = list(all_table_entities) + if self.source_config.markDeletedStoredProcedures: + logger.info( + f"Mark Deleted Stored Procedures Processing database [{self.context.database}]" + ) + + schema_fqn_list = self._get_filtered_schema_names( + return_fqn=True, add_to_status=False + ) + + for schema_fqn in schema_fqn_list: + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=StoredProcedure, + entity_source_state=self.stored_procedure_source_state, + mark_deleted_entity=self.source_config.markDeletedStoredProcedures, + params={"databaseSchema": schema_fqn}, + ) def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index d8b8d7e3f44..65de5fa7373 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -363,7 +363,7 @@ class DatalakeSource(DatabaseServiceSource): fileFormat=table_extension.value if table_extension else None, ) yield Either(right=table_request) - self.register_record(table_request) + self.register_record(table_request=table_request) except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index f4254f7cede..ff8a959dc56 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -259,22 +259,24 @@ class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource) """Prepare the stored procedure payload""" try: - yield Either( - right=CreateStoredProcedureRequest( - name=EntityName(__root__=stored_procedure.name), - storedProcedureCode=StoredProcedureCode( - language=Language.SQL, - code=stored_procedure.definition, - ), - databaseSchema=fqn.build( - metadata=self.metadata, - entity_type=DatabaseSchema, - service_name=self.context.database_service, - database_name=self.context.database, - schema_name=self.context.database_schema, - ), - ) + stored_procedure_request = CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + storedProcedureCode=StoredProcedureCode( + language=Language.SQL, + code=stored_procedure.definition, + ), + databaseSchema=fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + ), ) + yield Either(right=stored_procedure_request) + + self.register_record_stored_proc_request(stored_procedure_request) + except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 0507177c380..3967bc9fcae 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -563,33 +563,32 @@ class SnowflakeSource( """Prepare the stored procedure payload""" try: - yield Either( - right=CreateStoredProcedureRequest( - name=EntityName(__root__=stored_procedure.name), - description=stored_procedure.comment, - storedProcedureCode=StoredProcedureCode( - language=STORED_PROC_LANGUAGE_MAP.get( - stored_procedure.language - ), - code=stored_procedure.definition, - ), - databaseSchema=fqn.build( - metadata=self.metadata, - entity_type=DatabaseSchema, - service_name=self.context.database_service, + stored_procedure_request = CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + description=stored_procedure.comment, + storedProcedureCode=StoredProcedureCode( + language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language), + code=stored_procedure.definition, + ), + databaseSchema=fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + ), + sourceUrl=SourceUrl( + __root__=self._get_source_url_root( database_name=self.context.database, schema_name=self.context.database_schema, - ), - sourceUrl=SourceUrl( - __root__=self._get_source_url_root( - database_name=self.context.database, - schema_name=self.context.database_schema, - ) - + f"/procedure/{stored_procedure.name}" - + f"{stored_procedure.signature if stored_procedure.signature else ''}" - ), - ) + ) + + f"/procedure/{stored_procedure.name}" + + f"{stored_procedure.signature if stored_procedure.signature else ''}" + ), ) + yield Either(right=stored_procedure_request) + self.register_record_stored_proc_request(stored_procedure_request) + except Exception as exc: yield Either( left=StackTraceError( diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dashboardServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dashboardServiceMetadataPipeline.json index a2788bac058..e0613734d60 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dashboardServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dashboardServiceMetadataPipeline.json @@ -58,6 +58,12 @@ "default": true, "title": "Mark Deleted Dashboards" }, + "markDeletedDataModels": { + "description": "Optional configuration to soft delete data models in OpenMetadata if the source data models are deleted. Also, if the data models is deleted, all the associated entities like lineage, etc., with that data models will be deleted", + "type": "boolean", + "default": true, + "title": "Mark Deleted Data Models" + }, "includeTags": { "description": "Optional configuration to toggle the tags ingestion.", "type": "boolean", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index bf52b2f2348..492dfc20f96 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -25,6 +25,12 @@ "default": true, "title": "Mark Deleted Tables" }, + "markDeletedStoredProcedures": { + "description": "Optional configuration to soft delete stored procedures in OpenMetadata if the source stored procedures are deleted. Also, if the stored procedures is deleted, all the associated entities like lineage, etc., with that stored procedures will be deleted", + "type": "boolean", + "default": true, + "title": "Mark Deleted Stored Procedures" + }, "includeTables": { "description": "Optional configuration to turn off fetching metadata for tables.", "type": "boolean", diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/metadata.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/metadata.md index 38de3fa5ab6..ce7a9c8d376 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/metadata.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/metadata.md @@ -105,6 +105,27 @@ In such cases you may delete the table/schema manually from UI. $$ +$$section +### Mark Deleted Stored Procedures $(id="markDeletedStoredProcedures") +This is an optional configuration for enabling **soft deletion** of stored procedures during the ingestion. When this option is enabled, only stored procedures that have been deleted from the source will be soft deleted, and this will apply ONLY to the schema that is currently being ingested via the pipeline. + +Any related entities such as test suites or lineage information that were associated with those stored procedures will also be deleted. + +Here are some examples of scenarios in which stored procedures will get soft deleted if this flag is enabled. + +- If no filters were applied, but a stored procedure was deleted from the data source, then the same stored procedure will be soft deleted from OpenMetadata as well. +- If you have applied a `Schema Filter Pattern` to include `SchemaA` then any stored procedure deleted from `SchemaA` will also be soft deleted from Openmetadata. +- If `StoredProcedureA` was already ingested in OpenMetadata, then later you apply a `StoredProcedure Filter Pattern` to exclude `StoredProcedureA` then `StoredProcedureA` will get soft deleted from OpenMetadata. + +Here are some examples of scenarios where stored procedures will **NOT** get soft deleted if this flag is enabled. + +- If you already have `SchemaA` & `SchemaB` ingested in OpenMetadata, then later you apply a `Schema Filter Pattern` to exclude `SchemaB`, then no stored procedure from `SchemaB` will be deleted. +- If you already have `SchemaA` & `SchemaB` ingested in OpenMetadata and for this ingestion pipeline you have applied a `Schema Filter Pattern` to include only `SchemaA`, then any stored procedure deleted from `SchemaB` will not be deleted (since it is ignored in the ingestion). + +In such cases you may delete the stored procedure/schema manually from UI. + +$$ + $$section ### View Definition Parsing Timeout Limit $(id="viewParsingTimeoutLimit")