Add ingestion workflow reporter (#8943)

* Add ingestion workflow reporter
This commit is contained in:
Pere Miquel Brull 2022-11-22 13:11:32 +01:00 committed by GitHub
parent f3ac6f53e6
commit d6f255b61c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 151 additions and 19 deletions

View File

@ -13,7 +13,9 @@ sink:
type: metadata-rest type: metadata-rest
config: {} config: {}
workflowConfig: workflowConfig:
loggerLevel: "DEBUG"
openMetadataServerConfig: openMetadataServerConfig:
enableVersionValidation: false
hostPort: http://localhost:8585/api hostPort: http://localhost:8585/api
authProvider: no-auth authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -37,6 +37,8 @@ from metadata.ingestion.api.source import Source
from metadata.ingestion.api.stage import Stage from metadata.ingestion.api.stage import Stage
from metadata.ingestion.models.custom_types import ServiceWithConnectionType from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.timer.repeated_timer import RepeatedTimer
from metadata.timer.workflow_reporter import get_ingestion_status_timer
from metadata.utils.class_helper import ( from metadata.utils.class_helper import (
get_service_class_from_service_type, get_service_class_from_service_type,
get_service_type_from_source_type, get_service_type_from_source_type,
@ -52,6 +54,7 @@ logger = ingestion_logger()
T = TypeVar("T") T = TypeVar("T")
SUCCESS_THRESHOLD_VALUE = 90 SUCCESS_THRESHOLD_VALUE = 90
REPORTS_INTERVAL_SECONDS = 30
class InvalidWorkflowJSONException(Exception): class InvalidWorkflowJSONException(Exception):
@ -83,6 +86,7 @@ class Workflow:
Disabling pylint to wait for workflow reimplementation as a topology Disabling pylint to wait for workflow reimplementation as a topology
""" """
self.config = config self.config = config
self._timer: Optional[RepeatedTimer] = None
set_loggers_level(config.workflowConfig.loggerLevel.value) set_loggers_level(config.workflowConfig.loggerLevel.value)
@ -177,6 +181,16 @@ class Workflow:
f"BulkSink type:{self.config.bulkSink.type},{bulk_sink_class} configured" f"BulkSink type:{self.config.bulkSink.type},{bulk_sink_class} configured"
) )
@property
def timer(self) -> RepeatedTimer:
"""Status timer"""
if not self._timer:
self._timer = get_ingestion_status_timer(
interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self
)
return self._timer
def type_class_fetch(self, type_: str, is_file: bool): def type_class_fetch(self, type_: str, is_file: bool):
if is_file: if is_file:
return type_.replace("-", "_") return type_.replace("-", "_")
@ -199,22 +213,33 @@ class Workflow:
return cls(config) return cls(config)
def execute(self): def execute(self):
for record in self.source.next_record(): """
self.report["Source"] = self.source.get_status().as_obj() Pass each record from the source down the pipeline:
if hasattr(self, "processor"): Source -> (Processor) -> Sink
processed_record = self.processor.process(record) or Source -> (Processor) -> Stage -> BulkSink
else: """
processed_record = record self.timer.trigger()
if hasattr(self, "stage"):
self.stage.stage_record(processed_record) try:
self.report["Stage"] = self.stage.get_status().as_obj() for record in self.source.next_record():
if hasattr(self, "sink"): self.report["Source"] = self.source.get_status().as_obj()
self.sink.write_record(processed_record) if hasattr(self, "processor"):
self.report["sink"] = self.sink.get_status().as_obj() processed_record = self.processor.process(record)
if hasattr(self, "bulk_sink"): else:
self.stage.close() processed_record = record
self.bulk_sink.write_records() if hasattr(self, "stage"):
self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() self.stage.stage_record(processed_record)
self.report["Stage"] = self.stage.get_status().as_obj()
if hasattr(self, "sink"):
self.sink.write_record(processed_record)
self.report["sink"] = self.sink.get_status().as_obj()
if hasattr(self, "bulk_sink"):
self.stage.close()
self.bulk_sink.write_records()
self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj()
# Force resource closing. Required for killing the threading
finally:
self.stop()
def stop(self): def stop(self):
if hasattr(self, "processor"): if hasattr(self, "processor"):
@ -232,6 +257,7 @@ class Workflow:
pipeline_state = PipelineState.partialSuccess pipeline_state = PipelineState.partialSuccess
self.set_ingestion_pipeline_status(pipeline_state) self.set_ingestion_pipeline_status(pipeline_state)
self.source.close() self.source.close()
self.timer.stop()
def _get_source_success(self): def _get_source_success(self):
return self.source.get_status().calculate_success() return self.source.get_status().calculate_success()

View File

@ -22,6 +22,7 @@ import os
import shutil import shutil
import traceback import traceback
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import List, Optional from typing import List, Optional
from pydantic import ValidationError from pydantic import ValidationError
@ -324,5 +325,6 @@ class MetadataUsageBulkSink(BulkSink):
return self.status return self.status
def close(self): def close(self):
shutil.rmtree(self.config.filename) if Path(self.config.filename).exists():
shutil.rmtree(self.config.filename)
self.metadata.close() self.metadata.close()

View File

View File

@ -0,0 +1,44 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Define a threading repeated timer
"""
import time
from threading import Event, Thread
class RepeatedTimer:
"""Repeat `function` every `interval` seconds."""
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
def _target(self):
while not self.event.wait(self._time):
self.function(*self.args, **self.kwargs)
@property
def _time(self):
return self.interval - ((time.time() - self.start) % self.interval)
def trigger(self) -> None:
self.thread.start()
def stop(self) -> None:
self.event.set()
self.thread.join()

View File

@ -0,0 +1,58 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Prepare a timer to report on the workflow status
"""
import traceback
from logging import Logger
from metadata.ingestion.api.bulk_sink import BulkSinkStatus
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from metadata.timer.repeated_timer import RepeatedTimer
def report_ingestion_status(logger: Logger, workflow: "Workflow") -> None:
"""
Given a logger, use it to INFO the workflow status
"""
try:
source_status: SourceStatus = workflow.source.get_status()
logger.info(
f"Source: Processed {len(source_status.success)} records,"
f" filtered {len(source_status.filtered)} records,"
f" found {len(source_status.failures)} errors"
)
if hasattr(workflow, "sink"):
sink_status: SinkStatus = workflow.sink.get_status()
logger.info(
f"Sink: Processed {len(sink_status.records)} records,"
f" found {len(sink_status.failures)} errors"
)
if hasattr(workflow, "bulk_sink"):
bulk_sink_status: BulkSinkStatus = workflow.bulk_sink.get_status()
logger.info(
f"Bulk Sink: Processed {len(bulk_sink_status.records)} records,"
f" found {len(bulk_sink_status.failures)} errors"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Wild exception reporting status - {exc}")
def get_ingestion_status_timer(
interval: int, logger: Logger, workflow: "Workflow"
) -> RepeatedTimer:
"""
Prepare the threading Timer to execute the report_ingestion_status
"""
return RepeatedTimer(interval, report_ingestion_status, logger, workflow)