From 87a85b085ed4481cdeee4cae5d2d4c92b4025478 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 25 Apr 2022 11:55:12 +0200 Subject: [PATCH] Fix usage source type (#4471) --- .../openmetadata/workflows/ingestion/usage.py | 5 +++- .../test_workflow_creation.py | 25 ++++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py index 76883ac550e..09830e41735 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py @@ -46,10 +46,13 @@ def build_usage_workflow_config( Given an airflow_pipeline, prepare the workflow config JSON """ + usage_source = ingestion_pipeline.source + usage_source.type = f"{usage_source.type}-usage" + with tempfile.NamedTemporaryFile() as tmp_file: workflow_config = OpenMetadataWorkflowConfig( - source=ingestion_pipeline.source, + source=usage_source, processor=Processor(type="query-parser", config={"filter": ""}), stage=Stage(type="table-usage", config={"filename": tmp_file.name}), bulkSink=BulkSink( diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index e0743b08694..c45c8c374d7 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -66,7 +66,24 @@ class OMetaServiceTest(TestCase): "sourceConfig": {"config": {"enableDataProfiler": False}}, } + # TODO update to "snowflake-usage" after https://github.com/open-metadata/OpenMetadata/issues/4469 + usage_data = { + "type": "snowflake", + "serviceName": "local_snowflake", + "serviceConnection": { + "config": { + "type": "Snowflake", + "username": "openmetadata_user", + "password": "random", + "warehouse": "warehouse", + "account": "account", + } + }, + "sourceConfig": {"config": {"queryLogDuration": 10}}, + } + workflow_source = WorkflowSource(**data) + usage_workflow_source = WorkflowSource(**usage_data) @classmethod def setUpClass(cls) -> None: @@ -127,10 +144,10 @@ class OMetaServiceTest(TestCase): ingestion_pipeline = IngestionPipeline( id=uuid.uuid4(), - name="test_ingestion_workflow", + name="test_usage_workflow", pipelineType=PipelineType.usage, - fullyQualifiedName="local_mysql.test_usage_workflow", - source=self.workflow_source, + fullyQualifiedName="local_snowflake.test_usage_workflow", + source=self.usage_workflow_source, openMetadataServerConnection=self.server_config, airflowConfig=AirflowConfig( startDate="2022-04-10", @@ -142,6 +159,8 @@ class OMetaServiceTest(TestCase): ) workflow_config = build_usage_workflow_config(ingestion_pipeline) + self.assertIn("usage", workflow_config.source.type) + config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) Workflow.create(config)