From c16b3df5471ff5c11286b50619b6f9a51a94fbb7 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 2 Apr 2025 15:30:24 +0530 Subject: [PATCH] MINOR: Fix public schema lieage for postgres (#20548) --- .../ingestion/source/database/postgres/lineage.py | 3 ++- ingestion/src/metadata/utils/db_utils.py | 12 +++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py index 1d0e3f2b7f0..5d153da7ba7 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py @@ -31,6 +31,7 @@ from metadata.ingestion.source.database.postgres.queries import POSTGRES_SQL_STA from metadata.ingestion.source.database.postgres.query_parser import ( PostgresQueryParserSource, ) +from metadata.utils.db_utils import PUBLIC_SCHEMA from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -89,7 +90,7 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource): aborted=self.get_aborted_status(row), databaseName=self.get_database_name(row), serviceName=self.config.serviceName, - databaseSchema=self.get_schema_name(row), + databaseSchema=self.get_schema_name(row) or PUBLIC_SCHEMA, duration=row.get("duration"), ) except Exception as err: diff --git a/ingestion/src/metadata/utils/db_utils.py b/ingestion/src/metadata/utils/db_utils.py index afd1e483294..5fd6249515c 100644 --- a/ingestion/src/metadata/utils/db_utils.py +++ b/ingestion/src/metadata/utils/db_utils.py @@ -18,6 +18,9 @@ from typing import Iterable from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper @@ -33,6 +36,8 @@ from metadata.utils.logger import utils_logger logger = utils_logger() +PUBLIC_SCHEMA = "public" + def get_host_from_host_port(uri: str) -> str: """ @@ -64,7 +69,7 @@ def get_view_lineage( schema_name=schema_name, table_name=table_name, ) - table_entity = metadata.get_by_name( + table_entity: Table = metadata.get_by_name( entity=Table, fqn=table_fqn, ) @@ -79,6 +84,11 @@ def get_view_lineage( lineage_parser = LineageParser( view_definition, dialect, timeout_seconds=timeout_seconds ) + + if table_entity.serviceType == DatabaseServiceType.Postgres: + # For Postgres, if schema is not defined, we need to use the public schema + schema_name = PUBLIC_SCHEMA + if lineage_parser.source_tables and lineage_parser.target_tables: yield from get_lineage_by_query( metadata,