diff --git a/ingestion/src/metadata/interfaces/profiler_protocol.py b/ingestion/src/metadata/interfaces/profiler_protocol.py index ec9ea69770b..a6a0d36191e 100644 --- a/ingestion/src/metadata/interfaces/profiler_protocol.py +++ b/ingestion/src/metadata/interfaces/profiler_protocol.py @@ -40,6 +40,7 @@ class ProfilerInterfaceArgs(BaseModel): table_sample_precentage: Optional[Union[float, int]] table_sample_query: Optional[Union[int, str]] table_partition_config: Optional[PartitionProfilerConfig] + timeout_seconds: Optional[int] class Config: arbitrary_types_allowed = True diff --git a/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py b/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py index fcaf85362bf..925af61f26d 100644 --- a/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py +++ b/ingestion/src/metadata/interfaces/sqalchemy/sqa_profiler_interface.py @@ -39,6 +39,7 @@ from metadata.utils.connections import ( create_and_bind_thread_safe_session, get_connection, ) +from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger @@ -86,6 +87,8 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol): else None ) + self.timeout_seconds = profiler_interface_args.timeout_seconds + @staticmethod def _session_factory(service_connection_config): """Create thread safe session that will be automatically @@ -369,35 +372,45 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol): """get all profiler metrics""" logger.info(f"Computing metrics with {self._thread_count} threads.") profile_results = {"table": dict(), "columns": defaultdict(dict)} - with concurrent.futures.ThreadPoolExecutor( - max_workers=self._thread_count - ) as executor: + with CustomThreadPoolExecutor(max_workers=self._thread_count) as pool: futures = [ - executor.submit( + pool.submit( self.compute_metrics_in_thread, *metric_func, ) for metric_func in metric_funcs ] - for future in concurrent.futures.as_completed(futures): - profile, column, metric_type = future.result() - if metric_type != MetricTypes.System.value and not isinstance( - profile, dict - ): - profile = dict() - if metric_type == MetricTypes.Table.value: - profile_results["table"].update(profile) - elif metric_type == MetricTypes.System.value: - profile_results["system"] = profile - else: - profile_results["columns"][column].update( - { - "name": column, - "timestamp": datetime.now(tz=timezone.utc).timestamp(), - **profile, - } - ) + for future in futures: + if future.cancelled(): + continue + + try: + profile, column, metric_type = future.result( + timeout=self.timeout_seconds + ) + if metric_type != MetricTypes.System.value and not isinstance( + profile, dict + ): + profile = dict() + if metric_type == MetricTypes.Table.value: + profile_results["table"].update(profile) + elif metric_type == MetricTypes.System.value: + profile_results["system"] = profile + else: + profile_results["columns"][column].update( + { + "name": column, + "timestamp": datetime.now(tz=timezone.utc).timestamp(), + **profile, + } + ) + except concurrent.futures.TimeoutError as exc: + pool.shutdown39(wait=True, cancel_futures=True) + logger.debug(traceback.format_exc()) + logger.error(f"Operation was cancelled due to TimeoutError - {exc}") + raise concurrent.futures.TimeoutError + return profile_results def fetch_sample_data(self, table) -> TableData: diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index a8ed0bef89e..e3a8c424157 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -266,6 +266,7 @@ class ProfilerWorkflow(WorkflowStatusMixin): table_partition_config=self.get_partition_details(self._table_entity) if not self.get_profile_query(self._table_entity) else None, + timeout_seconds=self.source_config.timeoutSeconds, ) if isinstance(service_connection_config, DatalakeConnection): return DataLakeProfilerInterface(self._profiler_interface_args) diff --git a/ingestion/src/metadata/utils/custom_thread_pool.py b/ingestion/src/metadata/utils/custom_thread_pool.py new file mode 100644 index 00000000000..f2c92df62b6 --- /dev/null +++ b/ingestion/src/metadata/utils/custom_thread_pool.py @@ -0,0 +1,49 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Custom thread pool Executor +""" + +import queue +from concurrent.futures import ThreadPoolExecutor + + +class CustomThreadPoolExecutor(ThreadPoolExecutor): + """In python 3.9 shutdown will stop the pool Executor. + We replicate it here to add support in 3.7 and 3.8 + """ + + def shutdown39(self, wait=True, *, cancel_futures=False): + """replicate shutdown from 3.9 + + Args: + wait (bool, optional): Defaults to True. + cancel_futures (bool, optional): Defaults to False. + """ + with self._shutdown_lock: + self._shutdown = True + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. + self._work_queue.put(None) + if wait: + for thread_ in self._threads: + thread_.join() diff --git a/ingestion/tests/unit/profiler/test_profiler.py b/ingestion/tests/unit/profiler/test_profiler.py index 1fb594dafdc..a3a76940347 100644 --- a/ingestion/tests/unit/profiler/test_profiler.py +++ b/ingestion/tests/unit/profiler/test_profiler.py @@ -13,6 +13,7 @@ Test Profiler behavior """ import os +from concurrent.futures import TimeoutError from datetime import datetime, timezone from unittest import TestCase from unittest.mock import patch @@ -230,6 +231,28 @@ class ProfilerTest(TestCase): ) ) + def test_profiler_with_timeout(self): + """check timeout is properly used""" + + with patch.object( + SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User + ): + sqa_profiler_interface = SQAProfilerInterface( + profiler_interface_args=ProfilerInterfaceArgs( + service_connection_config=self.sqlite_conn, + table_entity=self.table_entity, + ometa_client=None, + timeout_seconds=0, + ) + ) + + simple = DefaultProfiler( + profiler_interface=sqa_profiler_interface, + ) + + with pytest.raises(TimeoutError): + simple.compute_metrics() + @classmethod def tearDownClass(cls) -> None: os.remove(cls.db_path) diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json index 9e34800bb01..46bfb2110b6 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json @@ -45,6 +45,11 @@ "description": "Number of threads to use during metric computations", "type": "number", "default": 5 + }, + "timeoutSeconds": { + "description": "Profiler Timeout in Seconds", + "type": "integer", + "default": 43200 } }, "additionalProperties": false diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx index 6f833fe4cdb..8976a8fc931 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx @@ -197,6 +197,9 @@ const AddIngestion = ({ const [threadCount, setThreadCount] = useState( (data?.sourceConfig.config as ConfigClass)?.threadCount ?? 5 ); + const [timeoutSeconds, setTimeoutSeconds] = useState( + (data?.sourceConfig.config as ConfigClass)?.timeoutSeconds ?? 43200 + ); const [dashboardFilterPattern, setDashboardFilterPattern] = useState( (data?.sourceConfig.config as ConfigClass)?.dashboardFilterPattern ?? @@ -549,6 +552,7 @@ const AddIngestion = ({ generateSampleData: ingestSampleData, profileSample: profileSample, threadCount: threadCount, + timeoutSeconds: timeoutSeconds, }; } @@ -749,6 +753,7 @@ const AddIngestion = ({ handleShowFilter={handleShowFilter} handleStageFileLocation={(val) => setStageFileLocation(val)} handleThreadCount={setThreadCount} + handleTimeoutSeconds={setTimeoutSeconds} includeLineage={includeLineage} includeTags={includeTag} includeView={includeView} @@ -775,6 +780,7 @@ const AddIngestion = ({ stageFileLocation={stageFileLocation} tableFilterPattern={tableFilterPattern} threadCount={threadCount} + timeoutSeconds={timeoutSeconds} topicFilterPattern={topicFilterPattern} useFqnFilter={useFqnFilter} onCancel={handleCancelClick} diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx index d49be769d72..107c69867e0 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx @@ -103,6 +103,8 @@ const mockConfigureIngestion: ConfigureIngestionProps = { handleDatasetServiceName: jest.fn(), threadCount: 5, handleThreadCount: jest.fn(), + timeoutSeconds: 43200, + handleTimeoutSeconds: jest.fn(), useFqnFilter: false, onUseFqnFilterClick: jest.fn(), }; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx index 2f11daa69d5..b6bacb78554 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx @@ -14,6 +14,7 @@ import { isNil } from 'lodash'; import { EditorContentRef } from 'Models'; import React, { Fragment, useRef, useState } from 'react'; +import { useTranslation } from 'react-i18next'; import { FilterPatternEnum } from '../../../enums/filterPattern.enum'; import { FormSubmitType } from '../../../enums/form.enum'; import { ServiceCategory } from '../../../enums/service.enum'; @@ -57,6 +58,7 @@ const ConfigureIngestion = ({ queryLogDuration, stageFileLocation, threadCount, + timeoutSeconds, resultLimit, enableDebugLog, profileSample, @@ -78,6 +80,7 @@ const ConfigureIngestion = ({ handleStageFileLocation, handleResultLimit, handleThreadCount, + handleTimeoutSeconds, useFqnFilter, onUseFqnFilterClick, onCancel, @@ -85,6 +88,7 @@ const ConfigureIngestion = ({ formType, }: ConfigureIngestionProps) => { const markdownRef = useRef(); + const { t } = useTranslation(); const getIngestSampleToggle = (label: string, desc: string) => { return ( @@ -180,6 +184,27 @@ const ConfigureIngestion = ({ ); }; + const getTimeoutSeconds = () => { + return ( +
+ +

+ {t('message.profiler-timeout-seconds-message')} +

+ handleTimeoutSeconds(parseInt(e.target.value))} + /> +
+ ); + }; + const getDatabaseFieldToggles = () => { return ( <> @@ -660,6 +685,8 @@ const ConfigureIngestion = ({ {getSeparator('')} {getThreadCount()} {getSeparator('')} + {getTimeoutSeconds()} + {getSeparator('')} {getIngestSampleToggle( 'Ingest Sample Data', 'Extract sample data from each profile' diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts index 497f2288d08..80d676e0d11 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts @@ -90,6 +90,7 @@ export interface ConfigureIngestionProps { queryLogDuration: number; stageFileLocation: string; resultLimit: number; + timeoutSeconds: number; handleIngestionName: (value: string) => void; handleDatasetServiceName: (value: string[]) => void; handleDescription?: (value: string) => void; @@ -109,6 +110,7 @@ export interface ConfigureIngestionProps { handleStageFileLocation: (value: string) => void; handleResultLimit: (value: number) => void; handleThreadCount: (value: number) => void; + handleTimeoutSeconds: (value: number) => void; onCancel: () => void; onNext: () => void; } diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json index 800ba38c4b1..cf7b0bd0df6 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json @@ -546,6 +546,7 @@ "table-profile": "Table Profile", "column-profile": "Column Profile", "applied-advanced-search": "Applied advanced search", + "profiler-timeout-seconds-label": "Timeout in Seconds", "add-deploy": "Add & Deploy", "dbt-Configuration-source": "DBT Configuration Source", "select-dbt-source": "Select DBT Source", @@ -669,6 +670,7 @@ "announcement-created-successfully": "Announcement created successfully!", "no-profiler-message": "Data Profiler is an optional configuration in Ingestion. Please enable the data profiler by following the documentation", "advanced-search-message": "Discover the right data assets using the syntax editor with and/or conditions.", + "profiler-timeout-seconds-message": "Optional number setting the timeout in seconds for the profiler. If the timeout is reached the profiler will wait for any pending queries to terminated its execution.", "instance-identifier": "Name that identifies this configuration instance uniquely.", "fetch-dbt-files": "Available sources to fetch DBT catalog and manifest files." }, diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json index 33080d371a4..db84a363c08 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json @@ -426,7 +426,8 @@ "updated-by": "Mis à jour par", "on-lowercase": "sur", "webhook-display-text": "Webhook {{displayText}}", - "endpoint": "Point de Terminaison" + "endpoint": "Point de Terminaison", + "profiler-timeout-seconds-label": "Délai d'Attente en Secondes" }, "message": { "service-email-required": "Un Email pour le Service account est requis", @@ -519,7 +520,8 @@ "field-text-is-required": "{{fieldText}} est requis.", "entity-owned-by-name": "Cette Resource appartient à {{entityOwner}}", "and-followed-owned-by-name": "et l'équipe suivante appartient à {{userName}}", - "field-text-is-invalid": "{{fieldText}} est invalide." + "field-text-is-invalid": "{{fieldText}} est invalide.", + "profiler-timeout-seconds-message": "Le délai d'attente en seconde est optionel pour le profilage. Si le délai d'attente est atteint le profilage attendra la fin d'éxécution des requêtes qui on débutées pour terminer son éxécution." }, "server": { "no-followed-entities": "Vous ne suivez aucunne resources pour le moment.",