GEN-1552: Postgres stored procedures support (#18083)

This commit is contained in:
harshsoni2024 2024-10-07 14:28:03 +05:30 committed by harshsoni2024
parent a9a37cfd0b
commit 83bfda5229
3 changed files with 195 additions and 3 deletions

View File

@ -13,14 +13,19 @@ Postgres source module
"""
import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple
from sqlalchemy import String as SqlAlchemyString
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine import Inspector
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
PartitionColumnDetails,
PartitionIntervalTypes,
@ -36,7 +41,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
@ -46,10 +51,13 @@ from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.mssql.models import STORED_PROC_LANGUAGE_MAP
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.postgres.models import PostgresStoredProcedure
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_DB_NAMES,
POSTGRES_GET_STORED_PROCEDURES,
POSTGRES_GET_TABLE_NAMES,
POSTGRES_PARTITION_DETAILS,
POSTGRES_SCHEMA_COMMENTS,
@ -63,6 +71,10 @@ from metadata.ingestion.source.database.postgres.utils import (
get_table_owner,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.importer import import_side_effects
@ -142,7 +154,7 @@ Inspector.get_table_owner = get_etable_owner
PGDialect.get_foreign_keys = get_foreign_keys
class PostgresSource(CommonDbSourceService, MultiDBSource):
class PostgresSource(CommonDbSourceService, MultiDBSource, StoredProcedureMixin):
"""
Implements the necessary methods to extract
Database metadata from Postgres Source
@ -298,3 +310,62 @@ class PostgresSource(CommonDbSourceService, MultiDBSource):
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedures(self) -> Iterable[PostgresStoredProcedure]:
"""List stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(POSTGRES_GET_STORED_PROCEDURES).all()
for row in results:
try:
stored_procedure = PostgresStoredProcedure.model_validate(
dict(row._mapping)
)
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)
def yield_stored_procedure(
self, stored_procedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""
try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(stored_procedure.name),
description=None,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
code=stored_procedure.definition,
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)
except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
return {}

View File

@ -0,0 +1,25 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Postgres models
"""
from typing import Optional
from pydantic import BaseModel, Field
class PostgresStoredProcedure(BaseModel):
"""Postgres stored procedure list query results"""
name: str = Field(alias="procedure_name")
schema: str = Field(alias="schema_name")
definition: str
language: Optional[str] = None

View File

@ -211,3 +211,99 @@ POSTGRES_FETCH_FK = """
n.oid = c.relnamespace
ORDER BY 1
"""
POSTGRES_GET_JSON_FIELDS = """
WITH RECURSIVE json_hierarchy AS (
SELECT
key AS path,
json_typeof(value) AS type,
value,
json_build_object() AS properties,
key AS title
FROM
{table_name} tbd,
LATERAL json_each({column_name}::json)
),
build_hierarchy AS (
SELECT
path,
type,
title,
CASE
WHEN type = 'object' THEN
json_build_object(
'title', title,
'type', 'object',
'properties', (
SELECT json_object_agg(
key,
json_build_object(
'title', key,
'type', json_typeof(value),
'properties', (
CASE
WHEN json_typeof(value) = 'object' THEN
(
SELECT json_object_agg(
key,
json_build_object(
'title', key,
'type', json_typeof(value),
'properties',
json_build_object()
)
)
FROM json_each(value::json) AS sub_key_value
)
ELSE json_build_object()
END
)
)
)
FROM json_each(value::json) AS key_value
)
)
WHEN type = 'array' THEN
json_build_object(
'title', title,
'type', 'array',
'properties', json_build_object()
)
ELSE
json_build_object(
'title', title,
'type', type
)
END AS hierarchy
FROM
json_hierarchy
),
aggregate_hierarchy AS (
select
json_build_object(
'title','{column_name}',
'type','object',
'properties',
json_object_agg(
path,
hierarchy
)) AS result
FROM
build_hierarchy
)
SELECT
result
FROM
aggregate_hierarchy;
"""
POSTGRES_GET_STORED_PROCEDURES = """
SELECT proname AS procedure_name,
nspname AS schema_name,
proargtypes AS argument_types,
prorettype::regtype AS return_type,
prosrc AS definition
FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE prokind = 'p';
"""