Fixes #5522 by implementing timeout for the profiler (#9340)

* feat(Profiler): Added timeout logic in the profiler workflow

* feat(Profiler): Added additional input in workflow UI

* fix(Profiler): timeout text blob in UI

* fix(Profiler): Return profile_results outside the context manager

* Fix(Profiler): Added localizer for timeout label and message

* Update ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py

Co-authored-by: Nahuel <nahuel@getcollate.io>

Co-authored-by: Nahuel <nahuel@getcollate.io>
This commit is contained in:
Teddy 2022-12-16 17:01:12 +01:00 committed by GitHub
parent 7aed6d340b
commit 3856d63f3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 157 additions and 24 deletions

View File

@ -40,6 +40,7 @@ class ProfilerInterfaceArgs(BaseModel):
table_sample_precentage: Optional[Union[float, int]]
table_sample_query: Optional[Union[int, str]]
table_partition_config: Optional[PartitionProfilerConfig]
timeout_seconds: Optional[int]
class Config:
arbitrary_types_allowed = True

View File

@ -39,6 +39,7 @@ from metadata.utils.connections import (
create_and_bind_thread_safe_session,
get_connection,
)
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
@ -86,6 +87,8 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol):
else None
)
self.timeout_seconds = profiler_interface_args.timeout_seconds
@staticmethod
def _session_factory(service_connection_config):
"""Create thread safe session that will be automatically
@ -369,35 +372,45 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol):
"""get all profiler metrics"""
logger.info(f"Computing metrics with {self._thread_count} threads.")
profile_results = {"table": dict(), "columns": defaultdict(dict)}
with concurrent.futures.ThreadPoolExecutor(
max_workers=self._thread_count
) as executor:
with CustomThreadPoolExecutor(max_workers=self._thread_count) as pool:
futures = [
executor.submit(
pool.submit(
self.compute_metrics_in_thread,
*metric_func,
)
for metric_func in metric_funcs
]
for future in concurrent.futures.as_completed(futures):
profile, column, metric_type = future.result()
if metric_type != MetricTypes.System.value and not isinstance(
profile, dict
):
profile = dict()
if metric_type == MetricTypes.Table.value:
profile_results["table"].update(profile)
elif metric_type == MetricTypes.System.value:
profile_results["system"] = profile
else:
profile_results["columns"][column].update(
{
"name": column,
"timestamp": datetime.now(tz=timezone.utc).timestamp(),
**profile,
}
)
for future in futures:
if future.cancelled():
continue
try:
profile, column, metric_type = future.result(
timeout=self.timeout_seconds
)
if metric_type != MetricTypes.System.value and not isinstance(
profile, dict
):
profile = dict()
if metric_type == MetricTypes.Table.value:
profile_results["table"].update(profile)
elif metric_type == MetricTypes.System.value:
profile_results["system"] = profile
else:
profile_results["columns"][column].update(
{
"name": column,
"timestamp": datetime.now(tz=timezone.utc).timestamp(),
**profile,
}
)
except concurrent.futures.TimeoutError as exc:
pool.shutdown39(wait=True, cancel_futures=True)
logger.debug(traceback.format_exc())
logger.error(f"Operation was cancelled due to TimeoutError - {exc}")
raise concurrent.futures.TimeoutError
return profile_results
def fetch_sample_data(self, table) -> TableData:

View File

@ -266,6 +266,7 @@ class ProfilerWorkflow(WorkflowStatusMixin):
table_partition_config=self.get_partition_details(self._table_entity)
if not self.get_profile_query(self._table_entity)
else None,
timeout_seconds=self.source_config.timeoutSeconds,
)
if isinstance(service_connection_config, DatalakeConnection):
return DataLakeProfilerInterface(self._profiler_interface_args)

View File

@ -0,0 +1,49 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Custom thread pool Executor
"""
import queue
from concurrent.futures import ThreadPoolExecutor
class CustomThreadPoolExecutor(ThreadPoolExecutor):
"""In python 3.9 shutdown will stop the pool Executor.
We replicate it here to add support in 3.7 and 3.8
"""
def shutdown39(self, wait=True, *, cancel_futures=False):
"""replicate shutdown from 3.9
Args:
wait (bool, optional): Defaults to True.
cancel_futures (bool, optional): Defaults to False.
"""
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for thread_ in self._threads:
thread_.join()

