diff --git a/.pylintrc b/.pylintrc index 004c2064c0c..7508666b5ee 100644 --- a/.pylintrc +++ b/.pylintrc @@ -22,6 +22,7 @@ fail-under=6.0 init-hook='from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))' extension-pkg-allow-list=pydantic load-plugins=ingestion.plugins.print_checker +max-public-methods=25 [MESSAGES CONTROL] disable=no-name-in-module,import-error,duplicate-code diff --git a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql index 126eb63bc33..0cb82c11672 100644 --- a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql @@ -83,3 +83,13 @@ CREATE TABLE IF NOT EXISTS table_entity_extension ( ALTER TABLE entity_relationship ADD INDEX from_entity_type_index(fromId, fromEntity), ADD INDEX to_entity_type_index(toId, toEntity); ALTER TABLE tag DROP CONSTRAINT fqnHash, ADD CONSTRAINT UNIQUE(fqnHash), ADD PRIMARY KEY(id); + + +-- rename viewParsingTimeoutLimit for queryParsingTimeoutLimit +UPDATE ingestion_pipeline_entity +SET json = JSON_INSERT( + JSON_REMOVE(json, '$.sourceConfig.config.viewParsingTimeoutLimit'), + '$.sourceConfig.config.queryParsingTimeoutLimit', + JSON_EXTRACT(json, '$.sourceConfig.config.viewParsingTimeoutLimit') +) +WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata'; diff --git a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql index ecd7d82d720..9f42077c610 100644 --- a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql @@ -90,3 +90,14 @@ ALTER TABLE tag DROP CONSTRAINT IF EXISTS tag_fqnhash_key; ALTER TABLE tag ADD CONSTRAINT unique_fqnHash UNIQUE (fqnHash); ALTER TABLE tag ADD CONSTRAINT tag_pk PRIMARY KEY (id); + + +-- rename viewParsingTimeoutLimit for queryParsingTimeoutLimit +UPDATE ingestion_pipeline_entity +SET json = jsonb_set( + json::jsonb #- '{sourceConfig,config,viewParsingTimeoutLimit}', + '{sourceConfig,config,queryParsingTimeoutLimit}', + (json #> '{sourceConfig,config,viewParsingTimeoutLimit}')::jsonb, + true +) +WHERE json #>> '{pipelineType}' = 'metadata'; diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index e5e8351b845..9f59af55495 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -129,7 +129,6 @@ def build_datamodel_name(model_name: str, explore_name: str) -> str: return clean_dashboard_name(model_name + "_" + explore_name) -# pylint: disable=too-many-public-methods class LookerSource(DashboardServiceSource): """ Looker Source Class. diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 6d90fea9f9f..b991977fd4a 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -14,7 +14,7 @@ Generic source to build SQL connectors. import traceback from abc import ABC from copy import deepcopy -from typing import Iterable, List, Optional, Tuple +from typing import Any, Iterable, List, Optional, Tuple from pydantic import BaseModel from sqlalchemy.engine import Connection @@ -26,6 +26,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import ( @@ -49,7 +53,10 @@ from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource from metadata.ingestion.source.models import TableView @@ -356,6 +363,27 @@ class CommonDbSourceService( Each source should implement its own when needed """ + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + @calculate_execution_time_generator def yield_table( self, table_name_and_type: Tuple[str, str] @@ -452,7 +480,7 @@ class CommonDbSourceService( metadata=self.metadata, service_name=self.context.database_service.name.__root__, connection_type=self.service_connection.type.value, - timeout_seconds=self.source_config.viewParsingTimeoutLimit, + timeout_seconds=self.source_config.queryParsingTimeoutLimit, ) def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]: diff --git a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py index 4fc7ce38cd9..56cfc066471 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py @@ -14,7 +14,7 @@ Common NoSQL source methods. import traceback from abc import ABC, abstractmethod -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from pandas import json_normalize @@ -22,6 +22,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -39,7 +43,10 @@ from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.utils import fqn from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE @@ -246,6 +253,27 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC): tags are not supported with NoSQL """ + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def get_source_url( self, database_name: Optional[str] = None, diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index bde7b89605a..cd616fe7466 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -12,15 +12,20 @@ Base class for ingesting database services """ from abc import ABC, abstractmethod -from typing import Iterable, List, Optional, Set, Tuple +from datetime import datetime +from typing import Any, Iterable, List, Optional, Set, Tuple -from pydantic import BaseModel +from pydantic import BaseModel, Field from sqlalchemy.engine import Inspector from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createDatabaseService import ( @@ -28,6 +33,8 @@ from metadata.generated.schema.api.services.createDatabaseService import ( ) from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.query import Query +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure from metadata.generated.schema.entity.data.table import ( Column, DataModel, @@ -77,6 +84,23 @@ class DataModelLink(BaseModel): datamodel: DataModel +class QueryByProcedure(BaseModel): + """ + Query(ies) executed by each stored procedure + """ + + procedure_id: str = Field(..., alias="PROCEDURE_ID") + query_id: str = Field(..., alias="QUERY_ID") + query_type: str = Field(..., alias="QUERY_TYPE") + procedure_text: str = Field(..., alias="PROCEDURE_TEXT") + procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") + procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") + query_start_time: datetime = Field(..., alias="QUERY_START_TIME") + query_duration: float = Field(..., alias="QUERY_DURATION") + query_text: str = Field(..., alias="QUERY_TEXT") + query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") + + class DatabaseServiceTopology(ServiceTopology): """ Defines the hierarchy in Database Services. @@ -130,7 +154,7 @@ class DatabaseServiceTopology(ServiceTopology): consumer=["database_service", "database"], ), ], - children=["table"], + children=["table", "stored_procedure"], post_process=["mark_tables_as_deleted"], ) table = TopologyNode( @@ -151,6 +175,36 @@ class DatabaseServiceTopology(ServiceTopology): ), ], ) + stored_procedure = TopologyNode( + producer="get_stored_procedures", + stages=[ + NodeStage( + type_=StoredProcedure, + context="stored_procedure", + processor="yield_stored_procedure", + consumer=["database_service", "database", "database_schema"], + ), + ], + children=["stored_procedure_queries"], + ) + stored_procedure_queries = TopologyNode( + producer="get_stored_procedure_queries", + stages=[ + NodeStage( + type_=AddLineageRequest, # TODO: Fix context management for multiple types + processor="yield_procedure_lineage", + context="stored_procedure_query_lineage", # Used to flag if the query has had processed lineage + nullable=True, + ack_sink=False, + ), + NodeStage( + type_=Query, + processor="yield_procedure_query", + nullable=True, + ack_sink=False, + ), + ], + ) class DatabaseServiceSource( @@ -274,6 +328,32 @@ class DatabaseServiceSource( Also, update the self.inspector value to the current db. """ + @abstractmethod + def get_stored_procedures(self) -> Iterable[Any]: + """List stored procedures to process""" + + @abstractmethod + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Process the stored procedure information""" + + @abstractmethod + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """List the queries associated to a stored procedure""" + + @abstractmethod + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Process the stored procedure query""" + + @abstractmethod + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Add procedure lineage from its query""" + def get_raw_database_schema_names(self) -> Iterable[str]: """ fetch all schema names without any filtering. diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py index c1255d5f5ec..34521794472 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py @@ -13,7 +13,7 @@ Databricks Unity Catalog Source source methods. """ import json import traceback -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple from databricks.sdk.service.catalog import ColumnInfo from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint @@ -23,6 +23,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database @@ -52,7 +56,10 @@ from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.ingestion.source.database.databricks.connection import get_connection from metadata.ingestion.source.database.databricks.models import ( ColumnJson, @@ -484,5 +491,26 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource): ) -> Iterable[Either[OMetaTagAndClassification]]: """No tags being processed""" + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def close(self): """Nothing to close""" diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 50d3690903c..498e2887603 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -13,12 +13,16 @@ DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs """ import traceback -from typing import Iterable, List, Optional, Tuple +from typing import Any, Iterable, List, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -55,7 +59,10 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.column_helpers import truncate_column_name -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.ingestion.source.database.datalake.columns import clean_dataframe from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.readers.dataframe.reader_factory import SupportedTypes @@ -572,6 +579,27 @@ class DatalakeSource(DatabaseServiceSource): ) -> Iterable[Either[OMetaTagAndClassification]]: """We don't bring tag information""" + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def standardize_table_name( self, schema: str, table: str # pylint: disable=unused-argument ) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py index 2b80c7605a6..d96d8b3a538 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py @@ -22,6 +22,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -44,7 +48,10 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.utils import fqn from metadata.utils.constants import DEFAULT_DATABASE from metadata.utils.filters import filter_by_schema, filter_by_table @@ -399,5 +406,26 @@ class DeltalakeSource(DatabaseServiceSource): ) -> Iterable[Either[OMetaTagAndClassification]]: """We don't pick up tags from Delta""" + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def close(self): """No client to close""" diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index a1d2a9f091c..c93a8687c91 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -14,13 +14,17 @@ Domo Database source to extract metadata """ import traceback -from typing import Iterable, List, Optional, Tuple +from typing import Any, Iterable, List, Optional, Tuple from metadata.clients.domo_client import DomoClient from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Column, Table, TableType @@ -42,7 +46,10 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.ingestion.source.database.domodatabase.models import ( OutputDataset, Owner, @@ -219,6 +226,27 @@ class DomodatabaseSource(DatabaseServiceSource): ) -> Iterable[Either[OMetaTagAndClassification]]: """No tags to send""" + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: yield from [] diff --git a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py index 1d40957e0fc..2328964ada2 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py @@ -12,12 +12,16 @@ Glue source methods. """ import traceback -from typing import Iterable, Optional, Tuple +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database @@ -42,7 +46,10 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.column_helpers import truncate_column_name from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.ingestion.source.database.glue.models import Column as GlueColumn from metadata.ingestion.source.database.glue.models import ( DatabasePage, @@ -349,6 +356,27 @@ class GlueSource(DatabaseServiceSource): ) -> Iterable[Either[OMetaTagAndClassification]]: """We don't pick up tags from Glue""" + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def get_source_url( self, database_name: Optional[str], diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py index c29a359e347..3213375efe4 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py @@ -12,12 +12,16 @@ Salesforce source ingestion """ import traceback -from typing import Iterable, Optional, Tuple +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import ( @@ -44,7 +48,10 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn -from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.database_service import ( + DatabaseServiceSource, + QueryByProcedure, +) from metadata.utils import fqn from metadata.utils.constants import DEFAULT_DATABASE from metadata.utils.filters import filter_by_table @@ -270,6 +277,27 @@ class SalesforceSource(DatabaseServiceSource): ) -> Iterable[Either[OMetaTagAndClassification]]: """No tags to pick up""" + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """Not Implemented""" + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Not implemented""" + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Not implemented""" + def standardize_table_name( # pylint: disable=unused-argument self, schema: str, table: str ) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index f337f0f8f3f..a153d030cd9 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -12,16 +12,26 @@ Snowflake source module """ import json +import re import traceback -from typing import Iterable, List, Optional, Tuple +from collections import defaultdict +from functools import lru_cache +from typing import Dict, Iterable, List, Optional, Tuple import sqlparse +from requests.utils import quote from snowflake.sqlalchemy.custom_types import VARIANT from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names from sqlalchemy.engine.reflection import Inspector from sqlparse.sql import Function, Identifier +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( IntervalType, TablePartition, @@ -36,9 +46,19 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.basic import ( + EntityName, + SourceUrl, + SqlQuery, + Timestamp, +) +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.lifeCycle import Created, Deleted from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.life_cycle import OMetaLifeCycleData from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type @@ -46,9 +66,14 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.database_service import QueryByProcedure from metadata.ingestion.source.database.snowflake.constants import ( SNOWFLAKE_REGION_ID_MAP, ) +from metadata.ingestion.source.database.snowflake.models import ( + STORED_PROC_LANGUAGE_MAP, + SnowflakeStoredProcedure, +) from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_FETCH_ALL_TAGS, SNOWFLAKE_GET_CLUSTER_KEY, @@ -57,6 +82,8 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_DATABASE_COMMENTS, SNOWFLAKE_GET_DATABASES, SNOWFLAKE_GET_SCHEMA_COMMENTS, + SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES, + SNOWFLAKE_GET_STORED_PROCEDURES, SNOWFLAKE_LIFE_CYCLE_QUERY, SNOWFLAKE_SESSION_TAG_QUERY, ) @@ -76,9 +103,11 @@ from metadata.ingestion.source.database.snowflake.utils import ( ) from metadata.utils import fqn from metadata.utils.filters import filter_by_database +from metadata.utils.helpers import get_start_and_end from metadata.utils.life_cycle_utils import init_empty_life_cycle_properties from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import get_all_table_comments +from metadata.utils.stored_procedures import get_procedure_name_from_call from metadata.utils.tag_utils import get_ometa_tag_and_classification ischema_names["VARIANT"] = VARIANT @@ -120,6 +149,9 @@ class SnowflakeSource(CommonDbSourceService): self.schema_desc_map = {} self.database_desc_map = {} + self._account: Optional[str] = None + self._region: Optional[str] = None + @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -130,6 +162,35 @@ class SnowflakeSource(CommonDbSourceService): ) return cls(config, metadata_config) + @property + def account(self) -> Optional[str]: + """Query the account information""" + if self._account is None: + self._account = self._get_current_account() + + return self._account + + @property + def region(self) -> Optional[str]: + """ + Query the region information + + Region id can be a vanilla id like "AWS_US_WEST_2" + and in case of multi region group it can be like "PUBLIC.AWS_US_WEST_2" + in such cases this method will extract vanilla region id and return the + region name from constant map SNOWFLAKE_REGION_ID_MAP + + for more info checkout this doc: + https://docs.snowflake.com/en/sql-reference/functions/current_region + """ + if self._region is None: + raw_region = self._get_current_region() + if raw_region: + clean_region_id = raw_region.split(".")[-1] + self._region = SNOWFLAKE_REGION_ID_MAP.get(clean_region_id.lower()) + + return self._region + def set_session_query_tag(self) -> None: """ Method to set query tag for current session @@ -388,20 +449,17 @@ class SnowflakeSource(CommonDbSourceService): logger.debug(f"Failed to fetch current account due to: {exc}") return None - def _clean_region_name(self, region_id: Optional[str]) -> Optional[str]: - """ - Region id can be a vanilla id like "AWS_US_WEST_2" - and in case of multi region group it can be like "PUBLIC.AWS_US_WEST_2" - in such cases this method will extract vanilla region id and return the - region name from constant map SNOWFLAKE_REGION_ID_MAP + def _get_source_url_root( + self, database_name: Optional[str] = None, schema_name: Optional[str] = None + ) -> str: + url = ( + f"https://app.snowflake.com/{self.region.lower()}" + f"/{self.account.lower()}/#/data/databases/{database_name}" + ) + if schema_name: + url = f"{url}/schemas/{schema_name}" - for more info checkout this doc: - https://docs.snowflake.com/en/sql-reference/functions/current_region - """ - if region_id: - clean_region_id = region_id.split(".")[-1] - return SNOWFLAKE_REGION_ID_MAP.get(clean_region_id.lower()) - return None + return url def get_source_url( self, @@ -414,19 +472,13 @@ class SnowflakeSource(CommonDbSourceService): Method to get the source url for snowflake """ try: - account = self._get_current_account() - region_id = self._get_current_region() - region_name = self._clean_region_name(region_id) - if account and region_name: + if self.account and self.region: tab_type = "view" if table_type == TableType.View else "table" - url = ( - f"https://app.snowflake.com/{region_name.lower()}" - f"/{account.lower()}/#/data/databases/{database_name}" + url = self._get_source_url_root( + database_name=database_name, schema_name=schema_name ) - if schema_name: - url = f"{url}/schemas/{schema_name}" - if table_name: - url = f"{url}/{tab_type}/{table_name}" + if table_name: + url = f"{url}/{tab_type}/{table_name}" return url except Exception as exc: logger.debug(traceback.format_exc()) @@ -437,8 +489,8 @@ class SnowflakeSource(CommonDbSourceService): """ Get the life cycle data of the table """ + table = self.context.table try: - table = self.context.table results = self.engine.execute( SNOWFLAKE_LIFE_CYCLE_QUERY.format( database_name=table.database.name, @@ -457,7 +509,183 @@ class SnowflakeSource(CommonDbSourceService): ) ) except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}" + yield Either( + left=StackTraceError( + name=table.name.__root__, + error=f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}", + stack_trace=traceback.format_exc(), + ) ) + + def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]: + """List Snowflake stored procedures""" + if self.source_config.includeStoredProcedures: + results = self.engine.execute( + SNOWFLAKE_GET_STORED_PROCEDURES.format( + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + ) + ).all() + for row in results: + stored_procedure = SnowflakeStoredProcedure.parse_obj(dict(row)) + yield stored_procedure + + def yield_stored_procedure( + self, stored_procedure: SnowflakeStoredProcedure + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Prepare the stored procedure payload""" + + try: + yield Either( + right=CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + description=stored_procedure.comment, + storedProcedureCode=StoredProcedureCode( + language=STORED_PROC_LANGUAGE_MAP.get( + stored_procedure.language + ), + code=stored_procedure.definition, + ), + databaseSchema=self.context.database_schema.fullyQualifiedName, + sourceUrl=SourceUrl( + __root__=self._get_source_url_root( + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + ) + + f"/{stored_procedure.name}{quote(stored_procedure.signature)}" + ), + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=stored_procedure.name, + error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + @lru_cache + def procedure_queries_dict( + self, schema_name: str, database_name: str + ) -> Dict[str, List[QueryByProcedure]]: + """ + Cache the queries ran for the stored procedures in the last `queryLogDuration` days. + + We will run this for each different and db name. + + The dictionary key will be the case-insensitive procedure name. + """ + start, _ = get_start_and_end(self.source_config.queryLogDuration) + results = self.engine.execute( + SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format( + start_date=start, + warehouse=self.service_connection.warehouse, + schema_name=schema_name, + database_name=database_name, + ) + ).all() + + queries_dict = defaultdict(list) + + for row in results: + try: + query_by_procedure = QueryByProcedure.parse_obj(dict(row)) + procedure_name = get_procedure_name_from_call( + query_text=query_by_procedure.procedure_text, + schema_name=schema_name, + database_name=database_name, + ) + queries_dict[procedure_name].append(query_by_procedure) + except Exception as exc: + self.status.failed( + StackTraceError( + name="Stored Procedure", + error=f"Error trying to get procedure name due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + return queries_dict + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """ + Pick the stored procedure name from the context + and return the list of associated queries + """ + queries_dict = self.procedure_queries_dict( + schema_name=self.context.database_schema.name.__root__, + database_name=self.context.database.name.__root__, + ) + + for query_by_procedure in ( + queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or [] + ): + yield query_by_procedure + + @staticmethod + def is_lineage_query(query_type: str, query_text: str) -> bool: + """Check if it's worth it to parse the query for lineage""" + + if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): + return True + + if query_type == "INSERT" and re.search( + "^.*insert.*into.*select.*$", query_text, re.IGNORECASE + ): + return True + + return False + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Add procedure lineage from its query""" + + self.update_context(key="stored_procedure_query_lineage", value=False) + if self.is_lineage_query( + query_type=query_by_procedure.query_type, + query_text=query_by_procedure.query_text, + ): + self.update_context(key="stored_procedure_query_lineage", value=True) + + for either_lineage in get_lineage_by_query( + self.metadata, + query=query_by_procedure.query_text, + service_name=self.context.database_service.name.__root__, + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + dialect=ConnectionTypeDialectMapper.dialect_of( + self.context.database_service.serviceType.value + ), + timeout_seconds=self.source_config.queryParsingTimeoutLimit, + lineage_source=LineageSource.QueryLineage, + ): + if either_lineage.right.edge.lineageDetails: + either_lineage.right.edge.lineageDetails.pipeline = EntityReference( + id=self.context.stored_procedure.id, + type="storedProcedure", + ) + + yield either_lineage + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Check the queries triggered by the procedure and add their lineage, if any""" + + yield Either( + right=CreateQueryRequest( + query=SqlQuery(__root__=query_by_procedure.query_text), + query_type=query_by_procedure.query_type, + duration=query_by_procedure.query_duration, + queryDate=Timestamp( + __root__=int(query_by_procedure.query_start_time.timestamp()) * 1000 + ), + triggeredBy=EntityReference( + id=self.context.stored_procedure.id, + type="storedProcedure", + ), + processedLineage=bool(self.context.stored_procedure_query_lineage), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py new file mode 100644 index 00000000000..ab7492ee9f1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -0,0 +1,36 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Snowflake models +""" +from typing import Optional + +from pydantic import BaseModel, Field + +from metadata.generated.schema.entity.data.storedProcedure import Language + +STORED_PROC_LANGUAGE_MAP = { + "PYTHON": Language.Python, + "SQL": Language.SQL, + "JAVA": Language.Java, + "JAVASCRIPT": Language.JavaScript, +} + + +class SnowflakeStoredProcedure(BaseModel): + """Snowflake stored procedure list query results""" + + name: str = Field(..., alias="NAME") + owner: Optional[str] = Field(..., alias="OWNER") + language: str = Field(..., alias="LANGUAGE") + definition: str = Field(..., alias="DEFINITION") + signature: Optional[str] = Field(..., alias="SIGNATURE") + comment: Optional[str] = Field(..., alias="COMMENT") diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index d79a7f78b76..39cc082c3a6 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -156,3 +156,73 @@ and table_catalog = '{database_name}' limit 1 """ ) + +SNOWFLAKE_GET_STORED_PROCEDURES = textwrap.dedent( + """ +SELECT + PROCEDURE_NAME AS name, + PROCEDURE_OWNER AS owner, + PROCEDURE_LANGUAGE AS language, + PROCEDURE_DEFINITION AS definition, + ARGUMENT_SIGNATURE AS signature, + COMMENT as comment +FROM INFORMATION_SCHEMA.PROCEDURES +WHERE PROCEDURE_CATALOG = '{database_name}' + AND PROCEDURE_SCHEMA = '{schema_name}' + """ +) + +SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent( + """ +WITH SP_HISTORY AS ( + SELECT + QUERY_ID, + QUERY_TEXT, + SESSION_ID, + START_TIME, + END_TIME + FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP + WHERE QUERY_TYPE = 'CALL' + AND START_TIME >= '{start_date}' + AND WAREHOUSE_NAME = '{warehouse}' + AND SCHEMA_NAME = '{schema_name}' + AND DATABASE_NAME = '{database_name}' +), +Q_HISTORY AS ( + SELECT + QUERY_ID, + QUERY_TYPE, + QUERY_TEXT, + SESSION_ID, + START_TIME, + END_TIME, + TOTAL_ELAPSED_TIME/1000 AS DURATION, + USER_NAME + FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP + WHERE QUERY_TYPE <> 'CALL' + AND START_TIME >= '{start_date}' + AND WAREHOUSE_NAME = '{warehouse}' + AND SCHEMA_NAME = '{schema_name}' + AND DATABASE_NAME = '{database_name}' +) +SELECT + SP.QUERY_ID AS PROCEDURE_ID, + Q.QUERY_ID AS QUERY_ID, + Q.QUERY_TYPE AS QUERY_TYPE, + SP.QUERY_TEXT AS PROCEDURE_TEXT, + SP.START_TIME AS PROCEDURE_START_TIME, + SP.END_TIME AS PROCEDURE_END_TIME, + Q.START_TIME AS QUERY_START_TIME, + Q.DURATION AS QUERY_DURATION, + Q.QUERY_TEXT AS QUERY_TEXT, + Q.USER_NAME AS QUERY_USER_NAME +FROM SP_HISTORY SP +JOIN Q_HISTORY Q + ON SP.SESSION_ID = Q.SESSION_ID + AND ( + Q.START_TIME BETWEEN SP.START_TIME AND SP.END_TIME + OR Q.END_TIME BETWEEN SP.START_TIME AND SP.END_TIME + ) +ORDER BY PROCEDURE_START_TIME DESC + """ +) diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index 87a6386b4a0..aeefcda8556 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -60,8 +60,6 @@ class MissingMetricException(Exception): """ -# pylint: disable=too-many-public-methods -# Pylint error above indicates that this class needs to be refactored class Profiler(Generic[TMetric]): """ Core Profiler. diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index 73cc3916a30..1f36ab8cc81 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -10,7 +10,7 @@ # limitations under the License. """ -Module for sqlalchmey dialect utils +Module for sqlalchemy dialect utils """ from typing import Dict, Optional, Tuple diff --git a/ingestion/src/metadata/utils/stored_procedures.py b/ingestion/src/metadata/utils/stored_procedures.py new file mode 100644 index 00000000000..d0bc353bb44 --- /dev/null +++ b/ingestion/src/metadata/utils/stored_procedures.py @@ -0,0 +1,57 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Stored Procedures Utilities +""" + +import re +from typing import Optional + +from metadata.utils.logger import utils_logger + +logger = utils_logger() + +NAME_PATTERN = r"(?<=call)(.*)(?=\()" + + +def get_procedure_name_from_call( + query_text: str, schema_name: str, database_name: str, sensitive_match: bool = False +) -> Optional[str]: + """ + In the query text we'll have: + - `CALL db.schema.procedure_name(...)`, + - `CALL schema.procedure_name(...)` + - `CALL procedure_name(...)`. + + We need to get the procedure name in these 3 cases. + + We'll return the lowered procedure name + """ + + res = re.search( + NAME_PATTERN, query_text, re.IGNORECASE if not sensitive_match else None + ) + if not res: + return None + + try: + return ( + res.group(0) # Get the first match + .strip() # Remove whitespace + .lower() # Replace all the lowercase variants of the procedure name prefixes + .replace(f"{database_name.lower()}.", "") + .replace(f"{schema_name.lower()}.", "") + ) + except Exception as exc: + logger.warning( + f"Error trying to get the procedure name in [{query_text}] due to [{exc}]" + ) + return None diff --git a/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml b/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml index 82cdec0763e..1a4f8c7f319 100644 --- a/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml +++ b/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml @@ -16,6 +16,7 @@ source: markDeletedTables: true includeTables: true includeViews: true + includeStoredProcedures: false type: DatabaseMetadata schemaFilterPattern: excludes: diff --git a/ingestion/tests/unit/topology/database/test_snowflake.py b/ingestion/tests/unit/topology/database/test_snowflake.py index f2d38d6c830..67a382407a4 100644 --- a/ingestion/tests/unit/topology/database/test_snowflake.py +++ b/ingestion/tests/unit/topology/database/test_snowflake.py @@ -16,7 +16,7 @@ snowflake unit tests # pylint: disable=line-too-long from unittest import TestCase -from unittest.mock import patch +from unittest.mock import PropertyMock, patch from metadata.generated.schema.entity.data.table import TableType from metadata.generated.schema.metadataIngestion.workflow import ( @@ -90,7 +90,7 @@ class SnowflakeUnitTest(TestCase): super().__init__(methodName) test_connection.return_value = False self.config = OpenMetadataWorkflowConfig.parse_obj(mock_snowflake_config) - self.snowflake_source = SnowflakeSource.create( + self.snowflake_source: SnowflakeSource = SnowflakeSource.create( mock_snowflake_config["source"], self.config.workflowConfig.openMetadataServerConfig, ) @@ -128,23 +128,23 @@ class SnowflakeUnitTest(TestCase): method to test source url """ with patch.object( - SnowflakeSource, "_get_current_account", return_value="random_account" + SnowflakeSource, + "account", + return_value="random_account", + new_callable=PropertyMock, ): with patch.object( - SnowflakeSource, "_get_current_region", return_value="AWS_US_WEST_2" + SnowflakeSource, + "region", + return_value="us-west-2", + new_callable=PropertyMock, ): self._assert_urls() with patch.object( SnowflakeSource, - "_get_current_region", - return_value="PUBLIC.AWS_US_WEST_2", - ): - self._assert_urls() - - with patch.object( - SnowflakeSource, - "_get_current_region", + "region", + new_callable=PropertyMock, return_value=None, ): self.assertIsNone( diff --git a/ingestion/tests/unit/utils/__init__.py b/ingestion/tests/unit/utils/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/unit/utils/test_stored_procedures.py b/ingestion/tests/unit/utils/test_stored_procedures.py new file mode 100644 index 00000000000..7d5970eabf6 --- /dev/null +++ b/ingestion/tests/unit/utils/test_stored_procedures.py @@ -0,0 +1,66 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test Stored Procedures Utils +""" +from unittest import TestCase + +from metadata.utils.stored_procedures import get_procedure_name_from_call + + +class StoredProceduresTests(TestCase): + """Group stored procedures tests""" + + def test_get_procedure_name_from_call(self): + """Check that we properly parse CALL queries""" + self.assertEquals( + get_procedure_name_from_call( + query_text="CALL db.schema.procedure_name(...)", + schema_name="schema", + database_name="db", + ), + "procedure_name", + ) + + self.assertEquals( + get_procedure_name_from_call( + query_text="CALL schema.procedure_name(...)", + schema_name="schema", + database_name="db", + ), + "procedure_name", + ) + + self.assertEquals( + get_procedure_name_from_call( + query_text="CALL procedure_name(...)", + schema_name="schema", + database_name="db", + ), + "procedure_name", + ) + + self.assertEquals( + get_procedure_name_from_call( + query_text="CALL DB.SCHEMA.PROCEDURE_NAME(...)", + schema_name="SCHEMA", + database_name="DB", + ), + "procedure_name", + ) + + self.assertIsNone( + get_procedure_name_from_call( + query_text="something very random", + schema_name="schema", + database_name="db", + ) + ) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java index bb7d5734cc0..f7625eea4ba 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java @@ -513,6 +513,8 @@ public class QueryResource extends EntityResource { .withVotes(new Votes().withUpVotes(0).withDownVotes(0)) .withUsers(getEntityReferences(USER, create.getUsers())) .withQueryUsedIn(EntityUtil.populateEntityReferences(create.getQueryUsedIn())) - .withQueryDate(create.getQueryDate()); + .withQueryDate(create.getQueryDate()) + .withTriggeredBy(create.getTriggeredBy()) + .withProcessedLineage(create.getProcessedLineage()); } } diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json index ec3df1770b6..de343720ca8 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json @@ -72,6 +72,15 @@ "queryUsedIn": { "description": "list of entities to which the query is joined.", "$ref": "../../type/entityReferenceList.json" + }, + "triggeredBy": { + "description": "Entity that triggered the query. E.g., a Stored Procedure or a Pipeline Task.", + "$ref": "../../type/entityReference.json" + }, + "processedLineage": { + "description": "Flag if this query has already been successfully processed for lineage", + "type": "boolean", + "default": false } }, "required": ["query"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json index d069849fe0c..011f2192d54 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json @@ -110,6 +110,15 @@ "queryUsedIn": { "description": "Entities that are using this query", "$ref": "../../type/entityReferenceList.json" + }, + "triggeredBy": { + "description": "Entity that triggered the query. E.g., a Stored Procedure or a Pipeline Task.", + "$ref": "../../type/entityReference.json" + }, + "processedLineage": { + "description": "Flag if this query has already been successfully processed for lineage", + "type": "boolean", + "default": false } }, "required": ["name","query"], diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index c97ceff98e9..e7e22720bcd 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -49,18 +49,30 @@ "default": true, "title": "Include Tags" }, + "includeStoredProcedures": { + "description": "Optional configuration to toggle the Stored Procedures ingestion.", + "type": "boolean", + "default": true, + "title": "Include Stored Procedures" + }, + "queryLogDuration": { + "description": "Configuration to tune how far we want to look back in query logs to process Stored Procedures results.", + "type": "integer", + "default": 1, + "title": "Query Log Duration" + }, + "queryParsingTimeoutLimit": { + "description": "Configuration to set the timeout for parsing the query in seconds.", + "type": "integer", + "default": 300, + "title": "Query Parsing Timeout Limit" + }, "useFqnForFiltering": { "description": "Regex will be applied on fully qualified name (e.g service_name.db_name.schema_name.table_name) instead of raw name (e.g. table_name)", "type": "boolean", "default": false, "title": "Use FQN For Filtering" }, - "viewParsingTimeoutLimit": { - "description": "Configuration to set the timeout for parsing view query in seconds.", - "type": "integer", - "default": 300, - "title": "View Parsing Timeout Limit (in sec.)" - }, "schemaFilterPattern": { "description": "Regex to only fetch tables or databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern",