| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2022-11-14 18:59:56 +01:00
										 |  |  | Data Insights DAG function builder | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | """
 | 
					
						
							| 
									
										
										
										
											2022-11-30 08:30:45 +01:00
										 |  |  | import json | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | from airflow import DAG | 
					
						
							| 
									
										
										
										
											2023-01-23 16:28:17 +01:00
										 |  |  | from openmetadata_managed_apis.utils.logger import set_operator_logger | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.common import ( | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  |     ClientInitializationError, | 
					
						
							| 
									
										
										
										
											2023-06-27 12:32:08 +02:00
										 |  |  |     GetServiceException, | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  |     build_dag, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-03-20 13:00:51 +01:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import ( | 
					
						
							|  |  |  |     build_elasticsearch_sink, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  | from metadata.generated.schema.entity.services.metadataService import MetadataService | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							|  |  |  |     LogLevels, | 
					
						
							|  |  |  |     OpenMetadataWorkflowConfig, | 
					
						
							|  |  |  |     Processor, | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							|  |  |  |     Source as WorkflowSource, | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-03-21 14:12:20 +01:00
										 |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							|  |  |  |     SourceConfig, | 
					
						
							|  |  |  |     WorkflowConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-11-30 08:30:45 +01:00
										 |  |  | from metadata.ingestion.models.encoders import show_secrets_encoder | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							| 
									
										
										
										
											2023-10-02 12:05:30 +02:00
										 |  |  | from metadata.workflow.data_insight import DataInsightWorkflow | 
					
						
							| 
									
										
										
										
											2023-10-09 18:31:58 +05:30
										 |  |  | from metadata.workflow.workflow_output_handler import print_status | 
					
						
							| 
									
										
										
										
											2022-11-30 08:30:45 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): | 
					
						
							|  |  |  |     """Task that creates and runs the data insight workflow.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The workflow_config gets created form the incoming | 
					
						
							|  |  |  |     ingestionPipeline. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is the callable used to create the PythonOperator | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         workflow_config (OpenMetadataWorkflowConfig): _description_ | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2023-01-23 16:28:17 +01:00
										 |  |  |     set_operator_logger(workflow_config) | 
					
						
							| 
									
										
										
										
											2022-11-30 08:30:45 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  |     workflow = DataInsightWorkflow.create(config) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     workflow.execute() | 
					
						
							|  |  |  |     workflow.raise_from_status() | 
					
						
							| 
									
										
										
										
											2023-10-09 18:31:58 +05:30
										 |  |  |     print_status(workflow) | 
					
						
							| 
									
										
										
										
											2022-11-30 08:30:45 +01:00
										 |  |  |     workflow.stop() | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def build_data_insight_workflow_config( | 
					
						
							|  |  |  |     ingestion_pipeline: IngestionPipeline, | 
					
						
							|  |  |  | ) -> OpenMetadataWorkflowConfig: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Given an airflow_pipeline, prepare the workflow config JSON | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  |     try: | 
					
						
							|  |  |  |         metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) | 
					
						
							|  |  |  |     except Exception as exc: | 
					
						
							|  |  |  |         raise ClientInitializationError(f"Failed to initialize the client: {exc}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     openmetadata_service = metadata.get_by_name( | 
					
						
							|  |  |  |         entity=MetadataService, fqn=ingestion_pipeline.service.fullyQualifiedName | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if not openmetadata_service: | 
					
						
							| 
									
										
										
										
											2023-06-27 12:32:08 +02:00
										 |  |  |         raise GetServiceException(service_type="metadata", service_name="OpenMetadata") | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-20 13:00:51 +01:00
										 |  |  |     sink = build_elasticsearch_sink( | 
					
						
							|  |  |  |         openmetadata_service.connection.config, ingestion_pipeline | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  |     workflow_config = OpenMetadataWorkflowConfig( | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  |         source=WorkflowSource( | 
					
						
							|  |  |  |             type="dataInsight", | 
					
						
							|  |  |  |             serviceName=ingestion_pipeline.service.name, | 
					
						
							| 
									
										
										
										
											2023-03-21 14:12:20 +01:00
										 |  |  |             sourceConfig=SourceConfig(),  # Source Config not needed here. Configs are passed to ES Sink. | 
					
						
							| 
									
										
										
										
											2022-11-15 05:44:25 +01:00
										 |  |  |         ), | 
					
						
							|  |  |  |         sink=sink, | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  |         processor=Processor( | 
					
						
							|  |  |  |             type="data-insight-processor", | 
					
						
							|  |  |  |             config={}, | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         workflowConfig=WorkflowConfig( | 
					
						
							|  |  |  |             loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, | 
					
						
							|  |  |  |             openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, | 
					
						
							|  |  |  |         ), | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |         ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return workflow_config | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def build_data_insight_dag(ingestion_pipeline: IngestionPipeline) -> DAG: | 
					
						
							| 
									
										
										
										
											2022-11-14 18:59:56 +01:00
										 |  |  |     """Build a simple Data Insight DAG""" | 
					
						
							| 
									
										
										
										
											2022-10-26 11:18:08 +02:00
										 |  |  |     workflow_config = build_data_insight_workflow_config(ingestion_pipeline) | 
					
						
							|  |  |  |     dag = build_dag( | 
					
						
							|  |  |  |         task_name="data_insight_task", | 
					
						
							|  |  |  |         ingestion_pipeline=ingestion_pipeline, | 
					
						
							|  |  |  |         workflow_config=workflow_config, | 
					
						
							|  |  |  |         workflow_fn=data_insight_workflow, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return dag |