mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-23 07:51:44 +00:00
Fixed data profiling and raw data type issue (#2474)
* Fixed data profiling and raw data type issue * Update metadata_rest.py * Update sql_source.py
This commit is contained in:
parent
a8bdf844dd
commit
3a6217a7e9
@ -149,6 +149,6 @@ def run_docker(start, stop, pause, resume, clean, file_path):
|
|||||||
fg="red",
|
fg="red",
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.error(traceback.print_exc())
|
logger.debug(traceback.print_exc())
|
||||||
click.secho(str(err), fg="red")
|
click.secho(str(err), fg="red")
|
||||||
|
@ -349,7 +349,7 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
except (APIError, ValidationError) as err:
|
except (APIError, ValidationError) as err:
|
||||||
logger.error(f"Failed to ingest Policy {ometa_policy.policy.name}")
|
logger.error(f"Failed to ingest Policy {ometa_policy.policy.name}")
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
traceback.print_exc()
|
logger.debug(traceback.print_exc())
|
||||||
self.status.failure(f"Policy: {ometa_policy.policy.name}")
|
self.status.failure(f"Policy: {ometa_policy.policy.name}")
|
||||||
|
|
||||||
def _create_location(self, location: Location) -> Location:
|
def _create_location(self, location: Location) -> Location:
|
||||||
|
@ -186,8 +186,8 @@ class GlueSource(Source[Entity]):
|
|||||||
if "NextToken" in glue_resp:
|
if "NextToken" in glue_resp:
|
||||||
yield from self.ingest_tables(glue_resp["NextToken"])
|
yield from self.ingest_tables(glue_resp["NextToken"])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.error(traceback.print_exc())
|
logger.debug(traceback.print_exc())
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
|
|
||||||
def get_downstream_tasks(self, task_unique_id, tasks):
|
def get_downstream_tasks(self, task_unique_id, tasks):
|
||||||
@ -243,8 +243,8 @@ class GlueSource(Source[Entity]):
|
|||||||
)
|
)
|
||||||
yield pipeline_ev
|
yield pipeline_ev
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.error(traceback.print_exc())
|
logger.debug(traceback.print_exc())
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
@ -195,7 +195,7 @@ class MetabaseSource(Source[Entity]):
|
|||||||
)
|
)
|
||||||
yield lineage
|
yield lineage
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.print_exc())
|
logger.debug(traceback.print_exc())
|
||||||
|
|
||||||
def req_get(self, path):
|
def req_get(self, path):
|
||||||
return requests.get(self.config.host_port + path, headers=self.metabase_session)
|
return requests.get(self.config.host_port + path, headers=self.metabase_session)
|
||||||
|
@ -226,7 +226,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
)
|
)
|
||||||
# Catch any errors during the profile runner and continue
|
# Catch any errors during the profile runner and continue
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(repr(err))
|
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
# check if we have any model to associate with
|
# check if we have any model to associate with
|
||||||
table_entity.dataModel = self._get_data_model(schema, table_name)
|
table_entity.dataModel = self._get_data_model(schema, table_name)
|
||||||
@ -239,7 +238,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
"{}.{}".format(self.config.get_service_name(), table_name)
|
"{}.{}".format(self.config.get_service_name(), table_name)
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
traceback.print_exc()
|
logger.debug(traceback.print_exc())
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
self.status.failures.append(
|
self.status.failures.append(
|
||||||
"{}.{}".format(self.config.service_name, table_name)
|
"{}.{}".format(self.config.service_name, table_name)
|
||||||
@ -483,13 +482,15 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
"raw_data_type" in column
|
"raw_data_type" in column
|
||||||
and column["raw_data_type"] is not None
|
and column["raw_data_type"] is not None
|
||||||
):
|
):
|
||||||
|
|
||||||
column["raw_data_type"] = self.parse_raw_data_type(
|
column["raw_data_type"] = self.parse_raw_data_type(
|
||||||
column["raw_data_type"]
|
column["raw_data_type"]
|
||||||
)
|
)
|
||||||
parsed_string = ColumnTypeParser._parse_datatype_string(
|
if not column["raw_data_type"].startswith(schema):
|
||||||
column["raw_data_type"]
|
parsed_string = ColumnTypeParser._parse_datatype_string(
|
||||||
)
|
column["raw_data_type"]
|
||||||
parsed_string["name"] = column["name"]
|
)
|
||||||
|
parsed_string["name"] = column["name"]
|
||||||
else:
|
else:
|
||||||
col_type = ColumnTypeParser.get_column_type(column["type"])
|
col_type = ColumnTypeParser.get_column_type(column["type"])
|
||||||
if col_type == "ARRAY" and re.match(
|
if col_type == "ARRAY" and re.match(
|
||||||
@ -511,7 +512,9 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
col_type = "VARCHAR"
|
col_type = "VARCHAR"
|
||||||
data_type_display = "varchar"
|
data_type_display = "varchar"
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Unknown type {column['type']} mapped to VARCHAR: {column['name']} {column['type']}"
|
"Unknown type {} mapped to VARCHAR: {}".format(
|
||||||
|
repr(column["type"]), column["name"]
|
||||||
|
)
|
||||||
)
|
)
|
||||||
col_data_length = (
|
col_data_length = (
|
||||||
1 if col_data_length is None else col_data_length
|
1 if col_data_length is None else col_data_length
|
||||||
@ -558,7 +561,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
col_dict = Column(**parsed_string)
|
col_dict = Column(**parsed_string)
|
||||||
om_column = col_dict
|
om_column = col_dict
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.print_exc())
|
logger.debug(traceback.print_exc())
|
||||||
logger.error(f"{err} : {column}")
|
logger.error(f"{err} : {column}")
|
||||||
continue
|
continue
|
||||||
table_columns.append(om_column)
|
table_columns.append(om_column)
|
||||||
@ -568,12 +571,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def _check_col_length(self, datatype, col_raw_type):
|
def _check_col_length(self, datatype, col_raw_type):
|
||||||
if datatype is not None and datatype.upper() in {
|
if datatype and datatype.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}:
|
||||||
"CHAR",
|
|
||||||
"VARCHAR",
|
|
||||||
"BINARY",
|
|
||||||
"VARBINARY",
|
|
||||||
}:
|
|
||||||
return col_raw_type.length if col_raw_type.length else 1
|
return col_raw_type.length if col_raw_type.length else 1
|
||||||
|
|
||||||
def run_data_profiler(self, table: str, schema: str) -> TableProfile:
|
def run_data_profiler(self, table: str, schema: str) -> TableProfile:
|
||||||
|
@ -285,7 +285,7 @@ class DatabaseCommon(Database):
|
|||||||
elif self.is_text(data_type):
|
elif self.is_text(data_type):
|
||||||
logical_type = SupportedDataType.TEXT
|
logical_type = SupportedDataType.TEXT
|
||||||
else:
|
else:
|
||||||
logger.info(f" {name} ({data_type}) not supported.")
|
logger.info(f" {name} ({repr(data_type)}) not supported.")
|
||||||
continue
|
continue
|
||||||
self.columns.append(
|
self.columns.append(
|
||||||
Column(
|
Column(
|
||||||
|
@ -69,7 +69,7 @@ class DataProfiler:
|
|||||||
return profile
|
return profile
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(f"Failed to run data profiler on {dataset_name} due to {err}")
|
logger.error(f"Failed to run data profiler on {dataset_name} due to {err}")
|
||||||
traceback.print_exc()
|
logger.debug(traceback.print_exc())
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _parse_test_results_to_table_profile(
|
def _parse_test_results_to_table_profile(
|
||||||
|
@ -96,7 +96,7 @@ class ColumnTypeParser:
|
|||||||
"MONEY": "NUMBER",
|
"MONEY": "NUMBER",
|
||||||
"NCHAR": "CHAR",
|
"NCHAR": "CHAR",
|
||||||
"NTEXT": "TEXT",
|
"NTEXT": "TEXT",
|
||||||
"NULL": "VARCHAR",
|
"NULL": "NULL",
|
||||||
"NUMBER": "NUMBER",
|
"NUMBER": "NUMBER",
|
||||||
"NUMERIC": "NUMERIC",
|
"NUMERIC": "NUMERIC",
|
||||||
"NVARCHAR": "VARCHAR",
|
"NVARCHAR": "VARCHAR",
|
||||||
@ -143,6 +143,8 @@ class ColumnTypeParser:
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def get_column_type(column_type: Any) -> str:
|
def get_column_type(column_type: Any) -> str:
|
||||||
type_class: Optional[str] = None
|
type_class: Optional[str] = None
|
||||||
|
if isinstance(column_type, types.NullType):
|
||||||
|
return "NULL"
|
||||||
for sql_type in ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.keys():
|
for sql_type in ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.keys():
|
||||||
if str(column_type) in sql_type:
|
if str(column_type) in sql_type:
|
||||||
type_class = ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE[sql_type]
|
type_class = ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE[sql_type]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user