mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 10:49:00 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			271 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright 2021 Acryl Data, Inc.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import json
 | |
| import os
 | |
| 
 | |
| import pytest
 | |
| from pydantic import ValidationError
 | |
| 
 | |
| from datahub_actions.pipeline.pipeline import Pipeline, PipelineException
 | |
| from datahub_actions.pipeline.pipeline_config import FailureMode
 | |
| from datahub_actions.plugin.transform.filter.filter_transformer import FilterTransformer
 | |
| from tests.unit.test_helpers import TestAction, TestEventSource, TestTransformer
 | |
| 
 | |
| 
 | |
| def test_create():
 | |
|     # Test successfully pipeline creation.
 | |
|     valid_config = _build_valid_pipeline_config()
 | |
|     valid_pipeline = Pipeline.create(valid_config)
 | |
| 
 | |
|     # Validate Pipeline is initialized
 | |
|     assert valid_pipeline.name is not None
 | |
|     assert valid_pipeline.source is not None
 | |
|     assert isinstance(valid_pipeline.source, TestEventSource)
 | |
|     assert valid_pipeline.transforms is not None
 | |
|     assert len(valid_pipeline.transforms) == 2  # Filter + Custom
 | |
|     assert isinstance(valid_pipeline.transforms[0], FilterTransformer)
 | |
|     assert isinstance(valid_pipeline.transforms[1], TestTransformer)
 | |
|     assert valid_pipeline.action is not None
 | |
|     assert isinstance(valid_pipeline.action, TestAction)
 | |
|     assert valid_pipeline._shutdown is False
 | |
|     assert valid_pipeline._stats is not None
 | |
|     assert valid_pipeline._retry_count == 3
 | |
|     assert valid_pipeline._failure_mode == FailureMode.CONTINUE
 | |
|     assert valid_pipeline._failed_events_dir == "/tmp/datahub/test"
 | |
| 
 | |
|     # Test invalid pipeline creation.
 | |
|     invalid_config = _build_invalid_pipeline_config()
 | |
|     with pytest.raises(ValidationError, match="name"):
 | |
|         Pipeline.create(invalid_config)
 | |
| 
 | |
| 
 | |
| def test_run():
 | |
|     valid_config = _build_basic_pipeline_config()
 | |
|     valid_pipeline = Pipeline.create(valid_config)
 | |
| 
 | |
|     # Run the pipeline
 | |
|     valid_pipeline.run()
 | |
| 
 | |
|     # Confirm that the pipeline has run as expected.
 | |
|     assert valid_pipeline._shutdown is False
 | |
| 
 | |
|     # Verify pipeline statistics
 | |
|     assert valid_pipeline.stats().failed_event_count == 0
 | |
|     assert valid_pipeline.stats().failed_ack_count == 0
 | |
|     assert valid_pipeline.stats().success_count == 3
 | |
|     assert (
 | |
|         valid_pipeline.stats()
 | |
|         .get_transformer_stats(valid_pipeline.transforms[0])
 | |
|         .exception_count
 | |
|         == 0
 | |
|     )
 | |
|     assert (
 | |
|         valid_pipeline.stats()
 | |
|         .get_transformer_stats(valid_pipeline.transforms[0])
 | |
|         .processed_count
 | |
|         == 3
 | |
|     )
 | |
|     assert (
 | |
|         valid_pipeline.stats()
 | |
|         .get_transformer_stats(valid_pipeline.transforms[0])
 | |
|         .filtered_count
 | |
|         == 0
 | |
|     )
 | |
|     assert valid_pipeline.stats().action_stats.exception_count == 0
 | |
|     assert valid_pipeline.stats().action_stats.success_count == 3
 | |
| 
 | |
|     # Verify that the test action processed the correct events (via counters)
 | |
|     assert valid_pipeline.action.total_event_count == 3  # type: ignore
 | |
|     assert valid_pipeline.action.mcl_count == 1  # type: ignore
 | |
|     assert valid_pipeline.action.ece_count == 1  # type: ignore
 | |
|     assert valid_pipeline.action.skipped_count == 1  # type: ignore
 | |
