Part of #12998 - Prep Stored Procedures Skeleton for Snowflake (#13121)

* Prep Stored Procedures Skeleton for Snowflake

* Update pylint and add migrations

* Fix test

* Reuse source url computation
This commit is contained in:
Pere Miquel Brull 2023-09-12 14:25:42 +02:00 committed by GitHub
parent 1960c68355
commit f0995cbddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 884 additions and 71 deletions

View File

@ -22,6 +22,7 @@ fail-under=6.0
init-hook='from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))'
extension-pkg-allow-list=pydantic
load-plugins=ingestion.plugins.print_checker
max-public-methods=25
[MESSAGES CONTROL]
disable=no-name-in-module,import-error,duplicate-code

View File

@ -83,3 +83,13 @@ CREATE TABLE IF NOT EXISTS table_entity_extension (
ALTER TABLE entity_relationship ADD INDEX from_entity_type_index(fromId, fromEntity), ADD INDEX to_entity_type_index(toId, toEntity);
ALTER TABLE tag DROP CONSTRAINT fqnHash, ADD CONSTRAINT UNIQUE(fqnHash), ADD PRIMARY KEY(id);
-- rename viewParsingTimeoutLimit for queryParsingTimeoutLimit
UPDATE ingestion_pipeline_entity
SET json = JSON_INSERT(
JSON_REMOVE(json, '$.sourceConfig.config.viewParsingTimeoutLimit'),
'$.sourceConfig.config.queryParsingTimeoutLimit',
JSON_EXTRACT(json, '$.sourceConfig.config.viewParsingTimeoutLimit')
)
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';

View File

@ -90,3 +90,14 @@ ALTER TABLE tag DROP CONSTRAINT IF EXISTS tag_fqnhash_key;
ALTER TABLE tag ADD CONSTRAINT unique_fqnHash UNIQUE (fqnHash);
ALTER TABLE tag ADD CONSTRAINT tag_pk PRIMARY KEY (id);
-- rename viewParsingTimeoutLimit for queryParsingTimeoutLimit
UPDATE ingestion_pipeline_entity
SET json = jsonb_set(
json::jsonb #- '{sourceConfig,config,viewParsingTimeoutLimit}',
'{sourceConfig,config,queryParsingTimeoutLimit}',
(json #> '{sourceConfig,config,viewParsingTimeoutLimit}')::jsonb,
true
)
WHERE json #>> '{pipelineType}' = 'metadata';

View File

@ -129,7 +129,6 @@ def build_datamodel_name(model_name: str, explore_name: str) -> str:
return clean_dashboard_name(model_name + "_" + explore_name)
# pylint: disable=too-many-public-methods
class LookerSource(DashboardServiceSource):
"""
Looker Source Class.

View File

@ -14,7 +14,7 @@ Generic source to build SQL connectors.
import traceback
from abc import ABC
from copy import deepcopy
from typing import Iterable, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple
from pydantic import BaseModel
from sqlalchemy.engine import Connection
@ -26,6 +26,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import (
@ -49,7 +53,10 @@ from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.ingestion.source.models import TableView
@ -356,6 +363,27 @@ class CommonDbSourceService(
Each source should implement its own when needed
"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
@calculate_execution_time_generator
def yield_table(
self, table_name_and_type: Tuple[str, str]
@ -452,7 +480,7 @@ class CommonDbSourceService(
metadata=self.metadata,
service_name=self.context.database_service.name.__root__,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.viewParsingTimeoutLimit,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
)
def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:

View File

@ -14,7 +14,7 @@ Common NoSQL source methods.
import traceback
from abc import ABC, abstractmethod
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from pandas import json_normalize
@ -22,6 +22,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -39,7 +43,10 @@ from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.utils import fqn
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE
@ -246,6 +253,27 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
tags are not supported with NoSQL
"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def get_source_url(
self,
database_name: Optional[str] = None,

View File

@ -12,15 +12,20 @@
Base class for ingesting database services
"""
from abc import ABC, abstractmethod
from typing import Iterable, List, Optional, Set, Tuple
from datetime import datetime
from typing import Any, Iterable, List, Optional, Set, Tuple
from pydantic import BaseModel
from pydantic import BaseModel, Field
from sqlalchemy.engine import Inspector
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createDatabaseService import (
@ -28,6 +33,8 @@ from metadata.generated.schema.api.services.createDatabaseService import (
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
@ -77,6 +84,23 @@ class DataModelLink(BaseModel):
datamodel: DataModel
class QueryByProcedure(BaseModel):
"""
Query(ies) executed by each stored procedure
"""
procedure_id: str = Field(..., alias="PROCEDURE_ID")
query_id: str = Field(..., alias="QUERY_ID")
query_type: str = Field(..., alias="QUERY_TYPE")
procedure_text: str = Field(..., alias="PROCEDURE_TEXT")
procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME")
procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME")
query_start_time: datetime = Field(..., alias="QUERY_START_TIME")
query_duration: float = Field(..., alias="QUERY_DURATION")
query_text: str = Field(..., alias="QUERY_TEXT")
query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME")
class DatabaseServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Database Services.
@ -130,7 +154,7 @@ class DatabaseServiceTopology(ServiceTopology):
consumer=["database_service", "database"],
),
],
children=["table"],
children=["table", "stored_procedure"],
post_process=["mark_tables_as_deleted"],
)
table = TopologyNode(
@ -151,6 +175,36 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
)
stored_procedure = TopologyNode(
producer="get_stored_procedures",
stages=[
NodeStage(
type_=StoredProcedure,
context="stored_procedure",
processor="yield_stored_procedure",
consumer=["database_service", "database", "database_schema"],
),
],
children=["stored_procedure_queries"],
)
stored_procedure_queries = TopologyNode(
producer="get_stored_procedure_queries",
stages=[
NodeStage(
type_=AddLineageRequest, # TODO: Fix context management for multiple types
processor="yield_procedure_lineage",
context="stored_procedure_query_lineage", # Used to flag if the query has had processed lineage
nullable=True,
ack_sink=False,
),
NodeStage(
type_=Query,
processor="yield_procedure_query",
nullable=True,
ack_sink=False,
),
],
)
class DatabaseServiceSource(
@ -274,6 +328,32 @@ class DatabaseServiceSource(
Also, update the self.inspector value to the current db.
"""
@abstractmethod
def get_stored_procedures(self) -> Iterable[Any]:
"""List stored procedures to process"""
@abstractmethod
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Process the stored procedure information"""
@abstractmethod
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""List the queries associated to a stored procedure"""
@abstractmethod
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Process the stored procedure query"""
@abstractmethod
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""
def get_raw_database_schema_names(self) -> Iterable[str]:
"""
fetch all schema names without any filtering.

View File

@ -13,7 +13,7 @@ Databricks Unity Catalog Source source methods.
"""
import json
import traceback
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple
from databricks.sdk.service.catalog import ColumnInfo
from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint
@ -23,6 +23,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
@ -52,7 +56,10 @@ from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import (
ColumnJson,
@ -484,5 +491,26 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource):
) -> Iterable[Either[OMetaTagAndClassification]]:
"""No tags being processed"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def close(self):
"""Nothing to close"""

View File

@ -13,12 +13,16 @@
DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs
"""
import traceback
from typing import Iterable, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -55,7 +59,10 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_helpers import truncate_column_name
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.datalake.columns import clean_dataframe
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.readers.dataframe.reader_factory import SupportedTypes
@ -572,6 +579,27 @@ class DatalakeSource(DatabaseServiceSource):
) -> Iterable[Either[OMetaTagAndClassification]]:
"""We don't bring tag information"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def standardize_table_name(
self, schema: str, table: str # pylint: disable=unused-argument
) -> str:

View File

@ -22,6 +22,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -44,7 +48,10 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.utils import fqn
from metadata.utils.constants import DEFAULT_DATABASE
from metadata.utils.filters import filter_by_schema, filter_by_table
@ -399,5 +406,26 @@ class DeltalakeSource(DatabaseServiceSource):
) -> Iterable[Either[OMetaTagAndClassification]]:
"""We don't pick up tags from Delta"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def close(self):
"""No client to close"""

View File

@ -14,13 +14,17 @@ Domo Database source to extract metadata
"""
import traceback
from typing import Iterable, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple
from metadata.clients.domo_client import DomoClient
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Column, Table, TableType
@ -42,7 +46,10 @@ from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.domodatabase.models import (
OutputDataset,
Owner,
@ -219,6 +226,27 @@ class DomodatabaseSource(DatabaseServiceSource):
) -> Iterable[Either[OMetaTagAndClassification]]:
"""No tags to send"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

View File

@ -12,12 +12,16 @@
Glue source methods.
"""
import traceback
from typing import Iterable, Optional, Tuple
from typing import Any, Iterable, Optional, Tuple
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
@ -42,7 +46,10 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_helpers import truncate_column_name
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.glue.models import Column as GlueColumn
from metadata.ingestion.source.database.glue.models import (
DatabasePage,
@ -349,6 +356,27 @@ class GlueSource(DatabaseServiceSource):
) -> Iterable[Either[OMetaTagAndClassification]]:
"""We don't pick up tags from Glue"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def get_source_url(
self,
database_name: Optional[str],

View File

@ -12,12 +12,16 @@
Salesforce source ingestion
"""
import traceback
from typing import Iterable, Optional, Tuple
from typing import Any, Iterable, Optional, Tuple
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import (
@ -44,7 +48,10 @@ from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.utils import fqn
from metadata.utils.constants import DEFAULT_DATABASE
from metadata.utils.filters import filter_by_table
@ -270,6 +277,27 @@ class SalesforceSource(DatabaseServiceSource):
) -> Iterable[Either[OMetaTagAndClassification]]:
"""No tags to pick up"""
def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""
def standardize_table_name( # pylint: disable=unused-argument
self, schema: str, table: str
) -> str:

View File

@ -12,16 +12,26 @@
Snowflake source module
"""
import json
import re
import traceback
from typing import Iterable, List, Optional, Tuple
from collections import defaultdict
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple
import sqlparse
from requests.utils import quote
from snowflake.sqlalchemy.custom_types import VARIANT
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names
from sqlalchemy.engine.reflection import Inspector
from sqlparse.sql import Function, Identifier
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
IntervalType,
TablePartition,
@ -36,9 +46,19 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import (
EntityName,
SourceUrl,
SqlQuery,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.lifeCycle import Created, Deleted
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
@ -46,9 +66,14 @@ from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.database_service import QueryByProcedure
from metadata.ingestion.source.database.snowflake.constants import (
SNOWFLAKE_REGION_ID_MAP,
)
from metadata.ingestion.source.database.snowflake.models import (
STORED_PROC_LANGUAGE_MAP,
SnowflakeStoredProcedure,
)
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_FETCH_ALL_TAGS,
SNOWFLAKE_GET_CLUSTER_KEY,
@ -57,6 +82,8 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_DATABASE_COMMENTS,
SNOWFLAKE_GET_DATABASES,
SNOWFLAKE_GET_SCHEMA_COMMENTS,
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES,
SNOWFLAKE_GET_STORED_PROCEDURES,
SNOWFLAKE_LIFE_CYCLE_QUERY,
SNOWFLAKE_SESSION_TAG_QUERY,
)
@ -76,9 +103,11 @@ from metadata.ingestion.source.database.snowflake.utils import (
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.life_cycle_utils import init_empty_life_cycle_properties
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import get_all_table_comments
from metadata.utils.stored_procedures import get_procedure_name_from_call
from metadata.utils.tag_utils import get_ometa_tag_and_classification
ischema_names["VARIANT"] = VARIANT
@ -120,6 +149,9 @@ class SnowflakeSource(CommonDbSourceService):
self.schema_desc_map = {}
self.database_desc_map = {}
self._account: Optional[str] = None
self._region: Optional[str] = None
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -130,6 +162,35 @@ class SnowflakeSource(CommonDbSourceService):
)
return cls(config, metadata_config)
@property
def account(self) -> Optional[str]:
"""Query the account information"""
if self._account is None:
self._account = self._get_current_account()
return self._account
@property
def region(self) -> Optional[str]:
"""
Query the region information
Region id can be a vanilla id like "AWS_US_WEST_2"
and in case of multi region group it can be like "PUBLIC.AWS_US_WEST_2"
in such cases this method will extract vanilla region id and return the
region name from constant map SNOWFLAKE_REGION_ID_MAP
for more info checkout this doc:
https://docs.snowflake.com/en/sql-reference/functions/current_region
"""
if self._region is None:
raw_region = self._get_current_region()
if raw_region:
clean_region_id = raw_region.split(".")[-1]
self._region = SNOWFLAKE_REGION_ID_MAP.get(clean_region_id.lower())
return self._region
def set_session_query_tag(self) -> None:
"""
Method to set query tag for current session
@ -388,20 +449,17 @@ class SnowflakeSource(CommonDbSourceService):
logger.debug(f"Failed to fetch current account due to: {exc}")
return None
def _clean_region_name(self, region_id: Optional[str]) -> Optional[str]:
"""
Region id can be a vanilla id like "AWS_US_WEST_2"
and in case of multi region group it can be like "PUBLIC.AWS_US_WEST_2"
in such cases this method will extract vanilla region id and return the
region name from constant map SNOWFLAKE_REGION_ID_MAP
def _get_source_url_root(
self, database_name: Optional[str] = None, schema_name: Optional[str] = None
) -> str:
url = (
f"https://app.snowflake.com/{self.region.lower()}"
f"/{self.account.lower()}/#/data/databases/{database_name}"
)
if schema_name:
url = f"{url}/schemas/{schema_name}"
for more info checkout this doc:
https://docs.snowflake.com/en/sql-reference/functions/current_region
"""
if region_id:
clean_region_id = region_id.split(".")[-1]
return SNOWFLAKE_REGION_ID_MAP.get(clean_region_id.lower())
return None
return url
def get_source_url(
self,
@ -414,19 +472,13 @@ class SnowflakeSource(CommonDbSourceService):
Method to get the source url for snowflake
"""
try:
account = self._get_current_account()
region_id = self._get_current_region()
region_name = self._clean_region_name(region_id)
if account and region_name:
if self.account and self.region:
tab_type = "view" if table_type == TableType.View else "table"
url = (
f"https://app.snowflake.com/{region_name.lower()}"
f"/{account.lower()}/#/data/databases/{database_name}"
url = self._get_source_url_root(
database_name=database_name, schema_name=schema_name
)
if schema_name:
url = f"{url}/schemas/{schema_name}"
if table_name:
url = f"{url}/{tab_type}/{table_name}"
if table_name:
url = f"{url}/{tab_type}/{table_name}"
return url
except Exception as exc:
logger.debug(traceback.format_exc())
@ -437,8 +489,8 @@ class SnowflakeSource(CommonDbSourceService):
"""
Get the life cycle data of the table
"""
table = self.context.table
try:
table = self.context.table
results = self.engine.execute(
SNOWFLAKE_LIFE_CYCLE_QUERY.format(
database_name=table.database.name,
@ -457,7 +509,183 @@ class SnowflakeSource(CommonDbSourceService):
)
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}"
yield Either(
left=StackTraceError(
name=table.name.__root__,
error=f"Unable to get the table life cycle data for table {table.name.__root__}: {exc}",
stack_trace=traceback.format_exc(),
)
)
def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]:
"""List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(
SNOWFLAKE_GET_STORED_PROCEDURES.format(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
)
).all()
for row in results:
stored_procedure = SnowflakeStoredProcedure.parse_obj(dict(row))
yield stored_procedure
def yield_stored_procedure(
self, stored_procedure: SnowflakeStoredProcedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""
try:
yield Either(
right=CreateStoredProcedureRequest(
name=EntityName(__root__=stored_procedure.name),
description=stored_procedure.comment,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(
stored_procedure.language
),
code=stored_procedure.definition,
),
databaseSchema=self.context.database_schema.fullyQualifiedName,
sourceUrl=SourceUrl(
__root__=self._get_source_url_root(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
)
+ f"/{stored_procedure.name}{quote(stored_procedure.signature)}"
),
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)
@lru_cache
def procedure_queries_dict(
self, schema_name: str, database_name: str
) -> Dict[str, List[QueryByProcedure]]:
"""
Cache the queries ran for the stored procedures in the last `queryLogDuration` days.
We will run this for each different and db name.
The dictionary key will be the case-insensitive procedure name.
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
results = self.engine.execute(
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
warehouse=self.service_connection.warehouse,
schema_name=schema_name,
database_name=database_name,
)
).all()
queries_dict = defaultdict(list)
for row in results:
try:
query_by_procedure = QueryByProcedure.parse_obj(dict(row))
procedure_name = get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
schema_name=schema_name,
database_name=database_name,
)
queries_dict[procedure_name].append(query_by_procedure)
except Exception as exc:
self.status.failed(
StackTraceError(
name="Stored Procedure",
error=f"Error trying to get procedure name due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)
return queries_dict
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
queries_dict = self.procedure_queries_dict(
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or []
):
yield query_by_procedure
@staticmethod
def is_lineage_query(query_type: str, query_text: str) -> bool:
"""Check if it's worth it to parse the query for lineage"""
if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
return True
if query_type == "INSERT" and re.search(
"^.*insert.*into.*select.*$", query_text, re.IGNORECASE
):
return True
return False
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""
self.update_context(key="stored_procedure_query_lineage", value=False)
if self.is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
):
self.update_context(key="stored_procedure_query_lineage", value=True)
for either_lineage in get_lineage_by_query(
self.metadata,
query=query_by_procedure.query_text,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
dialect=ConnectionTypeDialectMapper.dialect_of(
self.context.database_service.serviceType.value
),
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
lineage_source=LineageSource.QueryLineage,
):
if either_lineage.right.edge.lineageDetails:
either_lineage.right.edge.lineageDetails.pipeline = EntityReference(
id=self.context.stored_procedure.id,
type="storedProcedure",
)
yield either_lineage
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Check the queries triggered by the procedure and add their lineage, if any"""
yield Either(
right=CreateQueryRequest(
query=SqlQuery(__root__=query_by_procedure.query_text),
query_type=query_by_procedure.query_type,
duration=query_by_procedure.query_duration,
queryDate=Timestamp(
__root__=int(query_by_procedure.query_start_time.timestamp()) * 1000
),
triggeredBy=EntityReference(
id=self.context.stored_procedure.id,
type="storedProcedure",
),
processedLineage=bool(self.context.stored_procedure_query_lineage),
)
)

View File

@ -0,0 +1,36 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Snowflake models
"""
from typing import Optional
from pydantic import BaseModel, Field
from metadata.generated.schema.entity.data.storedProcedure import Language
STORED_PROC_LANGUAGE_MAP = {
"PYTHON": Language.Python,
"SQL": Language.SQL,
"JAVA": Language.Java,
"JAVASCRIPT": Language.JavaScript,
}
class SnowflakeStoredProcedure(BaseModel):
"""Snowflake stored procedure list query results"""
name: str = Field(..., alias="NAME")
owner: Optional[str] = Field(..., alias="OWNER")
language: str = Field(..., alias="LANGUAGE")
definition: str = Field(..., alias="DEFINITION")
signature: Optional[str] = Field(..., alias="SIGNATURE")
comment: Optional[str] = Field(..., alias="COMMENT")

View File

@ -156,3 +156,73 @@ and table_catalog = '{database_name}'
limit 1
"""
)
SNOWFLAKE_GET_STORED_PROCEDURES = textwrap.dedent(
"""
SELECT
PROCEDURE_NAME AS name,
PROCEDURE_OWNER AS owner,
PROCEDURE_LANGUAGE AS language,
PROCEDURE_DEFINITION AS definition,
ARGUMENT_SIGNATURE AS signature,
COMMENT as comment
FROM INFORMATION_SCHEMA.PROCEDURES
WHERE PROCEDURE_CATALOG = '{database_name}'
AND PROCEDURE_SCHEMA = '{schema_name}'
"""
)
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY AS (
SELECT
QUERY_ID,
QUERY_TEXT,
SESSION_ID,
START_TIME,
END_TIME
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP
WHERE QUERY_TYPE = 'CALL'
AND START_TIME >= '{start_date}'
AND WAREHOUSE_NAME = '{warehouse}'
AND SCHEMA_NAME = '{schema_name}'
AND DATABASE_NAME = '{database_name}'
),
Q_HISTORY AS (
SELECT
QUERY_ID,
QUERY_TYPE,
QUERY_TEXT,
SESSION_ID,
START_TIME,
END_TIME,
TOTAL_ELAPSED_TIME/1000 AS DURATION,
USER_NAME
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP
WHERE QUERY_TYPE <> 'CALL'
AND START_TIME >= '{start_date}'
AND WAREHOUSE_NAME = '{warehouse}'
AND SCHEMA_NAME = '{schema_name}'
AND DATABASE_NAME = '{database_name}'
)
SELECT
SP.QUERY_ID AS PROCEDURE_ID,
Q.QUERY_ID AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE,
SP.QUERY_TEXT AS PROCEDURE_TEXT,
SP.START_TIME AS PROCEDURE_START_TIME,
SP.END_TIME AS PROCEDURE_END_TIME,
Q.START_TIME AS QUERY_START_TIME,
Q.DURATION AS QUERY_DURATION,
Q.QUERY_TEXT AS QUERY_TEXT,
Q.USER_NAME AS QUERY_USER_NAME
FROM SP_HISTORY SP
JOIN Q_HISTORY Q
ON SP.SESSION_ID = Q.SESSION_ID
AND (
Q.START_TIME BETWEEN SP.START_TIME AND SP.END_TIME
OR Q.END_TIME BETWEEN SP.START_TIME AND SP.END_TIME
)
ORDER BY PROCEDURE_START_TIME DESC
"""
)

View File

@ -60,8 +60,6 @@ class MissingMetricException(Exception):
"""
# pylint: disable=too-many-public-methods
# Pylint error above indicates that this class needs to be refactored
class Profiler(Generic[TMetric]):
"""
Core Profiler.

View File

@ -10,7 +10,7 @@
# limitations under the License.
"""
Module for sqlalchmey dialect utils
Module for sqlalchemy dialect utils
"""
from typing import Dict, Optional, Tuple

View File

@ -0,0 +1,57 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Stored Procedures Utilities
"""
import re
from typing import Optional
from metadata.utils.logger import utils_logger
logger = utils_logger()
NAME_PATTERN = r"(?<=call)(.*)(?=\()"
def get_procedure_name_from_call(
query_text: str, schema_name: str, database_name: str, sensitive_match: bool = False
) -> Optional[str]:
"""
In the query text we'll have:
- `CALL db.schema.procedure_name(...)`,
- `CALL schema.procedure_name(...)`
- `CALL procedure_name(...)`.
We need to get the procedure name in these 3 cases.
We'll return the lowered procedure name
"""
res = re.search(
NAME_PATTERN, query_text, re.IGNORECASE if not sensitive_match else None
)
if not res:
return None
try:
return (
res.group(0) # Get the first match
.strip() # Remove whitespace
.lower() # Replace all the lowercase variants of the procedure name prefixes
.replace(f"{database_name.lower()}.", "")
.replace(f"{schema_name.lower()}.", "")
)
except Exception as exc:
logger.warning(
f"Error trying to get the procedure name in [{query_text}] due to [{exc}]"
)
return None

View File

@ -16,6 +16,7 @@ source:
markDeletedTables: true
includeTables: true
includeViews: true
includeStoredProcedures: false
type: DatabaseMetadata
schemaFilterPattern:
excludes:

View File

@ -16,7 +16,7 @@ snowflake unit tests
# pylint: disable=line-too-long
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import PropertyMock, patch
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.metadataIngestion.workflow import (
@ -90,7 +90,7 @@ class SnowflakeUnitTest(TestCase):
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_snowflake_config)
self.snowflake_source = SnowflakeSource.create(
self.snowflake_source: SnowflakeSource = SnowflakeSource.create(
mock_snowflake_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
@ -128,23 +128,23 @@ class SnowflakeUnitTest(TestCase):
method to test source url
"""
with patch.object(
SnowflakeSource, "_get_current_account", return_value="random_account"
SnowflakeSource,
"account",
return_value="random_account",
new_callable=PropertyMock,
):
with patch.object(
SnowflakeSource, "_get_current_region", return_value="AWS_US_WEST_2"
SnowflakeSource,
"region",
return_value="us-west-2",
new_callable=PropertyMock,
):
self._assert_urls()
with patch.object(
SnowflakeSource,
"_get_current_region",
return_value="PUBLIC.AWS_US_WEST_2",
):
self._assert_urls()
with patch.object(
SnowflakeSource,
"_get_current_region",
"region",
new_callable=PropertyMock,
return_value=None,
):
self.assertIsNone(

View File

View File

@ -0,0 +1,66 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Stored Procedures Utils
"""
from unittest import TestCase
from metadata.utils.stored_procedures import get_procedure_name_from_call
class StoredProceduresTests(TestCase):
"""Group stored procedures tests"""
def test_get_procedure_name_from_call(self):
"""Check that we properly parse CALL queries"""
self.assertEquals(
get_procedure_name_from_call(
query_text="CALL db.schema.procedure_name(...)",
schema_name="schema",
database_name="db",
),
"procedure_name",
)
self.assertEquals(
get_procedure_name_from_call(
query_text="CALL schema.procedure_name(...)",
schema_name="schema",
database_name="db",
),
"procedure_name",
)
self.assertEquals(
get_procedure_name_from_call(
query_text="CALL procedure_name(...)",
schema_name="schema",
database_name="db",
),
"procedure_name",
)
self.assertEquals(
get_procedure_name_from_call(
query_text="CALL DB.SCHEMA.PROCEDURE_NAME(...)",
schema_name="SCHEMA",
database_name="DB",
),
"procedure_name",
)
self.assertIsNone(
get_procedure_name_from_call(
query_text="something very random",
schema_name="schema",
database_name="db",
)
)

View File

@ -513,6 +513,8 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
.withVotes(new Votes().withUpVotes(0).withDownVotes(0))
.withUsers(getEntityReferences(USER, create.getUsers()))
.withQueryUsedIn(EntityUtil.populateEntityReferences(create.getQueryUsedIn()))
.withQueryDate(create.getQueryDate());
.withQueryDate(create.getQueryDate())
.withTriggeredBy(create.getTriggeredBy())
.withProcessedLineage(create.getProcessedLineage());
}
}

View File

@ -72,6 +72,15 @@
"queryUsedIn": {
"description": "list of entities to which the query is joined.",
"$ref": "../../type/entityReferenceList.json"
},
"triggeredBy": {
"description": "Entity that triggered the query. E.g., a Stored Procedure or a Pipeline Task.",
"$ref": "../../type/entityReference.json"
},
"processedLineage": {
"description": "Flag if this query has already been successfully processed for lineage",
"type": "boolean",
"default": false
}
},
"required": ["query"],

View File

@ -110,6 +110,15 @@
"queryUsedIn": {
"description": "Entities that are using this query",
"$ref": "../../type/entityReferenceList.json"
},
"triggeredBy": {
"description": "Entity that triggered the query. E.g., a Stored Procedure or a Pipeline Task.",
"$ref": "../../type/entityReference.json"
},
"processedLineage": {
"description": "Flag if this query has already been successfully processed for lineage",
"type": "boolean",
"default": false
}
},
"required": ["name","query"],

View File

@ -49,18 +49,30 @@
"default": true,
"title": "Include Tags"
},
"includeStoredProcedures": {
"description": "Optional configuration to toggle the Stored Procedures ingestion.",
"type": "boolean",
"default": true,
"title": "Include Stored Procedures"
},
"queryLogDuration": {
"description": "Configuration to tune how far we want to look back in query logs to process Stored Procedures results.",
"type": "integer",
"default": 1,
"title": "Query Log Duration"
},
"queryParsingTimeoutLimit": {
"description": "Configuration to set the timeout for parsing the query in seconds.",
"type": "integer",
"default": 300,
"title": "Query Parsing Timeout Limit"
},
"useFqnForFiltering": {
"description": "Regex will be applied on fully qualified name (e.g service_name.db_name.schema_name.table_name) instead of raw name (e.g. table_name)",
"type": "boolean",
"default": false,
"title": "Use FQN For Filtering"
},
"viewParsingTimeoutLimit": {
"description": "Configuration to set the timeout for parsing view query in seconds.",
"type": "integer",
"default": 300,
"title": "View Parsing Timeout Limit (in sec.)"
},
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern",