fix(ingest/postgres): fix profiling errors, skip json type column (#8291)

This commit is contained in:
Mayuri Nehate 2023-06-28 20:29:31 +05:30 committed by GitHub
parent 711efde2c0
commit 75d67b97bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 33 deletions

View File

@ -10,6 +10,7 @@ import threading
import traceback
import unittest.mock
import uuid
from functools import lru_cache
from typing import (
Any,
Callable,
@ -273,19 +274,30 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
# Compute columns to profile
columns_to_profile: List[str] = []
# Compute ignored columns
ignored_columns: List[str] = []
for col in self.dataset.get_table_columns():
ignored_columns_by_pattern: List[str] = []
ignored_columns_by_type: List[str] = []
for col_dict in self.dataset.columns:
col = col_dict["name"]
# We expect the allow/deny patterns to specify '<table_pattern>.<column_pattern>'
if not self.config._allow_deny_patterns.allowed(
f"{self.dataset_name}.{col}"
):
ignored_columns.append(col)
ignored_columns_by_pattern.append(col)
elif col_dict.get("type") and self._should_ignore_column(col_dict["type"]):
ignored_columns_by_type.append(col)
else:
columns_to_profile.append(col)
if ignored_columns:
if ignored_columns_by_pattern:
self.report.report_dropped(
f"The profile of columns by pattern {self.dataset_name}({', '.join(sorted(ignored_columns))})"
f"The profile of columns by pattern {self.dataset_name}({', '.join(sorted(ignored_columns_by_pattern))})"
)
if ignored_columns_by_type:
self.report.report_dropped(
f"The profile of columns by type {self.dataset_name}({', '.join(sorted(ignored_columns_by_type))})"
)
if self.config.max_number_of_fields_to_profile is not None:
@ -302,6 +314,11 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
)
return columns_to_profile
def _should_ignore_column(self, sqlalchemy_type: sa.types.TypeEngine) -> bool:
return str(sqlalchemy_type) in _get_column_types_to_ignore(
self.dataset.engine.dialect.name
)
@_run_with_query_combiner
def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None:
column_spec.type_ = BasicDatasetProfilerBase._get_column_type(
@ -1085,3 +1102,13 @@ class DatahubGEProfiler:
logger.debug(f"Setting table name to be {batch._table}")
return batch
# More dialect specific types to ignore can be added here
# Stringified types are used to avoid dialect specific import errors
@lru_cache(maxsize=1)
def _get_column_types_to_ignore(dialect_name: str) -> List[str]:
if dialect_name.lower() == "postgresql":
return ["JSON"]
return []

View File

@ -254,9 +254,10 @@ class SQLAlchemyQueryCombiner:
except Exception as e:
if not self.catch_exceptions:
raise e
logger.exception(
logger.warning(
f"Failed to execute query normally, using fallback: {str(query)}"
)
logger.debug("Failed to execute query normally", exc_info=e)
self.report.query_exceptions += 1
return _sa_execute_underlying_method(conn, query, *args, **kwargs)
else:
@ -400,7 +401,10 @@ class SQLAlchemyQueryCombiner:
except Exception as e:
if not self.serial_execution_fallback_enabled:
raise e
logger.exception(f"Failed to execute queue using combiner: {str(e)}")
logger.warning(
"Failed to execute queue using combiner, will fallback to execute one by one."
)
logger.debug("Failed to execute queue using combiner", exc_info=e)
self.report.query_exceptions += 1
self._execute_queue_fallback(main_greenlet)

View File

@ -446,6 +446,18 @@
"nativeDataType": "VARCHAR(length=255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata_json",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "JSON(astext_type=Text())",
"recursive": false,
"isPartOfKey": false
}
]
}
@ -488,31 +500,64 @@
"partition": "FULL_TABLE_SNAPSHOT"
},
"rowCount": 2,
"columnCount": 8,
"columnCount": 9,
"fieldProfiles": [
{
"fieldPath": "urn"
"fieldPath": "urn",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "aspect"
"fieldPath": "aspect",
"uniqueCount": 2,
"uniqueProportion": 1,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "version"
"fieldPath": "version",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "metadata"
"fieldPath": "metadata",
"uniqueCount": 2,
"uniqueProportion": 1,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "systemmetadata"
"fieldPath": "systemmetadata",
"uniqueCount": 0,
"nullCount": 2,
"nullProportion": 1
},
{
"fieldPath": "createdon"
"fieldPath": "createdon",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "createdby"
"fieldPath": "createdby",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "createdfor"
"fieldPath": "createdfor",
"uniqueCount": 0,
"nullCount": 2,
"nullProportion": 1
},
{
"fieldPath": "metadata_json"
}
]
}

