FIXES 15215: Add execution time tracker for ingestion (#15013)

* Initial idea on how to track execution times

* Fix linters

* Add missing decorators on the API Client

* Improve where the output is being handled
This commit is contained in:
IceS2 2024-02-16 09:58:20 +01:00 committed by GitHub
parent 7009fb7c77
commit 7b20ed2f34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 251 additions and 58 deletions

View File

@ -12,10 +12,15 @@
Abstract definition of each step
"""
from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Iterable, Optional
from metadata.ingestion.api.models import Entity
from metadata.ingestion.api.step import BulkStep, IterStep, ReturnStep, StageStep
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.execution_time_tracker import (
calculate_execution_time,
calculate_execution_time_generator,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -50,6 +55,10 @@ class Source(IterStep, ABC):
def name(self) -> str:
return "Source"
@calculate_execution_time_generator(context="Source")
def run(self) -> Iterable[Optional[Entity]]:
yield from super().run()
class Sink(ReturnStep, ABC):
"""All Sinks must inherit this base class."""
@ -58,6 +67,10 @@ class Sink(ReturnStep, ABC):
def name(self) -> str:
return "Sink"
@calculate_execution_time(context="Sink")
def run(self, record: Entity) -> Optional[Entity]:
return super().run(record)
class Processor(ReturnStep, ABC):
"""All Processor must inherit this base class"""

View File

@ -21,6 +21,7 @@ from requests.exceptions import HTTPError
from metadata.config.common import ConfigModel
from metadata.ingestion.ometa.credentials import URL, get_api_version
from metadata.utils.execution_time_tracker import calculate_execution_time
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -251,6 +252,7 @@ class REST:
return None
@calculate_execution_time(context="GET")
def get(self, path, data=None):
"""
GET method
@ -264,6 +266,7 @@ class REST:
"""
return self._request("GET", path, data)
@calculate_execution_time(context="POST")
def post(self, path, data=None):
"""
POST method
@ -277,6 +280,7 @@ class REST:
"""
return self._request("POST", path, data)
@calculate_execution_time(context="PUT")
def put(self, path, data=None):
"""
PUT method
@ -290,6 +294,7 @@ class REST:
"""
return self._request("PUT", path, data)
@calculate_execution_time(context="PATCH")
def patch(self, path, data=None):
"""
PATCH method
@ -308,6 +313,7 @@ class REST:
headers={"Content-type": "application/json-patch+json"},
)
@calculate_execution_time(context="DELETE")
def delete(self, path, data=None):
"""
DELETE method

View File

@ -82,7 +82,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.profiler.api.models import ProfilerResponse
from metadata.utils.helpers import calculate_execution_time
from metadata.utils.execution_time_tracker import calculate_execution_time
from metadata.utils.logger import get_log_name, ingestion_logger
logger = ingestion_logger()
@ -129,7 +129,7 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods
logger.debug(f"Processing Create request {type(record)}")
return self.write_create_request(record)
@calculate_execution_time
@calculate_execution_time(store=False)
def _run(self, record: Entity, *_, **__) -> Either[Any]:
"""
Default implementation for the single dispatch

View File

