From 47e436fdfd56eb4b8a5bf7f203a74bb5ded391b6 Mon Sep 17 00:00:00 2001 From: Aarush <60299379+EmmetAVS@users.noreply.github.com> Date: Tue, 22 Jul 2025 10:31:42 -0700 Subject: [PATCH] feat(postgres): add support for stored procedures in postgres. (#14102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: root Co-authored-by: Sergio Gómez Villamor --- .../datahub/ingestion/source/sql/postgres.py | 191 +++++++++++++++- .../postgres_all_db_mces_with_db_golden.json | 206 ++++++++++++++++++ .../postgres_mces_with_db_golden.json | 206 ++++++++++++++++++ .../integration/postgres/setup/setup.sql | 24 +- 4 files changed, 625 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py index 6af722d1fe..5095f2e2dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py @@ -36,6 +36,15 @@ from datahub.ingestion.source.sql.sql_common import ( register_custom_type, ) from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig +from datahub.ingestion.source.sql.sql_utils import ( + gen_database_key, + gen_schema_key, +) +from datahub.ingestion.source.sql.stored_procedures.base import ( + BaseProcedure, + generate_procedure_container_workunits, + generate_procedure_workunits, +) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayTypeClass, BytesTypeClass, @@ -123,6 +132,15 @@ class PostgresConfig(BasePostgresConfig): "Note: this is not used if `database` or `sqlalchemy_uri` are provided." ), ) + include_stored_procedures: bool = Field( + default=True, + description="Include ingest of stored procedures.", + ) + procedure_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for stored procedures to filter in ingestion." + "Specify regex to match the entire procedure name in database.schema.procedure_name format. e.g. to match all procedures starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + ) @platform_name("Postgres") @@ -135,7 +153,7 @@ class PostgresSource(SQLAlchemySource): """ This plugin extracts the following: - - Metadata for databases, schemas, views, and tables + - Metadata for databases, schemas, views, tables, and stored procedures - Column types associated with each table - Also supports PostGIS extensions - Table, row, and column statistics via optional SQL profiling @@ -291,3 +309,174 @@ class PostgresSource(SQLAlchemySource): ] = row.table_size except Exception as e: logger.error(f"failed to fetch profile metadata: {e}") + + def get_schema_level_workunits( + self, + inspector: Inspector, + schema: str, + database: str, + ) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + yield from super().get_schema_level_workunits( + inspector=inspector, + schema=schema, + database=database, + ) + + if self.config.include_stored_procedures: + try: + yield from self.loop_stored_procedures(inspector, schema, self.config) + except Exception as e: + self.report.failure( + title="Failed to list stored procedures for schema", + message="An error occurred while listing procedures for the schema.", + context=f"{database}.{schema}", + exc=e, + ) + + def loop_stored_procedures( + self, + inspector: Inspector, + schema: str, + config: PostgresConfig, + ) -> Iterable[MetadataWorkUnit]: + """ + Loop schema data for get stored procedures as dataJob-s. + """ + db_name = self.get_db_name(inspector) + + procedures = self.fetch_procedures_for_schema(inspector, schema, db_name) + if procedures: + yield from self._process_procedures(procedures, db_name, schema) + + def fetch_procedures_for_schema( + self, inspector: Inspector, schema: str, db_name: str + ) -> List[BaseProcedure]: + try: + raw_procedures: List[BaseProcedure] = self.get_procedures_for_schema( + inspector, schema, db_name + ) + procedures: List[BaseProcedure] = [] + for procedure in raw_procedures: + procedure_qualified_name = self.get_identifier( + schema=schema, + entity=procedure.name, + inspector=inspector, + ) + + if not self.config.procedure_pattern.allowed(procedure_qualified_name): + self.report.report_dropped(procedure_qualified_name) + else: + procedures.append(procedure) + return procedures + except Exception as e: + self.report.warning( + title="Failed to get procedures for schema", + message="An error occurred while fetching procedures for the schema.", + context=f"{db_name}.{schema}", + exc=e, + ) + return [] + + def get_procedures_for_schema( + self, inspector: Inspector, schema: str, db_name: str + ) -> List[BaseProcedure]: + """ + Get stored procedures for a specific schema. + """ + base_procedures = [] + with inspector.engine.connect() as conn: + procedures = conn.execute( + """ + SELECT + p.proname AS name, + l.lanname AS language, + pg_get_function_arguments(p.oid) AS arguments, + pg_get_functiondef(p.oid) AS definition, + obj_description(p.oid, 'pg_proc') AS comment + FROM + pg_proc p + JOIN + pg_namespace n ON n.oid = p.pronamespace + JOIN + pg_language l ON l.oid = p.prolang + WHERE + p.prokind = 'p' + AND n.nspname = '""" + + schema + + """'; + """ + ) + + procedure_rows = list(procedures) + for row in procedure_rows: + base_procedures.append( + BaseProcedure( + name=row.name, + language=row.language, + argument_signature=row.arguments, + return_type=None, + procedure_definition=row.definition, + created=None, + last_altered=None, + comment=row.comment, + extra_properties=None, + ) + ) + return base_procedures + + def _process_procedures( + self, + procedures: List[BaseProcedure], + db_name: str, + schema: str, + ) -> Iterable[MetadataWorkUnit]: + if procedures: + yield from generate_procedure_container_workunits( + database_key=gen_database_key( + database=db_name, + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ), + schema_key=gen_schema_key( + db_name=db_name, + schema=schema, + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ), + ) + for procedure in procedures: + yield from self._process_procedure(procedure, schema, db_name) + + def _process_procedure( + self, + procedure: BaseProcedure, + schema: str, + db_name: str, + ) -> Iterable[MetadataWorkUnit]: + try: + yield from generate_procedure_workunits( + procedure=procedure, + database_key=gen_database_key( + database=db_name, + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ), + schema_key=gen_schema_key( + db_name=db_name, + schema=schema, + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ), + schema_resolver=self.get_schema_resolver(), + ) + except Exception as e: + self.report.warning( + title="Failed to emit stored procedure", + message="An error occurred while emitting stored procedure", + context=procedure.name, + exc=e, + ) diff --git a/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json b/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json index 2c5cf02c43..c43ad5c3ba 100644 --- a/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json +++ b/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json @@ -764,6 +764,180 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "postgrestest.public.stored_procedures" + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Procedures Container" + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:a6097853edba03be190d99ece4b307ff" + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:a6097853edba03be190d99ece4b307ff", + "urn": "urn:li:container:a6097853edba03be190d99ece4b307ff" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "add_row_to_metadata_aspect_v2", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:51904fc8cd5cc729bc630decff284525" + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "CREATE OR REPLACE PROCEDURE public.add_row_to_metadata_aspect_v2(IN urn character varying, IN aspect character varying, IN version bigint, IN metadata text, IN createdon timestamp without time zone, IN createdby character varying, IN metadata_json json)\n LANGUAGE sql\nAS $procedure$\n insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values(\n urn,\n aspect,\n version,\n metadata,\n createdon,\n createdby,\n metadata_json\n )\n$procedure$\n", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:a6097853edba03be190d99ece4b307ff", + "urn": "urn:li:container:a6097853edba03be190d99ece4b307ff" + }, + { + "id": "urn:li:container:51904fc8cd5cc729bc630decff284525", + "urn": "urn:li:container:51904fc8cd5cc729bc630decff284525" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgrestest.public.metadata_aspect_v2,PROD)", @@ -984,6 +1158,38 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Apostgres%2Cpostgrestest.public.metadata_aspect_view%2CPROD%29", diff --git a/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json b/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json index 4a30cbfc45..29a2953d8c 100644 --- a/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json +++ b/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json @@ -588,6 +588,180 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "postgrestest.public.stored_procedures" + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Procedures Container" + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:a6097853edba03be190d99ece4b307ff" + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:a6097853edba03be190d99ece4b307ff", + "urn": "urn:li:container:a6097853edba03be190d99ece4b307ff" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "add_row_to_metadata_aspect_v2", + "type": { + "string": "Stored Procedure" + } + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Stored Procedure" + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:51904fc8cd5cc729bc630decff284525" + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "dataTransformLogic", + "aspect": { + "json": { + "transforms": [ + { + "queryStatement": { + "value": "CREATE OR REPLACE PROCEDURE public.add_row_to_metadata_aspect_v2(IN urn character varying, IN aspect character varying, IN version bigint, IN metadata text, IN createdon timestamp without time zone, IN createdby character varying, IN metadata_json json)\n LANGUAGE sql\nAS $procedure$\n insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values(\n urn,\n aspect,\n version,\n metadata,\n createdon,\n createdby,\n metadata_json\n )\n$procedure$\n", + "language": "SQL" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:a6097853edba03be190d99ece4b307ff", + "urn": "urn:li:container:a6097853edba03be190d99ece4b307ff" + }, + { + "id": "urn:li:container:51904fc8cd5cc729bc630decff284525", + "urn": "urn:li:container:51904fc8cd5cc729bc630decff284525" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgrestest.public.metadata_aspect_v2,PROD)", @@ -750,6 +924,38 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1646575200000, + "runId": "postgres-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Apostgres%2Cpostgrestest.public.metadata_aspect_view%2CPROD%29", diff --git a/metadata-ingestion/tests/integration/postgres/setup/setup.sql b/metadata-ingestion/tests/integration/postgres/setup/setup.sql index 66ef7942ad..3ac0135a59 100644 --- a/metadata-ingestion/tests/integration/postgres/setup/setup.sql +++ b/metadata-ingestion/tests/integration/postgres/setup/setup.sql @@ -38,4 +38,26 @@ insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, creat create view metadata_aspect_view as select urn, aspect from metadata_aspect_v2 where version=0; -- To get estimate counts of table rows after analyze -ANALYZE +ANALYZE; + +CREATE PROCEDURE add_row_to_metadata_aspect_v2( + urn varchar(500), + aspect varchar(200), + version bigint, + metadata text, + createdon timestamp, + createdby varchar(255), + metadata_json json +) +LANGUAGE SQL +AS $$ + insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values( + urn, + aspect, + version, + metadata, + createdon, + createdby, + metadata_json + ) +$$;