From 2092a52048c7ee5cc8ab9688e3cd58292cb4c55e Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Thu, 30 Nov 2023 02:00:37 +0530 Subject: [PATCH] Minor: Fix E2E Tests failure (#14171) --- .../metadata/ingestion/api/topology_runner.py | 48 +++++++++++----- .../source/database/snowflake/metadata.py | 57 +++++++++++-------- .../database/stored_procedures_mixin.py | 26 +++++---- ingestion/src/metadata/utils/fqn.py | 13 +++++ 4 files changed, 95 insertions(+), 49 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index bdf860d2265..3e59a2453ae 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -14,12 +14,15 @@ generate the _run based on their topology. """ import traceback from functools import singledispatchmethod -from typing import Any, Generic, Iterable, List, TypeVar +from typing import Any, Generic, Iterable, List, TypeVar, Union from pydantic import BaseModel +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.classification.tag import Tag +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.topology import ( @@ -177,15 +180,17 @@ class TopologyRunnerMixin(Generic[C]): *context_names, entity_request.name.__root__ ) - def update_context(self, stage: NodeStage, entity_name: str): + def update_context( + self, stage: NodeStage, context: Union[str, OMetaTagAndClassification] + ): """Append or update context""" # We'll store the entity_name in the topology context instead of the entity_fqn # and build the fqn on the fly wherever required. - # This is mainly because we need the entity_name in other places + # This is mainly because we need the context in other places if stage.context and not stage.cache_all: - self._replace_context(key=stage.context, value=entity_name) + self._replace_context(key=stage.context, value=context) if stage.context and stage.cache_all: - self._append_context(key=stage.context, value=entity_name) + self._append_context(key=stage.context, value=context) @singledispatchmethod def yield_and_update_context( @@ -236,7 +241,7 @@ class TopologyRunnerMixin(Generic[C]): "for the service connection." ) - self.update_context(stage=stage, entity_name=entity_name) + self.update_context(stage=stage, context=entity_name) @yield_and_update_context.register def _( @@ -252,9 +257,7 @@ class TopologyRunnerMixin(Generic[C]): lineage has been properly drawn. We'll skip the process for now. """ yield entity_request - self.update_context( - stage=stage, entity_name=right.edge.fromEntity.name.__root__ - ) + self.update_context(stage=stage, context=right.edge.fromEntity.name.__root__) @yield_and_update_context.register def _( @@ -266,15 +269,30 @@ class TopologyRunnerMixin(Generic[C]): """Tag implementation for the context information""" yield entity_request - tag_fqn = fqn.build( + # We'll keep the tag fqn in the context and use if required + self.update_context(stage=stage, context=right) + + @yield_and_update_context.register + def _( + self, + right: CreateStoredProcedureRequest, + stage: NodeStage, + entity_request: Either[C], + ) -> Iterable[Either[Entity]]: + """Tag implementation for the context information""" + yield entity_request + + procedure_fqn = fqn.build( metadata=self.metadata, - entity_type=Tag, - classification_name=right.tag_request.classification.__root__, - tag_name=right.tag_request.name.__root__, + entity_type=StoredProcedure, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + procedure_name=right.name.__root__, ) # We'll keep the tag fqn in the context and use if required - self.update_context(stage=stage, entity_name=tag_fqn) + self.update_context(stage=stage, context=procedure_fqn) def sink_request( self, stage: NodeStage, entity_request: Either[C] diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 7338270204f..0507177c380 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -30,6 +30,7 @@ from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( IntervalType, + Table, TablePartition, TableType, ) @@ -477,33 +478,43 @@ class SnowflakeSource( """ Get the life cycle data of the table """ - table = self.context.table - try: - life_cycle_data = self.life_cycle_query_dict( - query=SNOWFLAKE_LIFE_CYCLE_QUERY.format( - database_name=table.database.name, - schema_name=table.databaseSchema.name, - ) - ).get(table.name.__root__) - if life_cycle_data: - life_cycle = LifeCycle( - created=AccessDetails( - timestamp=convert_timestamp_to_milliseconds( - life_cycle_data.created_at.timestamp() + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + table_name=self.context.table, + skip_es_search=True, + ) + table = self.metadata.get_by_name(entity=Table, fqn=table_fqn) + if table: + try: + life_cycle_data = self.life_cycle_query_dict( + query=SNOWFLAKE_LIFE_CYCLE_QUERY.format( + database_name=table.database.name, + schema_name=table.databaseSchema.name, + ) + ).get(table.name.__root__) + if life_cycle_data: + life_cycle = LifeCycle( + created=AccessDetails( + timestamp=convert_timestamp_to_milliseconds( + life_cycle_data.created_at.timestamp() + ) ) ) - ) + yield Either( + right=OMetaLifeCycleData(entity=table, life_cycle=life_cycle) + ) + except Exception as exc: yield Either( - right=OMetaLifeCycleData(entity=table, life_cycle=life_cycle) + left=StackTraceError( + name=table.name.__root__, + error=f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}", + stack_trace=traceback.format_exc(), + ) ) - except Exception as exc: - yield Either( - left=StackTraceError( - name=table.name.__root__, - error=f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}", - stack_trace=traceback.format_exc(), - ) - ) def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]: """List Snowflake stored procedures""" diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index 16873ff859f..e7f7eb0bd8b 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -203,14 +203,18 @@ class StoredProcedureMixin(ABC): # First, get all the query history queries_dict = self.get_stored_procedure_queries_dict() # Then for each procedure, iterate over all its queries - for procedure in self.context.stored_procedures: - logger.debug(f"Processing Lineage for [{procedure.name}]") - for query_by_procedure in ( - queries_dict.get(procedure.name.__root__.lower()) or [] - ): - yield from self.yield_procedure_lineage( - query_by_procedure=query_by_procedure, procedure=procedure - ) - yield from self.yield_procedure_query( - query_by_procedure=query_by_procedure, procedure=procedure - ) + for procedure_fqn in self.context.stored_procedures: + procedure = self.metadata.get_by_name( + entity=StoredProcedure, fqn=procedure_fqn + ) + if procedure: + logger.debug(f"Processing Lineage for [{procedure.name}]") + for query_by_procedure in ( + queries_dict.get(procedure.name.__root__.lower()) or [] + ): + yield from self.yield_procedure_lineage( + query_by_procedure=query_by_procedure, procedure=procedure + ) + yield from self.yield_procedure_query( + query_by_procedure=query_by_procedure, procedure=procedure + ) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 669db098baa..07a45095cf7 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -36,6 +36,7 @@ from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.entity.data.searchIndex import SearchIndex +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure from metadata.generated.schema.entity.data.table import Column, DataModel, Table from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.teams.team import Team @@ -323,6 +324,18 @@ def _( return _build(service_name, database_name, schema_name, model_name) +@fqn_build_registry.add(StoredProcedure) +def _( + _: Optional[OpenMetadata], + *, + service_name: str, + database_name: str, + schema_name: str, + procedure_name: str, +) -> str: + return _build(service_name, database_name, schema_name, procedure_name) + + @fqn_build_registry.add(Pipeline) def _( _: Optional[OpenMetadata],