@ -62,8 +62,8 @@ from metadata.ingestion.source.database.stored_procedures_mixin import QueryByPr
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.execution_time_tracker import calculate_execution_time_generator
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import calculate_execution_time_generator
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -405,7 +405,7 @@ class CommonDbSourceService(
"""Not Implemented"""
yield from []
@calculate_execution_time_generator
@calculate_execution_time_generator(store=False)
def yield_table(
self, table_name_and_type: Tuple[str, str]
) -> Iterable[Either[CreateTableRequest]]:

View File

@ -53,7 +53,7 @@ from metadata.profiler.metrics.static.row_count import RowCount
from metadata.profiler.orm.registry import NOT_COMPUTE
from metadata.profiler.processor.sample_data_handler import upload_sample_data
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.helpers import calculate_execution_time
from metadata.utils.execution_time_tracker import calculate_execution_time
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -550,7 +550,7 @@ class Profiler(Generic[TMetric]):
return table_profile
@calculate_execution_time
@calculate_execution_time(store=False)
def generate_sample_data(self) -> Optional[TableData]:
"""Fetch and ingest sample data

View File

@ -0,0 +1,199 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ExecutionTimeTracker implementation to help track the execution time of different parts
of the code.
"""
from functools import wraps
from time import perf_counter
from typing import Dict, List, Optional
from pydantic import BaseModel
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import utils_logger
from metadata.utils.singleton import Singleton
logger = utils_logger()
class ExecutionTimeTrackerContext(BaseModel):
"""Small Model to hold the ExecutionTimeTracker context."""
name: str
start: float
stored: bool
class ExecutionTimeTracker(metaclass=Singleton):
"""ExecutionTimeTracker is implemented as a Singleton in order to hold state globally.
It works as a Context Manager in order to track and log execution times.
Example:
def my_function():
tracker = ExecutionTimeTracker()
with tracker(context="MyFunction", store=True):
other_opeartion()
"""
def __init__(self, enabled: bool = False):
"""When instantiated we can pass if we want it enabled or disabled in order to
avoid overhead when not needed.
Attrs
------
enabled: Defines if it will be enabled or not.
context: Keeps track of the context levels and their state.
state: Keeps track of the global state for the Execution Time Tracker.
"""
self.enabled: bool = enabled
self.context: List[ExecutionTimeTrackerContext] = []
self.state: Dict[str, float] = {}
self.new_context = None
self.store = True
@property
def last_stored_context_level(self) -> Optional[str]:
"""Returns the last stored context level.
In order to provide better logs and keep track where in the code the time is being
measured we keep track of nested contexts.
If a given context is not stored it will only log to debug but won't be part of the
global state.
"""
stored_context = [context for context in self.context if context.stored]
if stored_context:
return stored_context[-1].name
return None
def __call__(self, context: str, store: bool = True):
"""At every point we open a new Context Manager we can pass the current 'context' and
if we want to 'store' it.
Sets the temporary attributes used within the context:
new_context: Full Context name, appending the given context to the last stored context level.
store: If True, it will take part of the global state. Otherwise it will only log to debug.
"""
self.new_context = ".".join(
[part for part in [self.last_stored_context_level, context] if part]
)
self.store = store
return self
def __enter__(self):
"""If enabled, when entering the context, we append a new
ExecutionTimeTrackerContext to the list.
"""
if self.enabled:
self.context.append(
ExecutionTimeTrackerContext(
name=self.new_context, start=perf_counter(), stored=self.store
)
)
def __exit__(self, exc_type, exc_val, exc_tb):
"""If enabled, when exiting the context, we calculate the elapsed time and log to debug.
If the context.stored is True, we also save it to the global state."""
if self.enabled:
stop = perf_counter()
context = self.context.pop(-1)
if not context:
return
elapsed = stop - context.start
logger.debug(
"%s executed in %s", context.name, pretty_print_time_duration(elapsed)
)
if context.stored:
self._save(context, elapsed)
def _save(self, context: ExecutionTimeTrackerContext, elapsed: float):
"""Small utility to save the new measure to the global accumulator."""
self.state[context.name] = self.state.get(context.name, 0) + elapsed
def calculate_execution_time(context: Optional[str] = None, store: bool = True):
"""Utility decorator to be able to use the ExecutionTimeTracker on a function.
It receives the context and if it should store it.
Example:
@calculate_execution_time(context="MyContext")
def my_function():
...
"""
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
execution_time = ExecutionTimeTracker()
with execution_time(context or func.__name__, store):
result = func(*args, **kwargs)
return result
return inner
return decorator
def calculate_execution_time_generator(
context: Optional[str] = None, store: bool = True
):
"""Utility decorator to be able to use the ExecutionTimeTracker on a generator function.
It receives the context and if it should store it.
Example:
@calculate_execution_time_generator(context="MyContext")
def my_generator():
...
"""
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
# NOTE: We are basically implementing by hand a simplified version of 'yield from'
# in order to be able to calculate the time difference correctly.
# The 'while True' loop allows us to guarantee we are iterating over all thje values
# from func(*args, **kwargs).
execution_time = ExecutionTimeTracker()
generator = func(*args, **kwargs)
while True:
with execution_time(context or func.__name__, store):
try:
element = next(generator)
except StopIteration:
return
yield element
return inner
return decorator

View File

@ -20,10 +20,8 @@ import re
import shutil
import sys
from datetime import datetime, timedelta
from functools import wraps
from math import floor, log
from pathlib import Path
from time import perf_counter
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import sqlparse
@ -101,55 +99,6 @@ om_chart_type_dict = {
}
def calculate_execution_time(func):
"""
Method to calculate workflow execution time
"""
@wraps(func)
def calculate_debug_time(*args, **kwargs):
start = perf_counter()
result = func(*args, **kwargs)
end = perf_counter()
logger.debug(
f"{func.__name__} executed in { pretty_print_time_duration(end - start)}"
)
return result
return calculate_debug_time
def calculate_execution_time_generator(func):
"""
Generator method to calculate workflow execution time
"""
def calculate_debug_time(*args, **kwargs):
# NOTE: We are basically implementing by hand a simplified version of 'yield from'
# in order to be able to calculate the time difference correctly.
# The 'while True' loop allows us to guarantee we are iterating over all thje values
# from func(*args, **kwargs).
generator = func(*args, **kwargs)
while True:
start = perf_counter()
try:
element = next(generator)
except StopIteration:
return
end = perf_counter()
logger.debug(
f"{func.__name__} executed in { pretty_print_time_duration(end - start)}"
)
yield element
return calculate_debug_time
def pretty_print_time_duration(duration: Union[int, float]) -> str:
"""
Method to format and display the time

View File

@ -44,6 +44,7 @@ from metadata.utils.class_helper import (
get_reference_type_from_service_type,
get_service_class_from_service_type,
)
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.workflow.output_handler import report_ingestion_status
@ -92,6 +93,9 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
self._timer: Optional[RepeatedTimer] = None
self._ingestion_pipeline: Optional[IngestionPipeline] = None
self._start_ts = datetime_to_ts(datetime.now())
self._execution_time_tracker = ExecutionTimeTracker(
log_level == LogLevels.DEBUG
)
set_loggers_level(log_level.value)

View File

@ -26,6 +26,8 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.api.step import Summary
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures"
@ -137,6 +139,25 @@ def is_debug_enabled(workflow) -> bool:
)
def print_execution_time_summary():
"""Log the ExecutionTimeTracker Summary."""
tracker = ExecutionTimeTracker()
summary_table = {
"Context": [],
"Execution Time Aggregate": [],
}
for key in sorted(tracker.state.keys()):
summary_table["Context"].append(key)
summary_table["Execution Time Aggregate"].append(
pretty_print_time_duration(tracker.state[key])
)
log_ansi_encoded_string(bold=True, message="Execution Time Summary")
log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}")
def print_workflow_summary(workflow: "BaseWorkflow") -> None:
"""
Args:
@ -148,6 +169,7 @@ def print_workflow_summary(workflow: "BaseWorkflow") -> None:
if is_debug_enabled(workflow):
print_workflow_status_debug(workflow)
print_execution_time_summary()
failures = []
total_records = 0