diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 76c676c6da..5404b31763 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -10,6 +10,7 @@ import threading import traceback import unittest.mock import uuid +from functools import lru_cache from typing import ( Any, Callable, @@ -273,19 +274,30 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): # Compute columns to profile columns_to_profile: List[str] = [] + # Compute ignored columns - ignored_columns: List[str] = [] - for col in self.dataset.get_table_columns(): + ignored_columns_by_pattern: List[str] = [] + ignored_columns_by_type: List[str] = [] + + for col_dict in self.dataset.columns: + col = col_dict["name"] # We expect the allow/deny patterns to specify '.' if not self.config._allow_deny_patterns.allowed( f"{self.dataset_name}.{col}" ): - ignored_columns.append(col) + ignored_columns_by_pattern.append(col) + elif col_dict.get("type") and self._should_ignore_column(col_dict["type"]): + ignored_columns_by_type.append(col) else: columns_to_profile.append(col) - if ignored_columns: + + if ignored_columns_by_pattern: self.report.report_dropped( - f"The profile of columns by pattern {self.dataset_name}({', '.join(sorted(ignored_columns))})" + f"The profile of columns by pattern {self.dataset_name}({', '.join(sorted(ignored_columns_by_pattern))})" + ) + if ignored_columns_by_type: + self.report.report_dropped( + f"The profile of columns by type {self.dataset_name}({', '.join(sorted(ignored_columns_by_type))})" ) if self.config.max_number_of_fields_to_profile is not None: @@ -302,6 +314,11 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): ) return columns_to_profile + def _should_ignore_column(self, sqlalchemy_type: sa.types.TypeEngine) -> bool: + return str(sqlalchemy_type) in _get_column_types_to_ignore( + self.dataset.engine.dialect.name + ) + @_run_with_query_combiner def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None: column_spec.type_ = BasicDatasetProfilerBase._get_column_type( @@ -1085,3 +1102,13 @@ class DatahubGEProfiler: logger.debug(f"Setting table name to be {batch._table}") return batch + + +# More dialect specific types to ignore can be added here +# Stringified types are used to avoid dialect specific import errors +@lru_cache(maxsize=1) +def _get_column_types_to_ignore(dialect_name: str) -> List[str]: + if dialect_name.lower() == "postgresql": + return ["JSON"] + + return [] diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index 31d10a09c2..11c04082ee 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -254,9 +254,10 @@ class SQLAlchemyQueryCombiner: except Exception as e: if not self.catch_exceptions: raise e - logger.exception( + logger.warning( f"Failed to execute query normally, using fallback: {str(query)}" ) + logger.debug("Failed to execute query normally", exc_info=e) self.report.query_exceptions += 1 return _sa_execute_underlying_method(conn, query, *args, **kwargs) else: @@ -400,7 +401,10 @@ class SQLAlchemyQueryCombiner: except Exception as e: if not self.serial_execution_fallback_enabled: raise e - logger.exception(f"Failed to execute queue using combiner: {str(e)}") + logger.warning( + "Failed to execute queue using combiner, will fallback to execute one by one." + ) + logger.debug("Failed to execute queue using combiner", exc_info=e) self.report.query_exceptions += 1 self._execute_queue_fallback(main_greenlet) diff --git a/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json b/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json index 3d55509ce0..5d22ead40e 100644 --- a/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json +++ b/metadata-ingestion/tests/integration/postgres/postgres_all_db_mces_with_db_golden.json @@ -446,6 +446,18 @@ "nativeDataType": "VARCHAR(length=255)", "recursive": false, "isPartOfKey": false + }, + { + "fieldPath": "metadata_json", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.RecordType": {} + } + }, + "nativeDataType": "JSON(astext_type=Text())", + "recursive": false, + "isPartOfKey": false } ] } @@ -488,31 +500,64 @@ "partition": "FULL_TABLE_SNAPSHOT" }, "rowCount": 2, - "columnCount": 8, + "columnCount": 9, "fieldProfiles": [ { - "fieldPath": "urn" + "fieldPath": "urn", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "aspect" + "fieldPath": "aspect", + "uniqueCount": 2, + "uniqueProportion": 1, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "version" + "fieldPath": "version", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "metadata" + "fieldPath": "metadata", + "uniqueCount": 2, + "uniqueProportion": 1, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "systemmetadata" + "fieldPath": "systemmetadata", + "uniqueCount": 0, + "nullCount": 2, + "nullProportion": 1 }, { - "fieldPath": "createdon" + "fieldPath": "createdon", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "createdby" + "fieldPath": "createdby", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "createdfor" + "fieldPath": "createdfor", + "uniqueCount": 0, + "nullCount": 2, + "nullProportion": 1 + }, + { + "fieldPath": "metadata_json" } ] } diff --git a/metadata-ingestion/tests/integration/postgres/postgres_all_db_to_file_with_db_estimate_row_count.yml b/metadata-ingestion/tests/integration/postgres/postgres_all_db_to_file_with_db_estimate_row_count.yml index 69c20352f6..bfbad5fe83 100644 --- a/metadata-ingestion/tests/integration/postgres/postgres_all_db_to_file_with_db_estimate_row_count.yml +++ b/metadata-ingestion/tests/integration/postgres/postgres_all_db_to_file_with_db_estimate_row_count.yml @@ -13,8 +13,8 @@ source: profile_table_level_only: true profile_table_row_count_estimate_only: true turn_off_expensive_profiling_metrics: true - include_field_null_count: false - include_field_distinct_count: false + include_field_null_count: true + include_field_distinct_count: true include_field_min_value: false include_field_max_value: false include_field_mean_value: false diff --git a/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json b/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json index 07e9e00633..19052a53aa 100644 --- a/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json +++ b/metadata-ingestion/tests/integration/postgres/postgres_mces_with_db_golden.json @@ -296,6 +296,18 @@ "nativeDataType": "VARCHAR(length=255)", "recursive": false, "isPartOfKey": false + }, + { + "fieldPath": "metadata_json", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.RecordType": {} + } + }, + "nativeDataType": "JSON(astext_type=Text())", + "recursive": false, + "isPartOfKey": false } ] } @@ -338,31 +350,64 @@ "partition": "FULL_TABLE_SNAPSHOT" }, "rowCount": 2, - "columnCount": 8, + "columnCount": 9, "fieldProfiles": [ { - "fieldPath": "urn" + "fieldPath": "urn", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "aspect" + "fieldPath": "aspect", + "uniqueCount": 2, + "uniqueProportion": 1, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "version" + "fieldPath": "version", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "metadata" + "fieldPath": "metadata", + "uniqueCount": 2, + "uniqueProportion": 1, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "systemmetadata" + "fieldPath": "systemmetadata", + "uniqueCount": 0, + "nullCount": 2, + "nullProportion": 1 }, { - "fieldPath": "createdon" + "fieldPath": "createdon", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "createdby" + "fieldPath": "createdby", + "uniqueCount": 1, + "uniqueProportion": 0.5, + "nullCount": 0, + "nullProportion": 0.0 }, { - "fieldPath": "createdfor" + "fieldPath": "createdfor", + "uniqueCount": 0, + "nullCount": 2, + "nullProportion": 1 + }, + { + "fieldPath": "metadata_json" } ] } diff --git a/metadata-ingestion/tests/integration/postgres/postgres_to_file_with_db_estimate_row_count.yml b/metadata-ingestion/tests/integration/postgres/postgres_to_file_with_db_estimate_row_count.yml index 16f829af0c..712ce4a2b6 100644 --- a/metadata-ingestion/tests/integration/postgres/postgres_to_file_with_db_estimate_row_count.yml +++ b/metadata-ingestion/tests/integration/postgres/postgres_to_file_with_db_estimate_row_count.yml @@ -12,8 +12,8 @@ source: profile_table_level_only: true profile_table_row_count_estimate_only: true turn_off_expensive_profiling_metrics: true - include_field_null_count: false - include_field_distinct_count: false + include_field_null_count: true + include_field_distinct_count: true include_field_min_value: false include_field_max_value: false include_field_mean_value: false diff --git a/metadata-ingestion/tests/integration/postgres/setup/setup.sql b/metadata-ingestion/tests/integration/postgres/setup/setup.sql index cbd690c7b4..66ef7942ad 100644 --- a/metadata-ingestion/tests/integration/postgres/setup/setup.sql +++ b/metadata-ingestion/tests/integration/postgres/setup/setup.sql @@ -12,25 +12,28 @@ create table metadata_aspect_v2 ( createdon timestamp not null, createdby varchar(255) not null, createdfor varchar(255), - constraint pk_metadata_aspect_v2 primary key (urn,aspect,version) + constraint pk_metadata_aspect_v2 primary key (urn,aspect,version), + metadata_json json not null ); create index timeIndex ON metadata_aspect_v2 (createdon); -insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) values( +insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, metadata_json) values( 'urn:li:corpuser:datahub', 'corpUserInfo', 0, '{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}', now(), - 'urn:li:corpuser:__datahub_system' + 'urn:li:corpuser:__datahub_system', + '{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}' ), ( 'urn:li:corpuser:datahub', 'corpUserEditableInfo', 0, '{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/default_avatar.png"}', now(), - 'urn:li:corpuser:__datahub_system' + 'urn:li:corpuser:__datahub_system', + '{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/default_avatar.png"}' ); create view metadata_aspect_view as select urn, aspect from metadata_aspect_v2 where version=0;