|     assert (
 | |
|         valid_pipeline.action.smiley_count == 3  # type: ignore
 | |
|     )  # Confirms that events were transformed.
 | |
| 
 | |
|     # Verify that the event source received ack calls on all events
 | |
|     assert valid_pipeline.source.ack_count == 3  # type: ignore
 | |
| 
 | |
| 
 | |
| def test_stop():
 | |
|     # Configure a pipeline with a long-running event source
 | |
|     stoppable_pipeline_config = _build_stoppable_pipeline_config()
 | |
|     stopable_pipeline = Pipeline.create(stoppable_pipeline_config)
 | |
| 
 | |
|     # Start in async mode.
 | |
|     # TODO: This test should be rewritten to use async correctly, needs type ignore
 | |
|     stopable_pipeline.start()  # type: ignore
 | |
| 
 | |
|     # Stop the pipeline.
 | |
|     stopable_pipeline.stop()
 | |
| 
 | |
|     # Verify that the pipeline has stopped
 | |
|     assert stopable_pipeline._shutdown is True
 | |
| 
 | |
| 
 | |
| def test_failed_events_continue_mode():
 | |
|     # First, test a transformer that throws on invocation.
 | |
|     throwing_transformer_config = _build_throwing_transformer_pipeline_config(
 | |
|         failure_mode="CONTINUE"
 | |
|     )
 | |
|     throwing_transformer_pipeline = Pipeline.create(throwing_transformer_config)
 | |
|     throwing_transformer_pipeline.run()
 | |
|     # Ensure that the message was acked.
 | |
|     assert throwing_transformer_pipeline.source.ack_count == 3  # type: ignore
 | |
| 
 | |
|     # Next, test an action that throws on invocation.
 | |
|     throwing_action_config = _build_throwing_action_pipeline_config(
 | |
|         failure_mode="CONTINUE"
 | |
|     )
 | |
|     throwing_action_pipeline = Pipeline.create(throwing_action_config)
 | |
|     throwing_action_pipeline.run()
 | |
|     # Ensure that the message was acked.
 | |
|     assert throwing_action_pipeline.source.ack_count == 3  # type: ignore
 | |
| 
 | |
| 
 | |
| def test_failed_events_throw_mode():
 | |
|     # First, test a transformer that throws on invocation.
 | |
|     throwing_transformer_config = _build_throwing_transformer_pipeline_config(
 | |
|         failure_mode="THROW"
 | |
|     )
 | |
|     throwing_transformer_pipeline = Pipeline.create(throwing_transformer_config)
 | |
|     with pytest.raises(
 | |
|         PipelineException, match="Failed to process event after maximum retries"
 | |
|     ):
 | |
|         throwing_transformer_pipeline.run()
 | |
|     # Ensure that the message was NOT acked.
 | |
|     assert throwing_transformer_pipeline.source.ack_count == 0  # type: ignore
 | |
| 
 | |
|     # Next, test an action that throws on invocation.
 | |
|     throwing_action_config = _build_throwing_action_pipeline_config(
 | |
|         failure_mode="THROW"
 | |
|     )
 | |
|     throwing_action_pipeline = Pipeline.create(throwing_action_config)
 | |
|     with pytest.raises(
 | |
|         PipelineException, match="Failed to process event after maximum retries"
 | |
|     ):
 | |
|         throwing_action_pipeline.run()
 | |
|     # Ensure that the message was NOT acked.
 | |
|     assert throwing_action_pipeline.source.ack_count == 0  # type: ignore
 | |
| 
 | |
| 
 | |
| # Test Dead Letter Queue
 | |
| def test_failed_events_file():
 | |
|     failed_events_file_path = (
 | |
|         "/tmp/datahub/test/test_failed_events_file/failed_events.log"
 | |
|     )
 | |
|     try:
 | |
|         os.remove(failed_events_file_path)
 | |
|     except OSError:
 | |
|         pass
 | |
| 
 | |
|     throwing_action_config = _build_throwing_action_pipeline_config(
 | |
|         pipeline_name="test_failed_events_file", failure_mode="CONTINUE"
 | |
|     )
 | |
