diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 7e646e8e03c..3c6e5f85299 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -59,6 +59,7 @@ from metadata.ingestion.source.database.postgres.utils import ( get_columns, get_etable_owner, get_foreign_keys, + get_json_fields_and_type, get_table_comment, get_table_owner, get_view_definition, @@ -138,6 +139,7 @@ PGDialect.ischema_names = ischema_names Inspector.get_all_table_ddls = get_all_table_ddls Inspector.get_table_ddl = get_table_ddl Inspector.get_table_owner = get_etable_owner +Inspector.get_json_fields_and_type = get_json_fields_and_type PGDialect.get_foreign_keys = get_foreign_keys diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index c78cc313fa9..2ac50a92149 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -211,3 +211,88 @@ POSTGRES_FETCH_FK = """ n.oid = c.relnamespace ORDER BY 1 """ + +POSTGRES_GET_JSON_FIELDS = """ + WITH RECURSIVE json_hierarchy AS ( + SELECT + key AS path, + json_typeof(value) AS type, + value, + json_build_object() AS properties, + key AS title + FROM + {table_name} tbd, + LATERAL json_each({column_name}::json) + ), + build_hierarchy AS ( + SELECT + path, + type, + title, + CASE + WHEN type = 'object' THEN + json_build_object( + 'title', title, + 'type', 'object', + 'properties', ( + SELECT json_object_agg( + key, + json_build_object( + 'title', key, + 'type', json_typeof(value), + 'properties', ( + CASE + WHEN json_typeof(value) = 'object' THEN + ( + SELECT json_object_agg( + key, + json_build_object( + 'title', key, + 'type', json_typeof(value), + 'properties', + json_build_object() + ) + ) + FROM json_each(value::json) AS sub_key_value + ) + ELSE json_build_object() + END + ) + ) + ) + FROM json_each(value::json) AS key_value + ) + ) + WHEN type = 'array' THEN + json_build_object( + 'title', title, + 'type', 'array', + 'properties', json_build_object() + ) + ELSE + json_build_object( + 'title', title, + 'type', type + ) + END AS hierarchy + FROM + json_hierarchy + ), + aggregate_hierarchy AS ( + select + json_build_object( + 'title','{column_name}', + 'type','object', + 'properties', + json_object_agg( + path, + hierarchy + )) AS result + FROM + build_hierarchy + ) + SELECT + result + FROM + aggregate_hierarchy; +""" diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py index 13920e9b9ac..246519c00e2 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py @@ -13,6 +13,7 @@ """ Postgres SQLAlchemy util methods """ +import json import re import traceback from typing import Dict, Optional, Tuple @@ -23,15 +24,18 @@ from sqlalchemy.dialects.postgresql.base import ENUM from sqlalchemy.engine import reflection from sqlalchemy.sql import sqltypes +from metadata.generated.schema.entity.data.table import Column from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_COL_IDENTITY, POSTGRES_FETCH_FK, + POSTGRES_GET_JSON_FIELDS, POSTGRES_GET_SERVER_VERSION, POSTGRES_SQL_COLUMNS, POSTGRES_TABLE_COMMENTS, POSTGRES_TABLE_OWNERS, POSTGRES_VIEW_DEFINITIONS, ) +from metadata.parsers.json_schema_parser import parse_json_schema from metadata.utils.logger import utils_logger from metadata.utils.sqlalchemy_utils import ( get_table_comment_wrapper, @@ -186,6 +190,28 @@ def get_table_comment( ) +@reflection.cache +def get_json_fields_and_type( + self, table_name, column_name, schema=None, **kw +): # pylint: disable=unused-argument + try: + query = POSTGRES_GET_JSON_FIELDS.format( + table_name=table_name, column_name=column_name + ) + cursor = self.engine.execute(query) + result = cursor.fetchone() + if result: + parsed_column = parse_json_schema(json.dumps(result[0]), Column) + if parsed_column: + return parsed_column[0].children + except Exception as err: + logger.warning( + f"Unable to parse the json fields for {table_name}.{column_name} - {err}" + ) + logger.debug(traceback.format_exc()) + return None + + @reflection.cache def get_columns( # pylint: disable=too-many-locals self, connection, table_name, schema=None, **kw diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index b5b7e962b78..0f1ef012f9a 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -196,6 +196,26 @@ class SqlColumnHandlerMixin: ] return Column(**parsed_string) + @calculate_execution_time() + def process_json_type_column_fields( # pylint: disable=too-many-locals + self, schema_name: str, table_name: str, column_name: str, inspector: Inspector + ) -> Optional[List[Column]]: + """ + Parse fields column with json data types + """ + try: + if hasattr(inspector, "get_json_fields_and_type"): + result = inspector.get_json_fields_and_type( + table_name, column_name, schema_name + ) + return result + + except NotImplementedError: + logger.debug( + "Cannot parse json fields for table column [{schema_name}.{table_name}.{col_name}]: NotImplementedError" + ) + return None + @calculate_execution_time() def get_columns_and_constraints( # pylint: disable=too-many-locals self, schema_name: str, table_name: str, db_name: str, inspector: Inspector @@ -271,6 +291,12 @@ class SqlColumnHandlerMixin: precision, ) col_data_length = 1 if col_data_length is None else col_data_length + + if col_type == "JSON": + children = self.process_json_type_column_fields( + schema_name, table_name, column.get("name"), inspector + ) + om_column = Column( name=ColumnName( root=column["name"] diff --git a/ingestion/src/metadata/parsers/json_schema_parser.py b/ingestion/src/metadata/parsers/json_schema_parser.py index de5fd4b7d7b..412d3be013b 100644 --- a/ingestion/src/metadata/parsers/json_schema_parser.py +++ b/ingestion/src/metadata/parsers/json_schema_parser.py @@ -81,7 +81,7 @@ def get_json_schema_fields( displayName=value.get("title"), dataType=JsonSchemaDataTypes(value.get("type", "unknown")).name, description=value.get("description"), - children=get_json_schema_fields(value.get("properties")) + children=get_json_schema_fields(value.get("properties"), cls=cls) if value.get("type") == "object" else None, ) diff --git a/ingestion/tests/unit/test_json_schema_parser.py b/ingestion/tests/unit/test_json_schema_parser.py index bba1d94325e..fd1bb1d9ca8 100644 --- a/ingestion/tests/unit/test_json_schema_parser.py +++ b/ingestion/tests/unit/test_json_schema_parser.py @@ -14,6 +14,7 @@ Jsonschema parser tests """ from unittest import TestCase +from metadata.generated.schema.entity.data.table import Column from metadata.parsers.json_schema_parser import parse_json_schema @@ -47,7 +48,51 @@ class JsonSchemaParserTests(TestCase): } }""" + sample_postgres_json_schema = """{ + "title": "review_details", + "type": "object", + "properties": + { + "staff": { + "title": "staff", + "type": "array", + "properties": {} + }, + "services": { + "title": "services", + "type": "object", + "properties": { + "lunch": { + "title": "lunch", + "type": "string", + "properties": {} + }, + "check_in": { + "title": "check_in", + "type": "string", + "properties": {} + }, + "check_out": { + "title": "check_out", + "type": "string", + "properties": {} + }, + "additional_services": { + "title": "additional_services", + "type": "array", + "properties": {} + } + } + }, + "overall_experience": { + "title": "overall_experience", + "type": "string" + } + } + }""" + parsed_schema = parse_json_schema(sample_json_schema) + parsed_postgres_schema = parse_json_schema(sample_postgres_json_schema, Column) def test_schema_name(self): self.assertEqual(self.parsed_schema[0].name.root, "Person") @@ -83,3 +128,18 @@ class JsonSchemaParserTests(TestCase): "Age in years which must be equal to or greater than zero.", }, ) + + def test_parse_postgres_json_fields(self): + self.assertEqual(self.parsed_postgres_schema[0].name.root, "review_details") + self.assertEqual(self.parsed_postgres_schema[0].children[0].name.root, "staff") + self.assertEqual( + self.parsed_postgres_schema[0].children[1].name.root, "services" + ) + self.assertEqual( + self.parsed_postgres_schema[0].children[1].children[0].name.root, "lunch" + ) + self.assertEqual( + self.parsed_postgres_schema[0].children[1].dataType.name, "RECORD" + ) + self.assertEqual(len(self.parsed_postgres_schema[0].children), 3) + self.assertEqual(len(self.parsed_postgres_schema[0].children[1].children), 4)