2022-06-13 12:17:54 +02:00
|
|
|
# Copyright 2021 Collate
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
"""
|
|
|
|
Check that we are properly running nodes and stages
|
|
|
|
"""
|
|
|
|
from unittest import TestCase
|
|
|
|
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.ingestion.api.models import Either
|
2022-06-13 12:17:54 +02:00
|
|
|
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
|
|
|
|
from metadata.ingestion.models.topology import (
|
|
|
|
NodeStage,
|
|
|
|
ServiceTopology,
|
|
|
|
TopologyNode,
|
|
|
|
create_source_context,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class MockTopology(ServiceTopology):
|
|
|
|
root = TopologyNode(
|
|
|
|
producer="get_numbers",
|
|
|
|
stages=[
|
|
|
|
NodeStage(
|
|
|
|
type_=int,
|
|
|
|
processor="yield_numbers",
|
|
|
|
)
|
|
|
|
],
|
|
|
|
children=["strings"],
|
|
|
|
)
|
|
|
|
strings = TopologyNode(
|
|
|
|
producer="get_strings",
|
|
|
|
stages=[
|
|
|
|
NodeStage(
|
|
|
|
type_=str,
|
|
|
|
processor="yield_strings",
|
|
|
|
consumer=["numbers"],
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class MockSource(TopologyRunnerMixin):
|
|
|
|
topology = MockTopology()
|
|
|
|
context = create_source_context(topology)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_numbers():
|
|
|
|
yield 1
|
|
|
|
yield 2
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_strings():
|
|
|
|
yield "abc"
|
|
|
|
yield "def"
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def yield_numbers(number: int):
|
2023-08-30 15:49:42 +02:00
|
|
|
yield Either(right=number + 1)
|
2022-06-13 12:17:54 +02:00
|
|
|
|
2023-09-15 09:44:42 +02:00
|
|
|
@staticmethod
|
|
|
|
def yield_strings(my_str: str):
|
|
|
|
yield Either(right=my_str)
|
2022-06-13 12:17:54 +02:00
|
|
|
|
|
|
|
|
|
|
|
class TopologyRunnerTest(TestCase):
|
2023-08-30 15:49:42 +02:00
|
|
|
"""Validate filter patterns"""
|
2022-06-13 12:17:54 +02:00
|
|
|
|
2023-09-15 09:44:42 +02:00
|
|
|
@staticmethod
|
|
|
|
def test_node_and_stage():
|
2022-06-13 12:17:54 +02:00
|
|
|
source = MockSource()
|
2023-08-30 15:49:42 +02:00
|
|
|
processed = list(source._iter())
|
|
|
|
assert [either.right for either in processed] == [
|
|
|
|
2,
|
2023-09-15 09:44:42 +02:00
|
|
|
"abc",
|
|
|
|
"def",
|
2023-08-30 15:49:42 +02:00
|
|
|
3,
|
2023-09-15 09:44:42 +02:00
|
|
|
"abc",
|
|
|
|
"def",
|
2023-08-30 15:49:42 +02:00
|
|
|
]
|