|     throwing_action_pipeline = Pipeline.create(throwing_action_config)
 | |
|     throwing_action_pipeline.run()
 | |
| 
 | |
|     # Ensure that the file was written, and ensure that the first line is equivalent to the serialized
 | |
|     # version of the EnvelopedEvent.
 | |
|     index = 0
 | |
|     for line in open(failed_events_file_path, "r").readlines():
 | |
|         # Simply verify the event can be loaded.
 | |
|         assert json.loads(line.strip())
 | |
|         index = index + 1
 | |
|     os.remove(failed_events_file_path)
 | |
| 
 | |
| 
 | |
| def _build_valid_pipeline_config() -> dict:
 | |
|     return {
 | |
|         "name": "sample-pipeline",
 | |
|         "source": {"type": "test_source", "config": {}},
 | |
|         "filter": {"event_type": "MetadataChangeLogEvent_v1"},
 | |
|         "transform": [{"type": "test_transformer", "config": {"config1": "value1"}}],
 | |
|         "action": {"type": "test_action", "config": {"config1": "value1"}},
 | |
|         "options": {
 | |
|             "retry_count": 3,
 | |
|             "failure_mode": "CONTINUE",
 | |
|             "failed_events_dir": "/tmp/datahub/test",
 | |
|         },
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_basic_pipeline_config() -> dict:
 | |
|     return {
 | |
|         "name": "sample-pipeline",
 | |
|         "source": {"type": "test_source", "config": {}},
 | |
|         "transform": [{"type": "test_transformer", "config": {"config1": "value1"}}],
 | |
|         "action": {"type": "test_action", "config": {"config1": "value1"}},
 | |
|         "options": {
 | |
|             "retry_count": 3,
 | |
|             "failure_mode": "CONTINUE",
 | |
|             "failed_events_dir": "/tmp/datahub/test",
 | |
|         },
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_stoppable_pipeline_config() -> dict:
 | |
|     return {
 | |
|         "name": "stoppable-pipeline",
 | |
|         "source": {"type": "stoppable_event_source", "config": {}},
 | |
|         "transform": [{"type": "test_transformer", "config": {"config1": "value1"}}],
 | |
|         "action": {"type": "test_action", "config": {"config1": "value1"}},
 | |
|         "options": {
 | |
|             "retry_count": 3,
 | |
|             "failure_mode": "CONTINUE",
 | |
|             "failed_events_dir": "/tmp/datahub/test",
 | |
|         },
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_throwing_transformer_pipeline_config(failure_mode: str = "CONTINUE") -> dict:
 | |
|     return {
 | |
|         "name": "sample-pipeline",
 | |
|         "source": {"type": "test_source", "config": {}},
 | |
|         "transform": [
 | |
|             {"type": "throwing_test_transformer", "config": {"config1": "value1"}}
 | |
|         ],
 | |
|         "action": {"type": "test_action", "config": {"config1": "value1"}},
 | |
|         "options": {
 | |
|             "retry_count": 3,
 | |
|             "failure_mode": failure_mode,
 | |
|             "failed_events_dir": "/tmp/datahub/test",
 | |
|         },
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_throwing_action_pipeline_config(
 | |
|     pipeline_name: str = "throwing-action-pipeline", failure_mode: str = "CONTINUE"
 | |
| ) -> dict:
 | |
|     return {
 | |
|         "name": pipeline_name,
 | |
|         "source": {"type": "test_source", "config": {}},
 | |
|         "transform": [{"type": "test_transformer", "config": {"config1": "value1"}}],
 | |
|         "action": {"type": "throwing_test_action", "config": {"config1": "value1"}},
 | |
|         "options": {
 | |
|             "retry_count": 3,
 | |
|             "failure_mode": failure_mode,
 | |
|             "failed_events_dir": "/tmp/datahub/test",
 | |
|         },
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_invalid_pipeline_config() -> dict:
 | |
|     # No name field
 | |
|     return {
 | |
|         "source": {"type": "test_source", "config": {"to_upper": False}},
 | |
|         "action": {"type": "test_action", "config": {"config1": "value1"}},
 | |
|     }
 | 
