Fix Stored Procedures - Don't get queries without context (#13257)

This commit is contained in:
Pere Miquel Brull 2023-09-19 12:02:29 +02:00 committed by GitHub
parent b07dac8c1d
commit 7d711dc680
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 44 deletions

View File

@ -630,18 +630,21 @@ class BigquerySource(StoredProcedureMixin, CommonDbSourceService):
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
# Only process if we actually have yield a stored procedure
if self.context.stored_procedure:
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or []
):
yield query_by_procedure
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower())
or []
):
yield query_by_procedure

View File

@ -278,19 +278,22 @@ class RedshiftSource(StoredProcedureMixin, CommonDbSourceService):
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
database_name=self.context.database.name.__root__,
)
# Only process if we actually have yield a stored procedure
if self.context.stored_procedure:
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
database_name=self.context.database.name.__root__,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or []
):
yield query_by_procedure
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower())
or []
):
yield query_by_procedure

View File

@ -570,21 +570,24 @@ class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceS
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
warehouse=self.service_connection.warehouse,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
# Only process if we actually have yield a stored procedure
if self.context.stored_procedure:
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
warehouse=self.service_connection.warehouse,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or []
):
yield query_by_procedure
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower())
or []
):
yield query_by_procedure

View File

@ -21,6 +21,7 @@ from typing import Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.ingestion.api.models import Entity
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
METADATA_LOGGER = "metadata"
@ -216,3 +217,11 @@ def _(record: DeleteEntity) -> str:
Capture information about the deleted Entity
"""
return f"{type(record.entity).__name__} [{record.entity.name.__root__}]"
@get_log_name.register
def _(record: OMetaLifeCycleData) -> str:
"""
Capture the lifecycle changes of an Entity
"""
return f"{type(record.entity).__name__} Lifecycle [{record.entity.name.__root__}]"