diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 7e646e8e03c..76b89f8593b 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -13,14 +13,19 @@ Postgres source module """ import traceback from collections import namedtuple -from typing import Iterable, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple from sqlalchemy import String as SqlAlchemyString from sqlalchemy import sql from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names from sqlalchemy.engine import Inspector +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( PartitionColumnDetails, PartitionIntervalTypes, @@ -36,7 +41,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification @@ -46,10 +51,13 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.mssql.models import STORED_PROC_LANGUAGE_MAP from metadata.ingestion.source.database.multi_db_source import MultiDBSource +from metadata.ingestion.source.database.postgres.models import PostgresStoredProcedure from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_GET_ALL_TABLE_PG_POLICY, POSTGRES_GET_DB_NAMES, + POSTGRES_GET_STORED_PROCEDURES, POSTGRES_GET_TABLE_NAMES, POSTGRES_PARTITION_DETAILS, POSTGRES_SCHEMA_COMMENTS, @@ -63,6 +71,10 @@ from metadata.ingestion.source.database.postgres.utils import ( get_table_owner, get_view_definition, ) +from metadata.ingestion.source.database.stored_procedures_mixin import ( + QueryByProcedure, + StoredProcedureMixin, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.importer import import_side_effects @@ -142,7 +154,7 @@ Inspector.get_table_owner = get_etable_owner PGDialect.get_foreign_keys = get_foreign_keys -class PostgresSource(CommonDbSourceService, MultiDBSource): +class PostgresSource(CommonDbSourceService, MultiDBSource, StoredProcedureMixin): """ Implements the necessary methods to extract Database metadata from Postgres Source @@ -298,3 +310,62 @@ class PostgresSource(CommonDbSourceService, MultiDBSource): stackTrace=traceback.format_exc(), ) ) + + def get_stored_procedures(self) -> Iterable[PostgresStoredProcedure]: + """List stored procedures""" + if self.source_config.includeStoredProcedures: + results = self.engine.execute(POSTGRES_GET_STORED_PROCEDURES).all() + for row in results: + try: + stored_procedure = PostgresStoredProcedure.model_validate( + dict(row._mapping) + ) + yield stored_procedure + except Exception as exc: + logger.error() + self.status.failed( + error=StackTraceError( + name=dict(row).get("name", "UNKNOWN"), + error=f"Error parsing Stored Procedure payload: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def yield_stored_procedure( + self, stored_procedure + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Prepare the stored procedure payload""" + try: + stored_procedure_request = CreateStoredProcedureRequest( + name=EntityName(stored_procedure.name), + description=None, + storedProcedureCode=StoredProcedureCode( + language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language), + code=stored_procedure.definition, + ), + databaseSchema=fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=self.context.get().database_schema, + ), + ) + yield Either(right=stored_procedure_request) + self.register_record_stored_proc_request(stored_procedure_request) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=stored_procedure.name, + error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]", + stackTrace=traceback.format_exc(), + ) + ) + + def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]: + """ + Return the dictionary associating stored procedures to the + queries they triggered + """ + return {} diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/models.py b/ingestion/src/metadata/ingestion/source/database/postgres/models.py new file mode 100644 index 00000000000..a319237b570 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/postgres/models.py @@ -0,0 +1,25 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Postgres models +""" +from typing import Optional + +from pydantic import BaseModel, Field + + +class PostgresStoredProcedure(BaseModel): + """Postgres stored procedure list query results""" + + name: str = Field(alias="procedure_name") + schema: str = Field(alias="schema_name") + definition: str + language: Optional[str] = None diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index c78cc313fa9..e6cf7df513c 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -211,3 +211,99 @@ POSTGRES_FETCH_FK = """ n.oid = c.relnamespace ORDER BY 1 """ + +POSTGRES_GET_JSON_FIELDS = """ + WITH RECURSIVE json_hierarchy AS ( + SELECT + key AS path, + json_typeof(value) AS type, + value, + json_build_object() AS properties, + key AS title + FROM + {table_name} tbd, + LATERAL json_each({column_name}::json) + ), + build_hierarchy AS ( + SELECT + path, + type, + title, + CASE + WHEN type = 'object' THEN + json_build_object( + 'title', title, + 'type', 'object', + 'properties', ( + SELECT json_object_agg( + key, + json_build_object( + 'title', key, + 'type', json_typeof(value), + 'properties', ( + CASE + WHEN json_typeof(value) = 'object' THEN + ( + SELECT json_object_agg( + key, + json_build_object( + 'title', key, + 'type', json_typeof(value), + 'properties', + json_build_object() + ) + ) + FROM json_each(value::json) AS sub_key_value + ) + ELSE json_build_object() + END + ) + ) + ) + FROM json_each(value::json) AS key_value + ) + ) + WHEN type = 'array' THEN + json_build_object( + 'title', title, + 'type', 'array', + 'properties', json_build_object() + ) + ELSE + json_build_object( + 'title', title, + 'type', type + ) + END AS hierarchy + FROM + json_hierarchy + ), + aggregate_hierarchy AS ( + select + json_build_object( + 'title','{column_name}', + 'type','object', + 'properties', + json_object_agg( + path, + hierarchy + )) AS result + FROM + build_hierarchy + ) + SELECT + result + FROM + aggregate_hierarchy; +""" + +POSTGRES_GET_STORED_PROCEDURES = """ + SELECT proname AS procedure_name, + nspname AS schema_name, + proargtypes AS argument_types, + prorettype::regtype AS return_type, + prosrc AS definition + FROM pg_proc + JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE prokind = 'p'; +"""