mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-27 16:55:06 +00:00
Fixes: Parse postgres json column fields (#17645)
This commit is contained in:
parent
46e98e13c4
commit
e93cf23d6c
@ -59,6 +59,7 @@ from metadata.ingestion.source.database.postgres.utils import (
|
||||
get_columns,
|
||||
get_etable_owner,
|
||||
get_foreign_keys,
|
||||
get_json_fields_and_type,
|
||||
get_table_comment,
|
||||
get_table_owner,
|
||||
get_view_definition,
|
||||
@ -138,6 +139,7 @@ PGDialect.ischema_names = ischema_names
|
||||
Inspector.get_all_table_ddls = get_all_table_ddls
|
||||
Inspector.get_table_ddl = get_table_ddl
|
||||
Inspector.get_table_owner = get_etable_owner
|
||||
Inspector.get_json_fields_and_type = get_json_fields_and_type
|
||||
|
||||
PGDialect.get_foreign_keys = get_foreign_keys
|
||||
|
||||
|
||||
@ -211,3 +211,88 @@ POSTGRES_FETCH_FK = """
|
||||
n.oid = c.relnamespace
|
||||
ORDER BY 1
|
||||
"""
|
||||
|
||||
POSTGRES_GET_JSON_FIELDS = """
|
||||
WITH RECURSIVE json_hierarchy AS (
|
||||
SELECT
|
||||
key AS path,
|
||||
json_typeof(value) AS type,
|
||||
value,
|
||||
json_build_object() AS properties,
|
||||
key AS title
|
||||
FROM
|
||||
{table_name} tbd,
|
||||
LATERAL json_each({column_name}::json)
|
||||
),
|
||||
build_hierarchy AS (
|
||||
SELECT
|
||||
path,
|
||||
type,
|
||||
title,
|
||||
CASE
|
||||
WHEN type = 'object' THEN
|
||||
json_build_object(
|
||||
'title', title,
|
||||
'type', 'object',
|
||||
'properties', (
|
||||
SELECT json_object_agg(
|
||||
key,
|
||||
json_build_object(
|
||||
'title', key,
|
||||
'type', json_typeof(value),
|
||||
'properties', (
|
||||
CASE
|
||||
WHEN json_typeof(value) = 'object' THEN
|
||||
(
|
||||
SELECT json_object_agg(
|
||||
key,
|
||||
json_build_object(
|
||||
'title', key,
|
||||
'type', json_typeof(value),
|
||||
'properties',
|
||||
json_build_object()
|
||||
)
|
||||
)
|
||||
FROM json_each(value::json) AS sub_key_value
|
||||
)
|
||||
ELSE json_build_object()
|
||||
END
|
||||
)
|
||||
)
|
||||
)
|
||||
FROM json_each(value::json) AS key_value
|
||||
)
|
||||
)
|
||||
WHEN type = 'array' THEN
|
||||
json_build_object(
|
||||
'title', title,
|
||||
'type', 'array',
|
||||
'properties', json_build_object()
|
||||
)
|
||||
ELSE
|
||||
json_build_object(
|
||||
'title', title,
|
||||
'type', type
|
||||
)
|
||||
END AS hierarchy
|
||||
FROM
|
||||
json_hierarchy
|
||||
),
|
||||
aggregate_hierarchy AS (
|
||||
select
|
||||
json_build_object(
|
||||
'title','{column_name}',
|
||||
'type','object',
|
||||
'properties',
|
||||
json_object_agg(
|
||||
path,
|
||||
hierarchy
|
||||
)) AS result
|
||||
FROM
|
||||
build_hierarchy
|
||||
)
|
||||
SELECT
|
||||
result
|
||||
FROM
|
||||
aggregate_hierarchy;
|
||||
"""
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
"""
|
||||
Postgres SQLAlchemy util methods
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
import traceback
|
||||
from typing import Dict, Optional, Tuple
|
||||
@ -23,15 +24,18 @@ from sqlalchemy.dialects.postgresql.base import ENUM
|
||||
from sqlalchemy.engine import reflection
|
||||
from sqlalchemy.sql import sqltypes
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.ingestion.source.database.postgres.queries import (
|
||||
POSTGRES_COL_IDENTITY,
|
||||
POSTGRES_FETCH_FK,
|
||||
POSTGRES_GET_JSON_FIELDS,
|
||||
POSTGRES_GET_SERVER_VERSION,
|
||||
POSTGRES_SQL_COLUMNS,
|
||||
POSTGRES_TABLE_COMMENTS,
|
||||
POSTGRES_TABLE_OWNERS,
|
||||
POSTGRES_VIEW_DEFINITIONS,
|
||||
)
|
||||
from metadata.parsers.json_schema_parser import parse_json_schema
|
||||
from metadata.utils.logger import utils_logger
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_table_comment_wrapper,
|
||||
@ -186,6 +190,28 @@ def get_table_comment(
|
||||
)
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_json_fields_and_type(
|
||||
self, table_name, column_name, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
try:
|
||||
query = POSTGRES_GET_JSON_FIELDS.format(
|
||||
table_name=table_name, column_name=column_name
|
||||
)
|
||||
cursor = self.engine.execute(query)
|
||||
result = cursor.fetchone()
|
||||
if result:
|
||||
parsed_column = parse_json_schema(json.dumps(result[0]), Column)
|
||||
if parsed_column:
|
||||
return parsed_column[0].children
|
||||
except Exception as err:
|
||||
logger.warning(
|
||||
f"Unable to parse the json fields for {table_name}.{column_name} - {err}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
return None
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_columns( # pylint: disable=too-many-locals
|
||||
self, connection, table_name, schema=None, **kw
|
||||
|
||||
@ -196,6 +196,26 @@ class SqlColumnHandlerMixin:
|
||||
]
|
||||
return Column(**parsed_string)
|
||||
|
||||
@calculate_execution_time()
|
||||
def process_json_type_column_fields( # pylint: disable=too-many-locals
|
||||
self, schema_name: str, table_name: str, column_name: str, inspector: Inspector
|
||||
) -> Optional[List[Column]]:
|
||||
"""
|
||||
Parse fields column with json data types
|
||||
"""
|
||||
try:
|
||||
if hasattr(inspector, "get_json_fields_and_type"):
|
||||
result = inspector.get_json_fields_and_type(
|
||||
table_name, column_name, schema_name
|
||||
)
|
||||
return result
|
||||
|
||||
except NotImplementedError:
|
||||
logger.debug(
|
||||
"Cannot parse json fields for table column [{schema_name}.{table_name}.{col_name}]: NotImplementedError"
|
||||
)
|
||||
return None
|
||||
|
||||
@calculate_execution_time()
|
||||
def get_columns_and_constraints( # pylint: disable=too-many-locals
|
||||
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
|
||||
@ -271,6 +291,12 @@ class SqlColumnHandlerMixin:
|
||||
precision,
|
||||
)
|
||||
col_data_length = 1 if col_data_length is None else col_data_length
|
||||
|
||||
if col_type == "JSON":
|
||||
children = self.process_json_type_column_fields(
|
||||
schema_name, table_name, column.get("name"), inspector
|
||||
)
|
||||
|
||||
om_column = Column(
|
||||
name=ColumnName(
|
||||
root=column["name"]
|
||||
|
||||
@ -81,7 +81,7 @@ def get_json_schema_fields(
|
||||
displayName=value.get("title"),
|
||||
dataType=JsonSchemaDataTypes(value.get("type", "unknown")).name,
|
||||
description=value.get("description"),
|
||||
children=get_json_schema_fields(value.get("properties"))
|
||||
children=get_json_schema_fields(value.get("properties"), cls=cls)
|
||||
if value.get("type") == "object"
|
||||
else None,
|
||||
)
|
||||
|
||||
@ -14,6 +14,7 @@ Jsonschema parser tests
|
||||
"""
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.parsers.json_schema_parser import parse_json_schema
|
||||
|
||||
|
||||
@ -47,7 +48,51 @@ class JsonSchemaParserTests(TestCase):
|
||||
}
|
||||
}"""
|
||||
|
||||
sample_postgres_json_schema = """{
|
||||
"title": "review_details",
|
||||
"type": "object",
|
||||
"properties":
|
||||
{
|
||||
"staff": {
|
||||
"title": "staff",
|
||||
"type": "array",
|
||||
"properties": {}
|
||||
},
|
||||
"services": {
|
||||
"title": "services",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"lunch": {
|
||||
"title": "lunch",
|
||||
"type": "string",
|
||||
"properties": {}
|
||||
},
|
||||
"check_in": {
|
||||
"title": "check_in",
|
||||
"type": "string",
|
||||
"properties": {}
|
||||
},
|
||||
"check_out": {
|
||||
"title": "check_out",
|
||||
"type": "string",
|
||||
"properties": {}
|
||||
},
|
||||
"additional_services": {
|
||||
"title": "additional_services",
|
||||
"type": "array",
|
||||
"properties": {}
|
||||
}
|
||||
}
|
||||
},
|
||||
"overall_experience": {
|
||||
"title": "overall_experience",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}"""
|
||||
|
||||
parsed_schema = parse_json_schema(sample_json_schema)
|
||||
parsed_postgres_schema = parse_json_schema(sample_postgres_json_schema, Column)
|
||||
|
||||
def test_schema_name(self):
|
||||
self.assertEqual(self.parsed_schema[0].name.root, "Person")
|
||||
@ -83,3 +128,18 @@ class JsonSchemaParserTests(TestCase):
|
||||
"Age in years which must be equal to or greater than zero.",
|
||||
},
|
||||
)
|
||||
|
||||
def test_parse_postgres_json_fields(self):
|
||||
self.assertEqual(self.parsed_postgres_schema[0].name.root, "review_details")
|
||||
self.assertEqual(self.parsed_postgres_schema[0].children[0].name.root, "staff")
|
||||
self.assertEqual(
|
||||
self.parsed_postgres_schema[0].children[1].name.root, "services"
|
||||
)
|
||||
self.assertEqual(
|
||||
self.parsed_postgres_schema[0].children[1].children[0].name.root, "lunch"
|
||||
)
|
||||
self.assertEqual(
|
||||
self.parsed_postgres_schema[0].children[1].dataType.name, "RECORD"
|
||||
)
|
||||
self.assertEqual(len(self.parsed_postgres_schema[0].children), 3)
|
||||
self.assertEqual(len(self.parsed_postgres_schema[0].children[1].children), 4)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user