From f13eb4e273a216fbce7e1e5d15de2c5cfec6966a Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Thu, 23 Jan 2025 22:18:25 +0530 Subject: [PATCH] Fix :postgres parse json schemas (#19487) --- .../source/database/postgres/metadata.py | 2 - .../source/database/postgres/queries.py | 85 ------------------- .../source/database/postgres/utils.py | 26 ------ .../source/database/sql_column_handler.py | 25 ------ 4 files changed, 138 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index abe7695181d..8104fa2e768 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -70,7 +70,6 @@ from metadata.ingestion.source.database.postgres.utils import ( get_columns, get_etable_owner, get_foreign_keys, - get_json_fields_and_type, get_table_comment, get_table_owner, get_view_definition, @@ -113,7 +112,6 @@ PGDialect.ischema_names = ischema_names Inspector.get_all_table_ddls = get_all_table_ddls Inspector.get_table_ddl = get_table_ddl Inspector.get_table_owner = get_etable_owner -Inspector.get_json_fields_and_type = get_json_fields_and_type PGDialect.get_foreign_keys = get_foreign_keys diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index ec50272846b..03a66ede0a3 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -212,91 +212,6 @@ POSTGRES_FETCH_FK = """ ORDER BY 1 """ -POSTGRES_GET_JSON_FIELDS = """ - WITH RECURSIVE json_hierarchy AS ( - SELECT - key AS path, - json_typeof(value) AS type, - value, - json_build_object() AS properties, - key AS title - FROM - {table_name} tbd, - LATERAL json_each({column_name}::json) - ), - build_hierarchy AS ( - SELECT - path, - type, - title, - CASE - WHEN type = 'object' THEN - json_build_object( - 'title', title, - 'type', 'object', - 'properties', ( - SELECT json_object_agg( - key, - json_build_object( - 'title', key, - 'type', json_typeof(value), - 'properties', ( - CASE - WHEN json_typeof(value) = 'object' THEN - ( - SELECT json_object_agg( - key, - json_build_object( - 'title', key, - 'type', json_typeof(value), - 'properties', - json_build_object() - ) - ) - FROM json_each(value::json) AS sub_key_value - ) - ELSE json_build_object() - END - ) - ) - ) - FROM json_each(value::json) AS key_value - ) - ) - WHEN type = 'array' THEN - json_build_object( - 'title', title, - 'type', 'array', - 'properties', json_build_object() - ) - ELSE - json_build_object( - 'title', title, - 'type', type - ) - END AS hierarchy - FROM - json_hierarchy - ), - aggregate_hierarchy AS ( - select - json_build_object( - 'title','{column_name}', - 'type','object', - 'properties', - json_object_agg( - path, - hierarchy - )) AS result - FROM - build_hierarchy - ) - SELECT - result - FROM - aggregate_hierarchy; -""" - POSTGRES_GET_STORED_PROCEDURES = """ SELECT proname AS procedure_name, nspname AS schema_name, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py index aed0322d315..6222dfa493a 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py @@ -13,7 +13,6 @@ """ Postgres SQLAlchemy util methods """ -import json import re import traceback from typing import Dict, Optional, Tuple @@ -24,18 +23,15 @@ from sqlalchemy.dialects.postgresql.base import ENUM from sqlalchemy.engine import reflection from sqlalchemy.sql import sqltypes -from metadata.generated.schema.entity.data.table import Column from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_COL_IDENTITY, POSTGRES_FETCH_FK, - POSTGRES_GET_JSON_FIELDS, POSTGRES_GET_SERVER_VERSION, POSTGRES_SQL_COLUMNS, POSTGRES_TABLE_COMMENTS, POSTGRES_TABLE_OWNERS, POSTGRES_VIEW_DEFINITIONS, ) -from metadata.parsers.json_schema_parser import parse_json_schema from metadata.utils.logger import utils_logger from metadata.utils.sqlalchemy_utils import ( get_table_comment_wrapper, @@ -190,28 +186,6 @@ def get_table_comment( ) -@reflection.cache -def get_json_fields_and_type( - self, table_name, column_name, schema=None, **kw -): # pylint: disable=unused-argument - try: - query = POSTGRES_GET_JSON_FIELDS.format( - table_name=table_name, column_name=column_name - ) - cursor = self.engine.execute(query) - result = cursor.fetchone() - if result: - parsed_column = parse_json_schema(json.dumps(result[0]), Column) - if parsed_column: - return parsed_column[0].children - except Exception as err: - logger.warning( - f"Unable to parse the json fields for {table_name}.{column_name} - {err}" - ) - logger.debug(traceback.format_exc()) - return None - - @reflection.cache def get_columns( # pylint: disable=too-many-locals self, connection, table_name, schema=None, **kw diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index 6fc98ff1859..ee81183fd71 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -202,26 +202,6 @@ class SqlColumnHandlerMixin: ] return Column(**parsed_string) - @calculate_execution_time() - def process_json_type_column_fields( # pylint: disable=too-many-locals - self, schema_name: str, table_name: str, column_name: str, inspector: Inspector - ) -> Optional[List[Column]]: - """ - Parse fields column with json data types - """ - try: - if hasattr(inspector, "get_json_fields_and_type"): - result = inspector.get_json_fields_and_type( - table_name, column_name, schema_name - ) - return result - - except NotImplementedError: - logger.debug( - "Cannot parse json fields for table column [{schema_name}.{table_name}.{col_name}]: NotImplementedError" - ) - return None - @calculate_execution_time() def get_columns_and_constraints( # pylint: disable=too-many-locals self, schema_name: str, table_name: str, db_name: str, inspector: Inspector @@ -298,11 +278,6 @@ class SqlColumnHandlerMixin: ) col_data_length = 1 if col_data_length is None else col_data_length - if col_type == "JSON": - children = self.process_json_type_column_fields( - schema_name, table_name, column.get("name"), inspector - ) - om_column = Column( name=ColumnName( root=column["name"]