2021-04-18 20:15:05 +02:00
|
|
|
from typing import Iterable, List, cast
|
2021-02-11 22:48:08 -08:00
|
|
|
from unittest.mock import patch
|
2021-01-31 22:40:30 -08:00
|
|
|
|
2022-02-09 08:21:23 +01:00
|
|
|
import pytest
|
2021-07-29 20:04:40 -07:00
|
|
|
from freezegun import freeze_time
|
|
|
|
|
2022-04-24 17:12:21 -07:00
|
|
|
from datahub.configuration.common import DynamicTypedConfig
|
2022-02-09 08:21:23 +01:00
|
|
|
from datahub.ingestion.api.committable import CommitPolicy, Committable
|
2021-04-18 20:15:05 +02:00
|
|
|
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
|
|
|
|
from datahub.ingestion.api.source import Source, SourceReport
|
|
|
|
from datahub.ingestion.api.transform import Transformer
|
2021-06-29 11:43:43 -07:00
|
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
2021-04-18 20:15:05 +02:00
|
|
|
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
|
2021-07-29 20:04:40 -07:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import SystemMetadata
|
2021-04-18 20:15:05 +02:00
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
DatasetPropertiesClass,
|
|
|
|
DatasetSnapshotClass,
|
|
|
|
MetadataChangeEventClass,
|
2022-03-07 13:14:29 -08:00
|
|
|
StatusClass,
|
2021-04-18 20:15:05 +02:00
|
|
|
)
|
|
|
|
from tests.test_helpers.sink_helpers import RecordingSinkReport
|
2021-01-31 22:40:30 -08:00
|
|
|
|
2021-07-29 20:04:40 -07:00
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
|
|
|
2021-01-31 22:40:30 -08:00
|
|
|
|
2022-02-09 08:21:23 +01:00
|
|
|
class TestPipeline(object):
|
2021-04-05 19:11:28 -07:00
|
|
|
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True)
|
|
|
|
@patch("datahub.ingestion.sink.console.ConsoleSink.close", autospec=True)
|
2021-07-29 20:04:40 -07:00
|
|
|
@freeze_time(FROZEN_TIME)
|
2021-02-10 14:53:55 -08:00
|
|
|
def test_configure(self, mock_sink, mock_source):
|
2021-02-11 19:12:43 -08:00
|
|
|
pipeline = Pipeline.create(
|
2021-02-11 22:48:08 -08:00
|
|
|
{
|
2021-02-23 15:55:31 -08:00
|
|
|
"source": {
|
|
|
|
"type": "kafka",
|
|
|
|
"config": {"connection": {"bootstrap": "localhost:9092"}},
|
|
|
|
},
|
2021-02-11 22:48:08 -08:00
|
|
|
"sink": {"type": "console"},
|
|
|
|
}
|
2021-02-05 21:03:04 -08:00
|
|
|
)
|
|
|
|
pipeline.run()
|
2021-02-18 11:15:13 -08:00
|
|
|
pipeline.raise_from_status()
|
2021-05-11 19:03:20 -07:00
|
|
|
pipeline.pretty_print_summary()
|
2021-01-31 22:40:30 -08:00
|
|
|
mock_source.assert_called_once()
|
|
|
|
mock_sink.assert_called_once()
|
2021-04-18 20:15:05 +02:00
|
|
|
|
2022-04-24 17:12:21 -07:00
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
@patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection")
|
|
|
|
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True)
|
|
|
|
def test_configure_without_sink(self, mock_source, mock_test_connection):
|
|
|
|
|
|
|
|
mock_test_connection.return_value = {"noCode": True}
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"source": {
|
|
|
|
"type": "kafka",
|
|
|
|
"config": {"connection": {"bootstrap": "localhost:9092"}},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
# assert that the default sink config is for a DatahubRestSink
|
|
|
|
assert isinstance(pipeline.config.sink, DynamicTypedConfig)
|
|
|
|
assert pipeline.config.sink.type == "datahub-rest"
|
|
|
|
assert pipeline.config.sink.config == {
|
|
|
|
"server": "http://localhost:8080",
|
|
|
|
"token": "",
|
|
|
|
}
|
|
|
|
|
2021-07-29 20:04:40 -07:00
|
|
|
@freeze_time(FROZEN_TIME)
|
2021-05-17 15:08:49 -07:00
|
|
|
def test_run_including_fake_transformation(self):
|
2021-04-18 20:15:05 +02:00
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
2021-04-30 20:54:07 -07:00
|
|
|
"source": {"type": "tests.unit.test_pipeline.FakeSource"},
|
2021-04-18 20:15:05 +02:00
|
|
|
"transformers": [
|
|
|
|
{"type": "tests.unit.test_pipeline.AddStatusRemovedTransformer"}
|
|
|
|
],
|
|
|
|
"sink": {"type": "tests.test_helpers.sink_helpers.RecordingSink"},
|
2021-07-29 20:04:40 -07:00
|
|
|
"run_id": "pipeline_test",
|
2021-04-18 20:15:05 +02:00
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
|
|
|
|
|
|
|
expected_mce = get_initial_mce()
|
|
|
|
|
|
|
|
dataset_snapshot = cast(DatasetSnapshotClass, expected_mce.proposedSnapshot)
|
|
|
|
dataset_snapshot.aspects.append(get_status_removed_aspect())
|
|
|
|
|
|
|
|
sink_report: RecordingSinkReport = cast(
|
|
|
|
RecordingSinkReport, pipeline.sink.get_report()
|
|
|
|
)
|
|
|
|
|
2022-02-09 08:21:23 +01:00
|
|
|
assert len(sink_report.received_records) == 1
|
|
|
|
assert expected_mce == sink_report.received_records[0].record
|
2021-04-18 20:15:05 +02:00
|
|
|
|
2021-07-29 20:04:40 -07:00
|
|
|
@freeze_time(FROZEN_TIME)
|
2021-05-17 15:08:49 -07:00
|
|
|
def test_run_including_registered_transformation(self):
|
|
|
|
# This is not testing functionality, but just the transformer registration system.
|
|
|
|
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"source": {"type": "tests.unit.test_pipeline.FakeSource"},
|
|
|
|
"transformers": [
|
|
|
|
{
|
|
|
|
"type": "simple_add_dataset_ownership",
|
|
|
|
"config": {"owner_urns": ["urn:li:corpuser:foo"]},
|
|
|
|
}
|
|
|
|
],
|
|
|
|
"sink": {"type": "tests.test_helpers.sink_helpers.RecordingSink"},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
assert pipeline
|
|
|
|
|
2022-02-09 08:21:23 +01:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"commit_policy,source,should_commit",
|
|
|
|
[
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ALWAYS,
|
|
|
|
"FakeSource",
|
|
|
|
True,
|
|
|
|
id="ALWAYS-no-warnings-no-errors",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ON_NO_ERRORS,
|
|
|
|
"FakeSource",
|
|
|
|
True,
|
|
|
|
id="ON_NO_ERRORS-no-warnings-no-errors",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ON_NO_ERRORS_AND_NO_WARNINGS,
|
|
|
|
"FakeSource",
|
|
|
|
True,
|
|
|
|
id="ON_NO_ERRORS_AND_NO_WARNINGS-no-warnings-no-errors",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ALWAYS,
|
|
|
|
"FakeSourceWithWarnings",
|
|
|
|
True,
|
|
|
|
id="ALWAYS-with-warnings",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ON_NO_ERRORS,
|
|
|
|
"FakeSourceWithWarnings",
|
|
|
|
True,
|
|
|
|
id="ON_NO_ERRORS-with-warnings",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ON_NO_ERRORS_AND_NO_WARNINGS,
|
|
|
|
"FakeSourceWithWarnings",
|
|
|
|
False,
|
|
|
|
id="ON_NO_ERRORS_AND_NO_WARNINGS-with-warnings",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ALWAYS,
|
|
|
|
"FakeSourceWithFailures",
|
|
|
|
True,
|
|
|
|
id="ALWAYS-with-errors",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ON_NO_ERRORS,
|
|
|
|
"FakeSourceWithFailures",
|
|
|
|
False,
|
|
|
|
id="ON_NO_ERRORS-with-errors",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
CommitPolicy.ON_NO_ERRORS_AND_NO_WARNINGS,
|
|
|
|
"FakeSourceWithFailures",
|
|
|
|
False,
|
|
|
|
id="ON_NO_ERRORS_AND_NO_WARNINGS-with-errors",
|
|
|
|
),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
def test_pipeline_process_commits(self, commit_policy, source, should_commit):
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"source": {"type": f"tests.unit.test_pipeline.{source}"},
|
|
|
|
"sink": {"type": "console"},
|
|
|
|
"run_id": "pipeline_test",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
class FakeCommittable(Committable):
|
|
|
|
def __init__(self, commit_policy: CommitPolicy):
|
|
|
|
self.name = "test_checkpointer"
|
|
|
|
self.commit_policy = commit_policy
|
|
|
|
|
|
|
|
def commit(self) -> None:
|
|
|
|
pass
|
|
|
|
|
|
|
|
fake_committable: Committable = FakeCommittable(commit_policy)
|
|
|
|
|
|
|
|
with patch.object(
|
|
|
|
FakeCommittable, "commit", wraps=fake_committable.commit
|
|
|
|
) as mock_commit:
|
|
|
|
pipeline.ctx.register_reporter(fake_committable)
|
|
|
|
|
|
|
|
pipeline.run()
|
|
|
|
# check that we called the commit method once only if should_commit is True
|
|
|
|
if should_commit:
|
|
|
|
mock_commit.assert_called_once()
|
|
|
|
else:
|
|
|
|
mock_commit.assert_not_called()
|
|
|
|
|
2021-04-18 20:15:05 +02:00
|
|
|
|
|
|
|
class AddStatusRemovedTransformer(Transformer):
|
|
|
|
@classmethod
|
|
|
|
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Transformer":
|
|
|
|
return cls()
|
|
|
|
|
|
|
|
def transform(
|
|
|
|
self, record_envelopes: Iterable[RecordEnvelope]
|
|
|
|
) -> Iterable[RecordEnvelope]:
|
|
|
|
for record_envelope in record_envelopes:
|
2022-03-07 13:14:29 -08:00
|
|
|
if isinstance(record_envelope.record, MetadataChangeEventClass):
|
|
|
|
assert isinstance(
|
|
|
|
record_envelope.record.proposedSnapshot, DatasetSnapshotClass
|
|
|
|
)
|
|
|
|
record_envelope.record.proposedSnapshot.aspects.append(
|
|
|
|
get_status_removed_aspect()
|
|
|
|
)
|
2021-04-18 20:15:05 +02:00
|
|
|
yield record_envelope
|
|
|
|
|
|
|
|
|
2021-04-30 20:54:07 -07:00
|
|
|
class FakeSource(Source):
|
2021-04-18 20:15:05 +02:00
|
|
|
def __init__(self):
|
|
|
|
self.source_report = SourceReport()
|
|
|
|
self.work_units: List[MetadataWorkUnit] = [
|
|
|
|
MetadataWorkUnit(id="workunit-1", mce=get_initial_mce())
|
|
|
|
]
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
|
2021-04-30 20:54:07 -07:00
|
|
|
assert not config_dict
|
2022-02-09 08:21:23 +01:00
|
|
|
return cls()
|
2021-04-18 20:15:05 +02:00
|
|
|
|
|
|
|
def get_workunits(self) -> Iterable[WorkUnit]:
|
|
|
|
return self.work_units
|
|
|
|
|
|
|
|
def get_report(self) -> SourceReport:
|
|
|
|
return self.source_report
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2022-02-09 08:21:23 +01:00
|
|
|
class FakeSourceWithWarnings(FakeSource):
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
|
|
|
self.source_report.report_warning("test_warning", "warning_text")
|
|
|
|
|
|
|
|
def get_report(self) -> SourceReport:
|
|
|
|
return self.source_report
|
|
|
|
|
|
|
|
|
|
|
|
class FakeSourceWithFailures(FakeSource):
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
|
|
|
self.source_report.report_failure("test_failure", "failure_text")
|
|
|
|
|
|
|
|
def get_report(self) -> SourceReport:
|
|
|
|
return self.source_report
|
|
|
|
|
|
|
|
|
2021-04-18 20:15:05 +02:00
|
|
|
def get_initial_mce() -> MetadataChangeEventClass:
|
|
|
|
return MetadataChangeEventClass(
|
|
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
|
|
urn="urn:li:dataset:(urn:li:dataPlatform:test_platform,test,PROD)",
|
|
|
|
aspects=[
|
|
|
|
DatasetPropertiesClass(
|
|
|
|
description="test.description",
|
|
|
|
)
|
|
|
|
],
|
2021-07-29 20:04:40 -07:00
|
|
|
),
|
|
|
|
systemMetadata=SystemMetadata(
|
|
|
|
lastObserved=1586847600000, runId="pipeline_test"
|
|
|
|
),
|
2021-04-18 20:15:05 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-03-07 13:14:29 -08:00
|
|
|
def get_status_removed_aspect() -> StatusClass:
|
|
|
|
return StatusClass(removed=False)
|