View File

@ -13,6 +13,7 @@
Test Profiler behavior
"""
import os
from concurrent.futures import TimeoutError
from datetime import datetime, timezone
from unittest import TestCase
from unittest.mock import patch
@ -230,6 +231,28 @@ class ProfilerTest(TestCase):
)
)
def test_profiler_with_timeout(self):
"""check timeout is properly used"""
with patch.object(
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQAProfilerInterface(
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=self.sqlite_conn,
table_entity=self.table_entity,
ometa_client=None,
timeout_seconds=0,
)
)
simple = DefaultProfiler(
profiler_interface=sqa_profiler_interface,
)
with pytest.raises(TimeoutError):
simple.compute_metrics()
@classmethod
def tearDownClass(cls) -> None:
os.remove(cls.db_path)

View File

@ -45,6 +45,11 @@
"description": "Number of threads to use during metric computations",
"type": "number",
"default": 5
},
"timeoutSeconds": {
"description": "Profiler Timeout in Seconds",
"type": "integer",
"default": 43200
}
},
"additionalProperties": false

View File

@ -197,6 +197,9 @@ const AddIngestion = ({
const [threadCount, setThreadCount] = useState(
(data?.sourceConfig.config as ConfigClass)?.threadCount ?? 5
);
const [timeoutSeconds, setTimeoutSeconds] = useState(
(data?.sourceConfig.config as ConfigClass)?.timeoutSeconds ?? 43200
);
const [dashboardFilterPattern, setDashboardFilterPattern] =
useState<FilterPattern>(
(data?.sourceConfig.config as ConfigClass)?.dashboardFilterPattern ??
@ -549,6 +552,7 @@ const AddIngestion = ({
generateSampleData: ingestSampleData,
profileSample: profileSample,
threadCount: threadCount,
timeoutSeconds: timeoutSeconds,
};
}
@ -749,6 +753,7 @@ const AddIngestion = ({
handleShowFilter={handleShowFilter}
handleStageFileLocation={(val) => setStageFileLocation(val)}
handleThreadCount={setThreadCount}
handleTimeoutSeconds={setTimeoutSeconds}
includeLineage={includeLineage}
includeTags={includeTag}
includeView={includeView}
@ -775,6 +780,7 @@ const AddIngestion = ({
stageFileLocation={stageFileLocation}
tableFilterPattern={tableFilterPattern}
threadCount={threadCount}
timeoutSeconds={timeoutSeconds}
topicFilterPattern={topicFilterPattern}
useFqnFilter={useFqnFilter}
onCancel={handleCancelClick}

View File

@ -103,6 +103,8 @@ const mockConfigureIngestion: ConfigureIngestionProps = {
handleDatasetServiceName: jest.fn(),
threadCount: 5,
handleThreadCount: jest.fn(),
timeoutSeconds: 43200,
handleTimeoutSeconds: jest.fn(),
useFqnFilter: false,
onUseFqnFilterClick: jest.fn(),
};

View File

@ -14,6 +14,7 @@
import { isNil } from 'lodash';
import { EditorContentRef } from 'Models';
import React, { Fragment, useRef, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { FilterPatternEnum } from '../../../enums/filterPattern.enum';
import { FormSubmitType } from '../../../enums/form.enum';
import { ServiceCategory } from '../../../enums/service.enum';
@ -57,6 +58,7 @@ const ConfigureIngestion = ({
queryLogDuration,
stageFileLocation,
threadCount,
timeoutSeconds,
resultLimit,
enableDebugLog,
profileSample,
@ -78,6 +80,7 @@ const ConfigureIngestion = ({
handleStageFileLocation,
handleResultLimit,
handleThreadCount,
handleTimeoutSeconds,
useFqnFilter,
onUseFqnFilterClick,
onCancel,
@ -85,6 +88,7 @@ const ConfigureIngestion = ({
formType,
}: ConfigureIngestionProps) => {
const markdownRef = useRef<EditorContentRef>();
const { t } = useTranslation();
const getIngestSampleToggle = (label: string, desc: string) => {
return (
@ -180,6 +184,27 @@ const ConfigureIngestion = ({
);
};
const getTimeoutSeconds = () => {
return (
<div>
<label>{t('label.profiler-timeout-seconds-label')}</label>
<p className="tw-text-grey-muted tw-mt-1 tw-mb-2 tw-text-sm">
{t('message.profiler-timeout-seconds-message')}
</p>
<input
className="tw-form-inputs tw-form-inputs-padding tw-w-24"
data-testid="timeoutSeconds"
id="timeoutSeconds"
name="timeoutSeconds"
placeholder="43200"
type="number"
value={timeoutSeconds}
onChange={(e) => handleTimeoutSeconds(parseInt(e.target.value))}
/>
</div>
);
};
const getDatabaseFieldToggles = () => {
return (
<>
@ -660,6 +685,8 @@ const ConfigureIngestion = ({
{getSeparator('')}
{getThreadCount()}
{getSeparator('')}
{getTimeoutSeconds()}
{getSeparator('')}
{getIngestSampleToggle(
'Ingest Sample Data',
'Extract sample data from each profile'

View File

@ -90,6 +90,7 @@ export interface ConfigureIngestionProps {
queryLogDuration: number;
stageFileLocation: string;
resultLimit: number;
timeoutSeconds: number;
handleIngestionName: (value: string) => void;
handleDatasetServiceName: (value: string[]) => void;
handleDescription?: (value: string) => void;
@ -109,6 +110,7 @@ export interface ConfigureIngestionProps {
handleStageFileLocation: (value: string) => void;
handleResultLimit: (value: number) => void;
handleThreadCount: (value: number) => void;
handleTimeoutSeconds: (value: number) => void;
onCancel: () => void;
onNext: () => void;
}

View File

@ -546,6 +546,7 @@
"table-profile": "Table Profile",
"column-profile": "Column Profile",
"applied-advanced-search": "Applied advanced search",
"profiler-timeout-seconds-label": "Timeout in Seconds",
"add-deploy": "Add & Deploy",
"dbt-Configuration-source": "DBT Configuration Source",
"select-dbt-source": "Select DBT Source",
@ -669,6 +670,7 @@
"announcement-created-successfully": "Announcement created successfully!",
"no-profiler-message": "Data Profiler is an optional configuration in Ingestion. Please enable the data profiler by following the documentation",
"advanced-search-message": "Discover the right data assets using the syntax editor with and/or conditions.",
"profiler-timeout-seconds-message": "Optional number setting the timeout in seconds for the profiler. If the timeout is reached the profiler will wait for any pending queries to terminated its execution.",
"instance-identifier": "Name that identifies this configuration instance uniquely.",
"fetch-dbt-files": "Available sources to fetch DBT catalog and manifest files."
},

View File

@ -426,7 +426,8 @@
"updated-by": "Mis à jour par",
"on-lowercase": "sur",
"webhook-display-text": "Webhook {{displayText}}",
"endpoint": "Point de Terminaison"
"endpoint": "Point de Terminaison",
"profiler-timeout-seconds-label": "Délai d'Attente en Secondes"
},
"message": {
"service-email-required": "Un Email pour le Service account est requis",
@ -519,7 +520,8 @@
"field-text-is-required": "{{fieldText}} est requis.",
"entity-owned-by-name": "Cette Resource appartient à {{entityOwner}}",
"and-followed-owned-by-name": "et l'équipe suivante appartient à {{userName}}",
"field-text-is-invalid": "{{fieldText}} est invalide."
"field-text-is-invalid": "{{fieldText}} est invalide.",
"profiler-timeout-seconds-message": "Le délai d'attente en seconde est optionel pour le profilage. Si le délai d'attente est atteint le profilage attendra la fin d'éxécution des requêtes qui on débutées pour terminer son éxécution."
},
"server": {
"no-followed-entities": "Vous ne suivez aucunne resources pour le moment.",