From 3a6217a7e93dd4155909dbcf2875781625d66129 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 27 Jan 2022 18:59:35 +0530 Subject: [PATCH] Fixed data profiling and raw data type issue (#2474) * Fixed data profiling and raw data type issue * Update metadata_rest.py * Update sql_source.py --- ingestion/src/metadata/cli/docker.py | 4 +-- .../metadata/ingestion/sink/metadata_rest.py | 2 +- .../src/metadata/ingestion/source/glue.py | 8 +++--- .../src/metadata/ingestion/source/metabase.py | 2 +- .../metadata/ingestion/source/sql_source.py | 26 +++++++++---------- .../profiler/common/database_common.py | 2 +- .../src/metadata/profiler/dataprofiler.py | 2 +- .../src/metadata/utils/column_type_parser.py | 4 ++- 8 files changed, 25 insertions(+), 25 deletions(-) diff --git a/ingestion/src/metadata/cli/docker.py b/ingestion/src/metadata/cli/docker.py index 401a319082a..00013e5a747 100644 --- a/ingestion/src/metadata/cli/docker.py +++ b/ingestion/src/metadata/cli/docker.py @@ -149,6 +149,6 @@ def run_docker(start, stop, pause, resume, clean, file_path): fg="red", ) except Exception as err: - logger.error(traceback.format_exc()) - logger.error(traceback.print_exc()) + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) click.secho(str(err), fg="red") diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 95bb0498077..d9d9f86b3a1 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -349,7 +349,7 @@ class MetadataRestSink(Sink[Entity]): except (APIError, ValidationError) as err: logger.error(f"Failed to ingest Policy {ometa_policy.policy.name}") logger.error(err) - traceback.print_exc() + logger.debug(traceback.print_exc()) self.status.failure(f"Policy: {ometa_policy.policy.name}") def _create_location(self, location: Location) -> Location: diff --git a/ingestion/src/metadata/ingestion/source/glue.py b/ingestion/src/metadata/ingestion/source/glue.py index 739836eaed5..0f4266f966d 100644 --- a/ingestion/src/metadata/ingestion/source/glue.py +++ b/ingestion/src/metadata/ingestion/source/glue.py @@ -186,8 +186,8 @@ class GlueSource(Source[Entity]): if "NextToken" in glue_resp: yield from self.ingest_tables(glue_resp["NextToken"]) except Exception as err: - logger.error(traceback.format_exc()) - logger.error(traceback.print_exc()) + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) logger.error(err) def get_downstream_tasks(self, task_unique_id, tasks): @@ -243,8 +243,8 @@ class GlueSource(Source[Entity]): ) yield pipeline_ev except Exception as err: - logger.error(traceback.format_exc()) - logger.error(traceback.print_exc()) + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) logger.error(err) def close(self): diff --git a/ingestion/src/metadata/ingestion/source/metabase.py b/ingestion/src/metadata/ingestion/source/metabase.py index a7fe0c35d9c..36757d9a0b9 100644 --- a/ingestion/src/metadata/ingestion/source/metabase.py +++ b/ingestion/src/metadata/ingestion/source/metabase.py @@ -195,7 +195,7 @@ class MetabaseSource(Source[Entity]): ) yield lineage except Exception as err: - logger.error(traceback.print_exc()) + logger.debug(traceback.print_exc()) def req_get(self, path): return requests.get(self.config.host_port + path, headers=self.metabase_session) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 9a4d3411bcd..1d48fd93d3e 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -226,7 +226,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) # Catch any errors during the profile runner and continue except Exception as err: - logger.error(repr(err)) logger.error(err) # check if we have any model to associate with table_entity.dataModel = self._get_data_model(schema, table_name) @@ -239,7 +238,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): "{}.{}".format(self.config.get_service_name(), table_name) ) except Exception as err: - traceback.print_exc() + logger.debug(traceback.print_exc()) logger.error(err) self.status.failures.append( "{}.{}".format(self.config.service_name, table_name) @@ -483,13 +482,15 @@ class SQLSource(Source[OMetaDatabaseAndTable]): "raw_data_type" in column and column["raw_data_type"] is not None ): + column["raw_data_type"] = self.parse_raw_data_type( column["raw_data_type"] ) - parsed_string = ColumnTypeParser._parse_datatype_string( - column["raw_data_type"] - ) - parsed_string["name"] = column["name"] + if not column["raw_data_type"].startswith(schema): + parsed_string = ColumnTypeParser._parse_datatype_string( + column["raw_data_type"] + ) + parsed_string["name"] = column["name"] else: col_type = ColumnTypeParser.get_column_type(column["type"]) if col_type == "ARRAY" and re.match( @@ -511,7 +512,9 @@ class SQLSource(Source[OMetaDatabaseAndTable]): col_type = "VARCHAR" data_type_display = "varchar" logger.warning( - f"Unknown type {column['type']} mapped to VARCHAR: {column['name']} {column['type']}" + "Unknown type {} mapped to VARCHAR: {}".format( + repr(column["type"]), column["name"] + ) ) col_data_length = ( 1 if col_data_length is None else col_data_length @@ -558,7 +561,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): col_dict = Column(**parsed_string) om_column = col_dict except Exception as err: - logger.error(traceback.print_exc()) + logger.debug(traceback.print_exc()) logger.error(f"{err} : {column}") continue table_columns.append(om_column) @@ -568,12 +571,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): return None def _check_col_length(self, datatype, col_raw_type): - if datatype is not None and datatype.upper() in { - "CHAR", - "VARCHAR", - "BINARY", - "VARBINARY", - }: + if datatype and datatype.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}: return col_raw_type.length if col_raw_type.length else 1 def run_data_profiler(self, table: str, schema: str) -> TableProfile: diff --git a/ingestion/src/metadata/profiler/common/database_common.py b/ingestion/src/metadata/profiler/common/database_common.py index eb3139ed945..2019bbda9df 100644 --- a/ingestion/src/metadata/profiler/common/database_common.py +++ b/ingestion/src/metadata/profiler/common/database_common.py @@ -285,7 +285,7 @@ class DatabaseCommon(Database): elif self.is_text(data_type): logical_type = SupportedDataType.TEXT else: - logger.info(f" {name} ({data_type}) not supported.") + logger.info(f" {name} ({repr(data_type)}) not supported.") continue self.columns.append( Column( diff --git a/ingestion/src/metadata/profiler/dataprofiler.py b/ingestion/src/metadata/profiler/dataprofiler.py index 8fb4eb2ac80..882f5e68932 100644 --- a/ingestion/src/metadata/profiler/dataprofiler.py +++ b/ingestion/src/metadata/profiler/dataprofiler.py @@ -69,7 +69,7 @@ class DataProfiler: return profile except Exception as err: logger.error(f"Failed to run data profiler on {dataset_name} due to {err}") - traceback.print_exc() + logger.debug(traceback.print_exc()) pass def _parse_test_results_to_table_profile( diff --git a/ingestion/src/metadata/utils/column_type_parser.py b/ingestion/src/metadata/utils/column_type_parser.py index 58377f49106..c471b55b6fc 100644 --- a/ingestion/src/metadata/utils/column_type_parser.py +++ b/ingestion/src/metadata/utils/column_type_parser.py @@ -96,7 +96,7 @@ class ColumnTypeParser: "MONEY": "NUMBER", "NCHAR": "CHAR", "NTEXT": "TEXT", - "NULL": "VARCHAR", + "NULL": "NULL", "NUMBER": "NUMBER", "NUMERIC": "NUMERIC", "NVARCHAR": "VARCHAR", @@ -143,6 +143,8 @@ class ColumnTypeParser: @staticmethod def get_column_type(column_type: Any) -> str: type_class: Optional[str] = None + if isinstance(column_type, types.NullType): + return "NULL" for sql_type in ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.keys(): if str(column_type) in sql_type: type_class = ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE[sql_type]