From 42393f5de3bdf86e6d8d134af9ae7ffde9702f65 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 13 Sep 2023 08:47:45 +0200 Subject: [PATCH] Part of #12998 - Add Stored Procedures support in BigQuery (#13160) * Fix snowflake SP source url * Update sourceUrl on PUT * Add BigQuery Stored Procedures support * Linting * Linting * lint * Linting --- .../source/database/bigquery/metadata.py | 232 +++++++++++++++++- .../source/database/bigquery/models.py | 36 +++ .../source/database/bigquery/queries.py | 73 +++++- .../source/database/database_service.py | 5 +- .../source/database/snowflake/metadata.py | 3 +- .../source/database/snowflake/models.py | 35 ++- .../jdbi3/StoredProcedureRepository.java | 5 +- .../service/jdbi3/TableRepository.java | 2 +- 8 files changed, 377 insertions(+), 14 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/bigquery/models.py diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 4519ad3253f..a5a764f38b7 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -12,8 +12,11 @@ We require Taxonomy Admin permissions to fetch all Policy Tags """ import os +import re import traceback -from typing import Iterable, List, Optional, Tuple +from collections import defaultdict +from functools import lru_cache +from typing import Dict, Iterable, List, Optional, Tuple from google import auth from google.cloud.datacatalog_v1 import PolicyTagManagerClient @@ -27,7 +30,13 @@ from sqlalchemy_bigquery._types import _get_sqla_column_type from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.database import Database, EntityName +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( IntervalType, TablePartition, @@ -47,11 +56,22 @@ from metadata.generated.schema.security.credentials.gcpValues import ( MultipleProjectId, SingleProjectId, ) +from metadata.generated.schema.type.basic import SourceUrl, SqlQuery, Timestamp +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.source.database.bigquery.models import ( + STORED_PROC_LANGUAGE_MAP, + BigQueryStoredProcedure, +) from metadata.ingestion.source.database.bigquery.queries import ( + BIGQUERY_GET_STORED_PROCEDURE_QUERIES, + BIGQUERY_GET_STORED_PROCEDURES, BIGQUERY_SCHEMA_DESCRIPTION, BIGQUERY_TABLE_AND_TYPE, ) @@ -60,12 +80,15 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.database_service import QueryByProcedure from metadata.utils import fqn from metadata.utils.bigquery_utils import get_bigquery_client from metadata.utils.credentials import GOOGLE_CREDENTIALS from metadata.utils.filters import filter_by_database +from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import is_complex_type +from metadata.utils.stored_procedures import get_procedure_name_from_call from metadata.utils.tag_utils import ( get_ometa_tag_and_classification, get_tag_label, @@ -483,12 +506,12 @@ class BigquerySource(CommonDbSourceService): os.remove(tmp_credentials_file) del os.environ[GOOGLE_CREDENTIALS] - def get_source_url( + def _get_source_url( self, database_name: Optional[str] = None, schema_name: Optional[str] = None, table_name: Optional[str] = None, - table_type: Optional[TableType] = None, + type_infix: str = "4m3", ) -> Optional[str]: """ Method to get the source url for bigquery @@ -502,7 +525,7 @@ class BigquerySource(CommonDbSourceService): schema_table_url = f"&ws=!1m4!1m3!3m2!1s{database_name}!2s{schema_name}" if table_name: schema_table_url = ( - f"&ws=!1m5!1m4!4m3!1s{database_name}" + f"&ws=!1m5!1m4!{type_infix}!1s{database_name}" f"!2s{schema_name}!3s{table_name}" ) if schema_table_url: @@ -512,3 +535,202 @@ class BigquerySource(CommonDbSourceService): logger.debug(traceback.format_exc()) logger.warning(f"Unable to get source url: {exc}") return None + + def get_source_url( + self, + database_name: Optional[str] = None, + schema_name: Optional[str] = None, + table_name: Optional[str] = None, + table_type: Optional[TableType] = None, + ) -> Optional[str]: + return self._get_source_url( + database_name=database_name, + schema_name=schema_name, + table_name=table_name, + # This infix identifies tables in the URL + type_infix="4m3", + ) + + def get_stored_procedure_url( + self, + database_name: Optional[str] = None, + schema_name: Optional[str] = None, + table_name: Optional[str] = None, + ) -> Optional[str]: + return self._get_source_url( + database_name=database_name, + schema_name=schema_name, + table_name=table_name, + # This infix identifies Stored Procedures in the URL + type_infix="6m3", + ) + + def get_stored_procedures(self) -> Iterable[BigQueryStoredProcedure]: + """List BigQuery Stored Procedures""" + if self.source_config.includeStoredProcedures: + results = self.engine.execute( + BIGQUERY_GET_STORED_PROCEDURES.format( + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + ) + ).all() + for row in results: + stored_procedure = BigQueryStoredProcedure.parse_obj(dict(row)) + yield stored_procedure + + def yield_stored_procedure( + self, stored_procedure: BigQueryStoredProcedure + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Prepare the stored procedure payload""" + + try: + yield Either( + right=CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + storedProcedureCode=StoredProcedureCode( + language=STORED_PROC_LANGUAGE_MAP.get( + stored_procedure.language or "SQL", + ), + code=stored_procedure.definition, + ), + databaseSchema=self.context.database_schema.fullyQualifiedName, + sourceUrl=SourceUrl( + __root__=self.get_stored_procedure_url( + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + # Follow the same building strategy as tables + table_name=stored_procedure.name, + ) + ), + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=stored_procedure.name, + error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + @lru_cache + def procedure_queries_dict( + self, schema_name: str, database_name: str + ) -> Dict[str, List[QueryByProcedure]]: + """ + Cache the queries ran for the stored procedures in the last `queryLogDuration` days. + + We will run this for each different and db name. + + The dictionary key will be the case-insensitive procedure name. + """ + start, _ = get_start_and_end(self.source_config.queryLogDuration) + results = self.engine.execute( + BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format( + start_date=start, + ) + ).all() + + queries_dict = defaultdict(list) + + for row in results: + try: + query_by_procedure = QueryByProcedure.parse_obj(dict(row)) + procedure_name = get_procedure_name_from_call( + query_text=query_by_procedure.procedure_text, + schema_name=schema_name, + database_name=database_name, + ) + queries_dict[procedure_name].append(query_by_procedure) + except Exception as exc: + self.status.failed( + StackTraceError( + name="Stored Procedure", + error=f"Error trying to get procedure name due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + return queries_dict + + @staticmethod + def is_lineage_query(query_type: str, query_text: str) -> bool: + """Check if it's worth it to parse the query for lineage""" + + if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): + return True + + if query_type == "INSERT" and re.search( + "^.*insert.*into.*select.*$", query_text, re.IGNORECASE + ): + return True + + return False + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """ + Pick the stored procedure name from the context + and return the list of associated queries + """ + queries_dict = self.procedure_queries_dict( + schema_name=self.context.database_schema.name.__root__, + database_name=self.context.database.name.__root__, + ) + + for query_by_procedure in ( + queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or [] + ): + yield query_by_procedure + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Add procedure lineage from its query""" + + self.update_context(key="stored_procedure_query_lineage", value=False) + if self.is_lineage_query( + query_type=query_by_procedure.query_type, + query_text=query_by_procedure.query_text, + ): + self.update_context(key="stored_procedure_query_lineage", value=True) + + for either_lineage in get_lineage_by_query( + self.metadata, + query=query_by_procedure.query_text, + service_name=self.context.database_service.name.__root__, + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + dialect=ConnectionTypeDialectMapper.dialect_of( + self.context.database_service.serviceType.value + ), + timeout_seconds=self.source_config.queryParsingTimeoutLimit, + lineage_source=LineageSource.QueryLineage, + ): + if either_lineage.right.edge.lineageDetails: + either_lineage.right.edge.lineageDetails.pipeline = EntityReference( + id=self.context.stored_procedure.id, + type="storedProcedure", + ) + + yield either_lineage + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Check the queries triggered by the procedure and add their lineage, if any""" + + yield Either( + right=CreateQueryRequest( + query=SqlQuery(__root__=query_by_procedure.query_text), + query_type=query_by_procedure.query_type, + duration=query_by_procedure.query_duration, + queryDate=Timestamp( + __root__=int(query_by_procedure.query_start_time.timestamp()) * 1000 + ), + triggeredBy=EntityReference( + id=self.context.stored_procedure.id, + type="storedProcedure", + ), + processedLineage=bool(self.context.stored_procedure_query_lineage), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/models.py b/ingestion/src/metadata/ingestion/source/database/bigquery/models.py new file mode 100644 index 00000000000..e2191254d8d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/models.py @@ -0,0 +1,36 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +BigQuery models +""" +from typing import Optional + +from pydantic import BaseModel, Field + +from metadata.generated.schema.entity.data.storedProcedure import Language +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +STORED_PROC_LANGUAGE_MAP = { + "SQL": Language.SQL, + "JAVASCRIPT": Language.JavaScript, +} + + +class BigQueryStoredProcedure(BaseModel): + """BigQuery Stored Procedure list query results""" + + name: str + definition: str + language: Optional[str] = Field( + None, description="Will only be informed for non-SQL routines." + ) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py index 6b33dc0e146..5efce555427 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py @@ -30,8 +30,8 @@ WHERE creation_time BETWEEN "{start_time}" AND "{end_time}" {filters} AND job_type = "QUERY" AND state = "DONE" - AND IFNULL(statement_type, "NO") not in ("NO", "DROP_TABLE", "CREATE_TABLE") - AND query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND IFNULL(statement_type, "NO") not in ("NO", "DROP_TABLE") + AND query NOT LIKE '/*%%{"app": "OpenMetadata", %%}%%*/%%' AND query NOT LIKE '/* {{"app": "dbt", %%}} */%%' LIMIT {result_limit} """ @@ -56,3 +56,72 @@ BIGQUERY_TABLE_AND_TYPE = textwrap.dedent( select table_name, table_type from {}.INFORMATION_SCHEMA.TABLES where table_type != 'VIEW' """ ) + +BIGQUERY_GET_STORED_PROCEDURES = textwrap.dedent( + """ +SELECT + routine_name as name, + routine_definition as definition, + external_language as language +FROM test_omd.INFORMATION_SCHEMA.ROUTINES +WHERE routine_type in ('PROCEDURE', 'TABLE FUNCTION') + AND routine_catalog = '{database_name}' + AND routine_schema = '{schema_name}' + """ +) + +BIGQUERY_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent( + """ +WITH SP_HISTORY AS ( + SELECT + job_id, + query AS query_text, + start_time, + end_time, + user_email as user_name + FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT + WHERE statement_type = 'SCRIPT' + AND start_time >= '{start_date}' + AND job_type = "QUERY" + AND state = "DONE" + AND error_result is NULL + AND query LIKE 'CALL%%' +), +Q_HISTORY AS ( + SELECT + job_id, + project_id as database_name, + user_email as user_name, + statement_type as query_type, + start_time, + end_time, + query as query_text, + null as schema_name, + total_slot_ms/1000 as duration + FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT + WHERE statement_type <> 'SCRIPT' + AND start_time >= '{start_date}' + AND job_type = "QUERY" + AND state = "DONE" + AND error_result is NULL +) +SELECT + SP.job_id as procedure_id, + Q.job_id as query_id, + Q.query_type as query_type, + SP.query_text as procedure_text, + Q.query_text as query_text, + SP.start_time as procedure_start_time, + SP.end_time as procedure_end_time, + Q.start_time as query_start_time, + Q.end_time as query_end_time, + Q.duration as query_duration, + Q.user_name as query_user_name +FROM SP_HISTORY SP +JOIN Q_HISTORY Q + ON Q.start_time between SP.start_time and SP.end_time + AND Q.end_time between SP.start_time and SP.end_time + AND Q.user_name = SP.user_name +ORDER BY procedure_start_time DESC +""" +) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index cd616fe7466..b9172cf1c72 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -96,10 +96,13 @@ class QueryByProcedure(BaseModel): procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") query_start_time: datetime = Field(..., alias="QUERY_START_TIME") - query_duration: float = Field(..., alias="QUERY_DURATION") + query_duration: Optional[float] = Field(None, alias="QUERY_DURATION") query_text: str = Field(..., alias="QUERY_TEXT") query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") + class Config: + allow_population_by_field_name = True + class DatabaseServiceTopology(ServiceTopology): """ diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 59031ca4e0c..0235b7d16aa 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -557,7 +557,8 @@ class SnowflakeSource(CommonDbSourceService): database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, ) - + f"/{stored_procedure.name}{quote(stored_procedure.signature)}" + + f"/procedure/{stored_procedure.name}" + + f"{quote(stored_procedure.signature) if stored_procedure.signature else ''}" ), ) ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index ab7492ee9f1..da003704cd5 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -13,9 +13,12 @@ Snowflake models """ from typing import Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, validator from metadata.generated.schema.entity.data.storedProcedure import Language +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() STORED_PROC_LANGUAGE_MAP = { "PYTHON": Language.Python, @@ -32,5 +35,33 @@ class SnowflakeStoredProcedure(BaseModel): owner: Optional[str] = Field(..., alias="OWNER") language: str = Field(..., alias="LANGUAGE") definition: str = Field(..., alias="DEFINITION") - signature: Optional[str] = Field(..., alias="SIGNATURE") + signature: Optional[str] = Field( + ..., alias="SIGNATURE", description="Used to build the source URL" + ) comment: Optional[str] = Field(..., alias="COMMENT") + + # Update the signature to clean it up on read + @validator("signature") + def clean_signature( # pylint: disable=no-self-argument + cls, signature + ) -> Optional[str]: + """ + pylint: keeping the approach from pydantic docs + + A signature may look like `(TABLE_NAME VARCHAR, NAME VARCHAR)` + We want it to keep only `(VARCHAR, VARCHAR). + + This is needed to build the source URL of the procedure + """ + try: + clean_signature = signature.replace("(", "").replace(")", "") + if not clean_signature: + return None + + signature_list = clean_signature.split(",") + clean_signature_list = [elem.split(" ")[-1] for elem in signature_list] + + return f"({','.join(clean_signature_list)})" + except Exception as exc: + logger.warning(f"Error cleaning up Stored Procedure signature - [{exc}]") + return signature diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java index 9b5fec5f8e9..caa9b77bef6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java @@ -15,8 +15,8 @@ import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.FullyQualifiedName; public class StoredProcedureRepository extends EntityRepository { - static final String PATCH_FIELDS = "storedProcedureCode"; - static final String UPDATE_FIELDS = "storedProcedureCode"; + static final String PATCH_FIELDS = "storedProcedureCode,sourceUrl"; + static final String UPDATE_FIELDS = "storedProcedureCode,sourceUrl"; public StoredProcedureRepository(CollectionDAO dao) { super( @@ -112,6 +112,7 @@ public class StoredProcedureRepository extends EntityRepository @Override public void entitySpecificUpdate() { recordChange("storedProcedureCode", original.getStoredProcedureCode(), updated.getStoredProcedureCode()); + recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 91f76cff647..9823faad408 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -91,7 +91,7 @@ public class TableRepository extends EntityRepository { // Table fields that can be patched in a PATCH request static final String PATCH_FIELDS = "tableConstraints,tablePartition"; // Table fields that can be updated in a PUT request - static final String UPDATE_FIELDS = "tableConstraints,tablePartition,dataModel"; + static final String UPDATE_FIELDS = "tableConstraints,tablePartition,dataModel,sourceUrl"; public static final String FIELD_RELATION_COLUMN_TYPE = "table.columns.column"; public static final String FIELD_RELATION_TABLE_TYPE = "table";