From d6f255b61ca04a6b1a76415700db5dbbf36b30d5 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 22 Nov 2022 13:11:32 +0100 Subject: [PATCH] Add ingestion workflow reporter (#8943) * Add ingestion workflow reporter --- .../metadata/examples/workflows/mlflow.yaml | 6 +- .../src/metadata/ingestion/api/workflow.py | 58 ++++++++++++++----- .../ingestion/bulksink/metadata_usage.py | 4 +- ingestion/src/metadata/timer/__init__.py | 0 .../src/metadata/timer/repeated_timer.py | 44 ++++++++++++++ .../src/metadata/timer/workflow_reporter.py | 58 +++++++++++++++++++ 6 files changed, 151 insertions(+), 19 deletions(-) create mode 100644 ingestion/src/metadata/timer/__init__.py create mode 100644 ingestion/src/metadata/timer/repeated_timer.py create mode 100644 ingestion/src/metadata/timer/workflow_reporter.py diff --git a/ingestion/src/metadata/examples/workflows/mlflow.yaml b/ingestion/src/metadata/examples/workflows/mlflow.yaml index f8cf6e4ac5f..9448e88959e 100644 --- a/ingestion/src/metadata/examples/workflows/mlflow.yaml +++ b/ingestion/src/metadata/examples/workflows/mlflow.yaml @@ -13,7 +13,9 @@ sink: type: metadata-rest config: {} workflowConfig: - loggerLevel: "DEBUG" openMetadataServerConfig: + enableVersionValidation: false hostPort: http://localhost:8585/api - authProvider: no-auth + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index e10a5ba28eb..a55b0a75678 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -37,6 +37,8 @@ from metadata.ingestion.api.source import Source from metadata.ingestion.api.stage import Stage from metadata.ingestion.models.custom_types import ServiceWithConnectionType from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.timer.repeated_timer import RepeatedTimer +from metadata.timer.workflow_reporter import get_ingestion_status_timer from metadata.utils.class_helper import ( get_service_class_from_service_type, get_service_type_from_source_type, @@ -52,6 +54,7 @@ logger = ingestion_logger() T = TypeVar("T") SUCCESS_THRESHOLD_VALUE = 90 +REPORTS_INTERVAL_SECONDS = 30 class InvalidWorkflowJSONException(Exception): @@ -83,6 +86,7 @@ class Workflow: Disabling pylint to wait for workflow reimplementation as a topology """ self.config = config + self._timer: Optional[RepeatedTimer] = None set_loggers_level(config.workflowConfig.loggerLevel.value) @@ -177,6 +181,16 @@ class Workflow: f"BulkSink type:{self.config.bulkSink.type},{bulk_sink_class} configured" ) + @property + def timer(self) -> RepeatedTimer: + """Status timer""" + if not self._timer: + self._timer = get_ingestion_status_timer( + interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self + ) + + return self._timer + def type_class_fetch(self, type_: str, is_file: bool): if is_file: return type_.replace("-", "_") @@ -199,22 +213,33 @@ class Workflow: return cls(config) def execute(self): - for record in self.source.next_record(): - self.report["Source"] = self.source.get_status().as_obj() - if hasattr(self, "processor"): - processed_record = self.processor.process(record) - else: - processed_record = record - if hasattr(self, "stage"): - self.stage.stage_record(processed_record) - self.report["Stage"] = self.stage.get_status().as_obj() - if hasattr(self, "sink"): - self.sink.write_record(processed_record) - self.report["sink"] = self.sink.get_status().as_obj() - if hasattr(self, "bulk_sink"): - self.stage.close() - self.bulk_sink.write_records() - self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() + """ + Pass each record from the source down the pipeline: + Source -> (Processor) -> Sink + or Source -> (Processor) -> Stage -> BulkSink + """ + self.timer.trigger() + + try: + for record in self.source.next_record(): + self.report["Source"] = self.source.get_status().as_obj() + if hasattr(self, "processor"): + processed_record = self.processor.process(record) + else: + processed_record = record + if hasattr(self, "stage"): + self.stage.stage_record(processed_record) + self.report["Stage"] = self.stage.get_status().as_obj() + if hasattr(self, "sink"): + self.sink.write_record(processed_record) + self.report["sink"] = self.sink.get_status().as_obj() + if hasattr(self, "bulk_sink"): + self.stage.close() + self.bulk_sink.write_records() + self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() + # Force resource closing. Required for killing the threading + finally: + self.stop() def stop(self): if hasattr(self, "processor"): @@ -232,6 +257,7 @@ class Workflow: pipeline_state = PipelineState.partialSuccess self.set_ingestion_pipeline_status(pipeline_state) self.source.close() + self.timer.stop() def _get_source_success(self): return self.source.get_status().calculate_success() diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 407917eb337..ecf2c59f2b1 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -22,6 +22,7 @@ import os import shutil import traceback from datetime import datetime +from pathlib import Path from typing import List, Optional from pydantic import ValidationError @@ -324,5 +325,6 @@ class MetadataUsageBulkSink(BulkSink): return self.status def close(self): - shutil.rmtree(self.config.filename) + if Path(self.config.filename).exists(): + shutil.rmtree(self.config.filename) self.metadata.close() diff --git a/ingestion/src/metadata/timer/__init__.py b/ingestion/src/metadata/timer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/timer/repeated_timer.py b/ingestion/src/metadata/timer/repeated_timer.py new file mode 100644 index 00000000000..d66e3ef2915 --- /dev/null +++ b/ingestion/src/metadata/timer/repeated_timer.py @@ -0,0 +1,44 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Define a threading repeated timer +""" + +import time +from threading import Event, Thread + + +class RepeatedTimer: + """Repeat `function` every `interval` seconds.""" + + def __init__(self, interval, function, *args, **kwargs): + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.start = time.time() + self.event = Event() + self.thread = Thread(target=self._target) + + def _target(self): + while not self.event.wait(self._time): + self.function(*self.args, **self.kwargs) + + @property + def _time(self): + return self.interval - ((time.time() - self.start) % self.interval) + + def trigger(self) -> None: + self.thread.start() + + def stop(self) -> None: + self.event.set() + self.thread.join() diff --git a/ingestion/src/metadata/timer/workflow_reporter.py b/ingestion/src/metadata/timer/workflow_reporter.py new file mode 100644 index 00000000000..cd8b6090383 --- /dev/null +++ b/ingestion/src/metadata/timer/workflow_reporter.py @@ -0,0 +1,58 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Prepare a timer to report on the workflow status +""" +import traceback +from logging import Logger + +from metadata.ingestion.api.bulk_sink import BulkSinkStatus +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus +from metadata.timer.repeated_timer import RepeatedTimer + + +def report_ingestion_status(logger: Logger, workflow: "Workflow") -> None: + """ + Given a logger, use it to INFO the workflow status + """ + try: + source_status: SourceStatus = workflow.source.get_status() + logger.info( + f"Source: Processed {len(source_status.success)} records," + f" filtered {len(source_status.filtered)} records," + f" found {len(source_status.failures)} errors" + ) + if hasattr(workflow, "sink"): + sink_status: SinkStatus = workflow.sink.get_status() + logger.info( + f"Sink: Processed {len(sink_status.records)} records," + f" found {len(sink_status.failures)} errors" + ) + if hasattr(workflow, "bulk_sink"): + bulk_sink_status: BulkSinkStatus = workflow.bulk_sink.get_status() + logger.info( + f"Bulk Sink: Processed {len(bulk_sink_status.records)} records," + f" found {len(bulk_sink_status.failures)} errors" + ) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Wild exception reporting status - {exc}") + + +def get_ingestion_status_timer( + interval: int, logger: Logger, workflow: "Workflow" +) -> RepeatedTimer: + """ + Prepare the threading Timer to execute the report_ingestion_status + """ + return RepeatedTimer(interval, report_ingestion_status, logger, workflow)