Minor: Fix E2E Tests failure (#14171)

This commit is contained in:
Mayur Singal 2023-11-30 02:00:37 +05:30 committed by GitHub
parent 21e789b5a6
commit 2092a52048
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 49 deletions

View File

@ -14,12 +14,15 @@ generate the _run based on their topology.
"""
import traceback
from functools import singledispatchmethod
from typing import Any, Generic, Iterable, List, TypeVar
from typing import Any, Generic, Iterable, List, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.topology import (
@ -177,15 +180,17 @@ class TopologyRunnerMixin(Generic[C]):
*context_names, entity_request.name.__root__
)
def update_context(self, stage: NodeStage, entity_name: str):
def update_context(
self, stage: NodeStage, context: Union[str, OMetaTagAndClassification]
):
"""Append or update context"""
# We'll store the entity_name in the topology context instead of the entity_fqn
# and build the fqn on the fly wherever required.
# This is mainly because we need the entity_name in other places
# This is mainly because we need the context in other places
if stage.context and not stage.cache_all:
self._replace_context(key=stage.context, value=entity_name)
self._replace_context(key=stage.context, value=context)
if stage.context and stage.cache_all:
self._append_context(key=stage.context, value=entity_name)
self._append_context(key=stage.context, value=context)
@singledispatchmethod
def yield_and_update_context(
@ -236,7 +241,7 @@ class TopologyRunnerMixin(Generic[C]):
"for the service connection."
)
self.update_context(stage=stage, entity_name=entity_name)
self.update_context(stage=stage, context=entity_name)
@yield_and_update_context.register
def _(
@ -252,9 +257,7 @@ class TopologyRunnerMixin(Generic[C]):
lineage has been properly drawn. We'll skip the process for now.
"""
yield entity_request
self.update_context(
stage=stage, entity_name=right.edge.fromEntity.name.__root__
)
self.update_context(stage=stage, context=right.edge.fromEntity.name.__root__)
@yield_and_update_context.register
def _(
@ -266,15 +269,30 @@ class TopologyRunnerMixin(Generic[C]):
"""Tag implementation for the context information"""
yield entity_request
tag_fqn = fqn.build(
# We'll keep the tag fqn in the context and use if required
self.update_context(stage=stage, context=right)
@yield_and_update_context.register
def _(
self,
right: CreateStoredProcedureRequest,
stage: NodeStage,
entity_request: Either[C],
) -> Iterable[Either[Entity]]:
"""Tag implementation for the context information"""
yield entity_request
procedure_fqn = fqn.build(
metadata=self.metadata,
entity_type=Tag,
classification_name=right.tag_request.classification.__root__,
tag_name=right.tag_request.name.__root__,
entity_type=StoredProcedure,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
procedure_name=right.name.__root__,
)
# We'll keep the tag fqn in the context and use if required
self.update_context(stage=stage, entity_name=tag_fqn)
self.update_context(stage=stage, context=procedure_fqn)
def sink_request(
self, stage: NodeStage, entity_request: Either[C]

View File

@ -30,6 +30,7 @@ from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
IntervalType,
Table,
TablePartition,
TableType,
)
@ -477,33 +478,43 @@ class SnowflakeSource(
"""
Get the life cycle data of the table
"""
table = self.context.table
try:
life_cycle_data = self.life_cycle_query_dict(
query=SNOWFLAKE_LIFE_CYCLE_QUERY.format(
database_name=table.database.name,
schema_name=table.databaseSchema.name,
)
).get(table.name.__root__)
if life_cycle_data:
life_cycle = LifeCycle(
created=AccessDetails(
timestamp=convert_timestamp_to_milliseconds(
life_cycle_data.created_at.timestamp()
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=self.context.table,
skip_es_search=True,
)
table = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
if table:
try:
life_cycle_data = self.life_cycle_query_dict(
query=SNOWFLAKE_LIFE_CYCLE_QUERY.format(
database_name=table.database.name,
schema_name=table.databaseSchema.name,
)
).get(table.name.__root__)
if life_cycle_data:
life_cycle = LifeCycle(
created=AccessDetails(
timestamp=convert_timestamp_to_milliseconds(
life_cycle_data.created_at.timestamp()
)
)
)
)
yield Either(
right=OMetaLifeCycleData(entity=table, life_cycle=life_cycle)
)
except Exception as exc:
yield Either(
right=OMetaLifeCycleData(entity=table, life_cycle=life_cycle)
left=StackTraceError(
name=table.name.__root__,
error=f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}",
stack_trace=traceback.format_exc(),
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=table.name.__root__,
error=f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}",
stack_trace=traceback.format_exc(),
)
)
def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]:
"""List Snowflake stored procedures"""

View File

@ -203,14 +203,18 @@ class StoredProcedureMixin(ABC):
# First, get all the query history
queries_dict = self.get_stored_procedure_queries_dict()
# Then for each procedure, iterate over all its queries
for procedure in self.context.stored_procedures:
logger.debug(f"Processing Lineage for [{procedure.name}]")
for query_by_procedure in (
queries_dict.get(procedure.name.__root__.lower()) or []
):
yield from self.yield_procedure_lineage(
query_by_procedure=query_by_procedure, procedure=procedure
)
yield from self.yield_procedure_query(
query_by_procedure=query_by_procedure, procedure=procedure
)
for procedure_fqn in self.context.stored_procedures:
procedure = self.metadata.get_by_name(
entity=StoredProcedure, fqn=procedure_fqn
)
if procedure:
logger.debug(f"Processing Lineage for [{procedure.name}]")
for query_by_procedure in (
queries_dict.get(procedure.name.__root__.lower()) or []
):
yield from self.yield_procedure_lineage(
query_by_procedure=query_by_procedure, procedure=procedure
)
yield from self.yield_procedure_query(
query_by_procedure=query_by_procedure, procedure=procedure
)

View File

@ -36,6 +36,7 @@ from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.teams.team import Team
@ -323,6 +324,18 @@ def _(
return _build(service_name, database_name, schema_name, model_name)
@fqn_build_registry.add(StoredProcedure)
def _(
_: Optional[OpenMetadata],
*,
service_name: str,
database_name: str,
schema_name: str,
procedure_name: str,
) -> str:
return _build(service_name, database_name, schema_name, procedure_name)
@fqn_build_registry.add(Pipeline)
def _(
_: Optional[OpenMetadata],