diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py index e9e56b3afe8..19318974ebc 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py @@ -12,18 +12,31 @@ # pylint: disable=protected-access """Oracle source module""" import traceback -from typing import Iterable, Optional +from typing import Dict, Iterable, List, Optional from sqlalchemy.dialects.oracle.base import INTERVAL, OracleDialect, ischema_names from sqlalchemy.engine import Inspector +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.storedProcedure import ( + Language, + StoredProcedureCode, +) from metadata.generated.schema.entity.data.table import TableType from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( OracleConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.basic import EntityName +from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type @@ -31,6 +44,14 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.oracle.models import ( + FetchProcedureList, + OracleStoredProcedure, +) +from metadata.ingestion.source.database.oracle.queries import ( + ORACLE_GET_STORED_PROCEDURE_QUERIES, + ORACLE_GET_STORED_PROCEDURES, +) from metadata.ingestion.source.database.oracle.utils import ( _get_col_type, get_columns, @@ -41,6 +62,12 @@ from metadata.ingestion.source.database.oracle.utils import ( get_table_names, get_view_definition, ) +from metadata.ingestion.source.database.stored_procedures_mixin import ( + QueryByProcedure, + StoredProcedureMixin, +) +from metadata.utils import fqn +from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( get_all_table_comments, @@ -70,7 +97,7 @@ Inspector.get_mview_definition = get_mview_definition OracleDialect.get_mview_names = get_mview_names_dialect -class OracleSource(CommonDbSourceService): +class OracleSource(StoredProcedureMixin, CommonDbSourceService): """ Implements the necessary methods to extract Database metadata from Oracle Source @@ -131,3 +158,82 @@ class OracleSource(CommonDbSourceService): logger.debug(traceback.format_exc()) logger.warning(f"Failed to fetch view definition for {table_name}: {exc}") return None + + def process_result(self, data: FetchProcedureList): + """Process data as per our stored procedure format""" + result_dict = {} + + for row in data: + owner, name, line, text = row + key = (owner, name) + if key not in result_dict: + result_dict[key] = {"lines": [], "text": ""} + result_dict[key]["lines"].append(line) + result_dict[key]["text"] += text + + # Return the concatenated text for each procedure name, ordered by line + return result_dict + + def get_stored_procedures(self) -> Iterable[OracleStoredProcedure]: + """List Oracle Stored Procedures""" + if self.source_config.includeStoredProcedures: + results: FetchProcedureList = self.engine.execute( + ORACLE_GET_STORED_PROCEDURES.format( + schema=self.context.database_schema.upper() + ) + ).all() + results = self.process_result(data=results) + for row in results.items(): + stored_procedure = OracleStoredProcedure( + name=row[0][1], definition=row[1]["text"], owner=row[0][0] + ) + yield stored_procedure + + def yield_stored_procedure( + self, stored_procedure: OracleStoredProcedure + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Prepare the stored procedure payload""" + + try: + stored_procedure_request = CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + storedProcedureCode=StoredProcedureCode( + language=Language.SQL, + code=stored_procedure.definition, + ), + owner=self.metadata.get_reference_by_name( + name=stored_procedure.owner.lower() + ), + databaseSchema=fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + ), + ) + yield Either(right=stored_procedure_request) + self.register_record_stored_proc_request(stored_procedure_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=stored_procedure.name, + error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]", + stackTrace=traceback.format_exc(), + ) + ) + + def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]: + """ + Return the dictionary associating stored procedures to the + queries they triggered + """ + start, _ = get_start_and_end(self.source_config.queryLogDuration) + query = ORACLE_GET_STORED_PROCEDURE_QUERIES.format( + start_date=start, + ) + queries_dict = self.procedure_queries_dict( + query=query, + ) + + return queries_dict diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/models.py b/ingestion/src/metadata/ingestion/source/database/oracle/models.py new file mode 100644 index 00000000000..8f3c4392704 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/oracle/models.py @@ -0,0 +1,30 @@ +""" +Oracle models +""" +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class OracleStoredProcedure(BaseModel): + """Oracle Stored Procedure list query results""" + + name: str + definition: str + language: Optional[str] = Field( + None, description="Will only be informed for non-SQL routines." + ) + owner: str + + +class FetchProcedure(BaseModel): + """Oracle Fetch Stored Procedure Raw Model""" + + owner: Optional[str] + name: str + line: int + text: str + + +class FetchProcedureList(BaseModel): + __name__: List[FetchProcedure] diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/queries.py b/ingestion/src/metadata/ingestion/source/database/oracle/queries.py index 06c4884cec9..5c6c4bcb88d 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/queries.py @@ -12,6 +12,8 @@ SQL Queries used during ingestion """ +import textwrap + ORACLE_ALL_TABLE_COMMENTS = """ SELECT comments table_comment, @@ -63,6 +65,72 @@ col.default_on_null, ) AS identity_options """ +ORACLE_GET_STORED_PROCEDURES = """ +SELECT + OWNER, + NAME, + LINE, + TEXT +FROM + ALL_SOURCE +WHERE + type = 'PROCEDURE' and owner = '{schema}' +""" + +ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent( + """ +WITH SP_HISTORY AS (SELECT + SQL_ID, + sql_text AS query_text, + TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time, + TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time, + PARSING_SCHEMA_NAME as user_name + FROM gv$sql + WHERE sql_text LIKE 'CALL%%' + AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS') + ), + Q_HISTORY AS (SELECT + sql_id, + sql_text AS query_text, + CASE + WHEN UPPER(SQL_TEXT) LIKE 'INSERT%' THEN 'INSERT' + WHEN UPPER(SQL_TEXT) LIKE 'SELECT%' THEN 'SELECT' + ELSE 'OTHER' + END AS QUERY_TYPE, + TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time, + TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time, + PARSING_SCHEMA_NAME AS user_name, + PARSING_SCHEMA_NAME AS SCHEMA_NAME, + NULL AS DATABASE_NAME + FROM gv$sql + WHERE sql_text NOT LIKE '%CALL%' + AND SQL_FULLTEXT NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND SQL_FULLTEXT NOT LIKE '/* {{"app": "dbt", %%}} */%%' + AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS') +) +SELECT + SP.sql_id AS PROCEDURE_ID, + Q.sql_id AS QUERY_ID, + Q.QUERY_TYPE AS QUERY_TYPE, + Q.DATABASE_NAME AS QUERY_DATABASE_NAME, + Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME, + SP.QUERY_TEXT AS PROCEDURE_TEXT, + SP.START_TIME AS PROCEDURE_START_TIME, + SP.END_TIME AS PROCEDURE_END_TIME, + Q.START_TIME AS QUERY_START_TIME, + Q.QUERY_TEXT AS QUERY_TEXT, + Q.USER_NAME AS QUERY_USER_NAME +FROM SP_HISTORY SP +JOIN Q_HISTORY Q + ON Q.start_time between SP.start_time and SP.end_time + AND Q.end_time between SP.start_time and SP.end_time + AND Q.user_name = SP.user_name +ORDER BY PROCEDURE_START_TIME DESC +""" +) + ORACLE_GET_COLUMNS = """ SELECT col.column_name, diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index 9854fa649c4..cbc93f73755 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -59,7 +59,7 @@ class QueryByProcedure(BaseModel): procedure_text: str = Field(..., alias="PROCEDURE_TEXT") procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") - query_start_time: datetime = Field(..., alias="QUERY_START_TIME") + query_start_time: Optional[datetime] = Field(..., alias="QUERY_START_TIME") query_duration: Optional[float] = Field(None, alias="QUERY_DURATION") query_text: str = Field(..., alias="QUERY_TEXT") query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") diff --git a/ingestion/tests/unit/topology/database/test_oracle.py b/ingestion/tests/unit/topology/database/test_oracle.py new file mode 100644 index 00000000000..8f2b151761f --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_oracle.py @@ -0,0 +1,222 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test Oracle using the topology +""" + +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.oracle.metadata import OracleSource +from metadata.ingestion.source.database.oracle.models import OracleStoredProcedure + +mock_oracle_config = { + "source": { + "type": "oracle", + "serviceName": "test2", + "serviceConnection": { + "config": { + "type": "Oracle", + "oracleConnectionType": {"oracleServiceName": "TESTDB"}, + "username": "username", + "password": "password", + "hostPort": "localhost:1466", + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +MOCK_DATABASE_SERVICE = DatabaseService( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="oracle_source_test", + connection=DatabaseConnection(), + serviceType=DatabaseServiceType.Oracle, +) + +MOCK_DATABASE = Database( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + name="sample_database", + fullyQualifiedName="oracle_source_test.sample_database", + displayName="sample_database", + description="", + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="databaseService" + ), +) + +MOCK_DATABASE_SCHEMA = DatabaseSchema( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="sample_schema", + fullyQualifiedName="mssql_source_test.sample_database.sample_schema", + service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"), + database=EntityReference( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + type="database", + ), +) + +MOCK_STORED_PROCEDURE = OracleStoredProcedure( + name="sample_procedure", + definition="SAMPLE_SQL_TEXT", + owner="sample_stored_prcedure_owner", +) + +EXPECTED_DATABASE = [ + CreateDatabaseRequest( + name=EntityName(__root__="sample_database"), + displayName=None, + description=None, + tags=None, + owner=None, + service=FullyQualifiedEntityName(__root__="oracle_source_test"), + dataProducts=None, + default=False, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_DATABASE_SCHEMA = [ + CreateDatabaseSchemaRequest( + name=EntityName(__root__="sample_schema"), + displayName=None, + description=None, + owner=None, + database=FullyQualifiedEntityName( + __root__="oracle_source_test.sample_database" + ), + dataProducts=None, + tags=None, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_STORED_PROCEDURE = [ + CreateStoredProcedureRequest( + name=EntityName(__root__="sample_procedure"), + displayName=None, + description=None, + owner=None, + tags=None, + storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"), + databaseSchema=FullyQualifiedEntityName( + __root__="oracle_source_test.sample_database.sample_schema" + ), + extension=None, + dataProducts=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + + +class OracleUnitTest(TestCase): + """ + Implements the necessary methods to extract + Oracle Unit Test + """ + + @patch( + "metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection" + ) + def __init__( + self, + methodName, + test_connection, + ) -> None: + super().__init__(methodName) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(mock_oracle_config) + self.metadata = OpenMetadata( + OpenMetadataConnection.parse_obj( + mock_oracle_config["workflowConfig"]["openMetadataServerConfig"] + ) + ) + self.oracle = OracleSource.create( + mock_oracle_config["source"], + self.metadata, + ) + self.oracle.context.__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.__root__ + + def test_yield_database(self): + assert EXPECTED_DATABASE == [ + either.right for either in self.oracle.yield_database(MOCK_DATABASE.name) + ] + + self.oracle.context.__dict__["database"] = MOCK_DATABASE.name.__root__ + + def test_yield_schema(self): + assert EXPECTED_DATABASE_SCHEMA == [ + either.right + for either in self.oracle.yield_database_schema(MOCK_DATABASE_SCHEMA.name) + ] + self.oracle.context.__dict__[ + "database_schema" + ] = MOCK_DATABASE_SCHEMA.name.__root__ + + def test_yield_stored_procedure(self): + assert EXPECTED_STORED_PROCEDURE == [ + either.right + for either in self.oracle.yield_stored_procedure(MOCK_STORED_PROCEDURE) + ]