From 1db18a50cfde00825a04b8a2c63c94fa6d5df615 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 18 Oct 2021 15:00:19 -0700 Subject: [PATCH] Issue-823: data profiler should have an option of configuring a profiler date (#831) * Issue-823: data profiler should have an option of configuring a profiler date * Fix error logging, remove stack trace * Issue-823: data profiler should have an option of configuring a profiler date * Issue-823: Fix formatting --- .../ingestion/ometa/openmetadata_rest.py | 21 +++++++------------ .../metadata/ingestion/source/sql_source.py | 7 +++++-- .../src/metadata/profiler/dataprofiler.py | 17 +++++++++++---- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py index 0ef63f553fa..b1cf5746314 100644 --- a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py +++ b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py @@ -19,6 +19,7 @@ import logging import time import uuid from typing import List, Optional +from urllib.error import HTTPError import google.auth import google.auth.transport.requests @@ -237,9 +238,7 @@ class OpenMetadataAPIClient(object): ) return DatabaseService(**resp) except APIError as err: - logger.error( - f"Error trying to GET the database service {service_name}", err - ) + logger.error(f"Error trying to GET the database service {service_name}") return None def get_database_service_by_id(self, service_id: str) -> DatabaseService: @@ -402,9 +401,7 @@ class OpenMetadataAPIClient(object): ) return MessagingService(**resp) except APIError as err: - logger.error( - f"Error trying to GET the messaging service {service_name}", err - ) + logger.error(f"Error trying to GET the messaging service {service_name}") return None def get_messaging_service_by_id(self, service_id: str) -> MessagingService: @@ -458,9 +455,7 @@ class OpenMetadataAPIClient(object): ) return DashboardService(**resp) except APIError as err: - logger.error( - f"Error trying to GET the dashboard service {service_name}", err - ) + logger.error(f"Error trying to GET the dashboard service {service_name}") return None def get_dashboard_service_by_id(self, service_id: str) -> DashboardService: @@ -479,7 +474,7 @@ class OpenMetadataAPIClient(object): return DashboardService(**resp) except APIError as err: logger.error( - f"Error trying to POST the dashboard service {dashboard_service}", err + f"Error trying to POST the dashboard service {dashboard_service}" ) return None @@ -548,9 +543,7 @@ class OpenMetadataAPIClient(object): ) return PipelineService(**resp) except APIError as err: - logger.error( - f"Error trying to GET the pipeline service {service_name}", err - ) + logger.error(f"Error trying to GET the pipeline service {service_name}") return None def get_pipeline_service_by_id(self, service_id: str) -> PipelineService: @@ -569,7 +562,7 @@ class OpenMetadataAPIClient(object): return PipelineService(**resp) except APIError as err: logger.error( - f"Error trying to POST the pipeline service {pipeline_service}", err + f"Error trying to POST the pipeline service {pipeline_service}" ) return None diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 1c61212a4a2..550a760761a 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -18,6 +18,7 @@ import traceback import uuid from abc import abstractmethod from dataclasses import dataclass, field +from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type from urllib.parse import quote_plus @@ -85,6 +86,7 @@ class SQLConnectionConfig(ConfigModel): include_tables: Optional[bool] = True generate_sample_data: Optional[bool] = True data_profiler_enabled: Optional[bool] = False + data_profiler_date: Optional[str] = datetime.now().strftime("%Y-%m-%d") data_profiler_offset: Optional[int] = 0 data_profiler_limit: Optional[int] = 50000 filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() @@ -383,9 +385,9 @@ class SQLSource(Source): self.status, dataset_name, column["type"] ) if col_type == "ARRAY": - if re.match(r"(?:\w*)(?:\()(\w*)(?:.*))", str(column["type"])): + if re.match(r"(?:\w*)(?:\()(\w*)(?:.*)", str(column["type"])): arr_data_type = re.match( - r"(?:\w*)(?:[(]*)(\w*)(?:.*))", str(column["type"]) + r"(?:\w*)(?:[(]*)(\w*)(?:.*)", str(column["type"]) ).groups() data_type_display = column["type"] col_constraint = None @@ -437,6 +439,7 @@ class SQLSource(Source): table = dataset_name profile = self.data_profiler.run_profiler( dataset_name=dataset_name, + profile_date=self.sql_config.data_profiler_date, schema=schema, table=table, limit=self.sql_config.data_profiler_limit, diff --git a/ingestion/src/metadata/profiler/dataprofiler.py b/ingestion/src/metadata/profiler/dataprofiler.py index 2346ecd4bd1..5bbb23b49b9 100644 --- a/ingestion/src/metadata/profiler/dataprofiler.py +++ b/ingestion/src/metadata/profiler/dataprofiler.py @@ -64,6 +64,7 @@ class DataProfiler: def run_profiler( self, dataset_name: str, + profile_date: str, schema: str = None, table: str = None, limit: int = None, @@ -81,7 +82,9 @@ class DataProfiler: } ) profile = self._parse_test_results_to_table_profile( - profile_test_results, dataset_name=dataset_name + profile_test_results, + dataset_name=dataset_name, + profile_date=profile_date, ) return profile except Exception as err: @@ -110,7 +113,10 @@ class DataProfiler: return result.expectation_config.kwargs.get("column") def _parse_test_results_to_table_profile( - self, profile_test_results: ExpectationSuiteValidationResult, dataset_name: str + self, + profile_test_results: ExpectationSuiteValidationResult, + dataset_name: str, + profile_date: str, ) -> TableProfile: profile = None column_profiles = [] @@ -119,7 +125,9 @@ class DataProfiler: ): if col is None: profile = self._parse_table_test_results( - col_test_result, dataset_name=dataset_name + col_test_result, + dataset_name=dataset_name, + profile_date=profile_date, ) else: column_profile = self._parse_column_test_results( @@ -135,8 +143,9 @@ class DataProfiler: self, table_test_results: Iterable[ExpectationValidationResult], dataset_name: str, + profile_date: str, ) -> TableProfile: - profile = TableProfile(profileDate=datetime.now().strftime("%Y-%m-%d")) + profile = TableProfile(profileDate=profile_date) for table_result in table_test_results: expectation: str = table_result.expectation_config.expectation_type result: dict = table_result.result