View File

@ -13,8 +13,8 @@ source:
profile_table_level_only: true
profile_table_row_count_estimate_only: true
turn_off_expensive_profiling_metrics: true
include_field_null_count: false
include_field_distinct_count: false
include_field_null_count: true
include_field_distinct_count: true
include_field_min_value: false
include_field_max_value: false
include_field_mean_value: false

View File

@ -296,6 +296,18 @@
"nativeDataType": "VARCHAR(length=255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata_json",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "JSON(astext_type=Text())",
"recursive": false,
"isPartOfKey": false
}
]
}
@ -338,31 +350,64 @@
"partition": "FULL_TABLE_SNAPSHOT"
},
"rowCount": 2,
"columnCount": 8,
"columnCount": 9,
"fieldProfiles": [
{
"fieldPath": "urn"
"fieldPath": "urn",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "aspect"
"fieldPath": "aspect",
"uniqueCount": 2,
"uniqueProportion": 1,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "version"
"fieldPath": "version",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "metadata"
"fieldPath": "metadata",
"uniqueCount": 2,
"uniqueProportion": 1,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "systemmetadata"
"fieldPath": "systemmetadata",
"uniqueCount": 0,
"nullCount": 2,
"nullProportion": 1
},
{
"fieldPath": "createdon"
"fieldPath": "createdon",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "createdby"
"fieldPath": "createdby",
"uniqueCount": 1,
"uniqueProportion": 0.5,
"nullCount": 0,
"nullProportion": 0.0
},
{
"fieldPath": "createdfor"
"fieldPath": "createdfor",
"uniqueCount": 0,
"nullCount": 2,
"nullProportion": 1
},
{
"fieldPath": "metadata_json"
}
]
}

View File

@ -12,8 +12,8 @@ source:
profile_table_level_only: true
profile_table_row_count_estimate_only: true
turn_off_expensive_profiling_metrics: true
include_field_null_count: false
include_field_distinct_count: false
include_field_null_count: true
include_field_distinct_count: true
include_field_min_value: false
include_field_max_value: false
include_field_mean_value: false

View File

@ -12,25 +12,28 @@ create table metadata_aspect_v2 (
createdon timestamp not null,
createdby varchar(255) not null,
createdfor varchar(255),
constraint pk_metadata_aspect_v2 primary key (urn,aspect,version)
constraint pk_metadata_aspect_v2 primary key (urn,aspect,version),
metadata_json json not null
);
create index timeIndex ON metadata_aspect_v2 (createdon);
insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) values(
insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values(
'urn:li:corpuser:datahub',
'corpUserInfo',
0,
'{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}',
now(),
'urn:li:corpuser:__datahub_system'
'urn:li:corpuser:__datahub_system',
'{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}'
), (
'urn:li:corpuser:datahub',
'corpUserEditableInfo',
0,
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/default_avatar.png"}',
now(),
'urn:li:corpuser:__datahub_system'
'urn:li:corpuser:__datahub_system',
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/default_avatar.png"}'
);
create view metadata_aspect_view as select urn, aspect from metadata_aspect_v2 where version=0;