mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-01 05:03:10 +00:00
* Fix #14012 - Snowflake procedure with empty definition * Add debugs
This commit is contained in:
parent
4a2a81c34b
commit
f847c1cf10
@ -13,6 +13,7 @@ Snowflake source module
|
|||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
import traceback
|
import traceback
|
||||||
|
import urllib
|
||||||
from typing import Dict, Iterable, List, Optional, Tuple
|
from typing import Dict, Iterable, List, Optional, Tuple
|
||||||
|
|
||||||
import sqlparse
|
import sqlparse
|
||||||
@ -58,6 +59,7 @@ from metadata.ingestion.source.database.snowflake.models import (
|
|||||||
SnowflakeStoredProcedure,
|
SnowflakeStoredProcedure,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.source.database.snowflake.queries import (
|
from metadata.ingestion.source.database.snowflake.queries import (
|
||||||
|
SNOWFLAKE_DESC_STORED_PROCEDURE,
|
||||||
SNOWFLAKE_FETCH_ALL_TAGS,
|
SNOWFLAKE_FETCH_ALL_TAGS,
|
||||||
SNOWFLAKE_GET_CLUSTER_KEY,
|
SNOWFLAKE_GET_CLUSTER_KEY,
|
||||||
SNOWFLAKE_GET_CURRENT_ACCOUNT,
|
SNOWFLAKE_GET_CURRENT_ACCOUNT,
|
||||||
@ -515,8 +517,36 @@ class SnowflakeSource(
|
|||||||
).all()
|
).all()
|
||||||
for row in results:
|
for row in results:
|
||||||
stored_procedure = SnowflakeStoredProcedure.parse_obj(dict(row))
|
stored_procedure = SnowflakeStoredProcedure.parse_obj(dict(row))
|
||||||
|
if stored_procedure.definition is None:
|
||||||
|
logger.debug(
|
||||||
|
f"Missing ownership permissions on procedure {stored_procedure.name}."
|
||||||
|
" Trying to fetch description via DESCRIBE."
|
||||||
|
)
|
||||||
|
stored_procedure.definition = self.describe_procedure_definition(
|
||||||
|
stored_procedure
|
||||||
|
)
|
||||||
yield stored_procedure
|
yield stored_procedure
|
||||||
|
|
||||||
|
def describe_procedure_definition(
|
||||||
|
self, stored_procedure: SnowflakeStoredProcedure
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
We can only get the SP definition via the INFORMATION_SCHEMA.PROCEDURES if the
|
||||||
|
user has OWNERSHIP grants, which will not always be the case.
|
||||||
|
|
||||||
|
Then, if the procedure is created with `EXECUTE AS CALLER`, we can still try to
|
||||||
|
get the definition with a DESCRIBE.
|
||||||
|
"""
|
||||||
|
res = self.engine.execute(
|
||||||
|
SNOWFLAKE_DESC_STORED_PROCEDURE.format(
|
||||||
|
database_name=self.context.database.name.__root__,
|
||||||
|
schema_name=self.context.database_schema.name.__root__,
|
||||||
|
procedure_name=stored_procedure.name,
|
||||||
|
procedure_signature=urllib.parse.unquote(stored_procedure.signature),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return dict(res.all()).get("body", "")
|
||||||
|
|
||||||
def yield_stored_procedure(
|
def yield_stored_procedure(
|
||||||
self, stored_procedure: SnowflakeStoredProcedure
|
self, stored_procedure: SnowflakeStoredProcedure
|
||||||
) -> Iterable[Either[CreateStoredProcedureRequest]]:
|
) -> Iterable[Either[CreateStoredProcedureRequest]]:
|
||||||
|
@ -35,7 +35,7 @@ class SnowflakeStoredProcedure(BaseModel):
|
|||||||
name: str = Field(..., alias="NAME")
|
name: str = Field(..., alias="NAME")
|
||||||
owner: Optional[str] = Field(..., alias="OWNER")
|
owner: Optional[str] = Field(..., alias="OWNER")
|
||||||
language: str = Field(..., alias="LANGUAGE")
|
language: str = Field(..., alias="LANGUAGE")
|
||||||
definition: str = Field(..., alias="DEFINITION")
|
definition: str = Field(None, alias="DEFINITION")
|
||||||
signature: Optional[str] = Field(
|
signature: Optional[str] = Field(
|
||||||
..., alias="SIGNATURE", description="Used to build the source URL"
|
..., alias="SIGNATURE", description="Used to build the source URL"
|
||||||
)
|
)
|
||||||
|
@ -173,6 +173,10 @@ WHERE PROCEDURE_CATALOG = '{database_name}'
|
|||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
SNOWFLAKE_DESC_STORED_PROCEDURE = (
|
||||||
|
"DESC PROCEDURE {database_name}.{schema_name}.{procedure_name}{procedure_signature}"
|
||||||
|
)
|
||||||
|
|
||||||
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
|
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
|
||||||
"""
|
"""
|
||||||
WITH SP_HISTORY AS (
|
WITH SP_HISTORY AS (
|
||||||
|
@ -125,6 +125,10 @@ class StoredProcedureMixin(ABC):
|
|||||||
def is_lineage_query(query_type: str, query_text: str) -> bool:
|
def is_lineage_query(query_type: str, query_text: str) -> bool:
|
||||||
"""Check if it's worth it to parse the query for lineage"""
|
"""Check if it's worth it to parse the query for lineage"""
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Validating query lineage for type [{query_type}] and text [{query_text}]"
|
||||||
|
)
|
||||||
|
|
||||||
if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
|
if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -139,7 +143,6 @@ class StoredProcedureMixin(ABC):
|
|||||||
self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure
|
self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure
|
||||||
) -> Iterable[Either[AddLineageRequest]]:
|
) -> Iterable[Either[AddLineageRequest]]:
|
||||||
"""Add procedure lineage from its query"""
|
"""Add procedure lineage from its query"""
|
||||||
|
|
||||||
self.context.stored_procedure_query_lineage = False
|
self.context.stored_procedure_query_lineage = False
|
||||||
if self.is_lineage_query(
|
if self.is_lineage_query(
|
||||||
query_type=query_by_procedure.query_type,
|
query_type=query_by_procedure.query_type,
|
||||||
@ -201,6 +204,7 @@ class StoredProcedureMixin(ABC):
|
|||||||
queries_dict = self.get_stored_procedure_queries_dict()
|
queries_dict = self.get_stored_procedure_queries_dict()
|
||||||
# Then for each procedure, iterate over all its queries
|
# Then for each procedure, iterate over all its queries
|
||||||
for procedure in self.context.stored_procedures:
|
for procedure in self.context.stored_procedures:
|
||||||
|
logger.debug(f"Processing Lineage for [{procedure.name}]")
|
||||||
for query_by_procedure in (
|
for query_by_procedure in (
|
||||||
queries_dict.get(procedure.name.__root__.lower()) or []
|
queries_dict.get(procedure.name.__root__.lower()) or []
|
||||||
):
|
):
|
||||||
|
@ -89,6 +89,14 @@ GRANT IMPORTED PRIVILEGES ON ALL SCHEMAS IN DATABASE SNOWFLAKE TO ROLE NEW_ROLE;
|
|||||||
|
|
||||||
You can find more information about the `account_usage` schema [here](https://docs.snowflake.com/en/sql-reference/account-usage).
|
You can find more information about the `account_usage` schema [here](https://docs.snowflake.com/en/sql-reference/account-usage).
|
||||||
|
|
||||||
|
Regarding Stored Procedures:
|
||||||
|
1. Snowflake only allows the grant of `USAGE` or `OWNERSHIP`
|
||||||
|
2. A user can only see the definition of the procedure in 2 situations:
|
||||||
|
1. If it has the `OWNERSHIP` grant,
|
||||||
|
2. If it has the `USAGE` grant and the procedure is created with `EXECUTE AS CALLER`.
|
||||||
|
|
||||||
|
Make sure to add the `GRANT <USAGE|OWNERSHIP> ON PROCEDURE <NAME>(<SIGNATURE>) to NEW_ROLE`, e.g., `GRANT USAGE ON PROCEDURE CLEAN_DATA(varchar, varchar) to NEW_ROLE`.
|
||||||
|
|
||||||
## Metadata Ingestion
|
## Metadata Ingestion
|
||||||
|
|
||||||
{% partial
|
{% partial
|
||||||
|
Loading…
x
Reference in New Issue
Block a user