diff --git a/ingestion/src/metadata/data_insight/api/workflow.py b/ingestion/src/metadata/data_insight/api/workflow.py index 45c1df0ab64..7f8bd16d937 100644 --- a/ingestion/src/metadata/data_insight/api/workflow.py +++ b/ingestion/src/metadata/data_insight/api/workflow.py @@ -59,17 +59,15 @@ from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, ) -from metadata.utils.workflow_helper import ( - set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, -) from metadata.utils.workflow_output_handler import print_data_insight_status +from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin logger = data_insight_logger() NOW = datetime.utcnow().timestamp() * 1000 -class DataInsightWorkflow: +class DataInsightWorkflow(WorkflowStatusMixin): """ Configure and run the Data Insigt workflow @@ -263,17 +261,25 @@ class DataInsightWorkflow: def execute(self): """Execute workflow""" - logger.info("Starting data processor execution") - self._execute_data_processor() - logger.info("Data processor finished running") + try: + logger.info("Starting data processor execution") + self._execute_data_processor() + logger.info("Data processor finished running") - logger.info("Sleeping for 1 second. Waiting for ES data to be indexed.") - time.sleep(1) - logger.info("Starting KPI runner") - self._execute_kpi_runner() - logger.info("KPI runner finished running") + logger.info("Sleeping for 1 second. Waiting for ES data to be indexed.") + time.sleep(1) + logger.info("Starting KPI runner") + self._execute_kpi_runner() + logger.info("KPI runner finished running") - def raise_from_status(self, raise_warnings=False): + # At the end of the `execute`, update the associated Ingestion Pipeline status as success + self.set_ingestion_pipeline_status(PipelineState.success) + # Any unhandled exception breaking the workflow should update the status + except Exception as err: + self.set_ingestion_pipeline_status(PipelineState.failed) + raise err + + def _raise_from_status_internal(self, raise_warnings=False): if self.data_processor and self.data_processor.get_status().failures: raise WorkflowExecutionError( "Source reported errors", self.data_processor.get_status() @@ -308,17 +314,4 @@ class DataInsightWorkflow: """ Close all connections """ - self.set_ingestion_pipeline_status(PipelineState.success) self.metadata.close() - - def set_ingestion_pipeline_status(self, state: PipelineState): - """ - Method to set the pipeline status of current ingestion pipeline - """ - pipeline_run_id = set_ingestion_pipeline_status_helper( - state=state, - ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, - pipeline_run_id=self.config.pipelineRunId, - metadata=self.metadata, - ) - self.config.pipelineRunId = pipeline_run_id diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index a55b0a75678..92dff907025 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -44,10 +44,8 @@ from metadata.utils.class_helper import ( get_service_type_from_source_type, ) from metadata.utils.logger import ingestion_logger, set_loggers_level -from metadata.utils.workflow_helper import ( - set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, -) from metadata.utils.workflow_output_handler import print_status +from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin logger = ingestion_logger() @@ -63,7 +61,7 @@ class InvalidWorkflowJSONException(Exception): """ -class Workflow: +class Workflow(WorkflowStatusMixin): """ Ingestion workflow implementation. @@ -237,6 +235,15 @@ class Workflow: self.stage.close() self.bulk_sink.write_records() self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() + + # If we reach this point, compute the success % and update the associated Ingestion Pipeline status + self.update_ingestion_status_at_end() + + # Any unhandled exception breaking the workflow should update the status + except Exception as err: + self.set_ingestion_pipeline_status(PipelineState.failed) + raise err + # Force resource closing. Required for killing the threading finally: self.stop() @@ -249,6 +256,17 @@ class Workflow: if hasattr(self, "sink"): self.sink.close() + self.source.close() + self.timer.stop() + + def _get_source_success(self): + return self.source.get_status().calculate_success() + + def update_ingestion_status_at_end(self): + """ + Once the execute method is done, update the status + as OK or KO depending on the success rate. + """ pipeline_state = PipelineState.success if ( self._get_source_success() >= SUCCESS_THRESHOLD_VALUE @@ -256,13 +274,8 @@ class Workflow: ): pipeline_state = PipelineState.partialSuccess self.set_ingestion_pipeline_status(pipeline_state) - self.source.close() - self.timer.stop() - def _get_source_success(self): - return self.source.get_status().calculate_success() - - def raise_from_status(self, raise_warnings=False): + def _raise_from_status_internal(self, raise_warnings=False): """ Method to raise error if failed execution """ @@ -307,18 +320,6 @@ class Workflow: return 0 - def set_ingestion_pipeline_status(self, state: PipelineState): - """ - Method to set the pipeline status of current ingestion pipeline - """ - pipeline_run_id = set_ingestion_pipeline_status_helper( - state=state, - ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, - pipeline_run_id=self.config.pipelineRunId, - metadata=self.metadata, - ) - self.config.pipelineRunId = pipeline_run_id - def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None: """ We override the current `serviceConnection` source config object if source workflow service already exists diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py index f11a6ed616e..2803e510866 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py @@ -55,7 +55,7 @@ class OMetaIngestionPipelineMixin: def get_pipeline_status( self, ingestion_pipeline_fqn: str, pipeline_status_run_id: str - ) -> None: + ) -> Optional[PipelineStatus]: """ GET pipeline status diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 3b3bea5a143..8d21a91bc34 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -82,10 +82,8 @@ from metadata.utils.class_helper import ( from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.logger import profiler_logger from metadata.utils.partition import get_partition_details -from metadata.utils.workflow_helper import ( - set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, -) from metadata.utils.workflow_output_handler import print_profiler_status +from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin logger = profiler_logger() @@ -94,7 +92,7 @@ class ProfilerInterfaceInstantiationError(Exception): """Raise when interface cannot be instantiated""" -class ProfilerWorkflow: +class ProfilerWorkflow(WorkflowStatusMixin): """ Configure and run the ORM profiler """ @@ -413,18 +411,18 @@ class ProfilerWorkflow: return copy_service_connection_config - def execute(self): + def run_profiler_workflow(self): """ - Run the profiling and tests + Main logic for the profiler workflow """ - databases = self.get_database_entities() if not databases: raise ValueError( "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long - f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long + f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" + # pylint: disable=line-too-long ) for database in databases: @@ -468,6 +466,21 @@ class ProfilerWorkflow: f"Unexpected exception executing in database [{database}]: {exc}" ) + def execute(self): + """ + Run the profiling and tests + """ + + try: + self.run_profiler_workflow() + # At the end of the `execute`, update the associated Ingestion Pipeline status as success + self.set_ingestion_pipeline_status(PipelineState.success) + + # Any unhandled exception breaking the workflow should update the status + except Exception as err: + self.set_ingestion_pipeline_status(PipelineState.failed) + raise err + def print_status(self) -> None: """ Print the workflow results with click @@ -486,7 +499,7 @@ class ProfilerWorkflow: return 1 return 0 - def raise_from_status(self, raise_warnings=False): + def _raise_from_status_internal(self, raise_warnings=False): """ Check source, processor and sink status and raise if needed @@ -521,7 +534,6 @@ class ProfilerWorkflow: """ Close all connections """ - self.set_ingestion_pipeline_status(PipelineState.success) self.metadata.close() def _retrieve_service_connection_if_needed(self) -> None: @@ -555,15 +567,3 @@ class ProfilerWorkflow: f"Error getting service connection for service name [{service_name}]" f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" ) - - def set_ingestion_pipeline_status(self, state: PipelineState): - """ - Method to set the pipeline status of current ingestion pipeline - """ - pipeline_run_id = set_ingestion_pipeline_status_helper( - state=state, - ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, - pipeline_run_id=self.config.pipelineRunId, - metadata=self.metadata, - ) - self.config.pipelineRunId = pipeline_run_id diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index 20125344ca5..cd1be734b36 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -55,15 +55,13 @@ from metadata.test_suite.runner.core import DataTestsRunner from metadata.utils import entity_link from metadata.utils.logger import test_suite_logger from metadata.utils.partition import get_partition_details -from metadata.utils.workflow_helper import ( - set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, -) from metadata.utils.workflow_output_handler import print_test_suite_status +from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin logger: Logger = test_suite_logger() -class TestSuiteWorkflow: +class TestSuiteWorkflow(WorkflowStatusMixin): """workflow to run the test suite""" def __init__(self, config: OpenMetadataWorkflowConfig): @@ -383,8 +381,10 @@ class TestSuiteWorkflow: return runtime_created_test_cases return [] - def execute(self): - """Execute test suite workflow""" + def run_test_suite(self): + """ + Main running logic + """ test_suites = ( self.get_test_suite_entity_for_ui_workflow() or self.get_or_create_test_suite_entity_for_cli_workflow() @@ -417,7 +417,7 @@ class TestSuiteWorkflow: if hasattr(self, "sink"): self.sink.write_record(test_result) logger.info( - f"Successfuly ran test case {test_case.name.__root__}" + f"Successfully ran test case {test_case.name.__root__}" ) self.status.processed(test_case.fullyQualifiedName.__root__) except Exception as exc: @@ -431,6 +431,18 @@ class TestSuiteWorkflow: logger.warning(f"Could not run test case for table {entity_fqn}: {exc}") self.status.failure(entity_fqn) + def execute(self): + """Execute test suite workflow""" + try: + self.run_test_suite() + # At the end of the `execute`, update the associated Ingestion Pipeline status as success + self.set_ingestion_pipeline_status(PipelineState.success) + + # Any unhandled exception breaking the workflow should update the status + except Exception as err: + self.set_ingestion_pipeline_status(PipelineState.failed) + raise err + def print_status(self) -> None: """ Print the workflow results with click @@ -447,7 +459,7 @@ class TestSuiteWorkflow: return 1 return 0 - def raise_from_status(self, raise_warnings=False): + def _raise_from_status_internal(self, raise_warnings=False): """ Check source, processor and sink status and raise if needed @@ -472,17 +484,4 @@ class TestSuiteWorkflow: """ Close all connections """ - self.set_ingestion_pipeline_status(PipelineState.success) self.metadata.close() - - def set_ingestion_pipeline_status(self, state: PipelineState): - """ - Method to set the pipeline status of current ingestion pipeline - """ - pipeline_run_id = set_ingestion_pipeline_status_helper( - state=state, - ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, - pipeline_run_id=self.config.pipelineRunId, - metadata=self.metadata, - ) - self.config.pipelineRunId = pipeline_run_id diff --git a/ingestion/src/metadata/utils/workflow_helper.py b/ingestion/src/metadata/utils/workflow_helper.py deleted file mode 100644 index b6c3269cebf..00000000000 --- a/ingestion/src/metadata/utils/workflow_helper.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Modules contains helper methods for different workflows -""" -import uuid -from datetime import datetime - -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - PipelineState, - PipelineStatus, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata - - -def set_ingestion_pipeline_status( - state: PipelineState, - ingestion_pipeline_fqn: str, - pipeline_run_id: str, - metadata: OpenMetadata, -) -> str: - """ - Method to set the pipeline status of current ingestion pipeline - """ - - if ingestion_pipeline_fqn: - # if ingestion pipeline fqn is not set then setting pipeline status is avoided - pipeline_status: PipelineStatus = None - - if state in (PipelineState.queued, PipelineState.running): - pipeline_run_id = pipeline_run_id or str(uuid.uuid4()) - pipeline_status = PipelineStatus( - runId=pipeline_run_id, - pipelineState=state, - startDate=datetime.now().timestamp() * 1000, - timestamp=datetime.now().timestamp() * 1000, - ) - elif pipeline_run_id: - pipeline_status = metadata.get_pipeline_status( - ingestion_pipeline_fqn, pipeline_run_id - ) - # if workflow is ended then update the end date in status - pipeline_status.endDate = datetime.now().timestamp() * 1000 - pipeline_status.pipelineState = state - metadata.create_or_update_pipeline_status( - ingestion_pipeline_fqn, pipeline_status - ) - return pipeline_run_id - return None diff --git a/ingestion/src/metadata/workflow/__init__.py b/ingestion/src/metadata/workflow/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py new file mode 100644 index 00000000000..8e1d7a843bf --- /dev/null +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -0,0 +1,92 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Add methods to the workflows for updating the IngestionPipeline status +""" +import uuid +from datetime import datetime +from typing import Optional + +from metadata.config.common import WorkflowExecutionError +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, + PipelineStatus, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +class WorkflowStatusMixin: + """ + Helper methods to manage IngestionPipeline status + and workflow run ID + """ + + config: OpenMetadataWorkflowConfig + _run_id: Optional[str] = None + metadata: OpenMetadata + + @property + def run_id(self) -> str: + """ + If the config does not have an informed run id, we'll + generate and assign one here. + """ + if not self._run_id: + if self.config.pipelineRunId: + self._run_id = str(self.config.pipelineRunId.__root__) + else: + self._run_id = str(uuid.uuid4()) + + return self._run_id + + def set_ingestion_pipeline_status( + self, + state: PipelineState, + ) -> None: + """ + Method to set the pipeline status of current ingestion pipeline + """ + + # if we don't have a related Ingestion Pipeline FQN, no status is set. + if self.config.ingestionPipelineFQN: + + if state in (PipelineState.queued, PipelineState.running): + pipeline_status = PipelineStatus( + runId=self.run_id, + pipelineState=state, + startDate=datetime.now().timestamp() * 1000, + timestamp=datetime.now().timestamp() * 1000, + ) + else: + pipeline_status = self.metadata.get_pipeline_status( + self.config.ingestionPipelineFQN, self.run_id + ) + # if workflow is ended then update the end date in status + pipeline_status.endDate = datetime.now().timestamp() * 1000 + pipeline_status.pipelineState = state + + self.metadata.create_or_update_pipeline_status( + self.config.ingestionPipelineFQN, pipeline_status + ) + + def raise_from_status(self, raise_warnings=False): + """ + Method to raise error if failed execution + and updating Ingestion Pipeline Status + """ + try: + self._raise_from_status_internal(raise_warnings) + except WorkflowExecutionError as err: + self.set_ingestion_pipeline_status(PipelineState.failed) + raise err diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 8d1227a3faa..ab6f1aa55c7 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -12,7 +12,9 @@ Metadata DAG common functions """ import json +import uuid from datetime import datetime, timedelta +from functools import partial from typing import Callable, Optional import airflow @@ -21,7 +23,6 @@ from openmetadata_managed_apis.api.utils import clean_dag_id from pydantic import ValidationError from requests.utils import quote -from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService @@ -32,8 +33,6 @@ from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type import basic from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.orm_profiler.api.workflow import ProfilerWorkflow -from metadata.test_suite.api.workflow import TestSuiteWorkflow from metadata.utils.logger import set_loggers_level try: @@ -201,88 +200,11 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) workflow = Workflow.create(config) - try: - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() - except Exception as err: - workflow.set_ingestion_pipeline_status(PipelineState.failed) - raise err - -def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): - """ - Task that creates and runs the profiler workflow. - - The workflow_config gets cooked form the incoming - ingestionPipeline. - - This is the callable used to create the PythonOperator - """ - - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) - - config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - workflow = ProfilerWorkflow.create(config) - try: - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() - except Exception as err: - workflow.set_ingestion_pipeline_status(PipelineState.failed) - raise err - - -def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): - """ - Task that creates and runs the test suite workflow. - - The workflow_config gets cooked form the incoming - ingestionPipeline. - - This is the callable used to create the PythonOperator - """ - - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) - - config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - workflow = TestSuiteWorkflow.create(config) - - try: - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() - except Exception as err: - workflow.set_ingestion_pipeline_status(PipelineState.failed) - raise err - - -def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): - """Task that creates and runs the data insight workflow. - - The workflow_config gets created form the incoming - ingestionPipeline. - - This is the callable used to create the PythonOperator - - Args: - workflow_config (OpenMetadataWorkflowConfig): _description_ - """ - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) - - config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - workflow = DataInsightWorkflow.create(config) - try: - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() - except Exception as err: - workflow.set_ingestion_pipeline_status(PipelineState.failed) - raise err + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() def date_to_datetime( @@ -340,6 +262,17 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: } +def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, context): + """ + Airflow on_failure_callback to update workflow status if something unexpected + happens or if the DAG is externally killed. + """ + logger.info("Sending failed status from callback...") + config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) + workflow = Workflow.create(config) + workflow.set_ingestion_pipeline_status(PipelineState.failed) + + def build_dag( task_name: str, ingestion_pipeline: IngestionPipeline, @@ -352,12 +285,18 @@ def build_dag( with DAG(**build_dag_configs(ingestion_pipeline)) as dag: + # Initialize with random UUID4. Will be used by the callback instead of + # generating it inside the Workflow itself. + workflow_config.pipelineRunId = str(uuid.uuid4()) + PythonOperator( task_id=task_name, python_callable=workflow_fn, op_kwargs={"workflow_config": workflow_config}, - retries=ingestion_pipeline.airflowConfig.retries, - retry_delay=ingestion_pipeline.airflowConfig.retryDelay, + # There's no need to retry if we have had an error. Wait until the next schedule or manual rerun. + retries=0, + # each DAG will call its own OpenMetadataWorkflowConfig + on_failure_callback=partial(send_failed_status_callback, workflow_config), ) return dag diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index b31145c104b..9c6b0e5ba46 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -11,16 +11,16 @@ """ Data Insights DAG function builder """ - +import json from typing import cast from airflow import DAG from openmetadata_managed_apis.workflows.ingestion.common import ( ClientInitializationError, build_dag, - data_insight_workflow, ) +from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) @@ -36,8 +36,32 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.generated.schema.type.basic import ComponentConfig +from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS +from metadata.utils.logger import set_loggers_level + + +def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): + """Task that creates and runs the data insight workflow. + + The workflow_config gets created form the incoming + ingestionPipeline. + + This is the callable used to create the PythonOperator + + Args: + workflow_config (OpenMetadataWorkflowConfig): _description_ + """ + set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + + config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) + workflow = DataInsightWorkflow.create(config) + + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() def build_data_insight_workflow_config( diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 090d70cdef1..ef1d22e3c58 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -11,13 +11,14 @@ """ Profiler DAG function builder """ +import json from airflow import DAG -from openmetadata_managed_apis.workflows.ingestion.common import ( - build_dag, - build_source, - profiler_workflow, -) +from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source + +from metadata.ingestion.models.encoders import show_secrets_encoder +from metadata.orm_profiler.api.workflow import ProfilerWorkflow +from metadata.utils.logger import set_loggers_level try: from airflow.operators.python import PythonOperator @@ -36,6 +37,27 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) +def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): + """ + Task that creates and runs the profiler workflow. + + The workflow_config gets cooked form the incoming + ingestionPipeline. + + This is the callable used to create the PythonOperator + """ + + set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + + config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) + workflow = ProfilerWorkflow.create(config) + + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + + def build_profiler_workflow_config( ingestion_pipeline: IngestionPipeline, ) -> OpenMetadataWorkflowConfig: diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 025858d1bf2..f47d97a208e 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -11,13 +11,10 @@ """ testSuite DAG function builder """ +import json from airflow import DAG -from openmetadata_managed_apis.workflows.ingestion.common import ( - build_dag, - build_source, - test_suite_workflow, -) +from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, @@ -29,6 +26,30 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Sink, WorkflowConfig, ) +from metadata.ingestion.models.encoders import show_secrets_encoder +from metadata.test_suite.api.workflow import TestSuiteWorkflow +from metadata.utils.logger import set_loggers_level + + +def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): + """ + Task that creates and runs the test suite workflow. + + The workflow_config gets cooked form the incoming + ingestionPipeline. + + This is the callable used to create the PythonOperator + """ + + set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + + config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) + workflow = TestSuiteWorkflow.create(config) + + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() def build_test_suite_workflow_config(