Fix #9008 - Add Airflow callback, introduce Workflow Status Mixin and remove retries (#9049)

Fix #9008 - Add Airflow callback, introduce Workflow Status Mixin and remove retries (#9049)
This commit is contained in:
Pere Miquel Brull 2022-11-30 08:30:45 +01:00 committed by GitHub
parent e06935d3fa
commit 6a3ce624b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 281 additions and 248 deletions

View File

@ -59,17 +59,15 @@ from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill, get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill, get_end_of_day_timestamp_mill,
) )
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_data_insight_status from metadata.utils.workflow_output_handler import print_data_insight_status
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = data_insight_logger() logger = data_insight_logger()
NOW = datetime.utcnow().timestamp() * 1000 NOW = datetime.utcnow().timestamp() * 1000
class DataInsightWorkflow: class DataInsightWorkflow(WorkflowStatusMixin):
""" """
Configure and run the Data Insigt workflow Configure and run the Data Insigt workflow
@ -263,17 +261,25 @@ class DataInsightWorkflow:
def execute(self): def execute(self):
"""Execute workflow""" """Execute workflow"""
logger.info("Starting data processor execution") try:
self._execute_data_processor() logger.info("Starting data processor execution")
logger.info("Data processor finished running") self._execute_data_processor()
logger.info("Data processor finished running")
logger.info("Sleeping for 1 second. Waiting for ES data to be indexed.") logger.info("Sleeping for 1 second. Waiting for ES data to be indexed.")
time.sleep(1) time.sleep(1)
logger.info("Starting KPI runner") logger.info("Starting KPI runner")
self._execute_kpi_runner() self._execute_kpi_runner()
logger.info("KPI runner finished running") logger.info("KPI runner finished running")
def raise_from_status(self, raise_warnings=False): # At the end of the `execute`, update the associated Ingestion Pipeline status as success
self.set_ingestion_pipeline_status(PipelineState.success)
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def _raise_from_status_internal(self, raise_warnings=False):
if self.data_processor and self.data_processor.get_status().failures: if self.data_processor and self.data_processor.get_status().failures:
raise WorkflowExecutionError( raise WorkflowExecutionError(
"Source reported errors", self.data_processor.get_status() "Source reported errors", self.data_processor.get_status()
@ -308,17 +314,4 @@ class DataInsightWorkflow:
""" """
Close all connections Close all connections
""" """
self.set_ingestion_pipeline_status(PipelineState.success)
self.metadata.close() self.metadata.close()
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id

View File

@ -44,10 +44,8 @@ from metadata.utils.class_helper import (
get_service_type_from_source_type, get_service_type_from_source_type,
) )
from metadata.utils.logger import ingestion_logger, set_loggers_level from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_status from metadata.utils.workflow_output_handler import print_status
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = ingestion_logger() logger = ingestion_logger()
@ -63,7 +61,7 @@ class InvalidWorkflowJSONException(Exception):
""" """
class Workflow: class Workflow(WorkflowStatusMixin):
""" """
Ingestion workflow implementation. Ingestion workflow implementation.
@ -237,6 +235,15 @@ class Workflow:
self.stage.close() self.stage.close()
self.bulk_sink.write_records() self.bulk_sink.write_records()
self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj()
# If we reach this point, compute the success % and update the associated Ingestion Pipeline status
self.update_ingestion_status_at_end()
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
# Force resource closing. Required for killing the threading # Force resource closing. Required for killing the threading
finally: finally:
self.stop() self.stop()
@ -249,6 +256,17 @@ class Workflow:
if hasattr(self, "sink"): if hasattr(self, "sink"):
self.sink.close() self.sink.close()
self.source.close()
self.timer.stop()
def _get_source_success(self):
return self.source.get_status().calculate_success()
def update_ingestion_status_at_end(self):
"""
Once the execute method is done, update the status
as OK or KO depending on the success rate.
"""
pipeline_state = PipelineState.success pipeline_state = PipelineState.success
if ( if (
self._get_source_success() >= SUCCESS_THRESHOLD_VALUE self._get_source_success() >= SUCCESS_THRESHOLD_VALUE
@ -256,13 +274,8 @@ class Workflow:
): ):
pipeline_state = PipelineState.partialSuccess pipeline_state = PipelineState.partialSuccess
self.set_ingestion_pipeline_status(pipeline_state) self.set_ingestion_pipeline_status(pipeline_state)
self.source.close()
self.timer.stop()
def _get_source_success(self): def _raise_from_status_internal(self, raise_warnings=False):
return self.source.get_status().calculate_success()
def raise_from_status(self, raise_warnings=False):
""" """
Method to raise error if failed execution Method to raise error if failed execution
""" """
@ -307,18 +320,6 @@ class Workflow:
return 0 return 0
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id
def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None: def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None:
""" """
We override the current `serviceConnection` source config object if source workflow service already exists We override the current `serviceConnection` source config object if source workflow service already exists

View File

@ -55,7 +55,7 @@ class OMetaIngestionPipelineMixin:
def get_pipeline_status( def get_pipeline_status(
self, ingestion_pipeline_fqn: str, pipeline_status_run_id: str self, ingestion_pipeline_fqn: str, pipeline_status_run_id: str
) -> None: ) -> Optional[PipelineStatus]:
""" """
GET pipeline status GET pipeline status

View File

@ -82,10 +82,8 @@ from metadata.utils.class_helper import (
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import profiler_logger from metadata.utils.logger import profiler_logger
from metadata.utils.partition import get_partition_details from metadata.utils.partition import get_partition_details
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_profiler_status from metadata.utils.workflow_output_handler import print_profiler_status
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = profiler_logger() logger = profiler_logger()
@ -94,7 +92,7 @@ class ProfilerInterfaceInstantiationError(Exception):
"""Raise when interface cannot be instantiated""" """Raise when interface cannot be instantiated"""
class ProfilerWorkflow: class ProfilerWorkflow(WorkflowStatusMixin):
""" """
Configure and run the ORM profiler Configure and run the ORM profiler
""" """
@ -413,18 +411,18 @@ class ProfilerWorkflow:
return copy_service_connection_config return copy_service_connection_config
def execute(self): def run_profiler_workflow(self):
""" """
Run the profiling and tests Main logic for the profiler workflow
""" """
databases = self.get_database_entities() databases = self.get_database_entities()
if not databases: if not databases:
raise ValueError( raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}"
# pylint: disable=line-too-long
) )
for database in databases: for database in databases:
@ -468,6 +466,21 @@ class ProfilerWorkflow:
f"Unexpected exception executing in database [{database}]: {exc}" f"Unexpected exception executing in database [{database}]: {exc}"
) )
def execute(self):
"""
Run the profiling and tests
"""
try:
self.run_profiler_workflow()
# At the end of the `execute`, update the associated Ingestion Pipeline status as success
self.set_ingestion_pipeline_status(PipelineState.success)
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def print_status(self) -> None: def print_status(self) -> None:
""" """
Print the workflow results with click Print the workflow results with click
@ -486,7 +499,7 @@ class ProfilerWorkflow:
return 1 return 1
return 0 return 0
def raise_from_status(self, raise_warnings=False): def _raise_from_status_internal(self, raise_warnings=False):
""" """
Check source, processor and sink status and raise if needed Check source, processor and sink status and raise if needed
@ -521,7 +534,6 @@ class ProfilerWorkflow:
""" """
Close all connections Close all connections
""" """
self.set_ingestion_pipeline_status(PipelineState.success)
self.metadata.close() self.metadata.close()
def _retrieve_service_connection_if_needed(self) -> None: def _retrieve_service_connection_if_needed(self) -> None:
@ -555,15 +567,3 @@ class ProfilerWorkflow:
f"Error getting service connection for service name [{service_name}]" f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
) )
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id

View File

@ -55,15 +55,13 @@ from metadata.test_suite.runner.core import DataTestsRunner
from metadata.utils import entity_link from metadata.utils import entity_link
from metadata.utils.logger import test_suite_logger from metadata.utils.logger import test_suite_logger
from metadata.utils.partition import get_partition_details from metadata.utils.partition import get_partition_details
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_test_suite_status from metadata.utils.workflow_output_handler import print_test_suite_status
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger: Logger = test_suite_logger() logger: Logger = test_suite_logger()
class TestSuiteWorkflow: class TestSuiteWorkflow(WorkflowStatusMixin):
"""workflow to run the test suite""" """workflow to run the test suite"""
def __init__(self, config: OpenMetadataWorkflowConfig): def __init__(self, config: OpenMetadataWorkflowConfig):
@ -383,8 +381,10 @@ class TestSuiteWorkflow:
return runtime_created_test_cases return runtime_created_test_cases
return [] return []
def execute(self): def run_test_suite(self):
"""Execute test suite workflow""" """
Main running logic
"""
test_suites = ( test_suites = (
self.get_test_suite_entity_for_ui_workflow() self.get_test_suite_entity_for_ui_workflow()
or self.get_or_create_test_suite_entity_for_cli_workflow() or self.get_or_create_test_suite_entity_for_cli_workflow()
@ -417,7 +417,7 @@ class TestSuiteWorkflow:
if hasattr(self, "sink"): if hasattr(self, "sink"):
self.sink.write_record(test_result) self.sink.write_record(test_result)
logger.info( logger.info(
f"Successfuly ran test case {test_case.name.__root__}" f"Successfully ran test case {test_case.name.__root__}"
) )
self.status.processed(test_case.fullyQualifiedName.__root__) self.status.processed(test_case.fullyQualifiedName.__root__)
except Exception as exc: except Exception as exc:
@ -431,6 +431,18 @@ class TestSuiteWorkflow:
logger.warning(f"Could not run test case for table {entity_fqn}: {exc}") logger.warning(f"Could not run test case for table {entity_fqn}: {exc}")
self.status.failure(entity_fqn) self.status.failure(entity_fqn)
def execute(self):
"""Execute test suite workflow"""
try:
self.run_test_suite()
# At the end of the `execute`, update the associated Ingestion Pipeline status as success
self.set_ingestion_pipeline_status(PipelineState.success)
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def print_status(self) -> None: def print_status(self) -> None:
""" """
Print the workflow results with click Print the workflow results with click
@ -447,7 +459,7 @@ class TestSuiteWorkflow:
return 1 return 1
return 0 return 0
def raise_from_status(self, raise_warnings=False): def _raise_from_status_internal(self, raise_warnings=False):
""" """
Check source, processor and sink status and raise if needed Check source, processor and sink status and raise if needed
@ -472,17 +484,4 @@ class TestSuiteWorkflow:
""" """
Close all connections Close all connections
""" """
self.set_ingestion_pipeline_status(PipelineState.success)
self.metadata.close() self.metadata.close()
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id

View File

@ -1,58 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Modules contains helper methods for different workflows
"""
import uuid
from datetime import datetime
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
PipelineStatus,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
def set_ingestion_pipeline_status(
state: PipelineState,
ingestion_pipeline_fqn: str,
pipeline_run_id: str,
metadata: OpenMetadata,
) -> str:
"""
Method to set the pipeline status of current ingestion pipeline
"""
if ingestion_pipeline_fqn:
# if ingestion pipeline fqn is not set then setting pipeline status is avoided
pipeline_status: PipelineStatus = None
if state in (PipelineState.queued, PipelineState.running):
pipeline_run_id = pipeline_run_id or str(uuid.uuid4())
pipeline_status = PipelineStatus(
runId=pipeline_run_id,
pipelineState=state,
startDate=datetime.now().timestamp() * 1000,
timestamp=datetime.now().timestamp() * 1000,
)
elif pipeline_run_id:
pipeline_status = metadata.get_pipeline_status(
ingestion_pipeline_fqn, pipeline_run_id
)
# if workflow is ended then update the end date in status
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = state
metadata.create_or_update_pipeline_status(
ingestion_pipeline_fqn, pipeline_status
)
return pipeline_run_id
return None

View File

@ -0,0 +1,92 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Add methods to the workflows for updating the IngestionPipeline status
"""
import uuid
from datetime import datetime
from typing import Optional
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
PipelineStatus,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
class WorkflowStatusMixin:
"""
Helper methods to manage IngestionPipeline status
and workflow run ID
"""
config: OpenMetadataWorkflowConfig
_run_id: Optional[str] = None
metadata: OpenMetadata
@property
def run_id(self) -> str:
"""
If the config does not have an informed run id, we'll
generate and assign one here.
"""
if not self._run_id:
if self.config.pipelineRunId:
self._run_id = str(self.config.pipelineRunId.__root__)
else:
self._run_id = str(uuid.uuid4())
return self._run_id
def set_ingestion_pipeline_status(
self,
state: PipelineState,
) -> None:
"""
Method to set the pipeline status of current ingestion pipeline
"""
# if we don't have a related Ingestion Pipeline FQN, no status is set.
if self.config.ingestionPipelineFQN:
if state in (PipelineState.queued, PipelineState.running):
pipeline_status = PipelineStatus(
runId=self.run_id,
pipelineState=state,
startDate=datetime.now().timestamp() * 1000,
timestamp=datetime.now().timestamp() * 1000,
)
else:
pipeline_status = self.metadata.get_pipeline_status(
self.config.ingestionPipelineFQN, self.run_id
)
# if workflow is ended then update the end date in status
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = state
self.metadata.create_or_update_pipeline_status(
self.config.ingestionPipelineFQN, pipeline_status
)
def raise_from_status(self, raise_warnings=False):
"""
Method to raise error if failed execution
and updating Ingestion Pipeline Status
"""
try:
self._raise_from_status_internal(raise_warnings)
except WorkflowExecutionError as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err

View File

@ -12,7 +12,9 @@
Metadata DAG common functions Metadata DAG common functions
""" """
import json import json
import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import partial
from typing import Callable, Optional from typing import Callable, Optional
import airflow import airflow
@ -21,7 +23,6 @@ from openmetadata_managed_apis.api.utils import clean_dag_id
from pydantic import ValidationError from pydantic import ValidationError
from requests.utils import quote from requests.utils import quote
from metadata.data_insight.api.workflow import DataInsightWorkflow
from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.messagingService import MessagingService
@ -32,8 +33,6 @@ from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type import basic from metadata.generated.schema.type import basic
from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.test_suite.api.workflow import TestSuiteWorkflow
from metadata.utils.logger import set_loggers_level from metadata.utils.logger import set_loggers_level
try: try:
@ -201,88 +200,11 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = Workflow.create(config) workflow = Workflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
workflow.execute()
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.raise_from_status()
""" workflow.print_status()
Task that creates and runs the profiler workflow. workflow.stop()
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the test suite workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = TestSuiteWorkflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""Task that creates and runs the data insight workflow.
The workflow_config gets created form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
Args:
workflow_config (OpenMetadataWorkflowConfig): _description_
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = DataInsightWorkflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def date_to_datetime( def date_to_datetime(
@ -340,6 +262,17 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
} }
def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, context):
"""
Airflow on_failure_callback to update workflow status if something unexpected
happens or if the DAG is externally killed.
"""
logger.info("Sending failed status from callback...")
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = Workflow.create(config)
workflow.set_ingestion_pipeline_status(PipelineState.failed)
def build_dag( def build_dag(
task_name: str, task_name: str,
ingestion_pipeline: IngestionPipeline, ingestion_pipeline: IngestionPipeline,
@ -352,12 +285,18 @@ def build_dag(
with DAG(**build_dag_configs(ingestion_pipeline)) as dag: with DAG(**build_dag_configs(ingestion_pipeline)) as dag:
# Initialize with random UUID4. Will be used by the callback instead of
# generating it inside the Workflow itself.
workflow_config.pipelineRunId = str(uuid.uuid4())
PythonOperator( PythonOperator(
task_id=task_name, task_id=task_name,
python_callable=workflow_fn, python_callable=workflow_fn,
op_kwargs={"workflow_config": workflow_config}, op_kwargs={"workflow_config": workflow_config},
retries=ingestion_pipeline.airflowConfig.retries, # There's no need to retry if we have had an error. Wait until the next schedule or manual rerun.
retry_delay=ingestion_pipeline.airflowConfig.retryDelay, retries=0,
# each DAG will call its own OpenMetadataWorkflowConfig
on_failure_callback=partial(send_failed_status_callback, workflow_config),
) )
return dag return dag

View File

@ -11,16 +11,16 @@
""" """
Data Insights DAG function builder Data Insights DAG function builder
""" """
import json
from typing import cast from typing import cast
from airflow import DAG from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import ( from openmetadata_managed_apis.workflows.ingestion.common import (
ClientInitializationError, ClientInitializationError,
build_dag, build_dag,
data_insight_workflow,
) )
from metadata.data_insight.api.workflow import DataInsightWorkflow
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline, IngestionPipeline,
) )
@ -36,8 +36,32 @@ from metadata.generated.schema.metadataIngestion.workflow import (
) )
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.generated.schema.type.basic import ComponentConfig from metadata.generated.schema.type.basic import ComponentConfig
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS
from metadata.utils.logger import set_loggers_level
def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""Task that creates and runs the data insight workflow.
The workflow_config gets created form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
Args:
workflow_config (OpenMetadataWorkflowConfig): _description_
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = DataInsightWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
def build_data_insight_workflow_config( def build_data_insight_workflow_config(

View File

@ -11,13 +11,14 @@
""" """
Profiler DAG function builder Profiler DAG function builder
""" """
import json
from airflow import DAG from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import ( from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
build_dag,
build_source, from metadata.ingestion.models.encoders import show_secrets_encoder
profiler_workflow, from metadata.orm_profiler.api.workflow import ProfilerWorkflow
) from metadata.utils.logger import set_loggers_level
try: try:
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
@ -36,6 +37,27 @@ from metadata.generated.schema.metadataIngestion.workflow import (
) )
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the profiler workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
def build_profiler_workflow_config( def build_profiler_workflow_config(
ingestion_pipeline: IngestionPipeline, ingestion_pipeline: IngestionPipeline,
) -> OpenMetadataWorkflowConfig: ) -> OpenMetadataWorkflowConfig:

View File

@ -11,13 +11,10 @@
""" """
testSuite DAG function builder testSuite DAG function builder
""" """
import json
from airflow import DAG from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import ( from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
build_dag,
build_source,
test_suite_workflow,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline, IngestionPipeline,
@ -29,6 +26,30 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Sink, Sink,
WorkflowConfig, WorkflowConfig,
) )
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.test_suite.api.workflow import TestSuiteWorkflow
from metadata.utils.logger import set_loggers_level
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the test suite workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = TestSuiteWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
def build_test_suite_workflow_config( def build_test_suite_workflow_config(