mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-01 13:13:10 +00:00
parent
f32b4d4c47
commit
b8d0b3b6e4
@ -110,7 +110,7 @@ class DashboardServiceTopology(ServiceTopology):
|
||||
),
|
||||
],
|
||||
children=["bulk_data_model", "dashboard"],
|
||||
post_process=["mark_dashboards_as_deleted"],
|
||||
post_process=["mark_dashboards_as_deleted", "mark_datamodels_as_deleted"],
|
||||
)
|
||||
# Dashboard Services have very different approaches when
|
||||
# when dealing with data models. Tableau has the models
|
||||
@ -198,6 +198,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
topology = DashboardServiceTopology()
|
||||
context = create_source_context(topology)
|
||||
dashboard_source_state: Set = set()
|
||||
datamodel_source_state: Set = set()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -371,6 +372,20 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
params={"service": self.context.dashboard_service},
|
||||
)
|
||||
|
||||
def mark_datamodels_as_deleted(self) -> Iterable[Either[DeleteEntity]]:
|
||||
"""
|
||||
Method to mark the datamodels as deleted
|
||||
"""
|
||||
if self.source_config.markDeletedDataModels:
|
||||
logger.info("Mark Deleted Datamodels set to True")
|
||||
yield from delete_entity_from_source(
|
||||
metadata=self.metadata,
|
||||
entity_type=DashboardDataModel,
|
||||
entity_source_state=self.datamodel_source_state,
|
||||
mark_deleted_entity=self.source_config.markDeletedDataModels,
|
||||
params={"service": self.context.dashboard_service},
|
||||
)
|
||||
|
||||
def process_owner(self, dashboard_details):
|
||||
"""
|
||||
Method to process the dashboard onwers
|
||||
@ -413,6 +428,21 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
|
||||
self.dashboard_source_state.add(dashboard_fqn)
|
||||
|
||||
def register_record_datamodel(
|
||||
self, datamodel_requst: CreateDashboardDataModelRequest
|
||||
) -> None:
|
||||
"""
|
||||
Mark the datamodel record as scanned and update the datamodel_source_state
|
||||
"""
|
||||
datamodel_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=DashboardDataModel,
|
||||
service_name=datamodel_requst.service.__root__,
|
||||
data_model_name=datamodel_requst.name.__root__,
|
||||
)
|
||||
|
||||
self.datamodel_source_state.add(datamodel_fqn)
|
||||
|
||||
def get_owner_details( # pylint: disable=useless-return
|
||||
self, dashboard_details # pylint: disable=unused-argument
|
||||
) -> Optional[EntityReference]:
|
||||
|
@ -403,6 +403,7 @@ class LookerSource(DashboardServiceSource):
|
||||
project=model.project_name,
|
||||
)
|
||||
yield Either(right=explore_datamodel)
|
||||
self.register_record_datamodel(datamodel_requst=explore_datamodel)
|
||||
|
||||
# build datamodel by our hand since ack_sink=False
|
||||
self.context.dataModel = self._build_data_model(datamodel_name)
|
||||
@ -487,24 +488,23 @@ class LookerSource(DashboardServiceSource):
|
||||
view: Optional[LookMlView] = project_parser.find_view(view_name=view_name)
|
||||
|
||||
if view:
|
||||
yield Either(
|
||||
right=CreateDashboardDataModelRequest(
|
||||
name=build_datamodel_name(explore.model_name, view.name),
|
||||
displayName=view.name,
|
||||
description=view.description,
|
||||
service=self.context.dashboard_service,
|
||||
dataModelType=DataModelType.LookMlView.value,
|
||||
serviceType=DashboardServiceType.Looker.value,
|
||||
columns=get_columns_from_model(view),
|
||||
sql=project_parser.parsed_files.get(Includes(view.source_file)),
|
||||
# In Looker, you need to create Explores and Views within a Project
|
||||
project=explore.project_name,
|
||||
)
|
||||
data_model_request = CreateDashboardDataModelRequest(
|
||||
name=build_datamodel_name(explore.model_name, view.name),
|
||||
displayName=view.name,
|
||||
description=view.description,
|
||||
service=self.context.dashboard_service,
|
||||
dataModelType=DataModelType.LookMlView.value,
|
||||
serviceType=DashboardServiceType.Looker.value,
|
||||
columns=get_columns_from_model(view),
|
||||
sql=project_parser.parsed_files.get(Includes(view.source_file)),
|
||||
# In Looker, you need to create Explores and Views within a Project
|
||||
project=explore.project_name,
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self._view_data_model = self._build_data_model(
|
||||
build_datamodel_name(explore.model_name, view.name)
|
||||
)
|
||||
|
||||
self.register_record_datamodel(datamodel_requst=data_model_request)
|
||||
yield from self.add_view_lineage(view, explore)
|
||||
else:
|
||||
yield Either(
|
||||
|
@ -324,6 +324,8 @@ class PowerbiSource(DashboardServiceSource):
|
||||
project=self._fetch_dataset_workspace(dataset_id=dataset.id),
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_requst=data_model_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -221,6 +221,7 @@ class QliksenseSource(DashboardServiceSource):
|
||||
columns=self.get_column_info(data_model),
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_requst=data_model_request)
|
||||
except Exception as exc:
|
||||
name = (
|
||||
data_model.tableName if data_model.tableName else data_model.id
|
||||
|
@ -206,6 +206,7 @@ class SupersetAPISource(SupersetSourceMixin):
|
||||
dataModelType=DataModelType.SupersetDataModel.value,
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_requst=data_model_request)
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -200,6 +200,8 @@ class SupersetDBSource(SupersetSourceMixin):
|
||||
dataModelType=DataModelType.SupersetDataModel.value,
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_requst=data_model_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -190,6 +190,8 @@ class TableauSource(DashboardServiceSource):
|
||||
columns=self.get_column_info(data_model),
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_requst=data_model_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -9,7 +9,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
We require Taxonomy Admin permissions to fetch all Policy Tags
|
||||
Bigquery source module
|
||||
"""
|
||||
import os
|
||||
import traceback
|
||||
@ -604,32 +604,32 @@ class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource)
|
||||
"""Prepare the stored procedure payload"""
|
||||
|
||||
try:
|
||||
yield Either(
|
||||
right=CreateStoredProcedureRequest(
|
||||
name=EntityName(__root__=stored_procedure.name),
|
||||
storedProcedureCode=StoredProcedureCode(
|
||||
language=STORED_PROC_LANGUAGE_MAP.get(
|
||||
stored_procedure.language or "SQL",
|
||||
),
|
||||
code=stored_procedure.definition,
|
||||
stored_procedure_request = CreateStoredProcedureRequest(
|
||||
name=EntityName(__root__=stored_procedure.name),
|
||||
storedProcedureCode=StoredProcedureCode(
|
||||
language=STORED_PROC_LANGUAGE_MAP.get(
|
||||
stored_procedure.language or "SQL",
|
||||
),
|
||||
databaseSchema=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
code=stored_procedure.definition,
|
||||
),
|
||||
databaseSchema=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
),
|
||||
sourceUrl=SourceUrl(
|
||||
__root__=self.get_stored_procedure_url(
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
),
|
||||
sourceUrl=SourceUrl(
|
||||
__root__=self.get_stored_procedure_url(
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
# Follow the same building strategy as tables
|
||||
table_name=stored_procedure.name,
|
||||
)
|
||||
),
|
||||
)
|
||||
# Follow the same building strategy as tables
|
||||
table_name=stored_procedure.name,
|
||||
)
|
||||
),
|
||||
)
|
||||
yield Either(right=stored_procedure_request)
|
||||
self.register_record_stored_proc_request(stored_procedure_request)
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -142,7 +142,7 @@ class DatabaseServiceTopology(ServiceTopology):
|
||||
),
|
||||
],
|
||||
children=["table", "stored_procedure"],
|
||||
post_process=["mark_tables_as_deleted"],
|
||||
post_process=["mark_tables_as_deleted", "mark_stored_procedures_as_deleted"],
|
||||
)
|
||||
table = TopologyNode(
|
||||
producer="get_tables_name_and_type",
|
||||
@ -194,6 +194,7 @@ class DatabaseServiceSource(
|
||||
source_config: DatabaseServiceMetadataPipeline
|
||||
config: WorkflowSource
|
||||
database_source_state: Set = set()
|
||||
stored_procedure_source_state: Set = set()
|
||||
# Big union of types we want to fetch dynamically
|
||||
service_connection: DatabaseConnection.__fields__["config"].type_
|
||||
|
||||
@ -410,6 +411,23 @@ class DatabaseServiceSource(
|
||||
|
||||
self.database_source_state.add(table_fqn)
|
||||
|
||||
def register_record_stored_proc_request(
|
||||
self, stored_proc_request: CreateStoredProcedureRequest
|
||||
) -> None:
|
||||
"""
|
||||
Mark the table record as scanned and update the database_source_state
|
||||
"""
|
||||
table_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=StoredProcedure,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
procedure_name=stored_proc_request.name.__root__,
|
||||
)
|
||||
|
||||
self.stored_procedure_source_state.add(table_fqn)
|
||||
|
||||
def _get_filtered_schema_names(
|
||||
self, return_fqn: bool = False, add_to_status: bool = True
|
||||
) -> Iterable[str]:
|
||||
@ -451,16 +469,27 @@ class DatabaseServiceSource(
|
||||
params={"database": schema_fqn},
|
||||
)
|
||||
|
||||
def get_all_entities(self):
|
||||
def mark_stored_procedures_as_deleted(self):
|
||||
"""
|
||||
Get all the tables and cache them
|
||||
Use the current inspector to mark Stored Procedures as deleted
|
||||
"""
|
||||
all_table_entities = self.metadata.list_all_entities(
|
||||
entity=Table,
|
||||
params={"database": self.context.database_service},
|
||||
fields=["*"],
|
||||
)
|
||||
self.context.table_entities = list(all_table_entities)
|
||||
if self.source_config.markDeletedStoredProcedures:
|
||||
logger.info(
|
||||
f"Mark Deleted Stored Procedures Processing database [{self.context.database}]"
|
||||
)
|
||||
|
||||
schema_fqn_list = self._get_filtered_schema_names(
|
||||
return_fqn=True, add_to_status=False
|
||||
)
|
||||
|
||||
for schema_fqn in schema_fqn_list:
|
||||
yield from delete_entity_from_source(
|
||||
metadata=self.metadata,
|
||||
entity_type=StoredProcedure,
|
||||
entity_source_state=self.stored_procedure_source_state,
|
||||
mark_deleted_entity=self.source_config.markDeletedStoredProcedures,
|
||||
params={"databaseSchema": schema_fqn},
|
||||
)
|
||||
|
||||
def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
|
||||
"""
|
||||
|
@ -363,7 +363,7 @@ class DatalakeSource(DatabaseServiceSource):
|
||||
fileFormat=table_extension.value if table_extension else None,
|
||||
)
|
||||
yield Either(right=table_request)
|
||||
self.register_record(table_request)
|
||||
self.register_record(table_request=table_request)
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -259,22 +259,24 @@ class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource)
|
||||
"""Prepare the stored procedure payload"""
|
||||
|
||||
try:
|
||||
yield Either(
|
||||
right=CreateStoredProcedureRequest(
|
||||
name=EntityName(__root__=stored_procedure.name),
|
||||
storedProcedureCode=StoredProcedureCode(
|
||||
language=Language.SQL,
|
||||
code=stored_procedure.definition,
|
||||
),
|
||||
databaseSchema=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
),
|
||||
)
|
||||
stored_procedure_request = CreateStoredProcedureRequest(
|
||||
name=EntityName(__root__=stored_procedure.name),
|
||||
storedProcedureCode=StoredProcedureCode(
|
||||
language=Language.SQL,
|
||||
code=stored_procedure.definition,
|
||||
),
|
||||
databaseSchema=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
),
|
||||
)
|
||||
yield Either(right=stored_procedure_request)
|
||||
|
||||
self.register_record_stored_proc_request(stored_procedure_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -563,33 +563,32 @@ class SnowflakeSource(
|
||||
"""Prepare the stored procedure payload"""
|
||||
|
||||
try:
|
||||
yield Either(
|
||||
right=CreateStoredProcedureRequest(
|
||||
name=EntityName(__root__=stored_procedure.name),
|
||||
description=stored_procedure.comment,
|
||||
storedProcedureCode=StoredProcedureCode(
|
||||
language=STORED_PROC_LANGUAGE_MAP.get(
|
||||
stored_procedure.language
|
||||
),
|
||||
code=stored_procedure.definition,
|
||||
),
|
||||
databaseSchema=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
stored_procedure_request = CreateStoredProcedureRequest(
|
||||
name=EntityName(__root__=stored_procedure.name),
|
||||
description=stored_procedure.comment,
|
||||
storedProcedureCode=StoredProcedureCode(
|
||||
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
|
||||
code=stored_procedure.definition,
|
||||
),
|
||||
databaseSchema=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
),
|
||||
sourceUrl=SourceUrl(
|
||||
__root__=self._get_source_url_root(
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
),
|
||||
sourceUrl=SourceUrl(
|
||||
__root__=self._get_source_url_root(
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
)
|
||||
+ f"/procedure/{stored_procedure.name}"
|
||||
+ f"{stored_procedure.signature if stored_procedure.signature else ''}"
|
||||
),
|
||||
)
|
||||
)
|
||||
+ f"/procedure/{stored_procedure.name}"
|
||||
+ f"{stored_procedure.signature if stored_procedure.signature else ''}"
|
||||
),
|
||||
)
|
||||
yield Either(right=stored_procedure_request)
|
||||
self.register_record_stored_proc_request(stored_procedure_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
|
@ -58,6 +58,12 @@
|
||||
"default": true,
|
||||
"title": "Mark Deleted Dashboards"
|
||||
},
|
||||
"markDeletedDataModels": {
|
||||
"description": "Optional configuration to soft delete data models in OpenMetadata if the source data models are deleted. Also, if the data models is deleted, all the associated entities like lineage, etc., with that data models will be deleted",
|
||||
"type": "boolean",
|
||||
"default": true,
|
||||
"title": "Mark Deleted Data Models"
|
||||
},
|
||||
"includeTags": {
|
||||
"description": "Optional configuration to toggle the tags ingestion.",
|
||||
"type": "boolean",
|
||||
|
@ -25,6 +25,12 @@
|
||||
"default": true,
|
||||
"title": "Mark Deleted Tables"
|
||||
},
|
||||
"markDeletedStoredProcedures": {
|
||||
"description": "Optional configuration to soft delete stored procedures in OpenMetadata if the source stored procedures are deleted. Also, if the stored procedures is deleted, all the associated entities like lineage, etc., with that stored procedures will be deleted",
|
||||
"type": "boolean",
|
||||
"default": true,
|
||||
"title": "Mark Deleted Stored Procedures"
|
||||
},
|
||||
"includeTables": {
|
||||
"description": "Optional configuration to turn off fetching metadata for tables.",
|
||||
"type": "boolean",
|
||||
|
@ -105,6 +105,27 @@ In such cases you may delete the table/schema manually from UI.
|
||||
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Mark Deleted Stored Procedures $(id="markDeletedStoredProcedures")
|
||||
This is an optional configuration for enabling **soft deletion** of stored procedures during the ingestion. When this option is enabled, only stored procedures that have been deleted from the source will be soft deleted, and this will apply ONLY to the schema that is currently being ingested via the pipeline.
|
||||
|
||||
Any related entities such as test suites or lineage information that were associated with those stored procedures will also be deleted.
|
||||
|
||||
Here are some examples of scenarios in which stored procedures will get soft deleted if this flag is enabled.
|
||||
|
||||
- If no filters were applied, but a stored procedure was deleted from the data source, then the same stored procedure will be soft deleted from OpenMetadata as well.
|
||||
- If you have applied a `Schema Filter Pattern` to include `SchemaA` then any stored procedure deleted from `SchemaA` will also be soft deleted from Openmetadata.
|
||||
- If `StoredProcedureA` was already ingested in OpenMetadata, then later you apply a `StoredProcedure Filter Pattern` to exclude `StoredProcedureA` then `StoredProcedureA` will get soft deleted from OpenMetadata.
|
||||
|
||||
Here are some examples of scenarios where stored procedures will **NOT** get soft deleted if this flag is enabled.
|
||||
|
||||
- If you already have `SchemaA` & `SchemaB` ingested in OpenMetadata, then later you apply a `Schema Filter Pattern` to exclude `SchemaB`, then no stored procedure from `SchemaB` will be deleted.
|
||||
- If you already have `SchemaA` & `SchemaB` ingested in OpenMetadata and for this ingestion pipeline you have applied a `Schema Filter Pattern` to include only `SchemaA`, then any stored procedure deleted from `SchemaB` will not be deleted (since it is ignored in the ingestion).
|
||||
|
||||
In such cases you may delete the stored procedure/schema manually from UI.
|
||||
|
||||
$$
|
||||
|
||||
$$section
|
||||
### View Definition Parsing Timeout Limit $(id="viewParsingTimeoutLimit")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user