diff --git a/ingestion/src/metadata/data_insight/api/workflow.py b/ingestion/src/metadata/data_insight/api/workflow.py index e9acd88ed5b..2c99ed59055 100644 --- a/ingestion/src/metadata/data_insight/api/workflow.py +++ b/ingestion/src/metadata/data_insight/api/workflow.py @@ -52,10 +52,13 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.processor import ProcessorStatus +from metadata.ingestion.api.workflow import REPORTS_INTERVAL_SECONDS from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata from metadata.ingestion.sink.elasticsearch import ElasticsearchSink +from metadata.timer.repeated_timer import RepeatedTimer +from metadata.timer.workflow_reporter import get_ingestion_status_timer from metadata.utils.importer import get_sink -from metadata.utils.logger import data_insight_logger +from metadata.utils.logger import data_insight_logger, set_loggers_level from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, @@ -78,6 +81,10 @@ class DataInsightWorkflow(WorkflowStatusMixin): def __init__(self, config: OpenMetadataWorkflowConfig) -> None: self.config = config + self._timer: Optional[RepeatedTimer] = None + + set_loggers_level(config.workflowConfig.loggerLevel.value) + self.metadata_config: OpenMetadataConnection = ( self.config.workflowConfig.openMetadataServerConfig ) @@ -85,7 +92,7 @@ class DataInsightWorkflow(WorkflowStatusMixin): self.set_ingestion_pipeline_status(state=PipelineState.running) self.status = ProcessorStatus() - self.data_processor: Optional[ + self.source: Optional[ Union[ DataProcessor, EntityReportDataProcessor, @@ -113,6 +120,16 @@ class DataInsightWorkflow(WorkflowStatusMixin): self.es_sink = cast(ElasticsearchSink, self.es_sink) + @property + def timer(self) -> RepeatedTimer: + """Status timer""" + if not self._timer: + self._timer = get_ingestion_status_timer( + interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self + ) + + return self._timer + @staticmethod def _is_kpi_active(entity: Kpi) -> bool: """Check if a KPI is active @@ -197,10 +214,10 @@ class DataInsightWorkflow(WorkflowStatusMixin): has_checked_and_handled_existing_es_data = False logger.info(f"Processing data for report type {report_data_type}") try: - self.data_processor = DataProcessor.create( + self.source = DataProcessor.create( _data_processor_type=report_data_type.value, metadata=self.metadata ) - for record in self.data_processor.process(): + for record in self.source.process(): if hasattr(self, "sink"): self.sink.write_record(record) if hasattr(self, "es_sink"): @@ -214,6 +231,9 @@ class DataInsightWorkflow(WorkflowStatusMixin): logger.warning( "No sink attribute found, skipping ingestion of KPI result" ) + self.status.records.extend(self.source.processor_status.records) + self.status.failures.extend(self.source.processor_status.failures) + self.status.warnings.extend(self.source.processor_status.warnings) except Exception as exc: error = f"Error while executing data insight workflow for report type {report_data_type}: {exc}" @@ -271,6 +291,8 @@ class DataInsightWorkflow(WorkflowStatusMixin): def execute(self): """Execute workflow""" + self.timer.trigger() + try: logger.info("Starting data processor execution") self._execute_data_processor() @@ -291,21 +313,23 @@ class DataInsightWorkflow(WorkflowStatusMixin): except Exception as err: self.set_ingestion_pipeline_status(PipelineState.failed) raise err + finally: + self.stop() def _raise_from_status_internal(self, raise_warnings=False): - if self.data_processor and self.data_processor.get_status().failures: + if self.source and self.source.get_status().failures: raise WorkflowExecutionError( - "Source reported errors", self.data_processor.get_status() + "Source reported errors", self.source.get_status() ) if hasattr(self, "sink") and self.sink.get_status().failures: raise WorkflowExecutionError("Sink reported errors", self.sink.get_status()) if raise_warnings and ( - (self.data_processor and self.data_processor.get_status().warnings) + (self.source and self.source.get_status().warnings) or self.sink.get_status().warnings ): raise WorkflowExecutionError( "Source reported warnings", - self.data_processor.get_status() if self.data_processor else None, + self.source.get_status() if self.source else None, ) def print_status(self) -> None: @@ -316,7 +340,7 @@ class DataInsightWorkflow(WorkflowStatusMixin): Returns 1 if status is failed, 0 otherwise. """ if ( - (self.data_processor and self.data_processor.get_status().failures) + (self.source and self.source.get_status().failures) or self.status.failures or (hasattr(self, "sink") and self.sink.get_status().failures) ): @@ -328,3 +352,4 @@ class DataInsightWorkflow(WorkflowStatusMixin): Close all connections """ self.metadata.close() + self.timer.stop() diff --git a/ingestion/src/metadata/data_insight/processor/entity_report_data_processor.py b/ingestion/src/metadata/data_insight/processor/entity_report_data_processor.py index ed8a58863cf..1cd88aa98fd 100644 --- a/ingestion/src/metadata/data_insight/processor/entity_report_data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/entity_report_data_processor.py @@ -171,11 +171,19 @@ class EntityReportDataProcessor(DataProcessor): refined_data = defaultdict(lambda: defaultdict(dict)) for entity in self.fetch_data(): data_blob_for_entity = {} - team = ( - self._get_team(entity.owner) - if not isinstance(entity, User) - else self._get_team(entity.teams) - ) + try: + team = ( + self._get_team(entity.owner) + if not isinstance(entity, User) + else self._get_team(entity.teams) + ) + except Exception: + logger.debug(traceback.format_exc()) + self.processor_status.failed( + entity.name.__root__, "Error retrieving team" + ) + continue + try: entity_tier = get_entity_tier_from_tags(entity.tags) except AttributeError: @@ -230,6 +238,8 @@ class EntityReportDataProcessor(DataProcessor): str(entity_tier) ].update(data_blob_for_entity_counter) + self.processor_status.scanned(entity.name.__root__) + return refined_data def process(self) -> Iterable[ReportData]: diff --git a/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py b/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py index af486ff9fb7..0c0ab4d8a7b 100644 --- a/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py @@ -195,6 +195,8 @@ class WebAnalyticEntityViewReportDataProcessor(DataProcessor): else: refined_data[split_url[1]]["views"] += 1 + self.processor_status.scanned(ENTITIES[entity_type].__name__) + def refine(self): """Aggregates data. It will return a dictionary of the following shape @@ -325,6 +327,8 @@ class WebAnalyticUserActivityReportDataProcessor(DataProcessor): if timestamp > user_data["lastSession"]: user_data["lastSession"] = timestamp + self.processor_status.scanned(user_id) + def fetch_data(self) -> Iterable[WebAnalyticEventData]: if CACHED_EVENTS: for event in CACHED_EVENTS: diff --git a/ingestion/src/metadata/data_insight/sink/metadata_rest.py b/ingestion/src/metadata/data_insight/sink/metadata_rest.py index 483c186e46c..2d7d2185ee9 100644 --- a/ingestion/src/metadata/data_insight/sink/metadata_rest.py +++ b/ingestion/src/metadata/data_insight/sink/metadata_rest.py @@ -66,8 +66,8 @@ class MetadataRestSink(Sink[Entity]): try: if isinstance(record, ReportData): self.metadata.add_data_insight_report_data(record) - logger.info( - "Successfully ingested data insight for" + logger.debug( + "Successfully ingested data insight for " f"{record.data.__class__.__name__ if record.data else 'Unknown'}" ) self.status.records_written( @@ -75,7 +75,7 @@ class MetadataRestSink(Sink[Entity]): ) if isinstance(record, KpiResult): self.metadata.add_kpi_result(fqn=record.kpiFqn.__root__, record=record) - logger.info(f"Successfully ingested KPI for {record.kpiFqn}") + logger.debug(f"Successfully ingested KPI for {record.kpiFqn}") self.status.records_written(f"Data Insight: {record.kpiFqn}") except APIError as err: diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index c75e08fe5ff..510e54e8a27 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -342,7 +342,11 @@ class ElasticsearchSink(Sink[Entity]): try: self._write_record(record) - self.status.records_written(record.name.__root__) + self.status.records_written( + record.name.__root__ + if hasattr(record, "name") + else type(record).__name__ + ) except Exception as exc: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/utils/workflow_output_handler.py b/ingestion/src/metadata/utils/workflow_output_handler.py index ac5ce50df3a..efa67da9fc8 100644 --- a/ingestion/src/metadata/utils/workflow_output_handler.py +++ b/ingestion/src/metadata/utils/workflow_output_handler.py @@ -278,18 +278,18 @@ def print_data_insight_status(workflow) -> None: print_workflow_summary( workflow, processor=True, - processor_status=workflow.data_processor.get_status(), + processor_status=workflow.status, ) - if workflow.data_processor.get_status().source_start_time: + if workflow.source.get_status().source_start_time: log_ansi_encoded_string( - message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.data_processor.get_status().source_start_time)} ", # pylint: disable=line-too-long + message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)} ", # pylint: disable=line-too-long ) if workflow.result_status() == 1: log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE) elif ( - workflow.data_processor.get_status().warnings + workflow.source.get_status().warnings or workflow.status.warnings or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings) ):