#9269 - Add MSSQL Stored Procedure Support (#14739)

* Remove unnecessary field

* Remove unnecessary field

* Support query comments in MSSQL

* Remove unnecessary field

* Format

* Add external type

* Add MSSQL SP support
This commit is contained in:
Pere Miquel Brull 2024-01-17 06:37:27 +01:00 committed by GitHub
parent cfbb94aa32
commit eadda0e3f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 230 additions and 24 deletions

View File

@ -159,3 +159,8 @@ CREATE TABLE IF NOT EXISTS consumers_dlq (
timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL, timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL,
UNIQUE(id, extension) UNIQUE(id, extension)
); );
-- Add supportsQueryComment to MSSQL
update dbservice_entity
set json = JSON_SET(json, '$.connection.config.supportsQueryComment', true)
where serviceType = 'Mssql';

View File

@ -172,3 +172,7 @@ CREATE TABLE IF NOT EXISTS consumers_dlq (
UNIQUE(id, extension) UNIQUE(id, extension)
); );
-- Add supportsQueryComment to MSSQL
update dbservice_entity
set json = jsonb_set(json::jsonb, '{connection,config,supportsQueryComment}', 'true', true)
where serviceType = 'Mssql';

View File

@ -75,7 +75,6 @@ BIGQUERY_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
""" """
WITH SP_HISTORY AS ( WITH SP_HISTORY AS (
SELECT SELECT
job_id,
query AS query_text, query AS query_text,
start_time, start_time,
end_time, end_time,
@ -90,7 +89,6 @@ WITH SP_HISTORY AS (
), ),
Q_HISTORY AS ( Q_HISTORY AS (
SELECT SELECT
job_id,
project_id as database_name, project_id as database_name,
user_email as user_name, user_email as user_name,
statement_type as query_type, statement_type as query_type,
@ -109,8 +107,6 @@ Q_HISTORY AS (
AND error_result is NULL AND error_result is NULL
) )
SELECT SELECT
SP.job_id as procedure_id,
Q.job_id as query_id,
Q.query_type as query_type, Q.query_type as query_type,
SP.query_text as procedure_text, SP.query_text as procedure_text,
Q.query_text as query_text, Q.query_text as query_text,

View File

@ -10,21 +10,39 @@
# limitations under the License. # limitations under the License.
"""MSSQL source module""" """MSSQL source module"""
import traceback import traceback
from typing import Iterable, Optional from typing import Dict, Iterable, List, Optional
from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection, MssqlConnection,
) )
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.basic import EntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE from metadata.ingestion.source.database.mssql.models import (
STORED_PROC_LANGUAGE_MAP,
MssqlStoredProcedure,
)
from metadata.ingestion.source.database.mssql.queries import (
MSSQL_GET_DATABASE,
MSSQL_GET_STORED_PROCEDURE_QUERIES,
MSSQL_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.mssql.utils import ( from metadata.ingestion.source.database.mssql.utils import (
get_columns, get_columns,
get_foreign_keys, get_foreign_keys,
@ -36,8 +54,13 @@ from metadata.ingestion.source.database.mssql.utils import (
get_view_names, get_view_names,
) )
from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.filters import filter_by_database from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.sqa_utils import update_mssql_ischema_names from metadata.utils.sqa_utils import update_mssql_ischema_names
from metadata.utils.sqlalchemy_utils import ( from metadata.utils.sqlalchemy_utils import (
@ -65,7 +88,7 @@ MSDialect.get_table_names = get_table_names
MSDialect.get_view_names = get_view_names MSDialect.get_view_names = get_view_names
class MssqlSource(CommonDbSourceService, MultiDBSource): class MssqlSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
""" """
Implements the necessary methods to extract Implements the necessary methods to extract
Database metadata from MSSQL Source Database metadata from MSSQL Source
@ -122,3 +145,75 @@ class MssqlSource(CommonDbSourceService, MultiDBSource):
logger.error( logger.error(
f"Error trying to connect to database {new_database}: {exc}" f"Error trying to connect to database {new_database}: {exc}"
) )
def get_stored_procedures(self) -> Iterable[MssqlStoredProcedure]:
"""List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(
MSSQL_GET_STORED_PROCEDURES.format(
database_name=self.context.database,
schema_name=self.context.database_schema,
)
).all()
for row in results:
try:
stored_procedure = MssqlStoredProcedure.parse_obj(dict(row))
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)
def yield_stored_procedure(
self, stored_procedure: MssqlStoredProcedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""
try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(__root__=stored_procedure.name),
description=None,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
code=stored_procedure.definition,
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)
except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = MSSQL_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -0,0 +1,30 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MSSQL models"""
from typing import Optional
from pydantic import BaseModel, Field
from metadata.generated.schema.entity.data.storedProcedure import Language
STORED_PROC_LANGUAGE_MAP = {
"SQL": Language.SQL,
"EXTERNAL": Language.External,
}
class MssqlStoredProcedure(BaseModel):
"""MSSQL stored procedure list query results"""
name: str = Field(...)
owner: Optional[str] = Field(None)
language: str = Field(Language.SQL)
definition: str = Field(None)

View File

@ -186,3 +186,81 @@ index_info AS (
ORDER BY fk_info.constraint_schema, fk_info.constraint_name, ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position fk_info.ordinal_position
""" """
MSSQL_GET_STORED_PROCEDURES = textwrap.dedent(
"""
SELECT
ROUTINE_NAME AS name,
NULL AS owner,
ROUTINE_BODY AS language,
ROUTINE_DEFINITION AS definition
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_TYPE = 'PROCEDURE'
AND ROUTINE_CATALOG = '{database_name}'
AND ROUTINE_SCHEMA = '{schema_name}'
AND LEFT(ROUTINE_NAME, 3) NOT IN ('sp_', 'xp_', 'ms_')
"""
)
MSSQL_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY (start_time, end_time, procedure_name, query_text) AS (
select
s.last_execution_time start_time,
DATEADD(s, s.total_elapsed_time/1000, s.last_execution_time) end_time,
OBJECT_NAME(object_id, database_id) as procedure_name,
text as query_text
from sys.dm_exec_procedure_stats s
CROSS APPLY sys.dm_exec_sql_text(s.plan_handle)
WHERE OBJECT_NAME(object_id, database_id) IS NOT NULL
AND s.last_execution_time > '{start_date}'
),
Q_HISTORY (database_name, query_text, start_time, end_time, duration,query_type, schema_name, user_name) AS (
select
db.NAME database_name,
t.text query_text,
s.last_execution_time start_time,
DATEADD(s, s.total_elapsed_time/1000, s.last_execution_time) end_time,
s.total_elapsed_time/1000 duration,
case
when t.text LIKE '%%MERGE%%' then 'MERGE'
when t.text LIKE '%%UPDATE%%' then 'UPDATE'
when t.text LIKE '%%SELECT%%INTO%%' then 'CREATE_TABLE_AS_SELECT'
when t.text LIKE '%%INSERT%%' then 'INSERT'
else 'UNKNOWN' end query_type,
NULL schema_name,
NULL user_name
FROM sys.dm_exec_cached_plans AS p
INNER JOIN sys.dm_exec_query_stats AS s
ON p.plan_handle = s.plan_handle
CROSS APPLY sys.dm_exec_sql_text(p.plan_handle) AS t
INNER JOIN sys.databases db
ON db.database_id = t.dbid
WHERE s.last_execution_time between '2024-01-13' and '2024-01-20'
AND t.text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND t.text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND p.objtype NOT IN ('Prepared', 'Proc')
AND s.last_execution_time > '{start_date}'
)
select
Q.query_type AS QUERY_TYPE,
Q.database_name AS QUERY_DATABASE_NAME,
Q.schema_name AS QUERY_SCHEMA_NAME,
Q.query_text AS QUERY_TEXT,
Q.user_name AS QUERY_USER_NAME,
Q.start_time AS QUERY_START_TIME,
Q.duration AS QUERY_DURATION,
SP.procedure_name AS PROCEDURE_NAME,
SP.query_text AS PROCEDURE_TEXT,
SP.start_time AS PROCEDURE_START_TIME,
SP.end_time AS PROCEDURE_END_TIME
from SP_HISTORY SP
JOIN Q_HISTORY Q
ON (
Q.start_time BETWEEN SP.start_time and SP.end_time
OR Q.end_time BETWEEN SP.start_time and SP.end_time
)
order by PROCEDURE_START_TIME desc
;
"""
)

View File

@ -80,7 +80,6 @@ WHERE
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent( ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
""" """
WITH SP_HISTORY AS (SELECT WITH SP_HISTORY AS (SELECT
SQL_ID,
sql_text AS query_text, sql_text AS query_text,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time, TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time,
TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time, TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time,
@ -90,7 +89,6 @@ WITH SP_HISTORY AS (SELECT
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS') AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
), ),
Q_HISTORY AS (SELECT Q_HISTORY AS (SELECT
sql_id,
sql_text AS query_text, sql_text AS query_text,
CASE CASE
WHEN UPPER(SQL_TEXT) LIKE 'INSERT%' THEN 'INSERT' WHEN UPPER(SQL_TEXT) LIKE 'INSERT%' THEN 'INSERT'
@ -111,8 +109,6 @@ WITH SP_HISTORY AS (SELECT
>= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
) )
SELECT SELECT
SP.sql_id AS PROCEDURE_ID,
Q.sql_id AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE, Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME, Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME, Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,

View File

@ -300,7 +300,6 @@ REDSHIFT_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
""" """
with SP_HISTORY as ( with SP_HISTORY as (
select select
query as procedure_id,
querytxt as procedure_text, querytxt as procedure_text,
starttime as procedure_start_time, starttime as procedure_start_time,
endtime as procedure_end_time, endtime as procedure_end_time,
@ -311,7 +310,6 @@ with SP_HISTORY as (
), ),
Q_HISTORY as ( Q_HISTORY as (
select select
query as query_id,
querytxt as query_text, querytxt as query_text,
case case
when querytxt ilike '%%MERGE%%' then 'MERGE' when querytxt ilike '%%MERGE%%' then 'MERGE'
@ -334,11 +332,9 @@ Q_HISTORY as (
and userid <> 1 and userid <> 1
) )
select select
sp.procedure_id,
sp.procedure_text, sp.procedure_text,
sp.procedure_start_time, sp.procedure_start_time,
sp.procedure_end_time, sp.procedure_end_time,
q.query_id,
q.query_text, q.query_text,
q.query_type, q.query_type,
q.query_database_name, q.query_database_name,

View File

@ -187,7 +187,6 @@ SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
""" """
WITH SP_HISTORY AS ( WITH SP_HISTORY AS (
SELECT SELECT
QUERY_ID,
QUERY_TEXT, QUERY_TEXT,
SESSION_ID, SESSION_ID,
START_TIME, START_TIME,
@ -198,7 +197,6 @@ WITH SP_HISTORY AS (
), ),
Q_HISTORY AS ( Q_HISTORY AS (
SELECT SELECT
QUERY_ID,
QUERY_TYPE, QUERY_TYPE,
QUERY_TEXT, QUERY_TEXT,
SESSION_ID, SESSION_ID,
@ -215,8 +213,6 @@ Q_HISTORY AS (
AND START_TIME >= '{start_date}' AND START_TIME >= '{start_date}'
) )
SELECT SELECT
SP.QUERY_ID AS PROCEDURE_ID,
Q.QUERY_ID AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE, Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME, Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME, Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,

View File

@ -51,8 +51,7 @@ class QueryByProcedure(BaseModel):
Query(ies) executed by each stored procedure Query(ies) executed by each stored procedure
""" """
procedure_id: str = Field(..., alias="PROCEDURE_ID") procedure_name: str = Field(None, alias="PROCEDURE_NAME")
query_id: str = Field(..., alias="QUERY_ID")
query_type: str = Field(..., alias="QUERY_TYPE") query_type: str = Field(..., alias="QUERY_TYPE")
query_database_name: str = Field(None, alias="QUERY_DATABASE_NAME") query_database_name: str = Field(None, alias="QUERY_DATABASE_NAME")
query_schema_name: str = Field(None, alias="QUERY_SCHEMA_NAME") query_schema_name: str = Field(None, alias="QUERY_SCHEMA_NAME")
@ -109,8 +108,11 @@ class StoredProcedureMixin(ABC):
for row in results: for row in results:
try: try:
query_by_procedure = QueryByProcedure.parse_obj(dict(row)) query_by_procedure = QueryByProcedure.parse_obj(dict(row))
procedure_name = get_procedure_name_from_call( procedure_name = (
query_text=query_by_procedure.procedure_text, query_by_procedure.procedure_name
or get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
)
) )
queries_dict[procedure_name].append(query_by_procedure) queries_dict[procedure_name].append(query_by_procedure)
except Exception as exc: except Exception as exc:

View File

@ -18,7 +18,8 @@
"SQL", "SQL",
"Java", "Java",
"JavaScript", "JavaScript",
"Python" "Python",
"External"
], ],
"javaEnums": [ "javaEnums": [
{ {
@ -32,6 +33,9 @@
}, },
{ {
"name": "Python" "name": "Python"
},
{
"name": "External"
} }
] ]
}, },

View File

@ -97,6 +97,10 @@
"sampleDataStorageConfig": { "sampleDataStorageConfig": {
"title": "Storage Config for Sample Data", "title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig" "$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsQueryComment": {
"title": "Supports Query Comment",
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"
} }
}, },
"additionalProperties": false, "additionalProperties": false,