| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Metadata DAG common functions | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | import json | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  | from datetime import datetime, timedelta | 
					
						
							| 
									
										
										
										
											2022-11-11 09:59:15 +01:00
										 |  |  | from typing import Callable, Optional | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  | import airflow | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | from airflow import DAG | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.api.utils import clean_dag_id | 
					
						
							| 
									
										
										
										
											2022-11-09 13:00:22 +05:30
										 |  |  | from pydantic import ValidationError | 
					
						
							|  |  |  | from requests.utils import quote | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | from metadata.data_insight.api.workflow import DataInsightWorkflow | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | from metadata.generated.schema.entity.services.dashboardService import DashboardService | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import DatabaseService | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.messagingService import MessagingService | 
					
						
							| 
									
										
										
										
											2022-11-16 11:11:54 +05:30
										 |  |  | from metadata.generated.schema.entity.services.metadataService import MetadataService | 
					
						
							| 
									
										
										
										
											2022-06-28 14:58:38 +02:00
										 |  |  | from metadata.generated.schema.entity.services.mlmodelService import MlModelService | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | from metadata.generated.schema.entity.services.pipelineService import PipelineService | 
					
						
							| 
									
										
										
										
											2022-09-30 17:57:03 +02:00
										 |  |  | from metadata.generated.schema.tests.testSuite import TestSuite | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | from metadata.generated.schema.type import basic | 
					
						
							| 
									
										
										
										
											2022-04-20 11:59:56 +02:00
										 |  |  | from metadata.ingestion.models.encoders import show_secrets_encoder | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | from metadata.orm_profiler.api.workflow import ProfilerWorkflow | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  | from metadata.test_suite.api.workflow import TestSuiteWorkflow | 
					
						
							| 
									
										
										
										
											2022-04-29 06:54:30 +02:00
										 |  |  | from metadata.utils.logger import set_loggers_level | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     from airflow.operators.python import PythonOperator | 
					
						
							|  |  |  | except ModuleNotFoundError: | 
					
						
							|  |  |  |     from airflow.operators.python_operator import PythonOperator | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-09 13:00:22 +05:30
										 |  |  | from openmetadata_managed_apis.utils.logger import workflow_logger | 
					
						
							|  |  |  | from openmetadata_managed_apis.utils.parser import ( | 
					
						
							|  |  |  |     parse_service_connection, | 
					
						
							|  |  |  |     parse_validation_err, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.credentials_builder import ( | 
					
						
							| 
									
										
										
										
											2022-07-19 14:51:44 +02:00
										 |  |  |     build_secrets_manager_credentials, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     PipelineState, | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							| 
									
										
										
										
											2022-04-29 09:40:05 +02:00
										 |  |  |     LogLevels, | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |     OpenMetadataWorkflowConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							|  |  |  |     Source as WorkflowSource, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig | 
					
						
							| 
									
										
										
										
											2022-11-09 13:00:22 +05:30
										 |  |  | from metadata.ingestion.api.parser import ( | 
					
						
							|  |  |  |     InvalidWorkflowException, | 
					
						
							|  |  |  |     ParsingConfigurationError, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | from metadata.ingestion.api.workflow import Workflow | 
					
						
							| 
									
										
										
										
											2022-11-09 13:00:22 +05:30
										 |  |  | from metadata.ingestion.ometa.utils import model_str | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | logger = workflow_logger() | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  | class InvalidServiceException(Exception): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Exception to be thrown when couldn't fetch the service from server | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ClientInitializationError(Exception): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Exception to be thrown when couldn't initialize the Openmetadata Client | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Use the service EntityReference to build the Source. | 
					
						
							|  |  |  |     Building the source dynamically helps us to not store any | 
					
						
							|  |  |  |     sensitive info. | 
					
						
							|  |  |  |     :param ingestion_pipeline: With the service ref | 
					
						
							|  |  |  |     :return: WorkflowSource | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-07-19 14:51:44 +02:00
										 |  |  |     secrets_manager = ( | 
					
						
							|  |  |  |         ingestion_pipeline.openMetadataServerConnection.secretsManagerProvider | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     ingestion_pipeline.openMetadataServerConnection.secretsManagerCredentials = ( | 
					
						
							|  |  |  |         build_secrets_manager_credentials(secrets_manager) | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  |     try: | 
					
						
							|  |  |  |         metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) | 
					
						
							|  |  |  |     except Exception as exc: | 
					
						
							|  |  |  |         raise ClientInitializationError(f"Failed to initialize the client: {exc}") | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     service_type = ingestion_pipeline.service.type | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-29 17:26:59 +05:30
										 |  |  |     if service_type == "testSuite": | 
					
						
							| 
									
										
										
										
											2022-09-30 17:57:03 +02:00
										 |  |  |         service = metadata.get_by_name( | 
					
						
							|  |  |  |             entity=TestSuite, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |         )  # check we are able to access OM server | 
					
						
							|  |  |  |         if not service: | 
					
						
							|  |  |  |             raise InvalidServiceException( | 
					
						
							|  |  |  |                 f"Could not get service from type {service_type}" | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  |         return WorkflowSource( | 
					
						
							|  |  |  |             type=service_type, | 
					
						
							|  |  |  |             serviceName=ingestion_pipeline.service.name, | 
					
						
							|  |  |  |             sourceConfig=ingestion_pipeline.sourceConfig, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-09 13:00:22 +05:30
										 |  |  |     entity_class = None | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         if service_type == "databaseService": | 
					
						
							|  |  |  |             entity_class = DatabaseService | 
					
						
							|  |  |  |             service: DatabaseService = metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=entity_class, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         elif service_type == "pipelineService": | 
					
						
							|  |  |  |             entity_class = PipelineService | 
					
						
							|  |  |  |             service: PipelineService = metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=entity_class, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         elif service_type == "dashboardService": | 
					
						
							|  |  |  |             entity_class = DashboardService | 
					
						
							|  |  |  |             service: DashboardService = metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=entity_class, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         elif service_type == "messagingService": | 
					
						
							|  |  |  |             entity_class = MessagingService | 
					
						
							|  |  |  |             service: MessagingService = metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=entity_class, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         elif service_type == "mlmodelService": | 
					
						
							|  |  |  |             entity_class = MlModelService | 
					
						
							|  |  |  |             service: MlModelService = metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=entity_class, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-11-16 11:11:54 +05:30
										 |  |  |         elif service_type == "metadataService": | 
					
						
							|  |  |  |             entity_class = MetadataService | 
					
						
							|  |  |  |             service: MetadataService = metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=entity_class, fqn=ingestion_pipeline.service.name | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-11-09 13:00:22 +05:30
										 |  |  |         else: | 
					
						
							|  |  |  |             raise InvalidServiceException(f"Invalid Service Type: {service_type}") | 
					
						
							|  |  |  |     except ValidationError as original_error: | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             resp = metadata.client.get( | 
					
						
							|  |  |  |                 f"{metadata.get_suffix(entity_class)}/name/{quote(model_str(ingestion_pipeline.service.name), safe='')}" | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             parse_service_connection(resp) | 
					
						
							|  |  |  |         except (ValidationError, InvalidWorkflowException) as scoped_error: | 
					
						
							|  |  |  |             if isinstance(scoped_error, ValidationError): | 
					
						
							|  |  |  |                 # Let's catch validations of internal Workflow models, not the Workflow itself | 
					
						
							|  |  |  |                 object_error = ( | 
					
						
							|  |  |  |                     scoped_error.model.__name__ | 
					
						
							|  |  |  |                     if scoped_error.model is not None | 
					
						
							|  |  |  |                     else "workflow" | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 raise ParsingConfigurationError( | 
					
						
							|  |  |  |                     f"We encountered an error parsing the configuration of your {object_error}.\n" | 
					
						
							|  |  |  |                     f"{parse_validation_err(scoped_error)}" | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             raise scoped_error | 
					
						
							|  |  |  |         raise ParsingConfigurationError( | 
					
						
							|  |  |  |             f"We encountered an error parsing the configuration of your workflow.\n" | 
					
						
							|  |  |  |             f"{parse_validation_err(original_error)}" | 
					
						
							| 
									
										
										
										
											2022-06-28 14:58:38 +02:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     if not service: | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  |         raise InvalidServiceException(f"Could not get service from type {service_type}") | 
					
						
							| 
									
										
										
										
											2022-06-02 20:40:53 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     return WorkflowSource( | 
					
						
							|  |  |  |         type=service.serviceType.value.lower(), | 
					
						
							|  |  |  |         serviceName=service.name.__root__, | 
					
						
							|  |  |  |         serviceConnection=service.connection, | 
					
						
							|  |  |  |         sourceConfig=ingestion_pipeline.sourceConfig, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Task that creates and runs the ingestion workflow. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The workflow_config gets cooked form the incoming | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     ingestionPipeline. | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     This is the callable used to create the PythonOperator | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-04-29 06:54:30 +02:00
										 |  |  |     set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) | 
					
						
							| 
									
										
										
										
											2022-04-20 11:59:56 +02:00
										 |  |  |     config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |     workflow = Workflow.create(config) | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     try: | 
					
						
							|  |  |  |         workflow.execute() | 
					
						
							|  |  |  |         workflow.raise_from_status() | 
					
						
							|  |  |  |         workflow.print_status() | 
					
						
							|  |  |  |         workflow.stop() | 
					
						
							|  |  |  |     except Exception as err: | 
					
						
							|  |  |  |         workflow.set_ingestion_pipeline_status(PipelineState.failed) | 
					
						
							|  |  |  |         raise err | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Task that creates and runs the profiler workflow. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The workflow_config gets cooked form the incoming | 
					
						
							|  |  |  |     ingestionPipeline. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is the callable used to create the PythonOperator | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-04-28 12:20:03 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-29 06:54:30 +02:00
										 |  |  |     set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) | 
					
						
							| 
									
										
										
										
											2022-04-28 12:20:03 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  |     workflow = ProfilerWorkflow.create(config) | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     try: | 
					
						
							|  |  |  |         workflow.execute() | 
					
						
							|  |  |  |         workflow.raise_from_status() | 
					
						
							|  |  |  |         workflow.print_status() | 
					
						
							|  |  |  |         workflow.stop() | 
					
						
							|  |  |  |     except Exception as err: | 
					
						
							|  |  |  |         workflow.set_ingestion_pipeline_status(PipelineState.failed) | 
					
						
							|  |  |  |         raise err | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  | def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Task that creates and runs the test suite workflow. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The workflow_config gets cooked form the incoming | 
					
						
							|  |  |  |     ingestionPipeline. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is the callable used to create the PythonOperator | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  |     workflow = TestSuiteWorkflow.create(config) | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         workflow.execute() | 
					
						
							|  |  |  |         workflow.raise_from_status() | 
					
						
							|  |  |  |         workflow.print_status() | 
					
						
							|  |  |  |         workflow.stop() | 
					
						
							|  |  |  |     except Exception as err: | 
					
						
							|  |  |  |         workflow.set_ingestion_pipeline_status(PipelineState.failed) | 
					
						
							|  |  |  |         raise err | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): | 
					
						
							|  |  |  |     """Task that creates and runs the data insight workflow.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The workflow_config gets created form the incoming | 
					
						
							|  |  |  |     ingestionPipeline. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is the callable used to create the PythonOperator | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         workflow_config (OpenMetadataWorkflowConfig): _description_ | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  |     workflow = DataInsightWorkflow.create(config) | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     try: | 
					
						
							|  |  |  |         workflow.execute() | 
					
						
							|  |  |  |         workflow.raise_from_status() | 
					
						
							|  |  |  |         workflow.print_status() | 
					
						
							|  |  |  |         workflow.stop() | 
					
						
							|  |  |  |     except Exception as err: | 
					
						
							|  |  |  |         workflow.set_ingestion_pipeline_status(PipelineState.failed) | 
					
						
							|  |  |  |         raise err | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  | def date_to_datetime( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |     date: Optional[basic.DateTime], date_format: str = "%Y-%m-%dT%H:%M:%S%z" | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  | ) -> Optional[datetime]: | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |     Format a basic.DateTime to datetime. ISO 8601 format by default. | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  |     if date is None: | 
					
						
							|  |  |  |         return | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  |     return datetime.strptime(str(date.__root__), date_format) | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 12:20:03 +02:00
										 |  |  | def build_workflow_config_property( | 
					
						
							|  |  |  |     ingestion_pipeline: IngestionPipeline, | 
					
						
							|  |  |  | ) -> WorkflowConfig: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Prepare the workflow config with logLevels and openMetadataServerConfig | 
					
						
							|  |  |  |     :param ingestion_pipeline: Received payload from REST | 
					
						
							|  |  |  |     :return: WorkflowConfig | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     return WorkflowConfig( | 
					
						
							| 
									
										
										
										
											2022-04-29 09:40:05 +02:00
										 |  |  |         loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, | 
					
						
							| 
									
										
										
										
											2022-04-28 12:20:03 +02:00
										 |  |  |         openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  | def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Prepare kwargs to send to DAG | 
					
						
							|  |  |  |     :param ingestion_pipeline: pipeline configs | 
					
						
							|  |  |  |     :return: dict to use as kwargs | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     return { | 
					
						
							| 
									
										
										
										
											2022-07-13 14:43:35 +02:00
										 |  |  |         "dag_id": clean_dag_id(ingestion_pipeline.name.__root__), | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  |         "description": ingestion_pipeline.description, | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |         "start_date": ingestion_pipeline.airflowConfig.startDate.__root__ | 
					
						
							|  |  |  |         if ingestion_pipeline.airflowConfig.startDate | 
					
						
							|  |  |  |         else airflow.utils.dates.days_ago(1), | 
					
						
							|  |  |  |         "end_date": ingestion_pipeline.airflowConfig.endDate.__root__ | 
					
						
							|  |  |  |         if ingestion_pipeline.airflowConfig.endDate | 
					
						
							|  |  |  |         else None, | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  |         "concurrency": ingestion_pipeline.airflowConfig.concurrency, | 
					
						
							|  |  |  |         "max_active_runs": ingestion_pipeline.airflowConfig.maxActiveRuns, | 
					
						
							|  |  |  |         "default_view": ingestion_pipeline.airflowConfig.workflowDefaultView, | 
					
						
							|  |  |  |         "orientation": ingestion_pipeline.airflowConfig.workflowDefaultViewOrientation, | 
					
						
							|  |  |  |         "dagrun_timeout": timedelta(ingestion_pipeline.airflowConfig.workflowTimeout) | 
					
						
							|  |  |  |         if ingestion_pipeline.airflowConfig.workflowTimeout | 
					
						
							|  |  |  |         else None, | 
					
						
							|  |  |  |         "is_paused_upon_creation": ingestion_pipeline.airflowConfig.pausePipeline | 
					
						
							|  |  |  |         or False, | 
					
						
							|  |  |  |         "catchup": ingestion_pipeline.airflowConfig.pipelineCatchup or False, | 
					
						
							|  |  |  |         "schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | def build_dag( | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |     task_name: str, | 
					
						
							|  |  |  |     ingestion_pipeline: IngestionPipeline, | 
					
						
							| 
									
										
										
										
											2022-04-20 11:59:56 +02:00
										 |  |  |     workflow_config: OpenMetadataWorkflowConfig, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     workflow_fn: Callable, | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | ) -> DAG: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Build a simple metadata workflow DAG | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  |     with DAG(**build_dag_configs(ingestion_pipeline)) as dag: | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         PythonOperator( | 
					
						
							|  |  |  |             task_id=task_name, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             python_callable=workflow_fn, | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |             op_kwargs={"workflow_config": workflow_config}, | 
					
						
							| 
									
										
										
										
											2022-04-20 09:14:27 +02:00
										 |  |  |             retries=ingestion_pipeline.airflowConfig.retries, | 
					
						
							|  |  |  |             retry_delay=ingestion_pipeline.airflowConfig.retryDelay, | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return dag |