mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-26 17:34:41 +00:00
Fix :postgres parse json schemas (#19487)
This commit is contained in:
parent
9a167c7c3f
commit
f13eb4e273
@ -70,7 +70,6 @@ from metadata.ingestion.source.database.postgres.utils import (
|
|||||||
get_columns,
|
get_columns,
|
||||||
get_etable_owner,
|
get_etable_owner,
|
||||||
get_foreign_keys,
|
get_foreign_keys,
|
||||||
get_json_fields_and_type,
|
|
||||||
get_table_comment,
|
get_table_comment,
|
||||||
get_table_owner,
|
get_table_owner,
|
||||||
get_view_definition,
|
get_view_definition,
|
||||||
@ -113,7 +112,6 @@ PGDialect.ischema_names = ischema_names
|
|||||||
Inspector.get_all_table_ddls = get_all_table_ddls
|
Inspector.get_all_table_ddls = get_all_table_ddls
|
||||||
Inspector.get_table_ddl = get_table_ddl
|
Inspector.get_table_ddl = get_table_ddl
|
||||||
Inspector.get_table_owner = get_etable_owner
|
Inspector.get_table_owner = get_etable_owner
|
||||||
Inspector.get_json_fields_and_type = get_json_fields_and_type
|
|
||||||
|
|
||||||
PGDialect.get_foreign_keys = get_foreign_keys
|
PGDialect.get_foreign_keys = get_foreign_keys
|
||||||
|
|
||||||
|
@ -212,91 +212,6 @@ POSTGRES_FETCH_FK = """
|
|||||||
ORDER BY 1
|
ORDER BY 1
|
||||||
"""
|
"""
|
||||||
|
|
||||||
POSTGRES_GET_JSON_FIELDS = """
|
|
||||||
WITH RECURSIVE json_hierarchy AS (
|
|
||||||
SELECT
|
|
||||||
key AS path,
|
|
||||||
json_typeof(value) AS type,
|
|
||||||
value,
|
|
||||||
json_build_object() AS properties,
|
|
||||||
key AS title
|
|
||||||
FROM
|
|
||||||
{table_name} tbd,
|
|
||||||
LATERAL json_each({column_name}::json)
|
|
||||||
),
|
|
||||||
build_hierarchy AS (
|
|
||||||
SELECT
|
|
||||||
path,
|
|
||||||
type,
|
|
||||||
title,
|
|
||||||
CASE
|
|
||||||
WHEN type = 'object' THEN
|
|
||||||
json_build_object(
|
|
||||||
'title', title,
|
|
||||||
'type', 'object',
|
|
||||||
'properties', (
|
|
||||||
SELECT json_object_agg(
|
|
||||||
key,
|
|
||||||
json_build_object(
|
|
||||||
'title', key,
|
|
||||||
'type', json_typeof(value),
|
|
||||||
'properties', (
|
|
||||||
CASE
|
|
||||||
WHEN json_typeof(value) = 'object' THEN
|
|
||||||
(
|
|
||||||
SELECT json_object_agg(
|
|
||||||
key,
|
|
||||||
json_build_object(
|
|
||||||
'title', key,
|
|
||||||
'type', json_typeof(value),
|
|
||||||
'properties',
|
|
||||||
json_build_object()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
FROM json_each(value::json) AS sub_key_value
|
|
||||||
)
|
|
||||||
ELSE json_build_object()
|
|
||||||
END
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
FROM json_each(value::json) AS key_value
|
|
||||||
)
|
|
||||||
)
|
|
||||||
WHEN type = 'array' THEN
|
|
||||||
json_build_object(
|
|
||||||
'title', title,
|
|
||||||
'type', 'array',
|
|
||||||
'properties', json_build_object()
|
|
||||||
)
|
|
||||||
ELSE
|
|
||||||
json_build_object(
|
|
||||||
'title', title,
|
|
||||||
'type', type
|
|
||||||
)
|
|
||||||
END AS hierarchy
|
|
||||||
FROM
|
|
||||||
json_hierarchy
|
|
||||||
),
|
|
||||||
aggregate_hierarchy AS (
|
|
||||||
select
|
|
||||||
json_build_object(
|
|
||||||
'title','{column_name}',
|
|
||||||
'type','object',
|
|
||||||
'properties',
|
|
||||||
json_object_agg(
|
|
||||||
path,
|
|
||||||
hierarchy
|
|
||||||
)) AS result
|
|
||||||
FROM
|
|
||||||
build_hierarchy
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
result
|
|
||||||
FROM
|
|
||||||
aggregate_hierarchy;
|
|
||||||
"""
|
|
||||||
|
|
||||||
POSTGRES_GET_STORED_PROCEDURES = """
|
POSTGRES_GET_STORED_PROCEDURES = """
|
||||||
SELECT proname AS procedure_name,
|
SELECT proname AS procedure_name,
|
||||||
nspname AS schema_name,
|
nspname AS schema_name,
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
"""
|
"""
|
||||||
Postgres SQLAlchemy util methods
|
Postgres SQLAlchemy util methods
|
||||||
"""
|
"""
|
||||||
import json
|
|
||||||
import re
|
import re
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Dict, Optional, Tuple
|
from typing import Dict, Optional, Tuple
|
||||||
@ -24,18 +23,15 @@ from sqlalchemy.dialects.postgresql.base import ENUM
|
|||||||
from sqlalchemy.engine import reflection
|
from sqlalchemy.engine import reflection
|
||||||
from sqlalchemy.sql import sqltypes
|
from sqlalchemy.sql import sqltypes
|
||||||
|
|
||||||
from metadata.generated.schema.entity.data.table import Column
|
|
||||||
from metadata.ingestion.source.database.postgres.queries import (
|
from metadata.ingestion.source.database.postgres.queries import (
|
||||||
POSTGRES_COL_IDENTITY,
|
POSTGRES_COL_IDENTITY,
|
||||||
POSTGRES_FETCH_FK,
|
POSTGRES_FETCH_FK,
|
||||||
POSTGRES_GET_JSON_FIELDS,
|
|
||||||
POSTGRES_GET_SERVER_VERSION,
|
POSTGRES_GET_SERVER_VERSION,
|
||||||
POSTGRES_SQL_COLUMNS,
|
POSTGRES_SQL_COLUMNS,
|
||||||
POSTGRES_TABLE_COMMENTS,
|
POSTGRES_TABLE_COMMENTS,
|
||||||
POSTGRES_TABLE_OWNERS,
|
POSTGRES_TABLE_OWNERS,
|
||||||
POSTGRES_VIEW_DEFINITIONS,
|
POSTGRES_VIEW_DEFINITIONS,
|
||||||
)
|
)
|
||||||
from metadata.parsers.json_schema_parser import parse_json_schema
|
|
||||||
from metadata.utils.logger import utils_logger
|
from metadata.utils.logger import utils_logger
|
||||||
from metadata.utils.sqlalchemy_utils import (
|
from metadata.utils.sqlalchemy_utils import (
|
||||||
get_table_comment_wrapper,
|
get_table_comment_wrapper,
|
||||||
@ -190,28 +186,6 @@ def get_table_comment(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@reflection.cache
|
|
||||||
def get_json_fields_and_type(
|
|
||||||
self, table_name, column_name, schema=None, **kw
|
|
||||||
): # pylint: disable=unused-argument
|
|
||||||
try:
|
|
||||||
query = POSTGRES_GET_JSON_FIELDS.format(
|
|
||||||
table_name=table_name, column_name=column_name
|
|
||||||
)
|
|
||||||
cursor = self.engine.execute(query)
|
|
||||||
result = cursor.fetchone()
|
|
||||||
if result:
|
|
||||||
parsed_column = parse_json_schema(json.dumps(result[0]), Column)
|
|
||||||
if parsed_column:
|
|
||||||
return parsed_column[0].children
|
|
||||||
except Exception as err:
|
|
||||||
logger.warning(
|
|
||||||
f"Unable to parse the json fields for {table_name}.{column_name} - {err}"
|
|
||||||
)
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
@reflection.cache
|
@reflection.cache
|
||||||
def get_columns( # pylint: disable=too-many-locals
|
def get_columns( # pylint: disable=too-many-locals
|
||||||
self, connection, table_name, schema=None, **kw
|
self, connection, table_name, schema=None, **kw
|
||||||
|
@ -202,26 +202,6 @@ class SqlColumnHandlerMixin:
|
|||||||
]
|
]
|
||||||
return Column(**parsed_string)
|
return Column(**parsed_string)
|
||||||
|
|
||||||
@calculate_execution_time()
|
|
||||||
def process_json_type_column_fields( # pylint: disable=too-many-locals
|
|
||||||
self, schema_name: str, table_name: str, column_name: str, inspector: Inspector
|
|
||||||
) -> Optional[List[Column]]:
|
|
||||||
"""
|
|
||||||
Parse fields column with json data types
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if hasattr(inspector, "get_json_fields_and_type"):
|
|
||||||
result = inspector.get_json_fields_and_type(
|
|
||||||
table_name, column_name, schema_name
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
except NotImplementedError:
|
|
||||||
logger.debug(
|
|
||||||
"Cannot parse json fields for table column [{schema_name}.{table_name}.{col_name}]: NotImplementedError"
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
@calculate_execution_time()
|
@calculate_execution_time()
|
||||||
def get_columns_and_constraints( # pylint: disable=too-many-locals
|
def get_columns_and_constraints( # pylint: disable=too-many-locals
|
||||||
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
|
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
|
||||||
@ -298,11 +278,6 @@ class SqlColumnHandlerMixin:
|
|||||||
)
|
)
|
||||||
col_data_length = 1 if col_data_length is None else col_data_length
|
col_data_length = 1 if col_data_length is None else col_data_length
|
||||||
|
|
||||||
if col_type == "JSON":
|
|
||||||
children = self.process_json_type_column_fields(
|
|
||||||
schema_name, table_name, column.get("name"), inspector
|
|
||||||
)
|
|
||||||
|
|
||||||
om_column = Column(
|
om_column = Column(
|
||||||
name=ColumnName(
|
name=ColumnName(
|
||||||
root=column["name"]
|
root=column["name"]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user