From 9a4a9df8364da1a96a6ed1f96adb723b94044a29 Mon Sep 17 00:00:00 2001 From: Teddy Date: Thu, 1 Feb 2024 09:02:52 +0100 Subject: [PATCH] Fix #14895 - Get Metadata from Parquet Schema (#14956) * linting: fix python linting * fix: get column types from parquet schema for parquet files * style: python linting * fix: remove displayType check in test as variation depending on OS --- .../processor/test_case_runner.py | 1 - .../data_quality/source/test_suite.py | 1 - .../mixins/pandas_validator_mixin.py | 6 +- .../src/metadata/ingestion/lineage/parser.py | 1 - .../ingestion/models/patch_request.py | 4 +- .../ingestion/ometa/mixins/patch_mixin.py | 1 - .../source/dashboard/dashboard_service.py | 1 - .../source/dashboard/looker/metadata.py | 2 - .../source/dashboard/superset/api_source.py | 1 - .../source/dashboard/superset/db_source.py | 1 - .../source/database/azuresql/metadata.py | 1 - .../source/database/common_nosql_source.py | 5 +- .../source/database/databricks/client.py | 1 - .../source/database/datalake/metadata.py | 13 +- .../ingestion/source/database/dbt/metadata.py | 1 - .../source/database/hive/connection.py | 2 - .../source/database/mssql/metadata.py | 1 - .../ingestion/source/database/oracle/utils.py | 1 - .../database/postgres/pgspider/lineage.py | 1 - .../ingestion/source/database/sample_data.py | 1 - .../ingestion/source/database/sas/metadata.py | 5 +- .../source/database/snowflake/utils.py | 1 - .../database/stored_procedures_mixin.py | 1 - .../source/messaging/common_broker_source.py | 1 - .../source/pipeline/spline/metadata.py | 1 - .../source/search/elasticsearch/metadata.py | 1 - .../source/storage/storage_service.py | 16 +- .../interface/pandas/profiler_interface.py | 9 +- .../profiler/metrics/hybrid/histogram.py | 1 + .../orm/functions/table_metric_computer.py | 1 - .../metadata/profiler/processor/processor.py | 2 - .../profiler/source/base/profiler_source.py | 1 - .../profiler/source/bigquery/type_mapper.py | 1 - .../src/metadata/readers/file/api_reader.py | 1 - .../metadata/utils/datalake/datalake_utils.py | 481 +++++++++++++----- .../metadata/utils/secrets/client/loader.py | 1 - .../utils/secrets/secrets_manager_factory.py | 1 - .../src/metadata/workflow/application.py | 1 - ingestion/src/metadata/workflow/base.py | 1 - ingestion/src/metadata/workflow/metadata.py | 1 - ingestion/src/metadata/workflow/usage.py | 1 - ingestion/tests/cli_e2e/test_cli_hive.py | 1 - ingestion/tests/cli_e2e/test_cli_metabase.py | 1 - ingestion/tests/cli_e2e/test_cli_oracle.py | 1 - ingestion/tests/cli_e2e/test_cli_powerbi.py | 1 - ingestion/tests/cli_e2e/test_cli_tableau.py | 1 - .../airflow/test_lineage_runner.py | 1 - .../tests/integration/integration_base.py | 1 - .../secrets/test_secrets_manager_factory.py | 1 - .../unit/pii/test_column_name_scanner.py | 1 - .../pandas/test_profiler_interface.py | 1 - .../unit/profiler/test_profiler_interface.py | 1 - .../unit/resources/datalake/example.parquet | Bin 0 -> 16364 bytes .../tests/unit/test_column_type_parser.py | 6 +- .../tests/unit/test_pgspider_lineage_unit.py | 6 - .../unit/topology/database/test_datalake.py | 10 +- .../unit/topology/database/test_iceberg.py | 4 - .../unit/topology/pipeline/test_airbyte.py | 1 - .../unit/topology/storage/test_storage.py | 1 - ingestion/tests/unit/utils/test_datalake.py | 286 ++++++++++- .../openmetadata_managed_apis/api/utils.py | 1 - .../json/schema/entity/data/table.json | 3 +- 62 files changed, 686 insertions(+), 218 deletions(-) create mode 100644 ingestion/tests/unit/resources/datalake/example.parquet diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index 12205827472..aac36cfa112 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -54,7 +54,6 @@ class TestCaseRunner(Processor): """Execute the test suite tests and create test cases from the YAML config""" def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata): - super().__init__() self.config = config diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 396a0775222..cdb0bdd37a8 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -149,7 +149,6 @@ class TestSuiteSource(Source): ) else: - test_suite_cases = self._get_test_cases_from_test_suite(table.testSuite) yield Either( diff --git a/ingestion/src/metadata/data_quality/validations/mixins/pandas_validator_mixin.py b/ingestion/src/metadata/data_quality/validations/mixins/pandas_validator_mixin.py index 227d490349f..25641c36f14 100644 --- a/ingestion/src/metadata/data_quality/validations/mixins/pandas_validator_mixin.py +++ b/ingestion/src/metadata/data_quality/validations/mixins/pandas_validator_mixin.py @@ -17,7 +17,7 @@ from typing import Optional from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.registry import Metrics -from metadata.utils.datalake.datalake_utils import fetch_col_types +from metadata.utils.datalake.datalake_utils import GenericDataFrameColumnParser from metadata.utils.entity_link import get_decoded_column from metadata.utils.sqa_like_column import SQALikeColumn @@ -28,7 +28,9 @@ class PandasValidatorMixin: def get_column_name(self, entity_link: str, dfs) -> SQALikeColumn: # we'll use the first dataframe chunk to get the column name. column = dfs[0][get_decoded_column(entity_link)] - _type = fetch_col_types(dfs[0], get_decoded_column(entity_link)) + _type = GenericDataFrameColumnParser.fetch_col_types( + dfs[0], get_decoded_column(entity_link) + ) sqa_like_column = SQALikeColumn( name=column.name, type=_type, diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index 5b74bfba2d7..732afa85d6a 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -389,7 +389,6 @@ class LineageParser: def _evaluate_best_parser( query: str, dialect: Dialect, timeout_seconds: int ) -> Optional[LineageRunner]: - if query is None: return None diff --git a/ingestion/src/metadata/ingestion/models/patch_request.py b/ingestion/src/metadata/ingestion/models/patch_request.py index 92fa516fce6..da16d1a8332 100644 --- a/ingestion/src/metadata/ingestion/models/patch_request.py +++ b/ingestion/src/metadata/ingestion/models/patch_request.py @@ -210,9 +210,9 @@ def _determine_restricted_operation( Only retain add operation for restrict_update_fields fields """ path = patch_ops.get("path") - op = patch_ops.get("op") + ops = patch_ops.get("op") for field in restrict_update_fields or []: - if field in path and op != PatchOperation.ADD.value: + if field in path and ops != PatchOperation.ADD.value: return False return True diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 459747c5701..3e627597d8f 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -135,7 +135,6 @@ class OMetaPatchMixin(OMetaPatchMixinBase): Updated Entity """ try: - patch = build_patch( source=source, destination=destination, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index cc77b887eeb..f97dd32fca6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -531,7 +531,6 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): ) def check_database_schema_name(self, database_schema_name: str): - """ Check if the input database schema name is equal to "" and return the input name if it is not. diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index e40222762a1..404473b7208 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -833,7 +833,6 @@ class LookerSource(DashboardServiceSource): to_entity.id.__root__ not in self._added_lineage[from_entity.id.__root__] ): - self._added_lineage[from_entity.id.__root__].append( to_entity.id.__root__ ) @@ -943,7 +942,6 @@ class LookerSource(DashboardServiceSource): dashboard_name = self.context.dashboard try: - dashboard_fqn = fqn.build( metadata=self.metadata, entity_type=Dashboard, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py index e7d2f79e6a4..62123790c30 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py @@ -192,7 +192,6 @@ class SupersetAPISource(SupersetSourceMixin): def yield_datamodel( self, dashboard_details: DashboardResult ) -> Iterable[Either[CreateDashboardDataModelRequest]]: - if self.source_config.includeDataModels: for chart_id in self._get_charts_of_dashboard(dashboard_details): try: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py index 99977dbd24d..83833689fbd 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py @@ -216,7 +216,6 @@ class SupersetDBSource(SupersetSourceMixin): def yield_datamodel( self, dashboard_details: FetchDashboard ) -> Iterable[Either[CreateDashboardDataModelRequest]]: - if self.source_config.includeDataModels: for chart_id in self._get_charts_of_dashboard(dashboard_details): chart_json = self.all_charts.get(chart_id) diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py index 5bfa6d59eb2..af2ba5f2cd5 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py @@ -77,7 +77,6 @@ class AzuresqlSource(CommonDbSourceService, MultiDBSource): yield from self._execute_database_query(AZURE_SQL_GET_DATABASES) def get_database_names(self) -> Iterable[str]: - if not self.config.serviceConnection.__root__.config.ingestAllDatabases: configured_db = self.config.serviceConnection.__root__.config.database self.set_inspector(database_name=configured_db) diff --git a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py index 78a3d2eaaff..e9f394243ff 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py @@ -46,7 +46,7 @@ from metadata.ingestion.source.database.database_service import DatabaseServiceS from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.utils import fqn from metadata.utils.constants import DEFAULT_DATABASE -from metadata.utils.datalake.datalake_utils import get_columns +from metadata.utils.datalake.datalake_utils import DataFrameColumnParser from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -217,7 +217,8 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC): try: data = self.get_table_columns_dict(schema_name, table_name) df = pd.DataFrame.from_records(list(data)) - columns = get_columns(df) + column_parser = DataFrameColumnParser.create(df) + columns = column_parser.get_columns() table_request = CreateTableRequest( name=table_name, tableType=table_type, diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/client.py b/ingestion/src/metadata/ingestion/source/database/databricks/client.py index ac45705c252..5ffec2ad9a2 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/client.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/client.py @@ -94,7 +94,6 @@ class DatabricksClient: Method returns List the history of queries through SQL warehouses """ try: - data = {} daydiff = end_date - start_date diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 50316f8539a..a32a2a9aa7e 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -77,8 +77,8 @@ from metadata.utils import fqn from metadata.utils.constants import DEFAULT_DATABASE from metadata.utils.credentials import GOOGLE_CREDENTIALS from metadata.utils.datalake.datalake_utils import ( + DataFrameColumnParser, fetch_dataframe, - get_columns, get_file_format_type, ) from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table @@ -416,9 +416,14 @@ class DatalakeSource(DatabaseServiceSource): file_extension=table_extension, ), ) - - # If no data_frame (due to unsupported type), ignore - columns = get_columns(data_frame[0]) if data_frame else None + if data_frame: + column_parser = DataFrameColumnParser.create( + data_frame[0], table_extension + ) + columns = column_parser.get_columns() + else: + # If no data_frame (due to unsupported type), ignore + columns = None if columns: table_request = CreateTableRequest( name=table_name, diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index b1904d58796..4ac7a0deafd 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -692,7 +692,6 @@ class DbtSource(DbtServiceSource): ) if table_entity: try: - service_name, database_name, schema_name, table_name = fqn.split( table_entity.fullyQualifiedName.__root__ ) diff --git a/ingestion/src/metadata/ingestion/source/database/hive/connection.py b/ingestion/src/metadata/ingestion/source/database/hive/connection.py index 6580be73c3f..16e0c31952f 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/connection.py @@ -126,7 +126,6 @@ def get_metastore_connection(connection: Any) -> Engine: @get_metastore_connection.register def _(connection: PostgresConnection): - # import required to load sqlalchemy plugin # pylint: disable=import-outside-toplevel,unused-import from metadata.ingestion.source.database.hive.metastore_dialects.postgres import ( # nopycln: import @@ -153,7 +152,6 @@ def _(connection: PostgresConnection): @get_metastore_connection.register def _(connection: MysqlConnection): - # import required to load sqlalchemy plugin # pylint: disable=import-outside-toplevel,unused-import from metadata.ingestion.source.database.hive.metastore_dialects.mysql import ( # nopycln: import diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index 46fc1b5eda5..b800ee5f8d0 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -114,7 +114,6 @@ class MssqlSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource): yield from self._execute_database_query(MSSQL_GET_DATABASE) def get_database_names(self) -> Iterable[str]: - if not self.config.serviceConnection.__root__.config.ingestAllDatabases: configured_db = self.config.serviceConnection.__root__.config.database self.set_inspector(database_name=configured_db) diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py index 105cdfd958c..a44db57f834 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py @@ -62,7 +62,6 @@ def get_view_definition( dblink="", **kw, ): - return get_view_definition_wrapper( self, connection, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/pgspider/lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres/pgspider/lineage.py index 03d17bb4ccd..9f304534be9 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/pgspider/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/pgspider/lineage.py @@ -75,7 +75,6 @@ def get_lineage_from_multi_tenant_table( connection: any, service_name: str, ) -> Iterator[Either[AddLineageRequest]]: - """ For PGSpider, firstly, get list of multi-tenant tables. Next, get child foreign tables of each multi-tenant tables. diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 582080fe7b9..98b0b58a346 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -800,7 +800,6 @@ class SampleDataSource( # Create table and stored procedure lineage for lineage_entities in self.stored_procedures["lineage"]: - from_table = self.metadata.get_by_name( entity=Table, fqn=lineage_entities["from_table_fqn"] ) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index 7440ecdcc41..235c78ef389 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -465,7 +465,6 @@ class SasSource( or table_entity.extension.__root__.get("analysisTimeStamp") != table_extension.get("analysisTimeStamp") ): - # create the columns of the table columns, col_profile_list = self.create_columns_and_profiles( col_entity_instances, table_entity_instance @@ -711,10 +710,10 @@ class SasSource( if "state" in table_resource and table_resource["state"] == "unloaded": self.sas_client.load_table(table_uri + "/state?value=loaded") - except HTTPError as e: + except HTTPError as exc: # append http error to table description if it can't be found logger.error(f"table_uri: {table_uri}") - self.report_description.append(str(e)) + self.report_description.append(str(exc)) name_index = table_uri.rindex("/") table_name = table_uri[name_index + 1 :] param = f"filter=eq(name,'{table_name}')" diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index 1e90a46a33b..0876d44c269 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -39,7 +39,6 @@ from metadata.utils.sqlalchemy_utils import ( def _quoted_name(entity_name: Optional[str]) -> Optional[str]: - if entity_name: return fqn.quote_name(entity_name) diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index fb7b2885cbc..199f01d3efb 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -153,7 +153,6 @@ class StoredProcedureMixin(ABC): query_type=query_by_procedure.query_type, query_text=query_by_procedure.query_text, ): - self.context.stored_procedure_query_lineage = True for either_lineage in get_lineage_by_query( self.metadata, diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 2f75c3697c0..8d43b55a0ed 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -246,7 +246,6 @@ class CommonBrokerSource(MessagingServiceSource, ABC): if messages: for message in messages: try: - value = message.value() sample_data.append( self.decode_message( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py index d171b40c7cf..4d42f3c0349 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py @@ -131,7 +131,6 @@ class SplineSource(PipelineServiceSource): return None def _get_table_from_datasource_name(self, datasource: str) -> Optional[Table]: - if ( not datasource and not datasource.startswith("dbfs") diff --git a/ingestion/src/metadata/ingestion/source/search/elasticsearch/metadata.py b/ingestion/src/metadata/ingestion/source/search/elasticsearch/metadata.py index c0ce1cf8681..5b8ba8f5837 100644 --- a/ingestion/src/metadata/ingestion/source/search/elasticsearch/metadata.py +++ b/ingestion/src/metadata/ingestion/source/search/elasticsearch/metadata.py @@ -111,7 +111,6 @@ class ElasticsearchSource(SearchServiceSource): Method to Get Sample Data of Search Index Entity """ if self.source_config.includeSampleData and self.context.search_index: - sample_data = self.client.search( index=self.context.search_index, q=WILDCARD_SEARCH, diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 1499c2de1da..2c3f9a4dad1 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -14,8 +14,6 @@ Base class for ingesting Object Storage services from abc import ABC, abstractmethod from typing import Any, Iterable, List, Optional, Set -from pandas import DataFrame - from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.services.storageService import ( @@ -53,7 +51,10 @@ from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.readers.dataframe.reader_factory import SupportedTypes from metadata.readers.models import ConfigSource from metadata.utils import fqn -from metadata.utils.datalake.datalake_utils import fetch_dataframe, get_columns +from metadata.utils.datalake.datalake_utils import ( + DataFrameColumnParser, + fetch_dataframe, +) from metadata.utils.logger import ingestion_logger from metadata.utils.storage_metadata_config import ( StorageMetadataConfigException, @@ -67,7 +68,6 @@ OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json" class StorageServiceTopology(ServiceTopology): - root = TopologyNode( producer="get_services", stages=[ @@ -271,10 +271,10 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC): ), ) columns = [] - if isinstance(data_structure_details, DataFrame): - columns = get_columns(data_structure_details) - if isinstance(data_structure_details, list) and data_structure_details: - columns = get_columns(data_structure_details[0]) + column_parser = DataFrameColumnParser.create( + data_structure_details, SupportedTypes(metadata_entry.structureFormat) + ) + columns = column_parser.get_columns() return columns def _get_columns( diff --git a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py index 394e7744350..daad0a1ccc7 100644 --- a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py @@ -34,7 +34,10 @@ from metadata.profiler.metrics.core import MetricTypes from metadata.profiler.metrics.registry import Metrics from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, SAMPLE_DATA_DEFAULT_COUNT -from metadata.utils.datalake.datalake_utils import fetch_col_types, fetch_dataframe +from metadata.utils.datalake.datalake_utils import ( + GenericDataFrameColumnParser, + fetch_dataframe, +) from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.sqa_like_column import SQALikeColumn @@ -411,7 +414,9 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): sqalike_columns.append( SQALikeColumn( column_name, - fetch_col_types(self.complex_dataframe_sample[0], column_name), + GenericDataFrameColumnParser.fetch_col_types( + self.complex_dataframe_sample[0], column_name + ), ) ) return sqalike_columns diff --git a/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py b/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py index 60ede6fde0c..6aaf09acbde 100644 --- a/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py +++ b/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py @@ -30,6 +30,7 @@ from metadata.utils.logger import profiler_logger logger = profiler_logger() + # pylint: disable=too-many-locals class Histogram(HybridMetric): """ diff --git a/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py b/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py index 75fb14c647f..8d88b0d85d4 100644 --- a/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py +++ b/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py @@ -126,7 +126,6 @@ class AbstractTableMetricComputer(ABC): table: Table, where_clause: Optional[List[ColumnOperators]] = None, ): - query = select(*columns).select_from(table) if where_clause: query = query.where(*where_clause) diff --git a/ingestion/src/metadata/profiler/processor/processor.py b/ingestion/src/metadata/profiler/processor/processor.py index 0e414f11a80..af867b9dc15 100644 --- a/ingestion/src/metadata/profiler/processor/processor.py +++ b/ingestion/src/metadata/profiler/processor/processor.py @@ -40,7 +40,6 @@ class ProfilerProcessor(Processor): """ def __init__(self, config: OpenMetadataWorkflowConfig): - super().__init__() self.config = config @@ -56,7 +55,6 @@ class ProfilerProcessor(Processor): return "Profiler" def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]: - profiler_runner: Profiler = record.profiler_source.get_profiler_runner( record.entity, self.profiler_config ) diff --git a/ingestion/src/metadata/profiler/source/base/profiler_source.py b/ingestion/src/metadata/profiler/source/base/profiler_source.py index 0ef3994a29a..8a3a1f0b8d7 100644 --- a/ingestion/src/metadata/profiler/source/base/profiler_source.py +++ b/ingestion/src/metadata/profiler/source/base/profiler_source.py @@ -217,7 +217,6 @@ class ProfilerSource(ProfilerSourceInterface): def _get_context_entities( self, entity: Table ) -> Tuple[DatabaseSchema, Database, DatabaseService]: - schema_entity = None database_entity = None db_service = None diff --git a/ingestion/src/metadata/profiler/source/bigquery/type_mapper.py b/ingestion/src/metadata/profiler/source/bigquery/type_mapper.py index 0489a534f3c..2a6ba011577 100644 --- a/ingestion/src/metadata/profiler/source/bigquery/type_mapper.py +++ b/ingestion/src/metadata/profiler/source/bigquery/type_mapper.py @@ -29,7 +29,6 @@ def bigquery_type_mapper(_type_map: dict, col: Column): from sqlalchemy_bigquery import STRUCT def build_struct(_type_map: dict, col: Column): - structs = [] for child in col.children: if child.dataType != DataType.STRUCT: diff --git a/ingestion/src/metadata/readers/file/api_reader.py b/ingestion/src/metadata/readers/file/api_reader.py index 864651ab4ee..bf2dd0a645b 100644 --- a/ingestion/src/metadata/readers/file/api_reader.py +++ b/ingestion/src/metadata/readers/file/api_reader.py @@ -35,7 +35,6 @@ class ApiReader(Reader, ABC): """ def __init__(self, credentials: ReadersCredentials): - self._auth_headers = None self.credentials = credentials diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index a482f8bc0b9..724ffe1232c 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -15,8 +15,9 @@ from different auths and different file systems. """ import ast import json +import random import traceback -from typing import Dict, List, Optional, cast +from typing import Dict, List, Optional, Union, cast from metadata.generated.schema.entity.data.table import Column, DataType from metadata.ingestion.source.database.column_helpers import truncate_column_name @@ -29,18 +30,6 @@ from metadata.utils.logger import utils_logger logger = utils_logger() -DATALAKE_DATA_TYPES = { - **dict.fromkeys(["int64", "int", "int32"], DataType.INT), - "dict": DataType.JSON, - "list": DataType.ARRAY, - **dict.fromkeys(["float64", "float32", "float"], DataType.FLOAT), - "bool": DataType.BOOLEAN, - **dict.fromkeys( - ["datetime64", "timedelta[ns]", "datetime64[ns]"], DataType.DATETIME - ), - "str": DataType.STRING, -} - def fetch_dataframe( config_source, @@ -100,135 +89,379 @@ def get_file_format_type(key_name, metadata_entry=None): return False -def unique_json_structure(dicts: List[Dict]) -> Dict: - """Given a sample of `n` json objects, return a json object that represents the unique structure of all `n` objects. - Note that the type of the key will be that of the last object seen in the sample. +# pylint: disable=import-outside-toplevel +class DataFrameColumnParser: + """A column parser object. This serves as a Creator class for the appropriate column parser object parser + for datalake types. It allows us to implement different schema parsers for different datalake types without + implementing many conditionals statements. - Args: - dicts: list of json objects + e.g. if we want to implement a column parser for parquet files, we can simply implement a + ParquetDataFrameColumnParser class and add it as part of the `create` method. The `create` method will then return + the appropriate parser based on the file type. The `ColumnParser` class has a single entry point `get_columns` which + will call the `get_columns` method of the appropriate parser. """ - result = {} - for dict_ in dicts: - for key, value in dict_.items(): - if isinstance(value, dict): - nested_json = result.get(key, {}) - # `isinstance(nested_json, dict)` if for a key we first see a non dict value - # but then see a dict value later, we will consider the key to be a dict. - result[key] = unique_json_structure( - [nested_json if isinstance(nested_json, dict) else {}, value] - ) - else: - result[key] = value - return result + + def __init__(self, parser): + """Initialize the column parser object""" + self.parser = parser + + @classmethod + def create( + cls, + data_frame: "DataFrame", + file_type: Optional[SupportedTypes] = None, + sample: bool = True, + shuffle: bool = False, + ): + """Instantiate a column parser object with the appropriate parser + + Args: + data_frame: the dataframe object + file_type: the file type of the dataframe. Will be used to determine the appropriate parser. + sample: whether to sample the dataframe or not if we have a list of dataframes. + If sample is False, we will concatenate the dataframes, which can be cause OOM error for large dataset. + (default: True) + shuffle: whether to shuffle the dataframe list or not if sample is True. (default: False) + """ + data_frame = cls._get_data_frame(data_frame, sample, shuffle) + if file_type == SupportedTypes.PARQUET: + parser = ParquetDataFrameColumnParser(data_frame) + return cls(parser) + parser = GenericDataFrameColumnParser(data_frame) + return cls(parser) + + @staticmethod + def _get_data_frame( + data_frame: Union[List["DataFrame"], "DataFrame"], sample: bool, shuffle: bool + ): + """Return the dataframe to use for parsing""" + import pandas as pd + + if not isinstance(data_frame, list): + return data_frame + + if sample: + if shuffle: + random.shuffle(data_frame) + return data_frame[0] + + return pd.concat(data_frame) + + def get_columns(self): + """Get the columns from the parser""" + return self.parser.get_columns() -def construct_json_column_children(json_column: Dict) -> List[Dict]: - """Construt a dict representation of a Column object +class GenericDataFrameColumnParser: + """Given a dataframe object, parse the columns and return a list of Column objects. - Args: - json_column: unique json structure of a column + # TODO: We should consider making the function above part of the `GenericDataFrameColumnParser` class + # though we need to do a thorough overview of where they are used to ensure unnecessary coupling. """ - children = [] - for key, value in json_column.items(): - column = {} - type_ = type(value).__name__.lower() - column["dataTypeDisplay"] = DATALAKE_DATA_TYPES.get( - type_, DataType.UNKNOWN - ).value - column["dataType"] = DATALAKE_DATA_TYPES.get(type_, DataType.UNKNOWN).value - column["name"] = truncate_column_name(key) - column["displayName"] = key - if isinstance(value, dict): - column["children"] = construct_json_column_children(value) - children.append(column) - return children + _data_formats = { + **dict.fromkeys(["int64", "int", "int32"], DataType.INT), + "dict": DataType.JSON, + "list": DataType.ARRAY, + **dict.fromkeys(["float64", "float32", "float"], DataType.FLOAT), + "bool": DataType.BOOLEAN, + **dict.fromkeys( + ["datetime64", "timedelta[ns]", "datetime64[ns]"], DataType.DATETIME + ), + "str": DataType.STRING, + } + def __init__(self, data_frame: "DataFrame"): + self.data_frame = data_frame -def get_children(json_column) -> List[Dict]: - """Get children of json column. + def get_columns(self): + """ + method to process column details + """ + return self._get_columns(self.data_frame) - Args: - json_column (pandas.Series): column with 100 sample rows. - Sample rows will be used to infer children. - """ - from pandas import Series # pylint: disable=import-outside-toplevel + @classmethod + def _get_columns(cls, data_frame: "DataFrame"): + """ + method to process column details. - json_column = cast(Series, json_column) - try: - json_column = json_column.apply(json.loads) - except TypeError: - # if values are not strings, we will assume they are already json objects - # based on the read class logic - pass - json_structure = unique_json_structure(json_column.values.tolist()) + Note this was move from a function to a class method to bring it closer to the + `GenericDataFrameColumnParser` class. Should be rethought as part of the TODO. + """ + cols = [] + if hasattr(data_frame, "columns"): + df_columns = list(data_frame.columns) + for column in df_columns: + # use String by default + data_type = DataType.STRING + try: + if hasattr(data_frame[column], "dtypes"): + data_type = cls.fetch_col_types(data_frame, column_name=column) - return construct_json_column_children(json_structure) + parsed_string = { + "dataTypeDisplay": data_type.value, + "dataType": data_type, + "name": truncate_column_name(column), + "displayName": column, + } + if data_type == DataType.ARRAY: + parsed_string["arrayDataType"] = DataType.UNKNOWN + if data_type == DataType.JSON: + parsed_string["children"] = cls.get_children( + data_frame[column].dropna()[:100] + ) -def get_columns(data_frame: "DataFrame"): - """ - method to process column details - """ - cols = [] - if hasattr(data_frame, "columns"): - df_columns = list(data_frame.columns) - for column in df_columns: - # use String by default - data_type = DataType.STRING - try: - if hasattr(data_frame[column], "dtypes"): - data_type = fetch_col_types(data_frame, column_name=column) - - parsed_string = { - "dataTypeDisplay": data_type.value, - "dataType": data_type, - "name": truncate_column_name(column), - "displayName": column, - } - if data_type == DataType.ARRAY: - parsed_string["arrayDataType"] = DataType.UNKNOWN - - if data_type == DataType.JSON: - parsed_string["children"] = get_children( - data_frame[column].dropna()[:100] + cols.append(Column(**parsed_string)) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Unexpected exception parsing column [{column}]: {exc}" ) + return cols - cols.append(Column(**parsed_string)) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Unexpected exception parsing column [{column}]: {exc}") - return cols + @classmethod + def fetch_col_types(cls, data_frame, column_name): + """fetch_col_types: Fetch Column Type for the c + + Note this was move from a function to a class method to bring it closer to the + `GenericDataFrameColumnParser` class. Should be rethought as part of the TODO. + + Args: + data_frame (DataFrame) + column_name (string) + """ + data_type = None + try: + if data_frame[column_name].dtypes.name == "object" and any( + data_frame[column_name].dropna().values + ): + try: + # Safely evaluate the input string + df_row_val = data_frame[column_name].dropna().values[0] + parsed_object = ast.literal_eval(str(df_row_val)) + # Determine the data type of the parsed object + data_type = type(parsed_object).__name__.lower() + except (ValueError, SyntaxError): + # Handle any exceptions that may occur + data_type = "string" + + data_type = cls._data_formats.get( + data_type or data_frame[column_name].dtypes.name, DataType.STRING + ) + except Exception as err: + logger.warning( + f"Failed to distinguish data type for column {column_name}, Falling back to {data_type}, exc: {err}" + ) + logger.debug(traceback.format_exc()) + return data_type + + @classmethod + def unique_json_structure(cls, dicts: List[Dict]) -> Dict: + """Given a sample of `n` json objects, return a json object that represents the unique + structure of all `n` objects. Note that the type of the key will be that of + the last object seen in the sample. + + Args: + dicts: list of json objects + """ + result = {} + for dict_ in dicts: + for key, value in dict_.items(): + if isinstance(value, dict): + nested_json = result.get(key, {}) + # `isinstance(nested_json, dict)` if for a key we first see a non dict value + # but then see a dict value later, we will consider the key to be a dict. + result[key] = cls.unique_json_structure( + [nested_json if isinstance(nested_json, dict) else {}, value] + ) + else: + result[key] = value + return result + + @classmethod + def construct_json_column_children(cls, json_column: Dict) -> List[Dict]: + """Construt a dict representation of a Column object + + Args: + json_column: unique json structure of a column + """ + children = [] + for key, value in json_column.items(): + column = {} + type_ = type(value).__name__.lower() + column["dataTypeDisplay"] = cls._data_formats.get( + type_, DataType.UNKNOWN + ).value + column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value + column["name"] = truncate_column_name(key) + column["displayName"] = key + if isinstance(value, dict): + column["children"] = cls.construct_json_column_children(value) + children.append(column) + + return children + + @classmethod + def get_children(cls, json_column) -> List[Dict]: + """Get children of json column. + + Args: + json_column (pandas.Series): column with 100 sample rows. + Sample rows will be used to infer children. + """ + from pandas import Series # pylint: disable=import-outside-toplevel + + json_column = cast(Series, json_column) + try: + json_column = json_column.apply(json.loads) + except TypeError: + # if values are not strings, we will assume they are already json objects + # based on the read class logic + pass + json_structure = cls.unique_json_structure(json_column.values.tolist()) + + return cls.construct_json_column_children(json_structure) -def fetch_col_types(data_frame, column_name): - """fetch_col_types: Fetch Column Type for the c +# pylint: disable=import-outside-toplevel +class ParquetDataFrameColumnParser: + """Given a dataframe object generated from a parquet file, parse the columns and return a list of Column objects.""" - Args: - data_frame (DataFrame) - column_name (string) - """ - data_type = None - try: - if data_frame[column_name].dtypes.name == "object" and any( - data_frame[column_name].dropna().values + import pyarrow as pa + + _data_formats = { + **dict.fromkeys( + ["int8", "int16", "int32", "int64", "int", pa.DurationType], DataType.INT + ), + **dict.fromkeys(["uint8", "uint16", "uint32", "uint64", "uint"], DataType.UINT), + pa.StructType: DataType.STRUCT, + **dict.fromkeys([pa.ListType, pa.LargeListType], DataType.ARRAY), + **dict.fromkeys( + ["halffloat", "float32", "float64", "double", "float"], DataType.FLOAT + ), + "bool": DataType.BOOLEAN, + **dict.fromkeys( + [ + "datetime64", + "timedelta[ns]", + "datetime64[ns]", + "time32[s]", + "time32[ms]", + "time64[ns]", + "time64[us]", + pa.TimestampType, + "date64", + ], + DataType.DATETIME, + ), + "date32[day]": DataType.DATE, + "string": DataType.STRING, + **dict.fromkeys( + ["binary", "large_binary", pa.FixedSizeBinaryType], DataType.BINARY + ), + **dict.fromkeys([pa.Decimal128Type, pa.Decimal256Type], DataType.DECIMAL), + } + + def __init__(self, data_frame: "DataFrame"): + import pyarrow as pa + + self.data_frame = data_frame + self._arrow_table = pa.Table.from_pandas(self.data_frame) + + def get_columns(self): + """ + method to process column details for parquet files + """ + import pyarrow as pa + + schema: List[pa.Field] = self._arrow_table.schema + columns = [] + for column in schema: + parsed_column = { + "dataTypeDisplay": str(column.type), + "dataType": self._get_pq_data_type(column), + "name": truncate_column_name(column.name), + "displayName": column.name, + } + + if parsed_column["dataType"] == DataType.ARRAY: + try: + item_field = column.type.value_field + parsed_column["arrayDataType"] = self._get_pq_data_type(item_field) + except AttributeError: + # if the value field is not specified, we will set it to UNKNOWN + parsed_column["arrayDataType"] = DataType.UNKNOWN + + if parsed_column["dataType"] == DataType.BINARY: + try: + data_length = type(column.type).byte_width + except AttributeError: + # if the byte width is not specified, we will set it to -1 + # following pyarrow convention + data_length = -1 + parsed_column["dataLength"] = data_length + + if parsed_column["dataType"] == DataType.STRUCT: + parsed_column["children"] = self._get_children(column) + columns.append(Column(**parsed_column)) + + return columns + + def _get_children(self, column): + """For struct types, get the children of the column + + Args: + column (pa.Field): pa column + """ + field_idx = column.type.num_fields + + children = [] + for idx in range(field_idx): + child = column.type.field(idx) + data_type = self._get_pq_data_type(child) + + child_column = { + "dataTypeDisplay": str(child.type), + "dataType": data_type, + "name": truncate_column_name(child.name), + "displayName": child.name, + } + if data_type == DataType.STRUCT: + child_column["children"] = self._get_children(child) + children.append(child_column) + + return children + + def _get_pq_data_type(self, column): + """Given a column return the type of the column + + Args: + column (pa.Field): pa column + """ + import pyarrow as pa + + if isinstance( + column.type, + ( + pa.DurationType, + pa.StructType, + pa.ListType, + pa.LargeListType, + pa.TimestampType, + pa.Decimal128Type, + pa.Decimal256Type, + pa.FixedSizeBinaryType, + ), ): - try: - # Safely evaluate the input string - df_row_val = data_frame[column_name].dropna().values[0] - parsed_object = ast.literal_eval(str(df_row_val)) - # Determine the data type of the parsed object - data_type = type(parsed_object).__name__.lower() - except (ValueError, SyntaxError): - # Handle any exceptions that may occur - data_type = "string" + # the above type can take many shape + # (i.e. pa.ListType(pa.StructType([pa.column("a", pa.int64())])), etc,) + # so we'll use their type to determine the data type + data_type = self._data_formats.get(type(column.type), DataType.UNKNOWN) + else: + # for the other types we need to use their string representation + # to determine the data type as `type(column.type)` will return + # a generic `pyarrow.lib.DataType` + data_type = self._data_formats.get(str(column.type), DataType.UNKNOWN) - data_type = DATALAKE_DATA_TYPES.get( - data_type or data_frame[column_name].dtypes.name, DataType.STRING - ) - except Exception as err: - logger.warning( - f"Failed to distinguish data type for column {column_name}, Falling back to {data_type}, exc: {err}" - ) - logger.debug(traceback.format_exc()) - return data_type + return data_type diff --git a/ingestion/src/metadata/utils/secrets/client/loader.py b/ingestion/src/metadata/utils/secrets/client/loader.py index 688a2c2adcd..01871388439 100644 --- a/ingestion/src/metadata/utils/secrets/client/loader.py +++ b/ingestion/src/metadata/utils/secrets/client/loader.py @@ -63,7 +63,6 @@ def _(provider: SecretsManagerProvider) -> Optional[AWSCredentials]: @secrets_manager_client_loader.add(SecretsManagerClientLoader.env.value) def _(provider: SecretsManagerProvider) -> Optional[AWSCredentials]: - if provider in { SecretsManagerProvider.aws, SecretsManagerProvider.managed_aws, diff --git a/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py b/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py index 643e99594bc..0445b95d8ef 100644 --- a/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py +++ b/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py @@ -104,7 +104,6 @@ class SecretsManagerFactory(metaclass=Singleton): return self.secrets_manager def _load_secrets_manager_credentials(self) -> Optional["AWSCredentials"]: - if not self.secrets_manager_loader: return None diff --git a/ingestion/src/metadata/workflow/application.py b/ingestion/src/metadata/workflow/application.py index 07ce0688bb5..452e73abf5a 100644 --- a/ingestion/src/metadata/workflow/application.py +++ b/ingestion/src/metadata/workflow/application.py @@ -80,7 +80,6 @@ class ApplicationWorkflow(BaseWorkflow, ABC): runner: Optional[AppRunner] def __init__(self, config_dict: dict): - self.runner = None # Will be passed in post-init # TODO: Create a parse_gracefully method self.config = OpenMetadataApplicationConfig.parse_obj(config_dict) diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 5304d2a0ebe..fc51495fe28 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -235,7 +235,6 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): service = self._get_ingestion_pipeline_service() if service is not None: - return self.metadata.create_or_update( CreateIngestionPipelineRequest( name=pipeline_name, diff --git a/ingestion/src/metadata/workflow/metadata.py b/ingestion/src/metadata/workflow/metadata.py index 7626beb97db..97135819af6 100644 --- a/ingestion/src/metadata/workflow/metadata.py +++ b/ingestion/src/metadata/workflow/metadata.py @@ -31,7 +31,6 @@ class MetadataWorkflow(IngestionWorkflow): """ def set_steps(self): - # We keep the source registered in the workflow self.source = self._get_source() sink = self._get_sink() diff --git a/ingestion/src/metadata/workflow/usage.py b/ingestion/src/metadata/workflow/usage.py index 0e0fb56f8af..b6a7ac341b0 100644 --- a/ingestion/src/metadata/workflow/usage.py +++ b/ingestion/src/metadata/workflow/usage.py @@ -33,7 +33,6 @@ class UsageWorkflow(IngestionWorkflow): """ def set_steps(self): - # We keep the source registered in the workflow self.source = self._get_source() processor = self._get_processor() diff --git a/ingestion/tests/cli_e2e/test_cli_hive.py b/ingestion/tests/cli_e2e/test_cli_hive.py index 642695e4ecf..6c28e10229b 100644 --- a/ingestion/tests/cli_e2e/test_cli_hive.py +++ b/ingestion/tests/cli_e2e/test_cli_hive.py @@ -20,7 +20,6 @@ from .common_e2e_sqa_mixins import SQACommonMethods class HiveCliTest(CliCommonDB.TestSuite, SQACommonMethods): - prepare_e2e: List[str] = [ "DROP DATABASE IF EXISTS e2e_cli_tests CASCADE", "CREATE DATABASE e2e_cli_tests", diff --git a/ingestion/tests/cli_e2e/test_cli_metabase.py b/ingestion/tests/cli_e2e/test_cli_metabase.py index 977334ee3a7..71a581b6c32 100644 --- a/ingestion/tests/cli_e2e/test_cli_metabase.py +++ b/ingestion/tests/cli_e2e/test_cli_metabase.py @@ -20,7 +20,6 @@ from .common.test_cli_dashboard import CliCommonDashboard class MetabaseCliTest(CliCommonDashboard.TestSuite): - # in case we want to do something before running the tests def prepare(self) -> None: redshift_file_path = str( diff --git a/ingestion/tests/cli_e2e/test_cli_oracle.py b/ingestion/tests/cli_e2e/test_cli_oracle.py index c2a038694fb..c1dc65a7b74 100644 --- a/ingestion/tests/cli_e2e/test_cli_oracle.py +++ b/ingestion/tests/cli_e2e/test_cli_oracle.py @@ -23,7 +23,6 @@ from .common_e2e_sqa_mixins import SQACommonMethods class OracleCliTest(CliCommonDB.TestSuite, SQACommonMethods): - create_table_query: str = """ CREATE TABLE admin.admin_emp ( empno NUMBER(5) PRIMARY KEY, diff --git a/ingestion/tests/cli_e2e/test_cli_powerbi.py b/ingestion/tests/cli_e2e/test_cli_powerbi.py index 90a4d6271d0..dfcb80e9ff5 100644 --- a/ingestion/tests/cli_e2e/test_cli_powerbi.py +++ b/ingestion/tests/cli_e2e/test_cli_powerbi.py @@ -20,7 +20,6 @@ from .common.test_cli_dashboard import CliCommonDashboard class PowerBICliTest(CliCommonDashboard.TestSuite): - # in case we want to do something before running the tests def prepare(self) -> None: redshift_file_path = str( diff --git a/ingestion/tests/cli_e2e/test_cli_tableau.py b/ingestion/tests/cli_e2e/test_cli_tableau.py index 1127b4c21a8..4f305d16e26 100644 --- a/ingestion/tests/cli_e2e/test_cli_tableau.py +++ b/ingestion/tests/cli_e2e/test_cli_tableau.py @@ -20,7 +20,6 @@ from .common.test_cli_dashboard import CliCommonDashboard class TableauCliTest(CliCommonDashboard.TestSuite): - # in case we want to do something before running the tests def prepare(self) -> None: redshift_file_path = str( diff --git a/ingestion/tests/integration/airflow/test_lineage_runner.py b/ingestion/tests/integration/airflow/test_lineage_runner.py index d5853d303ad..d7d5dec16fe 100644 --- a/ingestion/tests/integration/airflow/test_lineage_runner.py +++ b/ingestion/tests/integration/airflow/test_lineage_runner.py @@ -157,7 +157,6 @@ class TestAirflowLineageRuner(TestCase): ) def test_lineage_runner(self): - with DAG("test_runner", start_date=datetime(2021, 1, 1)) as dag: BashOperator( task_id="print_date", diff --git a/ingestion/tests/integration/integration_base.py b/ingestion/tests/integration/integration_base.py index 1404a530edd..54f58191e1e 100644 --- a/ingestion/tests/integration/integration_base.py +++ b/ingestion/tests/integration/integration_base.py @@ -300,7 +300,6 @@ def get_create_test_case( def get_test_dag(name: str) -> DAG: """Get a DAG with the tasks created in the CreatePipelineRequest""" with DAG(name, start_date=datetime(2021, 1, 1)) as dag: - tasks = [ BashOperator( task_id=task_id, diff --git a/ingestion/tests/unit/metadata/utils/secrets/test_secrets_manager_factory.py b/ingestion/tests/unit/metadata/utils/secrets/test_secrets_manager_factory.py index 4d44fa294dc..4b105b8f41a 100644 --- a/ingestion/tests/unit/metadata/utils/secrets/test_secrets_manager_factory.py +++ b/ingestion/tests/unit/metadata/utils/secrets/test_secrets_manager_factory.py @@ -52,7 +52,6 @@ class TestSecretsManagerFactory(TestCase): ) def test_invalid_config_secret_manager(self): - om_connection: OpenMetadataConnection = self.build_open_metadata_connection( SecretsManagerProvider.db, SecretsManagerClientLoader.noop, diff --git a/ingestion/tests/unit/pii/test_column_name_scanner.py b/ingestion/tests/unit/pii/test_column_name_scanner.py index 19925759d3b..fe9dcfe9d7b 100644 --- a/ingestion/tests/unit/pii/test_column_name_scanner.py +++ b/ingestion/tests/unit/pii/test_column_name_scanner.py @@ -40,7 +40,6 @@ class ColumnNameScannerTest(TestCase): self.assertIsNone(ColumnNameScanner.scan("user_id")) def test_column_names_sensitive(self): - # Bank self.assertEqual(ColumnNameScanner.scan("bank_account"), EXPECTED_SENSITIVE) diff --git a/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py b/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py index f644dd8d4ac..f2bac2d75ce 100644 --- a/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py +++ b/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py @@ -62,7 +62,6 @@ class FakeConnection: class PandasInterfaceTest(TestCase): - import pandas as pd col_names = [ diff --git a/ingestion/tests/unit/profiler/test_profiler_interface.py b/ingestion/tests/unit/profiler/test_profiler_interface.py index bff960b031c..2c3026da0b8 100644 --- a/ingestion/tests/unit/profiler/test_profiler_interface.py +++ b/ingestion/tests/unit/profiler/test_profiler_interface.py @@ -328,7 +328,6 @@ class ProfilerInterfaceTest(TestCase): self.assertEqual(50, actual) def test_table_config_casting(self): - expected = TableConfig( profileSample=200, profileSampleType=ProfileSampleType.PERCENTAGE, diff --git a/ingestion/tests/unit/resources/datalake/example.parquet b/ingestion/tests/unit/resources/datalake/example.parquet new file mode 100644 index 0000000000000000000000000000000000000000..1c9d2a505685b4127da5e0fe3c59d3f495ea781b GIT binary patch literal 16364 zcmd5@UuawB6_*spae~(+Zlo!l+IepLEo+?Uk1Z$a+9g+iEk|~3$CfO0CFn}B{#gHF zNtWdduVo2G5xFaihUBXj(8Wv!Q1?Mw|gI_=AIt14gsa5<=M{I^FPu9%x8E zf_*yUX`>nSp5%H5kZK-49?&jp0pY}1j=^x`td4zYuow+zoF@s0NI>4vn|`Bb|IxFL z^%k>iXosLP6MzVz!Q*FZhA=5Z_|h@52)0snzO4_a~FZIMfXJbwbB0-aRAxwGpzlHMB#JDngra zVkcX5x}*Am`&W)0I|njAFMNiSNQas|X6fxdB37XSUt6+%0;|D^Ch|NXhh+U+&)zq) zFTMxKx@*CzGdXEI9Wokdbs4glAAIx8zo3?Yokxr&*~&81>@n}WA$lbhz#JdtD~*0o z$3C*if}%*^f2TKntY`0ipFMaCz$+za$o&?;-}!+Id`SlW_7m*ikM|6I$T-}z{J`+J zITfo9Ak|Dy49Zrk!^*!LlU0Q>0sWQU^oE|j@k4fh6oR`ovO|ulRr?3*_h)6mZpwh& zeUiQN#tiDeFkP_kSdF{suwkYvfyueKy|n8u?Ufxi>E0I=)sa;g=ZS;9uLF zwQgE>?g%iMj{plWw$+GNp0XH+n>C_JVBgT2-qy1Zf6Tr-hqbpgRMqAEh4b!&3{Fi3 z=g;TaFV7!3oIhTa!P%6-xqpG(zHsPp{%(`OsmtKJW@CS_9y*+tCuMN9WN_Ydu)o<4 z9nRZM8Jva;&fO{Y($p^C?5apG<5Lr`#7XBSm5mA1Nfj#O-J>&@pGWWLagiH2VmNv> zXwb1&FYi=~%Z7M-AfK&N2eQ>v0VXhF`buy5TF<_iW^YaRTsXd!>D~Q_YyV2)yag}P*292Xj}Lyjv-X@bB$I{|Kj`p<@snrSo6o}f z(;AHnW2>c8t&(lv(EmzsNHq`O69u+f9pJvY+751wpKmz02Upl2Y7lcdsuz+Kz3ek@Z&t)iSGpLf z{B)0QX|-A|CWs%*s6kxkXAda~1MJNJKy1nAkf`S9GY`g{I`d#9kx3QeMh5^KH^6NY z+{xg#IQ~TFy+?;mA#x%l#z%p0vPDS3of2}UB!;zd6OMq}C5ez8H{*yFAv69M5JrhW zL$l)+AY3KF5XxHD8BgGMUXoqR|NNBY8ITAvX+&v!B=qWt#llGhtUNv($|D1{ZZ=!Q zzM(S?qr9s0T^rYha4e^8wzcXKb*qiqkXlZI8R{_{a0Ww&$2nl8;)NGjsEKN{4w*!_ z274$|GHD#-fT5gpSlY0*)}o-qG~5)B=uBp3R5sh>%$1SU=D4 zte*51y`2tOZ>Krd+v$t-c3L{DV+%W^^F!bF{H)*m^aI*kvxC_W zvxC{gn*Mv%G{}ZbAFHj$$m1!`nb`QU?aHRLUy>G^sN6 zIIUX~fI^zmn5a}hqz(nyV)e380Z^x>klal}LaBjBN)7hueX2+*RS-$3V$!is73AM` zM-WM=f?K3L6n15(HRUXb)S;r*gD=ub6+}`|v2SN&lsbr{qGR9Q$SQRZNkzxL-LbCJ zL8J~HYxz>Vsx}eoczAkH-nIu8k<`?v&1IKriptp#sY6XQ3)N3i$#xaKC8Y);b!dP( znCd}GS*e9cDq58DXzHGxdcIhB0sFfZuA53#L^=Rf8%k9~>QL2MQT9<@XeDY7G$M8A z5F1rk3Kb#HQr*pDHRU{r)S+W{?ON2fEiEVPM@>>~U!k_K*t;6q@NJ~x z)zSuS3iDZUBfinVJz=>K-`FTosBNY;D%nyIsdn2%+eP~r?A^vBd&)a6PF{45-O`zd zg{7tEB2V)>T=Ux(t@yDEOM)OQ!Vi9~3Br`%P78iPNDH1c(cJE~{yGwUJN@o@h_~TM zQm8*D2z3F*!8Z!=+E5m>1>w0U=x|>O7V@=NAUw6|nBNphHLT2{AMh$k_~x;EbvwnGx@dTOukNXJ+qt9LOCCuT;%-;!kQHwt_qDU*z?JNe?7Xol-aStjI9!!k=e{Mo8`Q*axAj7 zzUpvh#K;QFOUOpEL9}H(8gYV*dAU8xVsUYbj{}>px}B?&OU?w?566KWZ1Xu)Th)@0 zt;!nsv)u-BS~kdO+aO;{dVP-mVc>XG{IK2~!(5|(7Ki0jvpq#ZlM7kD^8N!o@)CHyI9EQM8^_e|N4VN+A}C?NK$*ha}YQt zHSM9DzwP_bz3o#q@+GZR+xh-ZH8xz?cp;oi^88k*Wo`Yt%9E7VI#<+Q=h|biFEN^d zwOIB15G?q+?8mptiOD6A?|YY$eaxBW9$aOek@oF}>e~#Gd>~=ijs*#MGuQu11vImPcIFLr+$1T;PIe1-%wm0F1vus{i5rn`ZoP`A& zw%ypCUxc%-r-WzOH9YY8@hmK)vF#FIe5me){uMa>3UO@v1ZcxRXkWtP@Z6#Y`V)98 zPGZ|vhqkbe?FNbp)KL`56bcI)Rx2K-7i|e_12=r=kc~2-4QvbeCY|5$s2x~>33yh{ zZ(6O5)_4F8UHIUvbQC8gB-LGvfu2p|w|b|o*2}=>o1Z~;L?_|FIS53g;T$9%Uy4K5 zB^=-IUXsRpNIn{_IUe6Q|3(~Sg%&5exeDPGQYZOmz4-fLrcqeKCqi7cKTNii5k4pSl12F#u z;XLBQ{9G(iJSy@LsR88mmQeo80;Cf0uMS?1<@3iFCwoZvNECIUc%F*x z0!|s~8PKVeP(8g4HHZ77n?4L3H{_gDpLitMq_$Q)rTN8B{k0shB0j-QO0RjPwy&E# zb2z}>#3%SDUbdTUsh+CG4{3o%9p?)yWDn9$^&F5voI-yTR&`u}$tWZrUfpi)FIl}s z4jw94^Dsz31ld<-9xcZ=Zq`!O3yE_10$se1N*C*+{K_4EOBQ}Z_#po3suy&+1^9pd F{{z(h8PEU# literal 0 HcmV?d00001 diff --git a/ingestion/tests/unit/test_column_type_parser.py b/ingestion/tests/unit/test_column_type_parser.py index 3b55822c8f0..a58eb98623b 100644 --- a/ingestion/tests/unit/test_column_type_parser.py +++ b/ingestion/tests/unit/test_column_type_parser.py @@ -18,7 +18,7 @@ from unittest import TestCase from metadata.generated.schema.entity.data.table import DataType from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser -from metadata.utils.datalake.datalake_utils import fetch_col_types +from metadata.utils.datalake.datalake_utils import GenericDataFrameColumnParser COLUMN_TYPE_PARSE = [ "array", @@ -129,4 +129,6 @@ def test_check_datalake_type(): } df = pd.read_csv("ingestion/tests/unit/test_column_type_parser.csv") for column_name in df.columns.values.tolist(): - assert assert_col_type_dict.get(column_name) == fetch_col_types(df, column_name) + assert assert_col_type_dict.get( + column_name + ) == GenericDataFrameColumnParser.fetch_col_types(df, column_name) diff --git a/ingestion/tests/unit/test_pgspider_lineage_unit.py b/ingestion/tests/unit/test_pgspider_lineage_unit.py index ad7ce076ff7..52a27510305 100644 --- a/ingestion/tests/unit/test_pgspider_lineage_unit.py +++ b/ingestion/tests/unit/test_pgspider_lineage_unit.py @@ -622,7 +622,6 @@ class PGSpiderLineageUnitTests(TestCase): connection=self.postgres.service_connection, service_name=self.postgres.config.serviceName, ): - if isinstance(record, AddLineageRequest): requests.append(record) @@ -661,7 +660,6 @@ class PGSpiderLineageUnitTests(TestCase): connection=self.postgres.service_connection, service_name=self.postgres.config.serviceName, ): - if isinstance(record, AddLineageRequest): requests.append(record) @@ -700,7 +698,6 @@ class PGSpiderLineageUnitTests(TestCase): connection=self.postgres.service_connection, service_name=self.postgres.config.serviceName, ): - if isinstance(record, AddLineageRequest): requests.append(record) @@ -738,7 +735,6 @@ class PGSpiderLineageUnitTests(TestCase): connection=self.postgres.service_connection, service_name=self.postgres.config.serviceName, ): - if isinstance(record, AddLineageRequest): requests.append(record) @@ -773,7 +769,6 @@ class PGSpiderLineageUnitTests(TestCase): connection=self.postgres.service_connection, service_name=self.postgres.config.serviceName, ): - if isinstance(record, AddLineageRequest): requests.append(record) @@ -809,7 +804,6 @@ class PGSpiderLineageUnitTests(TestCase): connection=self.postgres.service_connection, service_name=self.postgres.config.serviceName, ): - if isinstance(record, AddLineageRequest): requests.append(record) diff --git a/ingestion/tests/unit/topology/database/test_datalake.py b/ingestion/tests/unit/topology/database/test_datalake.py index 8c45c7fcfe6..3819f886455 100644 --- a/ingestion/tests/unit/topology/database/test_datalake.py +++ b/ingestion/tests/unit/topology/database/test_datalake.py @@ -33,7 +33,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.readers.dataframe.avro import AvroDataFrameReader from metadata.readers.dataframe.json import JSONDataFrameReader -from metadata.utils.datalake.datalake_utils import get_columns +from metadata.utils.datalake.datalake_utils import GenericDataFrameColumnParser mock_datalake_config = { "source": { @@ -459,13 +459,17 @@ class DatalakeUnitTest(TestCase): actual_df_3 = JSONDataFrameReader.read_from_json( key="file.json", json_text=EXAMPLE_JSON_TEST_3, decode=True )[0] - actual_cols_3 = get_columns(actual_df_3) + actual_cols_3 = GenericDataFrameColumnParser._get_columns( + actual_df_3 + ) # pylint: disable=protected-access assert actual_cols_3 == EXAMPLE_JSON_COL_3 actual_df_4 = JSONDataFrameReader.read_from_json( key="file.json", json_text=EXAMPLE_JSON_TEST_4, decode=True )[0] - actual_cols_4 = get_columns(actual_df_4) + actual_cols_4 = GenericDataFrameColumnParser._get_columns( + actual_df_4 + ) # pylint: disable=protected-access assert actual_cols_4 == EXAMPLE_JSON_COL_4 def test_avro_file_parse(self): diff --git a/ingestion/tests/unit/topology/database/test_iceberg.py b/ingestion/tests/unit/topology/database/test_iceberg.py index 57a293dfa9f..168d09009e5 100644 --- a/ingestion/tests/unit/topology/database/test_iceberg.py +++ b/ingestion/tests/unit/topology/database/test_iceberg.py @@ -641,7 +641,6 @@ class IcebergUnitTest(TestCase): with patch.object( HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST ), patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()): - for i, table in enumerate(self.iceberg.get_tables_name_and_type()): self.assertEqual(table, EXPECTED_TABLE_LIST[i]) @@ -655,7 +654,6 @@ class IcebergUnitTest(TestCase): ), patch.object( HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table ): - self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0) # When pyiceberg.exceptions.NoSuchTableError is raised @@ -666,7 +664,6 @@ class IcebergUnitTest(TestCase): with patch.object( HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST ), patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table): - self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0) def test_get_owner_ref(self): @@ -802,7 +799,6 @@ class IcebergUnitTest(TestCase): with patch.object( OpenMetadata, "get_reference_by_email", return_value=ref ), patch.object(fqn, "build", return_value=fq_database_schema): - result = next(self.iceberg.yield_table((table_name, table_type))).right self.assertEqual(result, expected) diff --git a/ingestion/tests/unit/topology/pipeline/test_airbyte.py b/ingestion/tests/unit/topology/pipeline/test_airbyte.py index 334e5e42a65..1f4b32ff5b5 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airbyte.py +++ b/ingestion/tests/unit/topology/pipeline/test_airbyte.py @@ -192,7 +192,6 @@ class AirbyteUnitTest(TestCase): assert pipline == EXPECTED_CREATED_PIPELINES def test_pipeline_status(self): - status = [ either.right for either in self.airbyte.yield_pipeline_status(EXPECTED_ARIBYTE_DETAILS) diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index 11239bce2dd..3920821dd32 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -308,7 +308,6 @@ class StorageUnitTest(TestCase): ) ], ): - Column.__eq__ = custom_column_compare self.assertListEqual( [ diff --git a/ingestion/tests/unit/utils/test_datalake.py b/ingestion/tests/unit/utils/test_datalake.py index 428857ec3d9..6d1b5398dfd 100644 --- a/ingestion/tests/unit/utils/test_datalake.py +++ b/ingestion/tests/unit/utils/test_datalake.py @@ -12,12 +12,17 @@ Test datalake utils """ +import os from unittest import TestCase -from metadata.generated.schema.entity.data.table import Column +import pandas as pd + +from metadata.generated.schema.entity.data.table import Column, DataType +from metadata.readers.dataframe.reader_factory import SupportedTypes from metadata.utils.datalake.datalake_utils import ( - construct_json_column_children, - unique_json_structure, + DataFrameColumnParser, + GenericDataFrameColumnParser, + ParquetDataFrameColumnParser, ) STRUCTURE = { @@ -53,7 +58,7 @@ class TestDatalakeUtils(TestCase): ] expected = STRUCTURE - actual = unique_json_structure(sample_data) + actual = GenericDataFrameColumnParser.unique_json_structure(sample_data) self.assertDictEqual(expected, actual) @@ -153,14 +158,16 @@ class TestDatalakeUtils(TestCase): ], }, ] - actual = construct_json_column_children(STRUCTURE) + actual = GenericDataFrameColumnParser.construct_json_column_children(STRUCTURE) for el in zip(expected, actual): self.assertDictEqual(el[0], el[1]) def test_create_column_object(self): """test create column object fn""" - formatted_column = construct_json_column_children(STRUCTURE) + formatted_column = GenericDataFrameColumnParser.construct_json_column_children( + STRUCTURE + ) column = { "dataTypeDisplay": "STRING", "dataType": "STRING", @@ -170,3 +177,270 @@ class TestDatalakeUtils(TestCase): } column_obj = Column(**column) assert len(column_obj.children) == 3 + + +class TestParquetDataFrameColumnParser(TestCase): + """Test parquet dataframe column parser""" + + @classmethod + def setUpClass(cls) -> None: + resources_path = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "resources" + ) + cls.parquet_path = os.path.join(resources_path, "datalake", "example.parquet") + + cls.df = pd.read_parquet(cls.parquet_path) + + cls.parquet_parser = ParquetDataFrameColumnParser(cls.df) + + def test_parser_instantiation(self): + """Test the right parser is instantiated from the creator method""" + parquet_parser = DataFrameColumnParser.create(self.df, SupportedTypes.PARQUET) + self.assertIsInstance(parquet_parser.parser, ParquetDataFrameColumnParser) + + other_types = [typ for typ in SupportedTypes if typ != SupportedTypes.PARQUET] + for other_type in other_types: + with self.subTest(other_type=other_type): + generic_parser = DataFrameColumnParser.create(self.df, other_type) + self.assertIsInstance( + generic_parser.parser, GenericDataFrameColumnParser + ) + + def test_shuffle_and_sample_from_parser(self): + """test the shuffle and sampling logic from the parser creator method""" + parquet_parser = DataFrameColumnParser.create(self.df, SupportedTypes.PARQUET) + self.assertEqual(parquet_parser.parser.data_frame.shape, self.df.shape) + + parquet_parser = DataFrameColumnParser.create( + [self.df, self.df], SupportedTypes.PARQUET + ) + self.assertEqual(parquet_parser.parser.data_frame.shape, self.df.shape) + + parquet_parser = DataFrameColumnParser.create( + [self.df, self.df], SupportedTypes.PARQUET, sample=False + ) + self.assertEqual( + parquet_parser.parser.data_frame.shape, pd.concat([self.df, self.df]).shape + ) + + def test_get_columns(self): + """test `get_columns` method of the parquet column parser""" + expected = [ + Column( + dataTypeDisplay="bool", + dataType=DataType.BOOLEAN, + name="a", + displayName="a", + ), # type: ignore + Column( + dataTypeDisplay="int8", + dataType=DataType.INT, + name="b", + displayName="b", + ), # type: ignore + Column( + dataTypeDisplay="int16", + dataType=DataType.INT, + name="c", + displayName="c", + ), # type: ignore + Column( + dataTypeDisplay="int32", + dataType=DataType.INT, + name="d", + displayName="d", + ), # type: ignore + Column( + dataTypeDisplay="int64", + dataType=DataType.INT, + name="e", + displayName="e", + ), # type: ignore + Column( + dataTypeDisplay="uint8", + dataType=DataType.UINT, + name="f", + displayName="f", + ), # type: ignore + Column( + dataTypeDisplay="uint16", + dataType=DataType.UINT, + name="g", + displayName="g", + ), # type: ignore + Column( + dataTypeDisplay="uint32", + dataType=DataType.UINT, + name="h", + displayName="h", + ), # type: ignore + Column( + dataTypeDisplay="uint64", + dataType=DataType.UINT, + name="i", + displayName="i", + ), # type: ignore + Column( + dataTypeDisplay="float", + dataType=DataType.FLOAT, + name="k", + displayName="k", + ), # type: ignore + Column( + dataTypeDisplay="double", + dataType=DataType.FLOAT, + name="l", + displayName="l", + ), # type: ignore + Column( + dataTypeDisplay="time64[us]", + dataType=DataType.DATETIME, + name="n", + displayName="n", + ), # type: ignore + Column( + dataTypeDisplay="timestamp[ns]", + dataType=DataType.DATETIME, + name="o", + displayName="o", + ), # type: ignore + Column( + dataTypeDisplay="date32[day]", + dataType=DataType.DATE, + name="p", + displayName="p", + ), # type: ignore + Column( + dataTypeDisplay="date32[day]", + dataType=DataType.DATE, + name="q", + displayName="q", + ), # type: ignore + Column( + dataTypeDisplay="duration[ns]", + dataType=DataType.INT, + name="r", + displayName="r", + ), # type: ignore + Column( + dataTypeDisplay="binary", + dataType=DataType.BINARY, + name="t", + displayName="t", + ), # type: ignore + Column( + dataTypeDisplay="string", + dataType=DataType.STRING, + name="u", + displayName="u", + ), # type: ignore + Column( + dataTypeDisplay="string", + dataType=DataType.STRING, + name="v", + displayName="v", + ), # type: ignore + Column( + dataTypeDisplay="binary", + dataType=DataType.BINARY, + name="w", + displayName="w", + ), # type: ignore + Column( + dataTypeDisplay="string", + dataType=DataType.STRING, + name="x", + displayName="x", + ), # type: ignore + Column( + dataTypeDisplay="string", + dataType=DataType.STRING, + name="y", + displayName="y", + ), # type: ignore + Column( + dataTypeDisplay="list", + dataType=DataType.ARRAY, + name="aa", + displayName="aa", + ), # type: ignore + Column( + dataTypeDisplay="list", + dataType=DataType.ARRAY, + name="bb", + displayName="bb", + ), # type: ignore + Column( + dataTypeDisplay="struct>>", + dataType=DataType.STRUCT, + name="dd", + displayName="dd", + children=[ + Column( + dataTypeDisplay="int64", + dataType=DataType.INT, + name="ee", + displayName="ee", + ), # type: ignore + Column( + dataTypeDisplay="int64", + dataType=DataType.INT, + name="ff", + displayName="ff", + ), # type: ignore + Column( + dataTypeDisplay="struct>", + dataType=DataType.STRUCT, + name="gg", + displayName="gg", + children=[ + Column( + dataTypeDisplay="struct", + dataType=DataType.STRUCT, + name="hh", + displayName="hh", + children=[ + Column( + dataTypeDisplay="int64", + dataType=DataType.INT, + name="ii", + displayName="ii", + ), # type: ignore + Column( + dataTypeDisplay="int64", + dataType=DataType.INT, + name="jj", + displayName="jj", + ), # type: ignore + Column( + dataTypeDisplay="int64", + dataType=DataType.INT, + name="kk", + displayName="kk", + ), # type: ignore + ], + ), + ], + ), + ], + ), # type: ignore + ] + actual = self.parquet_parser.get_columns() + for validation in zip(expected, actual): + with self.subTest(validation=validation): + expected_col, actual_col = validation + self.assertEqual(expected_col.name, actual_col.name) + self.assertEqual(expected_col.displayName, actual_col.displayName) + self.assertEqual(expected_col.dataType, actual_col.dataType) + + def _validate_parsed_column(self, expected, actual): + """validate parsed column""" + self.assertEqual(expected.name, actual.name) + self.assertEqual(expected.dataType, actual.dataType) + self.assertEqual(expected.displayName, actual.displayName) + if expected.children: + self.assertEqual(len(expected.children), len(actual.children)) + for validation in zip(expected.children, actual.children): + with self.subTest(validation=validation): + expected_col, actual_col = validation + self._validate_parsed_column(expected_col, actual_col) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py index 75b61a1bdf5..8265269d5a0 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py @@ -110,7 +110,6 @@ def get_dagbag(): class ScanDagsTask(Process): def run(self): - if airflow_version >= "2.6": scheduler_job = self._run_new_scheduler_job() else: diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index 6f3c33086ec..15f8a6a671f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -149,7 +149,8 @@ "LARGEINT", "QUANTILE_STATE", "AGG_STATE", - "BITMAP" + "BITMAP", + "UINT" ] }, "constraint": {