mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-30 10:05:41 +00:00
parent
8ac6ce9f7a
commit
4683bee91a
@ -52,10 +52,13 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
|||||||
)
|
)
|
||||||
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
||||||
from metadata.ingestion.api.processor import ProcessorStatus
|
from metadata.ingestion.api.processor import ProcessorStatus
|
||||||
|
from metadata.ingestion.api.workflow import REPORTS_INTERVAL_SECONDS
|
||||||
from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata
|
||||||
from metadata.ingestion.sink.elasticsearch import ElasticsearchSink
|
from metadata.ingestion.sink.elasticsearch import ElasticsearchSink
|
||||||
|
from metadata.timer.repeated_timer import RepeatedTimer
|
||||||
|
from metadata.timer.workflow_reporter import get_ingestion_status_timer
|
||||||
from metadata.utils.importer import get_sink
|
from metadata.utils.importer import get_sink
|
||||||
from metadata.utils.logger import data_insight_logger
|
from metadata.utils.logger import data_insight_logger, set_loggers_level
|
||||||
from metadata.utils.time_utils import (
|
from metadata.utils.time_utils import (
|
||||||
get_beginning_of_day_timestamp_mill,
|
get_beginning_of_day_timestamp_mill,
|
||||||
get_end_of_day_timestamp_mill,
|
get_end_of_day_timestamp_mill,
|
||||||
@ -78,6 +81,10 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
|
|
||||||
def __init__(self, config: OpenMetadataWorkflowConfig) -> None:
|
def __init__(self, config: OpenMetadataWorkflowConfig) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
|
self._timer: Optional[RepeatedTimer] = None
|
||||||
|
|
||||||
|
set_loggers_level(config.workflowConfig.loggerLevel.value)
|
||||||
|
|
||||||
self.metadata_config: OpenMetadataConnection = (
|
self.metadata_config: OpenMetadataConnection = (
|
||||||
self.config.workflowConfig.openMetadataServerConfig
|
self.config.workflowConfig.openMetadataServerConfig
|
||||||
)
|
)
|
||||||
@ -85,7 +92,7 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
self.set_ingestion_pipeline_status(state=PipelineState.running)
|
self.set_ingestion_pipeline_status(state=PipelineState.running)
|
||||||
|
|
||||||
self.status = ProcessorStatus()
|
self.status = ProcessorStatus()
|
||||||
self.data_processor: Optional[
|
self.source: Optional[
|
||||||
Union[
|
Union[
|
||||||
DataProcessor,
|
DataProcessor,
|
||||||
EntityReportDataProcessor,
|
EntityReportDataProcessor,
|
||||||
@ -113,6 +120,16 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
|
|
||||||
self.es_sink = cast(ElasticsearchSink, self.es_sink)
|
self.es_sink = cast(ElasticsearchSink, self.es_sink)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def timer(self) -> RepeatedTimer:
|
||||||
|
"""Status timer"""
|
||||||
|
if not self._timer:
|
||||||
|
self._timer = get_ingestion_status_timer(
|
||||||
|
interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._timer
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _is_kpi_active(entity: Kpi) -> bool:
|
def _is_kpi_active(entity: Kpi) -> bool:
|
||||||
"""Check if a KPI is active
|
"""Check if a KPI is active
|
||||||
@ -197,10 +214,10 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
has_checked_and_handled_existing_es_data = False
|
has_checked_and_handled_existing_es_data = False
|
||||||
logger.info(f"Processing data for report type {report_data_type}")
|
logger.info(f"Processing data for report type {report_data_type}")
|
||||||
try:
|
try:
|
||||||
self.data_processor = DataProcessor.create(
|
self.source = DataProcessor.create(
|
||||||
_data_processor_type=report_data_type.value, metadata=self.metadata
|
_data_processor_type=report_data_type.value, metadata=self.metadata
|
||||||
)
|
)
|
||||||
for record in self.data_processor.process():
|
for record in self.source.process():
|
||||||
if hasattr(self, "sink"):
|
if hasattr(self, "sink"):
|
||||||
self.sink.write_record(record)
|
self.sink.write_record(record)
|
||||||
if hasattr(self, "es_sink"):
|
if hasattr(self, "es_sink"):
|
||||||
@ -214,6 +231,9 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
logger.warning(
|
logger.warning(
|
||||||
"No sink attribute found, skipping ingestion of KPI result"
|
"No sink attribute found, skipping ingestion of KPI result"
|
||||||
)
|
)
|
||||||
|
self.status.records.extend(self.source.processor_status.records)
|
||||||
|
self.status.failures.extend(self.source.processor_status.failures)
|
||||||
|
self.status.warnings.extend(self.source.processor_status.warnings)
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
error = f"Error while executing data insight workflow for report type {report_data_type}: {exc}"
|
error = f"Error while executing data insight workflow for report type {report_data_type}: {exc}"
|
||||||
@ -271,6 +291,8 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
"""Execute workflow"""
|
"""Execute workflow"""
|
||||||
|
self.timer.trigger()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.info("Starting data processor execution")
|
logger.info("Starting data processor execution")
|
||||||
self._execute_data_processor()
|
self._execute_data_processor()
|
||||||
@ -291,21 +313,23 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
except Exception as err:
|
except Exception as err:
|
||||||
self.set_ingestion_pipeline_status(PipelineState.failed)
|
self.set_ingestion_pipeline_status(PipelineState.failed)
|
||||||
raise err
|
raise err
|
||||||
|
finally:
|
||||||
|
self.stop()
|
||||||
|
|
||||||
def _raise_from_status_internal(self, raise_warnings=False):
|
def _raise_from_status_internal(self, raise_warnings=False):
|
||||||
if self.data_processor and self.data_processor.get_status().failures:
|
if self.source and self.source.get_status().failures:
|
||||||
raise WorkflowExecutionError(
|
raise WorkflowExecutionError(
|
||||||
"Source reported errors", self.data_processor.get_status()
|
"Source reported errors", self.source.get_status()
|
||||||
)
|
)
|
||||||
if hasattr(self, "sink") and self.sink.get_status().failures:
|
if hasattr(self, "sink") and self.sink.get_status().failures:
|
||||||
raise WorkflowExecutionError("Sink reported errors", self.sink.get_status())
|
raise WorkflowExecutionError("Sink reported errors", self.sink.get_status())
|
||||||
if raise_warnings and (
|
if raise_warnings and (
|
||||||
(self.data_processor and self.data_processor.get_status().warnings)
|
(self.source and self.source.get_status().warnings)
|
||||||
or self.sink.get_status().warnings
|
or self.sink.get_status().warnings
|
||||||
):
|
):
|
||||||
raise WorkflowExecutionError(
|
raise WorkflowExecutionError(
|
||||||
"Source reported warnings",
|
"Source reported warnings",
|
||||||
self.data_processor.get_status() if self.data_processor else None,
|
self.source.get_status() if self.source else None,
|
||||||
)
|
)
|
||||||
|
|
||||||
def print_status(self) -> None:
|
def print_status(self) -> None:
|
||||||
@ -316,7 +340,7 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
Returns 1 if status is failed, 0 otherwise.
|
Returns 1 if status is failed, 0 otherwise.
|
||||||
"""
|
"""
|
||||||
if (
|
if (
|
||||||
(self.data_processor and self.data_processor.get_status().failures)
|
(self.source and self.source.get_status().failures)
|
||||||
or self.status.failures
|
or self.status.failures
|
||||||
or (hasattr(self, "sink") and self.sink.get_status().failures)
|
or (hasattr(self, "sink") and self.sink.get_status().failures)
|
||||||
):
|
):
|
||||||
@ -328,3 +352,4 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
|||||||
Close all connections
|
Close all connections
|
||||||
"""
|
"""
|
||||||
self.metadata.close()
|
self.metadata.close()
|
||||||
|
self.timer.stop()
|
||||||
|
|||||||
@ -171,11 +171,19 @@ class EntityReportDataProcessor(DataProcessor):
|
|||||||
refined_data = defaultdict(lambda: defaultdict(dict))
|
refined_data = defaultdict(lambda: defaultdict(dict))
|
||||||
for entity in self.fetch_data():
|
for entity in self.fetch_data():
|
||||||
data_blob_for_entity = {}
|
data_blob_for_entity = {}
|
||||||
|
try:
|
||||||
team = (
|
team = (
|
||||||
self._get_team(entity.owner)
|
self._get_team(entity.owner)
|
||||||
if not isinstance(entity, User)
|
if not isinstance(entity, User)
|
||||||
else self._get_team(entity.teams)
|
else self._get_team(entity.teams)
|
||||||
)
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
self.processor_status.failed(
|
||||||
|
entity.name.__root__, "Error retrieving team"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
entity_tier = get_entity_tier_from_tags(entity.tags)
|
entity_tier = get_entity_tier_from_tags(entity.tags)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
@ -230,6 +238,8 @@ class EntityReportDataProcessor(DataProcessor):
|
|||||||
str(entity_tier)
|
str(entity_tier)
|
||||||
].update(data_blob_for_entity_counter)
|
].update(data_blob_for_entity_counter)
|
||||||
|
|
||||||
|
self.processor_status.scanned(entity.name.__root__)
|
||||||
|
|
||||||
return refined_data
|
return refined_data
|
||||||
|
|
||||||
def process(self) -> Iterable[ReportData]:
|
def process(self) -> Iterable[ReportData]:
|
||||||
|
|||||||
@ -195,6 +195,8 @@ class WebAnalyticEntityViewReportDataProcessor(DataProcessor):
|
|||||||
else:
|
else:
|
||||||
refined_data[split_url[1]]["views"] += 1
|
refined_data[split_url[1]]["views"] += 1
|
||||||
|
|
||||||
|
self.processor_status.scanned(ENTITIES[entity_type].__name__)
|
||||||
|
|
||||||
def refine(self):
|
def refine(self):
|
||||||
"""Aggregates data. It will return a dictionary of the following shape
|
"""Aggregates data. It will return a dictionary of the following shape
|
||||||
|
|
||||||
@ -325,6 +327,8 @@ class WebAnalyticUserActivityReportDataProcessor(DataProcessor):
|
|||||||
if timestamp > user_data["lastSession"]:
|
if timestamp > user_data["lastSession"]:
|
||||||
user_data["lastSession"] = timestamp
|
user_data["lastSession"] = timestamp
|
||||||
|
|
||||||
|
self.processor_status.scanned(user_id)
|
||||||
|
|
||||||
def fetch_data(self) -> Iterable[WebAnalyticEventData]:
|
def fetch_data(self) -> Iterable[WebAnalyticEventData]:
|
||||||
if CACHED_EVENTS:
|
if CACHED_EVENTS:
|
||||||
for event in CACHED_EVENTS:
|
for event in CACHED_EVENTS:
|
||||||
|
|||||||
@ -66,7 +66,7 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
try:
|
try:
|
||||||
if isinstance(record, ReportData):
|
if isinstance(record, ReportData):
|
||||||
self.metadata.add_data_insight_report_data(record)
|
self.metadata.add_data_insight_report_data(record)
|
||||||
logger.info(
|
logger.debug(
|
||||||
"Successfully ingested data insight for "
|
"Successfully ingested data insight for "
|
||||||
f"{record.data.__class__.__name__ if record.data else 'Unknown'}"
|
f"{record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||||
)
|
)
|
||||||
@ -75,7 +75,7 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
)
|
)
|
||||||
if isinstance(record, KpiResult):
|
if isinstance(record, KpiResult):
|
||||||
self.metadata.add_kpi_result(fqn=record.kpiFqn.__root__, record=record)
|
self.metadata.add_kpi_result(fqn=record.kpiFqn.__root__, record=record)
|
||||||
logger.info(f"Successfully ingested KPI for {record.kpiFqn}")
|
logger.debug(f"Successfully ingested KPI for {record.kpiFqn}")
|
||||||
self.status.records_written(f"Data Insight: {record.kpiFqn}")
|
self.status.records_written(f"Data Insight: {record.kpiFqn}")
|
||||||
|
|
||||||
except APIError as err:
|
except APIError as err:
|
||||||
|
|||||||
@ -342,7 +342,11 @@ class ElasticsearchSink(Sink[Entity]):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
self._write_record(record)
|
self._write_record(record)
|
||||||
self.status.records_written(record.name.__root__)
|
self.status.records_written(
|
||||||
|
record.name.__root__
|
||||||
|
if hasattr(record, "name")
|
||||||
|
else type(record).__name__
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
|
|||||||
@ -278,18 +278,18 @@ def print_data_insight_status(workflow) -> None:
|
|||||||
print_workflow_summary(
|
print_workflow_summary(
|
||||||
workflow,
|
workflow,
|
||||||
processor=True,
|
processor=True,
|
||||||
processor_status=workflow.data_processor.get_status(),
|
processor_status=workflow.status,
|
||||||
)
|
)
|
||||||
|
|
||||||
if workflow.data_processor.get_status().source_start_time:
|
if workflow.source.get_status().source_start_time:
|
||||||
log_ansi_encoded_string(
|
log_ansi_encoded_string(
|
||||||
message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.data_processor.get_status().source_start_time)} ", # pylint: disable=line-too-long
|
message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)} ", # pylint: disable=line-too-long
|
||||||
)
|
)
|
||||||
|
|
||||||
if workflow.result_status() == 1:
|
if workflow.result_status() == 1:
|
||||||
log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE)
|
log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE)
|
||||||
elif (
|
elif (
|
||||||
workflow.data_processor.get_status().warnings
|
workflow.source.get_status().warnings
|
||||||
or workflow.status.warnings
|
or workflow.status.warnings
|
||||||
or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings)
|
or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings)
|
||||||
):
|
):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user