From 12c005af96004a7ca14c4e1863ff5728550ce890 Mon Sep 17 00:00:00 2001 From: Akash Verma <138790903+akashverma0786@users.noreply.github.com> Date: Mon, 3 Feb 2025 10:23:48 +0530 Subject: [PATCH] Hive: Updated postgres query to get partition field (#19563) --- .../hive/metastore_dialects/mysql/dialect.py | 50 +++++++++++++----- .../metastore_dialects/postgres/dialect.py | 52 ++++++++++++++----- 2 files changed, 74 insertions(+), 28 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/mysql/dialect.py b/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/mysql/dialect.py index aba759090a9..050c2efe116 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/mysql/dialect.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/mysql/dialect.py @@ -45,21 +45,43 @@ class HiveMysqlMetaStoreDialect(HiveMetaStoreDialectMixin, MySQLDialect_pymysql) return [row[0] for row in connection.execute(query)] def _get_table_columns(self, connection, table_name, schema): - query = f""" - SELECT - col.COLUMN_NAME, - col.TYPE_NAME, - col.COMMENT - from - COLUMNS_V2 col - join CDS cds ON col.CD_ID = cds.CD_ID - join SDS sds ON sds.CD_ID = cds.CD_ID - join TBLS tbsl on sds.SD_ID = tbsl.SD_ID - and tbsl.TBL_NAME = '{table_name}' + schema_join = ( + f""" + JOIN DBS db on tbsl.DB_ID = db.DB_ID + AND db.NAME = '{schema}' + """ + if schema + else "" + ) + + query = f""" + WITH regular_columns AS ( + SELECT + col.COLUMN_NAME, + col.TYPE_NAME, + col.COMMENT + FROM COLUMNS_V2 col + JOIN CDS cds ON col.CD_ID = cds.CD_ID + JOIN SDS sds ON sds.CD_ID = cds.CD_ID + JOIN TBLS tbsl ON sds.SD_ID = tbsl.SD_ID + AND tbsl.TBL_NAME = '{table_name}' + {schema_join} + ), + partition_columns AS ( + SELECT + pk.PKEY_NAME as COLUMN_NAME, + pk.PKEY_TYPE as TYPE_NAME, + pk.PKEY_COMMENT as COMMENT + FROM PARTITION_KEYS pk + JOIN TBLS tbsl ON pk.TBL_ID = tbsl.TBL_ID + AND tbsl.TBL_NAME = '{table_name}' + {schema_join} + ) + -- Combine regular and partition columns + SELECT * FROM regular_columns + UNION ALL + SELECT * FROM partition_columns """ - if schema: - query += f""" join DBS db on tbsl.DB_ID = db.DB_ID - and db.NAME = '{schema}'""" return connection.execute(query).fetchall() diff --git a/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/postgres/dialect.py b/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/postgres/dialect.py index adebc71e70d..d25a7bf0335 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/postgres/dialect.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/metastore_dialects/postgres/dialect.py @@ -46,22 +46,46 @@ class HivePostgresMetaStoreDialect(HiveMetaStoreDialectMixin, PGDialect_psycopg2 return [row[0] for row in connection.execute(query)] def _get_table_columns(self, connection, table_name, schema): - query = f""" - SELECT - col."COLUMN_NAME", - col."TYPE_NAME", - col."COMMENT" - from - "COLUMNS_V2" col - join "CDS" cds ON col."CD_ID" = cds."CD_ID" - join "SDS" sds ON sds."CD_ID" = cds."CD_ID" - join "TBLS" tbsl on sds."SD_ID" = tbsl."SD_ID" - and tbsl."TBL_NAME" = '{table_name}' + # Build schema join clause if schema is provided + schema_join = ( + f""" + JOIN "DBS" db on tbsl."DB_ID" = db."DB_ID" + AND db."NAME" = '{schema}' """ - if schema: - query += f""" join "DBS" db on tbsl."DB_ID" = db."DB_ID" - and db."NAME" = '{schema}'""" + if schema + else "" + ) + query = f""" + WITH regular_columns AS ( + -- Get regular table columns from COLUMNS_V2 + SELECT + col."COLUMN_NAME", + col."TYPE_NAME", + col."COMMENT" + FROM "COLUMNS_V2" col + JOIN "CDS" cds ON col."CD_ID" = cds."CD_ID" + JOIN "SDS" sds ON sds."CD_ID" = cds."CD_ID" + JOIN "TBLS" tbsl ON sds."SD_ID" = tbsl."SD_ID" + AND tbsl."TBL_NAME" = '{table_name}' + {schema_join} + ), + partition_columns AS ( + -- Get partition key columns from PARTITION_KEYS + SELECT + pk."PKEY_NAME" as "COLUMN_NAME", + pk."PKEY_TYPE" as "TYPE_NAME", + pk."PKEY_COMMENT" as "COMMENT" + FROM "PARTITION_KEYS" pk + JOIN "TBLS" tbsl ON pk."TBL_ID" = tbsl."TBL_ID" + AND tbsl."TBL_NAME" = '{table_name}' + {schema_join} + ) + -- Combine regular and partition columns + SELECT * FROM regular_columns + UNION ALL + SELECT * FROM partition_columns + """ return connection.execute(query).fetchall() def _get_table_names_base_query(self, schema=None):