Fix usage source type (#4471)

This commit is contained in:
Pere Miquel Brull 2022-04-25 11:55:12 +02:00 committed by GitHub
parent bd150d8124
commit 87a85b085e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 4 deletions

View File

@ -46,10 +46,13 @@ def build_usage_workflow_config(
Given an airflow_pipeline, prepare the workflow config JSON Given an airflow_pipeline, prepare the workflow config JSON
""" """
usage_source = ingestion_pipeline.source
usage_source.type = f"{usage_source.type}-usage"
with tempfile.NamedTemporaryFile() as tmp_file: with tempfile.NamedTemporaryFile() as tmp_file:
workflow_config = OpenMetadataWorkflowConfig( workflow_config = OpenMetadataWorkflowConfig(
source=ingestion_pipeline.source, source=usage_source,
processor=Processor(type="query-parser", config={"filter": ""}), processor=Processor(type="query-parser", config={"filter": ""}),
stage=Stage(type="table-usage", config={"filename": tmp_file.name}), stage=Stage(type="table-usage", config={"filename": tmp_file.name}),
bulkSink=BulkSink( bulkSink=BulkSink(

View File

@ -66,7 +66,24 @@ class OMetaServiceTest(TestCase):
"sourceConfig": {"config": {"enableDataProfiler": False}}, "sourceConfig": {"config": {"enableDataProfiler": False}},
} }
# TODO update to "snowflake-usage" after https://github.com/open-metadata/OpenMetadata/issues/4469
usage_data = {
"type": "snowflake",
"serviceName": "local_snowflake",
"serviceConnection": {
"config": {
"type": "Snowflake",
"username": "openmetadata_user",
"password": "random",
"warehouse": "warehouse",
"account": "account",
}
},
"sourceConfig": {"config": {"queryLogDuration": 10}},
}
workflow_source = WorkflowSource(**data) workflow_source = WorkflowSource(**data)
usage_workflow_source = WorkflowSource(**usage_data)
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
@ -127,10 +144,10 @@ class OMetaServiceTest(TestCase):
ingestion_pipeline = IngestionPipeline( ingestion_pipeline = IngestionPipeline(
id=uuid.uuid4(), id=uuid.uuid4(),
name="test_ingestion_workflow", name="test_usage_workflow",
pipelineType=PipelineType.usage, pipelineType=PipelineType.usage,
fullyQualifiedName="local_mysql.test_usage_workflow", fullyQualifiedName="local_snowflake.test_usage_workflow",
source=self.workflow_source, source=self.usage_workflow_source,
openMetadataServerConnection=self.server_config, openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig( airflowConfig=AirflowConfig(
startDate="2022-04-10", startDate="2022-04-10",
@ -142,6 +159,8 @@ class OMetaServiceTest(TestCase):
) )
workflow_config = build_usage_workflow_config(ingestion_pipeline) workflow_config = build_usage_workflow_config(ingestion_pipeline)
self.assertIn("usage", workflow_config.source.type)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
Workflow.create(config) Workflow.create(config)