mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-10 01:21:53 +00:00
Issue-823: data profiler should have an option of configuring a profiler date (#831)
* Issue-823: data profiler should have an option of configuring a profiler date * Fix error logging, remove stack trace * Issue-823: data profiler should have an option of configuring a profiler date * Issue-823: Fix formatting
This commit is contained in:
parent
95181c1313
commit
1db18a50cf
@ -19,6 +19,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
|
||||||
import google.auth
|
import google.auth
|
||||||
import google.auth.transport.requests
|
import google.auth.transport.requests
|
||||||
@ -237,9 +238,7 @@ class OpenMetadataAPIClient(object):
|
|||||||
)
|
)
|
||||||
return DatabaseService(**resp)
|
return DatabaseService(**resp)
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
logger.error(
|
logger.error(f"Error trying to GET the database service {service_name}")
|
||||||
f"Error trying to GET the database service {service_name}", err
|
|
||||||
)
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_database_service_by_id(self, service_id: str) -> DatabaseService:
|
def get_database_service_by_id(self, service_id: str) -> DatabaseService:
|
||||||
@ -402,9 +401,7 @@ class OpenMetadataAPIClient(object):
|
|||||||
)
|
)
|
||||||
return MessagingService(**resp)
|
return MessagingService(**resp)
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
logger.error(
|
logger.error(f"Error trying to GET the messaging service {service_name}")
|
||||||
f"Error trying to GET the messaging service {service_name}", err
|
|
||||||
)
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_messaging_service_by_id(self, service_id: str) -> MessagingService:
|
def get_messaging_service_by_id(self, service_id: str) -> MessagingService:
|
||||||
@ -458,9 +455,7 @@ class OpenMetadataAPIClient(object):
|
|||||||
)
|
)
|
||||||
return DashboardService(**resp)
|
return DashboardService(**resp)
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
logger.error(
|
logger.error(f"Error trying to GET the dashboard service {service_name}")
|
||||||
f"Error trying to GET the dashboard service {service_name}", err
|
|
||||||
)
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_dashboard_service_by_id(self, service_id: str) -> DashboardService:
|
def get_dashboard_service_by_id(self, service_id: str) -> DashboardService:
|
||||||
@ -479,7 +474,7 @@ class OpenMetadataAPIClient(object):
|
|||||||
return DashboardService(**resp)
|
return DashboardService(**resp)
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Error trying to POST the dashboard service {dashboard_service}", err
|
f"Error trying to POST the dashboard service {dashboard_service}"
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -548,9 +543,7 @@ class OpenMetadataAPIClient(object):
|
|||||||
)
|
)
|
||||||
return PipelineService(**resp)
|
return PipelineService(**resp)
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
logger.error(
|
logger.error(f"Error trying to GET the pipeline service {service_name}")
|
||||||
f"Error trying to GET the pipeline service {service_name}", err
|
|
||||||
)
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_pipeline_service_by_id(self, service_id: str) -> PipelineService:
|
def get_pipeline_service_by_id(self, service_id: str) -> PipelineService:
|
||||||
@ -569,7 +562,7 @@ class OpenMetadataAPIClient(object):
|
|||||||
return PipelineService(**resp)
|
return PipelineService(**resp)
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Error trying to POST the pipeline service {pipeline_service}", err
|
f"Error trying to POST the pipeline service {pipeline_service}"
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ import traceback
|
|||||||
import uuid
|
import uuid
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime
|
||||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
|
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
|
||||||
from urllib.parse import quote_plus
|
from urllib.parse import quote_plus
|
||||||
|
|
||||||
@ -85,6 +86,7 @@ class SQLConnectionConfig(ConfigModel):
|
|||||||
include_tables: Optional[bool] = True
|
include_tables: Optional[bool] = True
|
||||||
generate_sample_data: Optional[bool] = True
|
generate_sample_data: Optional[bool] = True
|
||||||
data_profiler_enabled: Optional[bool] = False
|
data_profiler_enabled: Optional[bool] = False
|
||||||
|
data_profiler_date: Optional[str] = datetime.now().strftime("%Y-%m-%d")
|
||||||
data_profiler_offset: Optional[int] = 0
|
data_profiler_offset: Optional[int] = 0
|
||||||
data_profiler_limit: Optional[int] = 50000
|
data_profiler_limit: Optional[int] = 50000
|
||||||
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
||||||
@ -383,9 +385,9 @@ class SQLSource(Source):
|
|||||||
self.status, dataset_name, column["type"]
|
self.status, dataset_name, column["type"]
|
||||||
)
|
)
|
||||||
if col_type == "ARRAY":
|
if col_type == "ARRAY":
|
||||||
if re.match(r"(?:\w*)(?:\()(\w*)(?:.*))", str(column["type"])):
|
if re.match(r"(?:\w*)(?:\()(\w*)(?:.*)", str(column["type"])):
|
||||||
arr_data_type = re.match(
|
arr_data_type = re.match(
|
||||||
r"(?:\w*)(?:[(]*)(\w*)(?:.*))", str(column["type"])
|
r"(?:\w*)(?:[(]*)(\w*)(?:.*)", str(column["type"])
|
||||||
).groups()
|
).groups()
|
||||||
data_type_display = column["type"]
|
data_type_display = column["type"]
|
||||||
col_constraint = None
|
col_constraint = None
|
||||||
@ -437,6 +439,7 @@ class SQLSource(Source):
|
|||||||
table = dataset_name
|
table = dataset_name
|
||||||
profile = self.data_profiler.run_profiler(
|
profile = self.data_profiler.run_profiler(
|
||||||
dataset_name=dataset_name,
|
dataset_name=dataset_name,
|
||||||
|
profile_date=self.sql_config.data_profiler_date,
|
||||||
schema=schema,
|
schema=schema,
|
||||||
table=table,
|
table=table,
|
||||||
limit=self.sql_config.data_profiler_limit,
|
limit=self.sql_config.data_profiler_limit,
|
||||||
|
@ -64,6 +64,7 @@ class DataProfiler:
|
|||||||
def run_profiler(
|
def run_profiler(
|
||||||
self,
|
self,
|
||||||
dataset_name: str,
|
dataset_name: str,
|
||||||
|
profile_date: str,
|
||||||
schema: str = None,
|
schema: str = None,
|
||||||
table: str = None,
|
table: str = None,
|
||||||
limit: int = None,
|
limit: int = None,
|
||||||
@ -81,7 +82,9 @@ class DataProfiler:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
profile = self._parse_test_results_to_table_profile(
|
profile = self._parse_test_results_to_table_profile(
|
||||||
profile_test_results, dataset_name=dataset_name
|
profile_test_results,
|
||||||
|
dataset_name=dataset_name,
|
||||||
|
profile_date=profile_date,
|
||||||
)
|
)
|
||||||
return profile
|
return profile
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
@ -110,7 +113,10 @@ class DataProfiler:
|
|||||||
return result.expectation_config.kwargs.get("column")
|
return result.expectation_config.kwargs.get("column")
|
||||||
|
|
||||||
def _parse_test_results_to_table_profile(
|
def _parse_test_results_to_table_profile(
|
||||||
self, profile_test_results: ExpectationSuiteValidationResult, dataset_name: str
|
self,
|
||||||
|
profile_test_results: ExpectationSuiteValidationResult,
|
||||||
|
dataset_name: str,
|
||||||
|
profile_date: str,
|
||||||
) -> TableProfile:
|
) -> TableProfile:
|
||||||
profile = None
|
profile = None
|
||||||
column_profiles = []
|
column_profiles = []
|
||||||
@ -119,7 +125,9 @@ class DataProfiler:
|
|||||||
):
|
):
|
||||||
if col is None:
|
if col is None:
|
||||||
profile = self._parse_table_test_results(
|
profile = self._parse_table_test_results(
|
||||||
col_test_result, dataset_name=dataset_name
|
col_test_result,
|
||||||
|
dataset_name=dataset_name,
|
||||||
|
profile_date=profile_date,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
column_profile = self._parse_column_test_results(
|
column_profile = self._parse_column_test_results(
|
||||||
@ -135,8 +143,9 @@ class DataProfiler:
|
|||||||
self,
|
self,
|
||||||
table_test_results: Iterable[ExpectationValidationResult],
|
table_test_results: Iterable[ExpectationValidationResult],
|
||||||
dataset_name: str,
|
dataset_name: str,
|
||||||
|
profile_date: str,
|
||||||
) -> TableProfile:
|
) -> TableProfile:
|
||||||
profile = TableProfile(profileDate=datetime.now().strftime("%Y-%m-%d"))
|
profile = TableProfile(profileDate=profile_date)
|
||||||
for table_result in table_test_results:
|
for table_result in table_test_results:
|
||||||
expectation: str = table_result.expectation_config.expectation_type
|
expectation: str = table_result.expectation_config.expectation_type
|
||||||
result: dict = table_result.result
|
result: dict = table_result.result
|
||||||
|
Loading…
x
Reference in New Issue
Block a user