Fixes #11327 - Improve Profiler Logging (#11341)

* feat: improved profiler logging

* feat: ran python linting
This commit is contained in:
Teddy 2023-04-27 18:18:33 +02:00 committed by GitHub
parent a4af11fba4
commit b715208d28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 97 additions and 38 deletions

View File

@ -41,6 +41,9 @@ class ProcessorStatus(Status):
class ProfilerProcessorStatus(Status):
entity: Optional[str] = None
def scanned(self, record: Any) -> None:
self.records.append(record)
def failed_profiler(self, error: str, stack_trace: Optional[str] = None) -> None:
self.failed(self.entity if self.entity else "", error, stack_trace)

View File

@ -18,7 +18,7 @@ Workflow definition for the ORM Profiler.
"""
import traceback
from copy import deepcopy
from typing import Iterable, List, Optional, Tuple, Union, cast
from typing import Iterable, List, Optional, Union, cast
from pydantic import ValidationError
from sqlalchemy import MetaData
@ -47,7 +47,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.models.custom_types import ServiceWithConnectionType
@ -69,6 +68,8 @@ from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import (
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.core import Profiler
from metadata.profiler.processor.default import DefaultProfiler, get_default_metrics
from metadata.timer.repeated_timer import RepeatedTimer
from metadata.timer.workflow_reporter import get_ingestion_status_timer
from metadata.utils import fqn
from metadata.utils.class_helper import (
get_service_class_from_service_type,
@ -83,6 +84,8 @@ from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = profiler_logger()
NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,)
SUCCESS_THRESHOLD_VALUE = 90
REPORTS_INTERVAL_SECONDS = 60
class ProfilerInterfaceInstantiationError(Exception):
@ -101,6 +104,8 @@ class ProfilerWorkflow(WorkflowStatusMixin):
def __init__(self, config: OpenMetadataWorkflowConfig):
self.profiler = None # defined in `create_profiler()``
self.config = config
self._timer: Optional[RepeatedTimer] = None
self.metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
@ -116,7 +121,6 @@ class ProfilerWorkflow(WorkflowStatusMixin):
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
self.source_status = SourceStatus()
self.status = ProcessorStatus()
self._profiler_interface_args = None
if self.config.sink:
self.sink = get_sink(
@ -134,6 +138,11 @@ class ProfilerWorkflow(WorkflowStatusMixin):
"and that your ingestion token (settings > bots) is still valid."
)
logger.info(
f"Starting profiler for service {self.config.source.serviceName}"
f":{self.config.source.type.lower()}"
)
@classmethod
def create(cls, config_dict: dict) -> "ProfilerWorkflow":
"""
@ -149,6 +158,16 @@ class ProfilerWorkflow(WorkflowStatusMixin):
)
raise err
@property
def timer(self) -> RepeatedTimer:
"""Status timer"""
if not self._timer:
self._timer = get_ingestion_status_timer(
interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self
)
return self._timer
def get_config_for_entity(self, entity: Table) -> Optional[TableConfig]:
"""Get config for a specific entity
@ -341,7 +360,7 @@ class ProfilerWorkflow(WorkflowStatusMixin):
def run_profiler(
self, entity: Table, copied_service_config, sqa_metadata=None
) -> Tuple[Optional[ProfilerResponse], Optional[List]]:
) -> Optional[ProfilerResponse]:
"""
Main logic for the profiler workflow
"""
@ -372,21 +391,31 @@ class ProfilerWorkflow(WorkflowStatusMixin):
error = f"Unexpected exception processing entity [{name}]: {exc}"
logger.debug(traceback.format_exc())
logger.error(error)
self.status.failed(name, error, traceback.format_exc())
self.source_status.failed(name, error, traceback.format_exc())
try:
# if we fail to instantiate a profiler_interface, we won't have a profiler_interface variable
self.status.fail_all(profiler_interface.processor_status.failures)
self.source_status.fail_all(
profiler_interface.processor_status.failures
)
self.source_status.records.extend(
profiler_interface.processor_status.records
)
except UnboundLocalError:
pass
else:
return profile, profiler_interface.processor_status.failures
self.source_status.fail_all(profiler_interface.processor_status.failures)
self.source_status.records.extend(
profiler_interface.processor_status.records
)
return profile
return None, None
return None
def execute(self):
"""
Run the profiling and tests
"""
self.timer.trigger()
try:
for database in self.get_database_entities():
@ -399,19 +428,13 @@ class ProfilerWorkflow(WorkflowStatusMixin):
else None
) # we only need this for sqlalchemy based services
for entity in self.get_table_entities(database=database):
profile, failures = self.run_profiler(
profile = self.run_profiler(
entity, copied_service_config, sqa_metadata
)
if hasattr(self, "sink") and profile:
self.sink.write_record(profile)
if failures:
self.status.fail_all(
failures
) # we can have column level failures we need to report on
self.status.processed(entity.fullyQualifiedName.__root__) # type: ignore
self.source_status.scanned(entity.fullyQualifiedName.__root__) # type: ignore
# At the end of the `execute`, update the associated Ingestion Pipeline status as success
self.set_ingestion_pipeline_status(PipelineState.success)
self.update_ingestion_status_at_end()
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
@ -428,14 +451,26 @@ class ProfilerWorkflow(WorkflowStatusMixin):
"""
Returns 1 if status is failed, 0 otherwise.
"""
if (
self.source_status.failures
or self.status.failures
or (hasattr(self, "sink") and self.sink.get_status().failures)
if self.source_status.failures or (
hasattr(self, "sink") and self.sink.get_status().failures
):
return 1
return 0
def _get_source_success(self):
"""Compue the success rate of the source"""
return self.source_status.calculate_success()
def update_ingestion_status_at_end(self):
"""
Once the execute method is done, update the status
as OK or KO depending on the success rate.
"""
pipeline_state = PipelineState.success
if SUCCESS_THRESHOLD_VALUE <= self._get_source_success() < 100:
pipeline_state = PipelineState.partialSuccess
self.set_ingestion_pipeline_status(pipeline_state)
def _raise_from_status_internal(self, raise_warnings=False):
"""
Check source, processor and sink status and raise if needed
@ -444,8 +479,10 @@ class ProfilerWorkflow(WorkflowStatusMixin):
as we are just picking up data from OM.
"""
if self.status.failures:
raise WorkflowExecutionError("Processor reported errors", self.status)
if self._get_source_success() < SUCCESS_THRESHOLD_VALUE:
raise WorkflowExecutionError(
"Processor reported errors", self.source_status
)
if hasattr(self, "sink") and self.sink.get_status().failures:
raise WorkflowExecutionError("Sink reported errors", self.sink.get_status())
@ -454,8 +491,10 @@ class ProfilerWorkflow(WorkflowStatusMixin):
raise WorkflowExecutionError(
"Source reported warnings", self.source_status
)
if self.status.warnings:
raise WorkflowExecutionError("Processor reported warnings", self.status)
if self.source_status.warnings:
raise WorkflowExecutionError(
"Processor reported warnings", self.source_status
)
if hasattr(self, "sink") and self.sink.get_status().warnings:
raise WorkflowExecutionError(
"Sink reported warnings", self.sink.get_status()
@ -472,6 +511,7 @@ class ProfilerWorkflow(WorkflowStatusMixin):
Close all connections
"""
self.metadata.close()
self.timer.stop()
def _retrieve_service_connection_if_needed(self) -> None:
"""

View File

@ -250,6 +250,9 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
row = None
if column:
column = column.name
self.processor_status.scanned(f"{table.name.__root__}.{column}")
else:
self.processor_status.scanned(table.name.__root__)
return row, column, metric_type.value
def fetch_sample_data(self, table) -> TableData:

View File

@ -447,6 +447,9 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
if column is not None:
column = column.name
self.processor_status.scanned(f"{table.__tablename__}.{column}")
else:
self.processor_status.scanned(table.__tablename__)
return row, column, metric_type.value
@ -456,7 +459,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
metric_funcs: list,
):
"""get all profiler metrics"""
logger.info(f"Computing metrics with {self._thread_count} threads.")
logger.debug(f"Computing metrics with {self._thread_count} threads.")
profile_results = {"table": dict(), "columns": defaultdict(dict)}
with CustomThreadPoolExecutor(max_workers=self._thread_count) as pool:
futures = [

View File

@ -99,7 +99,7 @@ def _(
Returns:
List[Dict]:
"""
logger.info(f"Fetching system metrics for {dialect}")
logger.debug(f"Fetching system metrics for {dialect}")
dml_stat_to_dml_statement_mapping = {
"inserted_row_count": DatabaseDMLOperations.INSERT.value,
"deleted_row_count": DatabaseDMLOperations.DELETE.value,
@ -213,7 +213,7 @@ def _(
Returns:
List[Dict]:
"""
logger.info(f"Fetching system metrics for {dialect}")
logger.debug(f"Fetching system metrics for {dialect}")
stl_deleted = dedent(
f"""
@ -360,7 +360,7 @@ def _(
Returns:
Dict: system metric
"""
logger.info(f"Fetching system metrics for {dialect}")
logger.debug(f"Fetching system metrics for {dialect}")
metric_results: List[Dict] = []

View File

@ -453,7 +453,7 @@ class Profiler(Generic[TMetric]):
all its columns and return all the run profilers
in a Dict in the shape {col_name: Profiler}
"""
logger.info(
logger.debug(
f"Computing profile metrics for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)
@ -483,7 +483,7 @@ class Profiler(Generic[TMetric]):
TableData: sample data
"""
try:
logger.info(
logger.debug(
"Fetching sample data for "
f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." # type: ignore
)

View File

@ -88,7 +88,7 @@ class MetadataRestSink(Sink[Entity]):
table=record.table,
profile_request=self.clean_up_profile_columns(record.profile),
)
logger.info(
logger.debug(
f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.__root__}"
)
@ -96,7 +96,7 @@ class MetadataRestSink(Sink[Entity]):
self.metadata.ingest_table_sample_data(
table=record.table, sample_data=record.sample_data
)
logger.info(
logger.debug(
f"Successfully ingested sample data for {record.table.fullyQualifiedName.__root__}"
)
self.status.records_written(

View File

@ -25,7 +25,12 @@ def report_ingestion_status(logger: Logger, workflow: "Workflow") -> None:
Given a logger, use it to INFO the workflow status
"""
try:
source_status: SourceStatus = workflow.source.get_status()
if hasattr(
workflow, "source_status"
): # profiler workflow need to report from source_status
source_status: SourceStatus = workflow.source_status
else:
source_status: SourceStatus = workflow.source.get_status()
logger.info(
f"Source: Processed {len(source_status.records)} records,"
f" filtered {len(source_status.filtered)} records,"

View File

@ -233,17 +233,22 @@ def print_profiler_status(workflow) -> None:
source=True,
processor=True,
source_status=workflow.source_status,
processor_status=workflow.status,
)
if workflow.source_status.source_start_time:
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-workflow.source_status.source_start_time)}",
)
if workflow.result_status() == 1:
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE
)
elif (
workflow.source_status.warnings
or workflow.status.failures
or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings)
elif workflow.source_status.warnings or (
hasattr(workflow, "sink") and workflow.sink.get_status().warnings
):
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE