MINOR: Handle udf definition fetch exceptions (#21188)

(cherry picked from commit 7abbb73ae23ce280c9818b110da10c325d8e6cae)
This commit is contained in:
Mayur Singal 2025-05-15 00:01:03 +05:30 committed by OpenMetadata Release Bot
parent c7c0abd58b
commit 6ad2f3dcc7
2 changed files with 66 additions and 35 deletions

View File

@ -71,14 +71,22 @@ class TopologyRunnerMixin(Generic[C]):
cache = defaultdict(dict)
queue = Queue()
def _run_node_producer(self, node: TopologyNode) -> Iterable[Entity]:
"""Run the node producer"""
try:
node_producer = getattr(self, node.producer)
yield from node_producer() or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error running node producer: {exc}")
def _multithread_process_node(
self, node: TopologyNode, threads: int
) -> Iterable[Entity]:
"""Multithread Processing of a Node"""
node_producer = getattr(self, node.producer)
child_nodes = self._get_child_nodes(node)
node_entities = list(node_producer() or [])
node_entities = list(self._run_node_producer(node) or [])
node_entities_length = len(node_entities)
if node_entities_length == 0:
@ -120,10 +128,9 @@ class TopologyRunnerMixin(Generic[C]):
def _process_node(self, node: TopologyNode) -> Iterable[Entity]:
"""Processing of a Node in a single thread."""
node_producer = getattr(self, node.producer)
child_nodes = self._get_child_nodes(node)
for node_entity in node_producer() or []:
for node_entity in self._run_node_producer(node) or []:
for stage in node.stages:
yield from self._process_stage(
stage=stage, node_entity=node_entity, child_nodes=child_nodes
@ -217,6 +224,17 @@ class TopologyRunnerMixin(Generic[C]):
else []
)
def _run_stage_processor(
self, stage: NodeStage, node_entity: Any
) -> Iterable[Entity]:
"""Run the stage processor"""
try:
stage_fn = getattr(self, stage.processor)
yield from stage_fn(node_entity) or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error running stage processor: {exc}")
def _process_stage(
self, stage: NodeStage, node_entity: Any, child_nodes: List[TopologyNode]
) -> Iterable[Entity]:
@ -230,8 +248,9 @@ class TopologyRunnerMixin(Generic[C]):
"""
logger.debug(f"Processing stage: {stage}")
stage_fn = getattr(self, stage.processor)
for entity_request in stage_fn(node_entity) or []:
for entity_request in (
self._run_stage_processor(stage=stage, node_entity=node_entity) or []
):
try:
# yield and make sure the data is updated
yield from self.sink_request(stage=stage, entity_request=entity_request)

View File

@ -683,24 +683,28 @@ class SnowflakeSource(
def _get_stored_procedures_internal(
self, query: str
) -> Iterable[SnowflakeStoredProcedure]:
results = self.engine.execute(
query.format(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
account_usage=self.service_connection.accountUsageSchema,
)
).all()
for row in results:
stored_procedure = SnowflakeStoredProcedure.model_validate(dict(row))
if stored_procedure.definition is None:
logger.debug(
f"Missing ownership permissions on procedure {stored_procedure.name}."
" Trying to fetch description via DESCRIBE."
try:
results = self.engine.execute(
query.format(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
account_usage=self.service_connection.accountUsageSchema,
)
stored_procedure.definition = self.describe_procedure_definition(
stored_procedure
)
yield stored_procedure
).all()
for row in results:
stored_procedure = SnowflakeStoredProcedure.model_validate(dict(row))
if stored_procedure.definition is None:
logger.debug(
f"Missing ownership permissions on procedure {stored_procedure.name}."
" Trying to fetch description via DESCRIBE."
)
stored_procedure.definition = self.describe_procedure_definition(
stored_procedure
)
yield stored_procedure
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error fetching stored procedures: {exc}")
def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]:
"""List Snowflake stored procedures"""
@ -720,19 +724,27 @@ class SnowflakeSource(
Then, if the procedure is created with `EXECUTE AS CALLER`, we can still try to
get the definition with a DESCRIBE.
"""
if stored_procedure.procedure_type == StoredProcedureType.StoredProcedure.value:
query = SNOWFLAKE_DESC_STORED_PROCEDURE
else:
query = SNOWFLAKE_DESC_FUNCTION
res = self.engine.execute(
query.format(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
procedure_name=stored_procedure.name,
procedure_signature=stored_procedure.unquote_signature(),
try:
if (
stored_procedure.procedure_type
== StoredProcedureType.StoredProcedure.value
):
query = SNOWFLAKE_DESC_STORED_PROCEDURE
else:
query = SNOWFLAKE_DESC_FUNCTION
res = self.engine.execute(
query.format(
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
procedure_name=stored_procedure.name,
procedure_signature=stored_procedure.unquote_signature(),
)
)
)
return dict(res.all()).get("body", "")
return dict(res.all()).get("body", "")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error fetching stored procedure definition: {exc}")
return ""
def yield_stored_procedure(
self, stored_procedure: SnowflakeStoredProcedure