Fixes #19688 - Added Postgres Functions Ingestion (#20188)

This commit is contained in:
Keshav Mohta 2025-03-15 11:54:38 +05:30 committed by GitHub
parent 4ec19e56a9
commit b21c60304c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 54 additions and 19 deletions

View File

@ -60,6 +60,7 @@ from metadata.ingestion.source.database.postgres.models import PostgresStoredPro
from metadata.ingestion.source.database.postgres.queries import ( from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY, POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_DB_NAMES, POSTGRES_GET_DB_NAMES,
POSTGRES_GET_FUNCTIONS,
POSTGRES_GET_STORED_PROCEDURES, POSTGRES_GET_STORED_PROCEDURES,
POSTGRES_GET_TABLE_NAMES, POSTGRES_GET_TABLE_NAMES,
POSTGRES_PARTITION_DETAILS, POSTGRES_PARTITION_DETAILS,
@ -273,25 +274,33 @@ class PostgresSource(CommonDbSourceService, MultiDBSource):
) )
) )
def _get_stored_procedures_internal(
self, query: str
) -> Iterable[PostgresStoredProcedure]:
results = self.engine.execute(query).all()
for row in results:
try:
stored_procedure = PostgresStoredProcedure.model_validate(
dict(row._mapping)
)
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedures(self) -> Iterable[PostgresStoredProcedure]: def get_stored_procedures(self) -> Iterable[PostgresStoredProcedure]:
"""List stored procedures""" """List stored procedures"""
if self.source_config.includeStoredProcedures: if self.source_config.includeStoredProcedures:
results = self.engine.execute(POSTGRES_GET_STORED_PROCEDURES).all() yield from self._get_stored_procedures_internal(
for row in results: POSTGRES_GET_STORED_PROCEDURES
try: )
stored_procedure = PostgresStoredProcedure.model_validate( yield from self._get_stored_procedures_internal(POSTGRES_GET_FUNCTIONS)
dict(row._mapping)
)
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)
def yield_stored_procedure( def yield_stored_procedure(
self, stored_procedure self, stored_procedure
@ -312,6 +321,7 @@ class PostgresSource(CommonDbSourceService, MultiDBSource):
database_name=self.context.get().database, database_name=self.context.get().database,
schema_name=self.context.get().database_schema, schema_name=self.context.get().database_schema,
), ),
storedProcedureType=stored_procedure.procedure_type,
) )
yield Either(right=stored_procedure_request) yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request) self.register_record_stored_proc_request(stored_procedure_request)

View File

@ -23,3 +23,4 @@ class PostgresStoredProcedure(BaseModel):
schema: str = Field(alias="schema_name") schema: str = Field(alias="schema_name")
definition: str definition: str
language: Optional[str] = None language: Optional[str] = None
procedure_type: Optional[str] = Field(None, alias="procedure_type")

View File

@ -217,8 +217,25 @@ POSTGRES_GET_STORED_PROCEDURES = """
nspname AS schema_name, nspname AS schema_name,
proargtypes AS argument_types, proargtypes AS argument_types,
prorettype::regtype AS return_type, prorettype::regtype AS return_type,
prosrc AS definition prosrc AS definition,
'StoredProcedure' as procedure_type
FROM pg_proc FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE prokind = 'p'; WHERE prokind = 'p';
""" """
POSTGRES_GET_FUNCTIONS = """
SELECT
proname AS procedure_name,
nspname AS schema_name,
proargtypes AS argument_types,
prorettype :: regtype AS return_type,
prosrc AS definition,
'Function' as procedure_type
FROM
pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE
prokind = 'f'
and pg_namespace.nspname NOT IN ('pg_catalog', 'information_schema');
"""

View File

@ -6,7 +6,9 @@
"description": "A `StoredProcedure` entity that contains the set of code statements with an assigned name and is defined in a `Database Schema`.\"", "description": "A `StoredProcedure` entity that contains the set of code statements with an assigned name and is defined in a `Database Schema`.\"",
"type": "object", "type": "object",
"javaType": "org.openmetadata.schema.entity.data.StoredProcedure", "javaType": "org.openmetadata.schema.entity.data.StoredProcedure",
"javaInterfaces": ["org.openmetadata.schema.EntityInterface"], "javaInterfaces": [
"org.openmetadata.schema.EntityInterface"
],
"definitions": { "definitions": {
"storedProcedureType": { "storedProcedureType": {
"javaType": "org.openmetadata.schema.type.StoredProcedureType", "javaType": "org.openmetadata.schema.type.StoredProcedureType",
@ -16,7 +18,8 @@
"enum": [ "enum": [
"StoredProcedure", "StoredProcedure",
"UDF", "UDF",
"StoredPackage" "StoredPackage",
"Function"
], ],
"javaEnums": [ "javaEnums": [
{ {
@ -27,6 +30,9 @@
}, },
{ {
"name": "StoredPackage" "name": "StoredPackage"
},
{
"name": "Function"
} }
] ]
}, },

View File

@ -501,6 +501,7 @@ export enum Language {
* This schema defines the type of the type of Procedures * This schema defines the type of the type of Procedures
*/ */
export enum StoredProcedureType { export enum StoredProcedureType {
Function = "Function",
StoredPackage = "StoredPackage", StoredPackage = "StoredPackage",
StoredProcedure = "StoredProcedure", StoredProcedure = "StoredProcedure",
Udf = "UDF", Udf = "UDF",