#14630 added oracle stored procedures (#14641)

This commit is contained in:
NiharDoshi99 2024-01-15 18:28:27 +05:30 committed by GitHub
parent 709fa7415f
commit 54d34934c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 429 additions and 3 deletions

View File

@ -12,18 +12,31 @@
# pylint: disable=protected-access
"""Oracle source module"""
import traceback
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional
from sqlalchemy.dialects.oracle.base import INTERVAL, OracleDialect, ischema_names
from sqlalchemy.engine import Inspector
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import (
Language,
StoredProcedureCode,
)
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
OracleConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
@ -31,6 +44,14 @@ from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.oracle.models import (
FetchProcedureList,
OracleStoredProcedure,
)
from metadata.ingestion.source.database.oracle.queries import (
ORACLE_GET_STORED_PROCEDURE_QUERIES,
ORACLE_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.oracle.utils import (
_get_col_type,
get_columns,
@ -41,6 +62,12 @@ from metadata.ingestion.source.database.oracle.utils import (
get_table_names,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
@ -70,7 +97,7 @@ Inspector.get_mview_definition = get_mview_definition
OracleDialect.get_mview_names = get_mview_names_dialect
class OracleSource(CommonDbSourceService):
class OracleSource(StoredProcedureMixin, CommonDbSourceService):
"""
Implements the necessary methods to extract
Database metadata from Oracle Source
@ -131,3 +158,82 @@ class OracleSource(CommonDbSourceService):
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch view definition for {table_name}: {exc}")
return None
def process_result(self, data: FetchProcedureList):
"""Process data as per our stored procedure format"""
result_dict = {}
for row in data:
owner, name, line, text = row
key = (owner, name)
if key not in result_dict:
result_dict[key] = {"lines": [], "text": ""}
result_dict[key]["lines"].append(line)
result_dict[key]["text"] += text
# Return the concatenated text for each procedure name, ordered by line
return result_dict
def get_stored_procedures(self) -> Iterable[OracleStoredProcedure]:
"""List Oracle Stored Procedures"""
if self.source_config.includeStoredProcedures:
results: FetchProcedureList = self.engine.execute(
ORACLE_GET_STORED_PROCEDURES.format(
schema=self.context.database_schema.upper()
)
).all()
results = self.process_result(data=results)
for row in results.items():
stored_procedure = OracleStoredProcedure(
name=row[0][1], definition=row[1]["text"], owner=row[0][0]
)
yield stored_procedure
def yield_stored_procedure(
self, stored_procedure: OracleStoredProcedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""
try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(__root__=stored_procedure.name),
storedProcedureCode=StoredProcedureCode(
language=Language.SQL,
code=stored_procedure.definition,
),
owner=self.metadata.get_reference_by_name(
name=stored_procedure.owner.lower()
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)
except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = ORACLE_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -0,0 +1,30 @@
"""
Oracle models
"""
from typing import List, Optional
from pydantic import BaseModel, Field
class OracleStoredProcedure(BaseModel):
"""Oracle Stored Procedure list query results"""
name: str
definition: str
language: Optional[str] = Field(
None, description="Will only be informed for non-SQL routines."
)
owner: str
class FetchProcedure(BaseModel):
"""Oracle Fetch Stored Procedure Raw Model"""
owner: Optional[str]
name: str
line: int
text: str
class FetchProcedureList(BaseModel):
__name__: List[FetchProcedure]

View File

@ -12,6 +12,8 @@
SQL Queries used during ingestion
"""
import textwrap
ORACLE_ALL_TABLE_COMMENTS = """
SELECT
comments table_comment,
@ -63,6 +65,72 @@ col.default_on_null,
) AS identity_options
"""
ORACLE_GET_STORED_PROCEDURES = """
SELECT
OWNER,
NAME,
LINE,
TEXT
FROM
ALL_SOURCE
WHERE
type = 'PROCEDURE' and owner = '{schema}'
"""
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY AS (SELECT
SQL_ID,
sql_text AS query_text,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time,
TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time,
PARSING_SCHEMA_NAME as user_name
FROM gv$sql
WHERE sql_text LIKE 'CALL%%'
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
),
Q_HISTORY AS (SELECT
sql_id,
sql_text AS query_text,
CASE
WHEN UPPER(SQL_TEXT) LIKE 'INSERT%' THEN 'INSERT'
WHEN UPPER(SQL_TEXT) LIKE 'SELECT%' THEN 'SELECT'
ELSE 'OTHER'
END AS QUERY_TYPE,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time,
TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS')
+ NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time,
PARSING_SCHEMA_NAME AS user_name,
PARSING_SCHEMA_NAME AS SCHEMA_NAME,
NULL AS DATABASE_NAME
FROM gv$sql
WHERE sql_text NOT LIKE '%CALL%'
AND SQL_FULLTEXT NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND SQL_FULLTEXT NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS')
>= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
)
SELECT
SP.sql_id AS PROCEDURE_ID,
Q.sql_id AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,
SP.QUERY_TEXT AS PROCEDURE_TEXT,
SP.START_TIME AS PROCEDURE_START_TIME,
SP.END_TIME AS PROCEDURE_END_TIME,
Q.START_TIME AS QUERY_START_TIME,
Q.QUERY_TEXT AS QUERY_TEXT,
Q.USER_NAME AS QUERY_USER_NAME
FROM SP_HISTORY SP
JOIN Q_HISTORY Q
ON Q.start_time between SP.start_time and SP.end_time
AND Q.end_time between SP.start_time and SP.end_time
AND Q.user_name = SP.user_name
ORDER BY PROCEDURE_START_TIME DESC
"""
)
ORACLE_GET_COLUMNS = """
SELECT
col.column_name,

View File

@ -59,7 +59,7 @@ class QueryByProcedure(BaseModel):
procedure_text: str = Field(..., alias="PROCEDURE_TEXT")
procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME")
procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME")
query_start_time: datetime = Field(..., alias="QUERY_START_TIME")
query_start_time: Optional[datetime] = Field(..., alias="QUERY_START_TIME")
query_duration: Optional[float] = Field(None, alias="QUERY_DURATION")
query_text: str = Field(..., alias="QUERY_TEXT")
query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME")

View File

@ -0,0 +1,222 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Oracle using the topology
"""
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.oracle.metadata import OracleSource
from metadata.ingestion.source.database.oracle.models import OracleStoredProcedure
mock_oracle_config = {
"source": {
"type": "oracle",
"serviceName": "test2",
"serviceConnection": {
"config": {
"type": "Oracle",
"oracleConnectionType": {"oracleServiceName": "TESTDB"},
"username": "username",
"password": "password",
"hostPort": "localhost:1466",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_DATABASE_SERVICE = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="oracle_source_test",
connection=DatabaseConnection(),
serviceType=DatabaseServiceType.Oracle,
)
MOCK_DATABASE = Database(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
name="sample_database",
fullyQualifiedName="oracle_source_test.sample_database",
displayName="sample_database",
description="",
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="databaseService"
),
)
MOCK_DATABASE_SCHEMA = DatabaseSchema(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="sample_schema",
fullyQualifiedName="mssql_source_test.sample_database.sample_schema",
service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"),
database=EntityReference(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
type="database",
),
)
MOCK_STORED_PROCEDURE = OracleStoredProcedure(
name="sample_procedure",
definition="SAMPLE_SQL_TEXT",
owner="sample_stored_prcedure_owner",
)
EXPECTED_DATABASE = [
CreateDatabaseRequest(
name=EntityName(__root__="sample_database"),
displayName=None,
description=None,
tags=None,
owner=None,
service=FullyQualifiedEntityName(__root__="oracle_source_test"),
dataProducts=None,
default=False,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
EXPECTED_DATABASE_SCHEMA = [
CreateDatabaseSchemaRequest(
name=EntityName(__root__="sample_schema"),
displayName=None,
description=None,
owner=None,
database=FullyQualifiedEntityName(
__root__="oracle_source_test.sample_database"
),
dataProducts=None,
tags=None,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
EXPECTED_STORED_PROCEDURE = [
CreateStoredProcedureRequest(
name=EntityName(__root__="sample_procedure"),
displayName=None,
description=None,
owner=None,
tags=None,
storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"),
databaseSchema=FullyQualifiedEntityName(
__root__="oracle_source_test.sample_database.sample_schema"
),
extension=None,
dataProducts=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
class OracleUnitTest(TestCase):
"""
Implements the necessary methods to extract
Oracle Unit Test
"""
@patch(
"metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection"
)
def __init__(
self,
methodName,
test_connection,
) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_oracle_config)
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(
mock_oracle_config["workflowConfig"]["openMetadataServerConfig"]
)
)
self.oracle = OracleSource.create(
mock_oracle_config["source"],
self.metadata,
)
self.oracle.context.__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.__root__
def test_yield_database(self):
assert EXPECTED_DATABASE == [
either.right for either in self.oracle.yield_database(MOCK_DATABASE.name)
]
self.oracle.context.__dict__["database"] = MOCK_DATABASE.name.__root__
def test_yield_schema(self):
assert EXPECTED_DATABASE_SCHEMA == [
either.right
for either in self.oracle.yield_database_schema(MOCK_DATABASE_SCHEMA.name)
]
self.oracle.context.__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA.name.__root__
def test_yield_stored_procedure(self):
assert EXPECTED_STORED_PROCEDURE == [
either.right
for either in self.oracle.yield_stored_procedure(MOCK_STORED_PROCEDURE)
]