mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-21 14:59:57 +00:00
Hive: Updated postgres query to get partition field (#19563)
This commit is contained in:
parent
ca3fa6dcea
commit
12c005af96
@ -45,21 +45,43 @@ class HiveMysqlMetaStoreDialect(HiveMetaStoreDialectMixin, MySQLDialect_pymysql)
|
||||
return [row[0] for row in connection.execute(query)]
|
||||
|
||||
def _get_table_columns(self, connection, table_name, schema):
|
||||
schema_join = (
|
||||
f"""
|
||||
JOIN DBS db on tbsl.DB_ID = db.DB_ID
|
||||
AND db.NAME = '{schema}'
|
||||
"""
|
||||
if schema
|
||||
else ""
|
||||
)
|
||||
|
||||
query = f"""
|
||||
WITH regular_columns AS (
|
||||
SELECT
|
||||
col.COLUMN_NAME,
|
||||
col.TYPE_NAME,
|
||||
col.COMMENT
|
||||
from
|
||||
COLUMNS_V2 col
|
||||
join CDS cds ON col.CD_ID = cds.CD_ID
|
||||
join SDS sds ON sds.CD_ID = cds.CD_ID
|
||||
join TBLS tbsl on sds.SD_ID = tbsl.SD_ID
|
||||
and tbsl.TBL_NAME = '{table_name}'
|
||||
FROM COLUMNS_V2 col
|
||||
JOIN CDS cds ON col.CD_ID = cds.CD_ID
|
||||
JOIN SDS sds ON sds.CD_ID = cds.CD_ID
|
||||
JOIN TBLS tbsl ON sds.SD_ID = tbsl.SD_ID
|
||||
AND tbsl.TBL_NAME = '{table_name}'
|
||||
{schema_join}
|
||||
),
|
||||
partition_columns AS (
|
||||
SELECT
|
||||
pk.PKEY_NAME as COLUMN_NAME,
|
||||
pk.PKEY_TYPE as TYPE_NAME,
|
||||
pk.PKEY_COMMENT as COMMENT
|
||||
FROM PARTITION_KEYS pk
|
||||
JOIN TBLS tbsl ON pk.TBL_ID = tbsl.TBL_ID
|
||||
AND tbsl.TBL_NAME = '{table_name}'
|
||||
{schema_join}
|
||||
)
|
||||
-- Combine regular and partition columns
|
||||
SELECT * FROM regular_columns
|
||||
UNION ALL
|
||||
SELECT * FROM partition_columns
|
||||
"""
|
||||
if schema:
|
||||
query += f""" join DBS db on tbsl.DB_ID = db.DB_ID
|
||||
and db.NAME = '{schema}'"""
|
||||
|
||||
return connection.execute(query).fetchall()
|
||||
|
||||
|
@ -46,22 +46,46 @@ class HivePostgresMetaStoreDialect(HiveMetaStoreDialectMixin, PGDialect_psycopg2
|
||||
return [row[0] for row in connection.execute(query)]
|
||||
|
||||
def _get_table_columns(self, connection, table_name, schema):
|
||||
# Build schema join clause if schema is provided
|
||||
schema_join = (
|
||||
f"""
|
||||
JOIN "DBS" db on tbsl."DB_ID" = db."DB_ID"
|
||||
AND db."NAME" = '{schema}'
|
||||
"""
|
||||
if schema
|
||||
else ""
|
||||
)
|
||||
|
||||
query = f"""
|
||||
WITH regular_columns AS (
|
||||
-- Get regular table columns from COLUMNS_V2
|
||||
SELECT
|
||||
col."COLUMN_NAME",
|
||||
col."TYPE_NAME",
|
||||
col."COMMENT"
|
||||
from
|
||||
"COLUMNS_V2" col
|
||||
join "CDS" cds ON col."CD_ID" = cds."CD_ID"
|
||||
join "SDS" sds ON sds."CD_ID" = cds."CD_ID"
|
||||
join "TBLS" tbsl on sds."SD_ID" = tbsl."SD_ID"
|
||||
and tbsl."TBL_NAME" = '{table_name}'
|
||||
FROM "COLUMNS_V2" col
|
||||
JOIN "CDS" cds ON col."CD_ID" = cds."CD_ID"
|
||||
JOIN "SDS" sds ON sds."CD_ID" = cds."CD_ID"
|
||||
JOIN "TBLS" tbsl ON sds."SD_ID" = tbsl."SD_ID"
|
||||
AND tbsl."TBL_NAME" = '{table_name}'
|
||||
{schema_join}
|
||||
),
|
||||
partition_columns AS (
|
||||
-- Get partition key columns from PARTITION_KEYS
|
||||
SELECT
|
||||
pk."PKEY_NAME" as "COLUMN_NAME",
|
||||
pk."PKEY_TYPE" as "TYPE_NAME",
|
||||
pk."PKEY_COMMENT" as "COMMENT"
|
||||
FROM "PARTITION_KEYS" pk
|
||||
JOIN "TBLS" tbsl ON pk."TBL_ID" = tbsl."TBL_ID"
|
||||
AND tbsl."TBL_NAME" = '{table_name}'
|
||||
{schema_join}
|
||||
)
|
||||
-- Combine regular and partition columns
|
||||
SELECT * FROM regular_columns
|
||||
UNION ALL
|
||||
SELECT * FROM partition_columns
|
||||
"""
|
||||
if schema:
|
||||
query += f""" join "DBS" db on tbsl."DB_ID" = db."DB_ID"
|
||||
and db."NAME" = '{schema}'"""
|
||||
|
||||
return connection.execute(query).fetchall()
|
||||
|
||||
def _get_table_names_base_query(self, schema=None):
|
||||
|
Loading…
x
Reference in New Issue
Block a user