fixes: #15742 Oracle stored package feature (#18852)

This commit is contained in:
Akash Verma 2024-12-16 19:35:20 +05:30 committed by GitHub
parent 9ff575fd6f
commit 69557e8716
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 144 additions and 36 deletions

View File

@ -41,7 +41,11 @@ from metadata.ingestion.connections.builders import (
) )
from metadata.ingestion.connections.test_connections import test_connection_db_common from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.oracle.queries import CHECK_ACCESS_TO_ALL from metadata.ingestion.source.database.oracle.queries import (
CHECK_ACCESS_TO_ALL,
ORACLE_GET_SCHEMA,
ORACLE_GET_STORED_PACKAGES,
)
from metadata.utils.constants import THREE_MIN from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -131,6 +135,12 @@ def get_connection(connection: OracleConnection) -> Engine:
) )
class OraclePackageAccessError(Exception):
"""
Raised when unable to access Oracle stored packages
"""
def test_connection( def test_connection(
metadata: OpenMetadata, metadata: OpenMetadata,
engine: Engine, engine: Engine,
@ -143,7 +153,19 @@ def test_connection(
of a metadata workflow or during an Automation Workflow of a metadata workflow or during an Automation Workflow
""" """
test_conn_queries = {"CheckAccess": CHECK_ACCESS_TO_ALL} def test_oracle_package_access(engine):
try:
schema_name = engine.execute(ORACLE_GET_SCHEMA).scalar()
return ORACLE_GET_STORED_PACKAGES.format(schema=schema_name)
except Exception as e:
raise OraclePackageAccessError(
f"Failed to access Oracle stored packages: {e}"
)
test_conn_queries = {
"CheckAccess": CHECK_ACCESS_TO_ALL,
"PackageAccess": test_oracle_package_access(engine),
}
return test_connection_db_common( return test_connection_db_common(
metadata=metadata, metadata=metadata,

View File

@ -24,6 +24,7 @@ from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import ( from metadata.generated.schema.entity.data.storedProcedure import (
Language, Language,
StoredProcedureCode, StoredProcedureCode,
StoredProcedureType,
) )
from metadata.generated.schema.entity.data.table import TableType from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
@ -45,10 +46,11 @@ from metadata.ingestion.source.database.common_db_source import (
TableNameAndType, TableNameAndType,
) )
from metadata.ingestion.source.database.oracle.models import ( from metadata.ingestion.source.database.oracle.models import (
FetchProcedureList, FetchObjectList,
OracleStoredProcedure, OracleStoredObject,
) )
from metadata.ingestion.source.database.oracle.queries import ( from metadata.ingestion.source.database.oracle.queries import (
ORACLE_GET_STORED_PACKAGES,
ORACLE_GET_STORED_PROCEDURES, ORACLE_GET_STORED_PROCEDURES,
) )
from metadata.ingestion.source.database.oracle.utils import ( from metadata.ingestion.source.database.oracle.utils import (
@ -181,41 +183,51 @@ class OracleSource(CommonDbSourceService):
logger.warning(f"Failed to fetch Schema definition for {table_name}: {exc}") logger.warning(f"Failed to fetch Schema definition for {table_name}: {exc}")
return None return None
def process_result(self, data: FetchProcedureList): def process_result(self, data: FetchObjectList):
"""Process data as per our stored procedure format""" """Process data as per our stored procedure format"""
result_dict = {} result_dict = {}
for row in data: for row in data:
owner, name, line, text = row
owner, name, line, text, procedure_type = row
key = (owner, name) key = (owner, name)
if key not in result_dict: if key not in result_dict:
result_dict[key] = {"lines": [], "text": ""} result_dict[key] = {"lines": [], "text": "", "procedure_type": ""}
result_dict[key]["lines"].append(line) result_dict[key]["lines"].append(line)
result_dict[key]["text"] += text result_dict[key]["text"] += text
result_dict[key]["procedure_type"] = procedure_type
# Return the concatenated text for each procedure name, ordered by line # Return the concatenated text for each procedure name, ordered by line
return result_dict return result_dict
def get_stored_procedures(self) -> Iterable[OracleStoredProcedure]: def _get_stored_procedures_internal(
self, query: str
) -> Iterable[OracleStoredObject]:
results: FetchObjectList = self.engine.execute(
query.format(schema=self.context.get().database_schema.upper())
).all()
results = self.process_result(data=results)
for row in results.items():
stored_procedure = OracleStoredObject(
name=row[0][1],
definition=row[1]["text"],
owner=row[0][0],
procedure_type=row[1]["procedure_type"],
)
yield stored_procedure
def get_stored_procedures(self) -> Iterable[OracleStoredObject]:
"""List Oracle Stored Procedures""" """List Oracle Stored Procedures"""
if self.source_config.includeStoredProcedures: if self.source_config.includeStoredProcedures:
results: FetchProcedureList = self.engine.execute( yield from self._get_stored_procedures_internal(
ORACLE_GET_STORED_PROCEDURES.format( ORACLE_GET_STORED_PROCEDURES
schema=self.context.get().database_schema.upper() )
) yield from self._get_stored_procedures_internal(ORACLE_GET_STORED_PACKAGES)
).all()
results = self.process_result(data=results)
for row in results.items():
stored_procedure = OracleStoredProcedure(
name=row[0][1], definition=row[1]["text"], owner=row[0][0]
)
yield stored_procedure
def yield_stored_procedure( def yield_stored_procedure(
self, stored_procedure: OracleStoredProcedure self, stored_procedure: OracleStoredObject
) -> Iterable[Either[CreateStoredProcedureRequest]]: ) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload""" """Prepare the stored procedure payload"""
try: try:
stored_procedure_request = CreateStoredProcedureRequest( stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(stored_procedure.name), name=EntityName(stored_procedure.name),
@ -223,6 +235,11 @@ class OracleSource(CommonDbSourceService):
language=Language.SQL, language=Language.SQL,
code=stored_procedure.definition, code=stored_procedure.definition,
), ),
storedProcedureType=(
StoredProcedureType.StoredPackage
if stored_procedure.procedure_type == "StoredPackage"
else StoredProcedureType.StoredProcedure
),
owners=self.metadata.get_reference_by_name( owners=self.metadata.get_reference_by_name(
name=stored_procedure.owner.lower(), is_owner=True name=stored_procedure.owner.lower(), is_owner=True
), ),

View File

@ -6,7 +6,7 @@ from typing import List, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
class OracleStoredProcedure(BaseModel): class OracleStoredObject(BaseModel):
"""Oracle Stored Procedure list query results""" """Oracle Stored Procedure list query results"""
name: str name: str
@ -15,9 +15,10 @@ class OracleStoredProcedure(BaseModel):
None, description="Will only be informed for non-SQL routines." None, description="Will only be informed for non-SQL routines."
) )
owner: str owner: str
procedure_type: Optional[str] = Field(None, alias="procedure_type")
class FetchProcedure(BaseModel): class FetchObject(BaseModel):
"""Oracle Fetch Stored Procedure Raw Model""" """Oracle Fetch Stored Procedure Raw Model"""
owner: Optional[str] = None owner: Optional[str] = None
@ -26,5 +27,5 @@ class FetchProcedure(BaseModel):
text: str text: str
class FetchProcedureList(BaseModel): class FetchObjectList(BaseModel):
__name__: List[FetchProcedure] __name__: List[FetchObject]

View File

@ -87,13 +87,34 @@ SELECT
OWNER, OWNER,
NAME, NAME,
LINE, LINE,
TEXT TEXT,
'StoredProcedure' as procedure_type
FROM FROM
DBA_SOURCE DBA_SOURCE
WHERE WHERE
type = 'PROCEDURE' and owner = '{schema}' type = 'PROCEDURE' and owner = '{schema}'
""" """
) )
ORACLE_GET_SCHEMA = """
SELECT USERNAME AS SCHEMA_NAME
FROM ALL_USERS
WHERE ROWNUM = 1
ORDER BY USERNAME
"""
ORACLE_GET_STORED_PACKAGES = textwrap.dedent(
"""
SELECT
OWNER,
NAME,
LINE,
TEXT,
'StoredPackage' as procedure_type
FROM
DBA_SOURCE
WHERE TYPE IN ('PACKAGE', 'PACKAGE BODY') AND owner = '{schema}'
"""
)
CHECK_ACCESS_TO_ALL = "SELECT table_name FROM DBA_TABLES where ROWNUM < 2" CHECK_ACCESS_TO_ALL = "SELECT table_name FROM DBA_TABLES where ROWNUM < 2"
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent( ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
""" """

View File

@ -24,7 +24,10 @@ from metadata.generated.schema.api.data.createStoredProcedure import (
) )
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.storedProcedure import (
StoredProcedureCode,
StoredProcedureType,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
@ -40,7 +43,7 @@ from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntit
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.oracle.metadata import OracleSource from metadata.ingestion.source.database.oracle.metadata import OracleSource
from metadata.ingestion.source.database.oracle.models import OracleStoredProcedure from metadata.ingestion.source.database.oracle.models import OracleStoredObject
mock_oracle_config = { mock_oracle_config = {
"source": { "source": {
@ -103,10 +106,18 @@ MOCK_DATABASE_SCHEMA = DatabaseSchema(
), ),
) )
MOCK_STORED_PROCEDURE = OracleStoredProcedure( MOCK_STORED_PROCEDURE = OracleStoredObject(
name="sample_procedure", name="sample_procedure",
definition="SAMPLE_SQL_TEXT", definition="SAMPLE_SQL_TEXT",
owner="sample_stored_prcedure_owner", owner="sample_stored_prcedure_owner",
procedure_type="StoredProcedure",
)
MOCK_STORED_PACKAGE = OracleStoredObject(
name="sample_package",
definition="SAMPLE_SQL_TEXT",
owner="sample_stored_package_owner",
procedure_type="StoredPackage",
) )
EXPECTED_DATABASE = [ EXPECTED_DATABASE = [
@ -154,6 +165,28 @@ EXPECTED_STORED_PROCEDURE = [
owners=None, owners=None,
tags=None, tags=None,
storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"), storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"),
storedProcedureType=StoredProcedureType.StoredProcedure,
databaseSchema=FullyQualifiedEntityName(
"oracle_source_test.sample_database.sample_schema"
),
extension=None,
dataProducts=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
EXPECTED_STORED_PACKAGE = [
CreateStoredProcedureRequest(
name=EntityName("sample_package"),
displayName=None,
description=None,
owners=None,
tags=None,
storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"),
storedProcedureType=StoredProcedureType.StoredPackage,
databaseSchema=FullyQualifiedEntityName( databaseSchema=FullyQualifiedEntityName(
"oracle_source_test.sample_database.sample_schema" "oracle_source_test.sample_database.sample_schema"
), ),
@ -221,3 +254,9 @@ class OracleUnitTest(TestCase):
either.right either.right
for either in self.oracle.yield_stored_procedure(MOCK_STORED_PROCEDURE) for either in self.oracle.yield_stored_procedure(MOCK_STORED_PROCEDURE)
] ]
def test_yield_stored_package(self):
assert EXPECTED_STORED_PACKAGE == [
either.right
for either in self.oracle.yield_stored_procedure(MOCK_STORED_PACKAGE)
]

View File

@ -4,12 +4,18 @@
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.", "description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"steps": [ "steps": [
{ {
"name": "CheckAccess", "name": "CheckAccess",
"description": "Validate that we can properly reach the database and authenticate with the given credentials.", "description": "Validate that we can properly reach the database and authenticate with the given credentials.",
"errorMessage": "Failed to connect to oracle, please validate if the user has relevant permissions, if not, please provide the necessary permissions. For more details, please refer https://docs.open-metadata.org/connectors/database/oracle.", "errorMessage": "Failed to connect to oracle, please validate if the user has relevant permissions, if not, please provide the necessary permissions. For more details, please refer https://docs.open-metadata.org/connectors/database/oracle.",
"shortCircuit": true, "shortCircuit": true,
"mandatory": true "mandatory": true
}, },
{
"name": "PackageAccess",
"description": "Validate that we can access Oracle stored packages.",
"errorMessage": "Failed to access Oracle stored packages. Please verify the user has the necessary permissions to access Oracle packages.",
"mandatory": false
},
{ {
"name": "GetSchemas", "name": "GetSchemas",
"description": "List all the schemas available to the user.", "description": "List all the schemas available to the user.",
@ -17,18 +23,16 @@
"mandatory": true "mandatory": true
}, },
{ {
"name": "GetTables", "name": "GetTables",
"description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", "description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.", "errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.",
"mandatory": true "mandatory": true
}, },
{ {
"name": "GetViews", "name": "GetViews",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", "description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.", "errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false "mandatory": false
} }
] ]
} }

View File

@ -15,7 +15,8 @@
"default": "StoredProcedure", "default": "StoredProcedure",
"enum": [ "enum": [
"StoredProcedure", "StoredProcedure",
"UDF" "UDF",
"StoredPackage"
], ],
"javaEnums": [ "javaEnums": [
{ {
@ -23,6 +24,9 @@
}, },
{ {
"name": "UDF" "name": "UDF"
},
{
"name": "StoredPackage"
} }
] ]
}, },