feat(postgres): add support for stored procedures in postgres. (#14102)

Co-authored-by: root <root@Aarush-PC.localdomain>
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
This commit is contained in:
Aarush 2025-07-22 10:31:42 -07:00 committed by GitHub
parent ee759731f7
commit 47e436fdfd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 625 additions and 2 deletions

View File

@ -36,6 +36,15 @@ from datahub.ingestion.source.sql.sql_common import (
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig
from datahub.ingestion.source.sql.sql_utils import (
gen_database_key,
gen_schema_key,
)
from datahub.ingestion.source.sql.stored_procedures.base import (
BaseProcedure,
generate_procedure_container_workunits,
generate_procedure_workunits,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BytesTypeClass,
@ -123,6 +132,15 @@ class PostgresConfig(BasePostgresConfig):
"Note: this is not used if `database` or `sqlalchemy_uri` are provided."
),
)
include_stored_procedures: bool = Field(
default=True,
description="Include ingest of stored procedures.",
)
procedure_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for stored procedures to filter in ingestion."
"Specify regex to match the entire procedure name in database.schema.procedure_name format. e.g. to match all procedures starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
@platform_name("Postgres")
@ -135,7 +153,7 @@ class PostgresSource(SQLAlchemySource):
"""
This plugin extracts the following:
- Metadata for databases, schemas, views, and tables
- Metadata for databases, schemas, views, tables, and stored procedures
- Column types associated with each table
- Also supports PostGIS extensions
- Table, row, and column statistics via optional SQL profiling
@ -291,3 +309,174 @@ class PostgresSource(SQLAlchemySource):
] = row.table_size
except Exception as e:
logger.error(f"failed to fetch profile metadata: {e}")
def get_schema_level_workunits(
self,
inspector: Inspector,
schema: str,
database: str,
) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_schema_level_workunits(
inspector=inspector,
schema=schema,
database=database,
)
if self.config.include_stored_procedures:
try:
yield from self.loop_stored_procedures(inspector, schema, self.config)
except Exception as e:
self.report.failure(
title="Failed to list stored procedures for schema",
message="An error occurred while listing procedures for the schema.",
context=f"{database}.{schema}",
exc=e,
)
def loop_stored_procedures(
self,
inspector: Inspector,
schema: str,
config: PostgresConfig,
) -> Iterable[MetadataWorkUnit]:
"""
Loop schema data for get stored procedures as dataJob-s.
"""
db_name = self.get_db_name(inspector)
procedures = self.fetch_procedures_for_schema(inspector, schema, db_name)
if procedures:
yield from self._process_procedures(procedures, db_name, schema)
def fetch_procedures_for_schema(
self, inspector: Inspector, schema: str, db_name: str
) -> List[BaseProcedure]:
try:
raw_procedures: List[BaseProcedure] = self.get_procedures_for_schema(
inspector, schema, db_name
)
procedures: List[BaseProcedure] = []
for procedure in raw_procedures:
procedure_qualified_name = self.get_identifier(
schema=schema,
entity=procedure.name,
inspector=inspector,
)
if not self.config.procedure_pattern.allowed(procedure_qualified_name):
self.report.report_dropped(procedure_qualified_name)
else:
procedures.append(procedure)
return procedures
except Exception as e:
self.report.warning(
title="Failed to get procedures for schema",
message="An error occurred while fetching procedures for the schema.",
context=f"{db_name}.{schema}",
exc=e,
)
return []
def get_procedures_for_schema(
self, inspector: Inspector, schema: str, db_name: str
) -> List[BaseProcedure]:
"""
Get stored procedures for a specific schema.
"""
base_procedures = []
with inspector.engine.connect() as conn:
procedures = conn.execute(
"""
SELECT
p.proname AS name,
l.lanname AS language,
pg_get_function_arguments(p.oid) AS arguments,
pg_get_functiondef(p.oid) AS definition,
obj_description(p.oid, 'pg_proc') AS comment
FROM
pg_proc p
JOIN
pg_namespace n ON n.oid = p.pronamespace
JOIN
pg_language l ON l.oid = p.prolang
WHERE
p.prokind = 'p'
AND n.nspname = '"""
+ schema
+ """';
"""
)
procedure_rows = list(procedures)
for row in procedure_rows:
base_procedures.append(
BaseProcedure(
name=row.name,
language=row.language,
argument_signature=row.arguments,
return_type=None,
procedure_definition=row.definition,
created=None,
last_altered=None,
comment=row.comment,
extra_properties=None,
)
)
return base_procedures
def _process_procedures(
self,
procedures: List[BaseProcedure],
db_name: str,
schema: str,
) -> Iterable[MetadataWorkUnit]:
if procedures:
yield from generate_procedure_container_workunits(
database_key=gen_database_key(
database=db_name,
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
),
schema_key=gen_schema_key(
db_name=db_name,
schema=schema,
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
),
)
for procedure in procedures:
yield from self._process_procedure(procedure, schema, db_name)
def _process_procedure(
self,
procedure: BaseProcedure,
schema: str,
db_name: str,
) -> Iterable[MetadataWorkUnit]:
try:
yield from generate_procedure_workunits(
procedure=procedure,
database_key=gen_database_key(
database=db_name,
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
),
schema_key=gen_schema_key(
db_name=db_name,
schema=schema,
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
),
schema_resolver=self.get_schema_resolver(),
)
except Exception as e:
self.report.warning(
title="Failed to emit stored procedure",
message="An error occurred while emitting stored procedure",
context=procedure.name,
exc=e,
)

View File

@ -764,6 +764,180 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "postgrestest.public.stored_procedures"
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a6097853edba03be190d99ece4b307ff"
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a6097853edba03be190d99ece4b307ff",
"urn": "urn:li:container:a6097853edba03be190d99ece4b307ff"
}
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "add_row_to_metadata_aspect_v2",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:51904fc8cd5cc729bc630decff284525"
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "CREATE OR REPLACE PROCEDURE public.add_row_to_metadata_aspect_v2(IN urn character varying, IN aspect character varying, IN version bigint, IN metadata text, IN createdon timestamp without time zone, IN createdby character varying, IN metadata_json json)\n LANGUAGE sql\nAS $procedure$\n insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values(\n urn,\n aspect,\n version,\n metadata,\n createdon,\n createdby,\n metadata_json\n )\n$procedure$\n",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a6097853edba03be190d99ece4b307ff",
"urn": "urn:li:container:a6097853edba03be190d99ece4b307ff"
},
{
"id": "urn:li:container:51904fc8cd5cc729bc630decff284525",
"urn": "urn:li:container:51904fc8cd5cc729bc630decff284525"
}
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgrestest.public.metadata_aspect_v2,PROD)",
@ -984,6 +1158,38 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Apostgres%2Cpostgrestest.public.metadata_aspect_view%2CPROD%29",

View File

@ -588,6 +588,180 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "postgrestest.public.stored_procedures"
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a6097853edba03be190d99ece4b307ff"
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a6097853edba03be190d99ece4b307ff",
"urn": "urn:li:container:a6097853edba03be190d99ece4b307ff"
}
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "add_row_to_metadata_aspect_v2",
"type": {
"string": "Stored Procedure"
}
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:51904fc8cd5cc729bc630decff284525"
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "CREATE OR REPLACE PROCEDURE public.add_row_to_metadata_aspect_v2(IN urn character varying, IN aspect character varying, IN version bigint, IN metadata text, IN createdon timestamp without time zone, IN createdby character varying, IN metadata_json json)\n LANGUAGE sql\nAS $procedure$\n insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values(\n urn,\n aspect,\n version,\n metadata,\n createdon,\n createdby,\n metadata_json\n )\n$procedure$\n",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a6097853edba03be190d99ece4b307ff",
"urn": "urn:li:container:a6097853edba03be190d99ece4b307ff"
},
{
"id": "urn:li:container:51904fc8cd5cc729bc630decff284525",
"urn": "urn:li:container:51904fc8cd5cc729bc630decff284525"
}
]
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgrestest.public.metadata_aspect_v2,PROD)",
@ -750,6 +924,38 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(postgres,postgrestest.public.stored_procedures,PROD),add_row_to_metadata_aspect_v2_22b7c21680c08595bac1344d1d357044)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1646575200000,
"runId": "postgres-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:view_urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Apostgres%2Cpostgrestest.public.metadata_aspect_view%2CPROD%29",

View File

@ -38,4 +38,26 @@ insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, creat
create view metadata_aspect_view as select urn, aspect from metadata_aspect_v2 where version=0;
-- To get estimate counts of table rows after analyze
ANALYZE
ANALYZE;
CREATE PROCEDURE add_row_to_metadata_aspect_v2(
urn varchar(500),
aspect varchar(200),
version bigint,
metadata text,
createdon timestamp,
createdby varchar(255),
metadata_json json
)
LANGUAGE SQL
AS $$
insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values(
urn,
aspect,
version,
metadata,
createdon,
createdby,
metadata_json
)
$$;