diff --git a/ingestion/src/metadata/ingestion/api/steps.py b/ingestion/src/metadata/ingestion/api/steps.py index 49e27307df5..480f2e0afa4 100644 --- a/ingestion/src/metadata/ingestion/api/steps.py +++ b/ingestion/src/metadata/ingestion/api/steps.py @@ -12,10 +12,15 @@ Abstract definition of each step """ from abc import ABC, abstractmethod -from typing import Any +from typing import Any, Iterable, Optional +from metadata.ingestion.api.models import Entity from metadata.ingestion.api.step import BulkStep, IterStep, ReturnStep, StageStep from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.execution_time_tracker import ( + calculate_execution_time, + calculate_execution_time_generator, +) from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -50,6 +55,10 @@ class Source(IterStep, ABC): def name(self) -> str: return "Source" + @calculate_execution_time_generator(context="Source") + def run(self) -> Iterable[Optional[Entity]]: + yield from super().run() + class Sink(ReturnStep, ABC): """All Sinks must inherit this base class.""" @@ -58,6 +67,10 @@ class Sink(ReturnStep, ABC): def name(self) -> str: return "Sink" + @calculate_execution_time(context="Sink") + def run(self, record: Entity) -> Optional[Entity]: + return super().run(record) + class Processor(ReturnStep, ABC): """All Processor must inherit this base class""" diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index f287b8b53b0..e52a2f324ef 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -21,6 +21,7 @@ from requests.exceptions import HTTPError from metadata.config.common import ConfigModel from metadata.ingestion.ometa.credentials import URL, get_api_version +from metadata.utils.execution_time_tracker import calculate_execution_time from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -251,6 +252,7 @@ class REST: return None + @calculate_execution_time(context="GET") def get(self, path, data=None): """ GET method @@ -264,6 +266,7 @@ class REST: """ return self._request("GET", path, data) + @calculate_execution_time(context="POST") def post(self, path, data=None): """ POST method @@ -277,6 +280,7 @@ class REST: """ return self._request("POST", path, data) + @calculate_execution_time(context="PUT") def put(self, path, data=None): """ PUT method @@ -290,6 +294,7 @@ class REST: """ return self._request("PUT", path, data) + @calculate_execution_time(context="PATCH") def patch(self, path, data=None): """ PATCH method @@ -308,6 +313,7 @@ class REST: headers={"Content-type": "application/json-patch+json"}, ) + @calculate_execution_time(context="DELETE") def delete(self, path, data=None): """ DELETE method diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index aa36095297e..cdff7d632dc 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -82,7 +82,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.database.database_service import DataModelLink from metadata.profiler.api.models import ProfilerResponse -from metadata.utils.helpers import calculate_execution_time +from metadata.utils.execution_time_tracker import calculate_execution_time from metadata.utils.logger import get_log_name, ingestion_logger logger = ingestion_logger() @@ -129,7 +129,7 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods logger.debug(f"Processing Create request {type(record)}") return self.write_create_request(record) - @calculate_execution_time + @calculate_execution_time(store=False) def _run(self, record: Entity, *_, **__) -> Either[Any]: """ Default implementation for the single dispatch diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 239d3221821..1279b92482d 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -62,8 +62,8 @@ from metadata.ingestion.source.database.stored_procedures_mixin import QueryByPr from metadata.ingestion.source.models import TableView from metadata.utils import fqn from metadata.utils.db_utils import get_view_lineage +from metadata.utils.execution_time_tracker import calculate_execution_time_generator from metadata.utils.filters import filter_by_table -from metadata.utils.helpers import calculate_execution_time_generator from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -405,7 +405,7 @@ class CommonDbSourceService( """Not Implemented""" yield from [] - @calculate_execution_time_generator + @calculate_execution_time_generator(store=False) def yield_table( self, table_name_and_type: Tuple[str, str] ) -> Iterable[Either[CreateTableRequest]]: diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index 7eacab4cdff..ff0fd7cc055 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -53,7 +53,7 @@ from metadata.profiler.metrics.static.row_count import RowCount from metadata.profiler.orm.registry import NOT_COMPUTE from metadata.profiler.processor.sample_data_handler import upload_sample_data from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT -from metadata.utils.helpers import calculate_execution_time +from metadata.utils.execution_time_tracker import calculate_execution_time from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -550,7 +550,7 @@ class Profiler(Generic[TMetric]): return table_profile - @calculate_execution_time + @calculate_execution_time(store=False) def generate_sample_data(self) -> Optional[TableData]: """Fetch and ingest sample data diff --git a/ingestion/src/metadata/utils/execution_time_tracker.py b/ingestion/src/metadata/utils/execution_time_tracker.py new file mode 100644 index 00000000000..ba5b8151cf2 --- /dev/null +++ b/ingestion/src/metadata/utils/execution_time_tracker.py @@ -0,0 +1,199 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ExecutionTimeTracker implementation to help track the execution time of different parts +of the code. +""" +from functools import wraps +from time import perf_counter +from typing import Dict, List, Optional + +from pydantic import BaseModel + +from metadata.utils.helpers import pretty_print_time_duration +from metadata.utils.logger import utils_logger +from metadata.utils.singleton import Singleton + +logger = utils_logger() + + +class ExecutionTimeTrackerContext(BaseModel): + """Small Model to hold the ExecutionTimeTracker context.""" + + name: str + start: float + stored: bool + + +class ExecutionTimeTracker(metaclass=Singleton): + """ExecutionTimeTracker is implemented as a Singleton in order to hold state globally. + + It works as a Context Manager in order to track and log execution times. + + Example: + + def my_function(): + tracker = ExecutionTimeTracker() + + with tracker(context="MyFunction", store=True): + other_opeartion() + + """ + + def __init__(self, enabled: bool = False): + """When instantiated we can pass if we want it enabled or disabled in order to + avoid overhead when not needed. + + Attrs + ------ + enabled: Defines if it will be enabled or not. + context: Keeps track of the context levels and their state. + state: Keeps track of the global state for the Execution Time Tracker. + """ + self.enabled: bool = enabled + self.context: List[ExecutionTimeTrackerContext] = [] + self.state: Dict[str, float] = {} + self.new_context = None + self.store = True + + @property + def last_stored_context_level(self) -> Optional[str]: + """Returns the last stored context level. + + In order to provide better logs and keep track where in the code the time is being + measured we keep track of nested contexts. + + If a given context is not stored it will only log to debug but won't be part of the + global state. + """ + stored_context = [context for context in self.context if context.stored] + + if stored_context: + return stored_context[-1].name + + return None + + def __call__(self, context: str, store: bool = True): + """At every point we open a new Context Manager we can pass the current 'context' and + if we want to 'store' it. + + Sets the temporary attributes used within the context: + + new_context: Full Context name, appending the given context to the last stored context level. + store: If True, it will take part of the global state. Otherwise it will only log to debug. + """ + self.new_context = ".".join( + [part for part in [self.last_stored_context_level, context] if part] + ) + self.store = store + + return self + + def __enter__(self): + """If enabled, when entering the context, we append a new + ExecutionTimeTrackerContext to the list. + """ + if self.enabled: + self.context.append( + ExecutionTimeTrackerContext( + name=self.new_context, start=perf_counter(), stored=self.store + ) + ) + + def __exit__(self, exc_type, exc_val, exc_tb): + """If enabled, when exiting the context, we calculate the elapsed time and log to debug. + If the context.stored is True, we also save it to the global state.""" + if self.enabled: + stop = perf_counter() + context = self.context.pop(-1) + + if not context: + return + + elapsed = stop - context.start + + logger.debug( + "%s executed in %s", context.name, pretty_print_time_duration(elapsed) + ) + + if context.stored: + self._save(context, elapsed) + + def _save(self, context: ExecutionTimeTrackerContext, elapsed: float): + """Small utility to save the new measure to the global accumulator.""" + self.state[context.name] = self.state.get(context.name, 0) + elapsed + + +def calculate_execution_time(context: Optional[str] = None, store: bool = True): + """Utility decorator to be able to use the ExecutionTimeTracker on a function. + + It receives the context and if it should store it. + + Example: + + @calculate_execution_time(context="MyContext") + def my_function(): + ... + """ + + def decorator(func): + @wraps(func) + def inner(*args, **kwargs): + execution_time = ExecutionTimeTracker() + + with execution_time(context or func.__name__, store): + result = func(*args, **kwargs) + + return result + + return inner + + return decorator + + +def calculate_execution_time_generator( + context: Optional[str] = None, store: bool = True +): + """Utility decorator to be able to use the ExecutionTimeTracker on a generator function. + + It receives the context and if it should store it. + + Example: + + @calculate_execution_time_generator(context="MyContext") + def my_generator(): + ... + """ + + def decorator(func): + @wraps(func) + def inner(*args, **kwargs): + # NOTE: We are basically implementing by hand a simplified version of 'yield from' + # in order to be able to calculate the time difference correctly. + # The 'while True' loop allows us to guarantee we are iterating over all thje values + # from func(*args, **kwargs). + execution_time = ExecutionTimeTracker() + + generator = func(*args, **kwargs) + + while True: + with execution_time(context or func.__name__, store): + try: + element = next(generator) + except StopIteration: + return + + yield element + + return inner + + return decorator diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 3cb857e66ed..f218c611244 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -20,10 +20,8 @@ import re import shutil import sys from datetime import datetime, timedelta -from functools import wraps from math import floor, log from pathlib import Path -from time import perf_counter from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import sqlparse @@ -101,55 +99,6 @@ om_chart_type_dict = { } -def calculate_execution_time(func): - """ - Method to calculate workflow execution time - """ - - @wraps(func) - def calculate_debug_time(*args, **kwargs): - start = perf_counter() - result = func(*args, **kwargs) - end = perf_counter() - logger.debug( - f"{func.__name__} executed in { pretty_print_time_duration(end - start)}" - ) - return result - - return calculate_debug_time - - -def calculate_execution_time_generator(func): - """ - Generator method to calculate workflow execution time - """ - - def calculate_debug_time(*args, **kwargs): - # NOTE: We are basically implementing by hand a simplified version of 'yield from' - # in order to be able to calculate the time difference correctly. - # The 'while True' loop allows us to guarantee we are iterating over all thje values - # from func(*args, **kwargs). - generator = func(*args, **kwargs) - - while True: - start = perf_counter() - - try: - element = next(generator) - except StopIteration: - return - - end = perf_counter() - - logger.debug( - f"{func.__name__} executed in { pretty_print_time_duration(end - start)}" - ) - - yield element - - return calculate_debug_time - - def pretty_print_time_duration(duration: Union[int, float]) -> str: """ Method to format and display the time diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index fc51495fe28..5641b4e253a 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -44,6 +44,7 @@ from metadata.utils.class_helper import ( get_reference_type_from_service_type, get_service_class_from_service_type, ) +from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger, set_loggers_level from metadata.workflow.output_handler import report_ingestion_status @@ -92,6 +93,9 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): self._timer: Optional[RepeatedTimer] = None self._ingestion_pipeline: Optional[IngestionPipeline] = None self._start_ts = datetime_to_ts(datetime.now()) + self._execution_time_tracker = ExecutionTimeTracker( + log_level == LogLevels.DEBUG + ) set_loggers_level(log_level.value) diff --git a/ingestion/src/metadata/workflow/output_handler.py b/ingestion/src/metadata/workflow/output_handler.py index 32fbd3b432c..6e46a7d8a3c 100644 --- a/ingestion/src/metadata/workflow/output_handler.py +++ b/ingestion/src/metadata/workflow/output_handler.py @@ -26,6 +26,8 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import ) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.ingestion.api.step import Summary +from metadata.utils.execution_time_tracker import ExecutionTimeTracker +from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" @@ -137,6 +139,25 @@ def is_debug_enabled(workflow) -> bool: ) +def print_execution_time_summary(): + """Log the ExecutionTimeTracker Summary.""" + tracker = ExecutionTimeTracker() + + summary_table = { + "Context": [], + "Execution Time Aggregate": [], + } + + for key in sorted(tracker.state.keys()): + summary_table["Context"].append(key) + summary_table["Execution Time Aggregate"].append( + pretty_print_time_duration(tracker.state[key]) + ) + + log_ansi_encoded_string(bold=True, message="Execution Time Summary") + log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}") + + def print_workflow_summary(workflow: "BaseWorkflow") -> None: """ Args: @@ -148,6 +169,7 @@ def print_workflow_summary(workflow: "BaseWorkflow") -> None: if is_debug_enabled(workflow): print_workflow_status_debug(workflow) + print_execution_time_summary() failures = [] total_records = 0