mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 16:49:03 +00:00
450 lines
16 KiB
Python
450 lines
16 KiB
Python
import typing
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.source.nifi import (
|
|
BidirectionalComponentGraph,
|
|
NifiComponent,
|
|
NifiFlow,
|
|
NifiProcessGroup,
|
|
NifiSource,
|
|
NifiSourceConfig,
|
|
NifiType,
|
|
)
|
|
|
|
|
|
@typing.no_type_check
|
|
def test_nifi_s3_provenance_event():
|
|
config_dict = {"site_url": "http://localhost:8080", "incremental_lineage": False}
|
|
nifi_config = NifiSourceConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test")
|
|
|
|
with patch(
|
|
"datahub.ingestion.source.nifi.NifiSource.fetch_provenance_events"
|
|
) as mock_provenance_events, patch(
|
|
"datahub.ingestion.source.nifi.NifiSource.delete_provenance"
|
|
) as mock_delete_provenance:
|
|
mocked_functions(mock_provenance_events, mock_delete_provenance, "puts3")
|
|
|
|
nifi_source = NifiSource(nifi_config, ctx)
|
|
|
|
nifi_source.nifi_flow = NifiFlow(
|
|
version="1.15.0",
|
|
clustered=False,
|
|
root_process_group=NifiProcessGroup(
|
|
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
|
|
name="Standalone Flow",
|
|
parent_group_id=None,
|
|
),
|
|
components={
|
|
"aed63edf-e660-3f29-b56b-192cf6286889": NifiComponent(
|
|
id="aed63edf-e660-3f29-b56b-192cf6286889",
|
|
name="PutS3Object",
|
|
type="org.apache.nifi.processors.aws.s3.PutS3Object",
|
|
parent_group_id="80404c81-017d-1000-e8e8-af7420af06c1",
|
|
nifi_type=NifiType.PROCESSOR,
|
|
comments="",
|
|
status=None,
|
|
inlets={},
|
|
outlets={},
|
|
config={},
|
|
target_uris=None,
|
|
last_event_time=None,
|
|
)
|
|
},
|
|
remotely_accessible_ports={},
|
|
connections=BidirectionalComponentGraph(),
|
|
processGroups={
|
|
"803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup(
|
|
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
|
|
name="Standalone Flow",
|
|
parent_group_id=None,
|
|
),
|
|
"80404c81-017d-1000-e8e8-af7420af06c1": NifiProcessGroup(
|
|
id="80404c81-017d-1000-e8e8-af7420af06c1",
|
|
name="Single_Site_S3_to_S3",
|
|
parent_group_id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
|
|
),
|
|
},
|
|
remoteProcessGroups={},
|
|
remote_ports={},
|
|
)
|
|
|
|
NifiSource.process_provenance_events(nifi_source)
|
|
workunits = list(NifiSource.construct_workunits(nifi_source))
|
|
|
|
# one aspect for dataflow and two aspects for datajob
|
|
# and two aspects for dataset
|
|
assert len(workunits) == 6
|
|
assert workunits[0].metadata.entityType == "dataFlow"
|
|
|
|
assert workunits[1].metadata.entityType == "dataset"
|
|
assert workunits[2].metadata.entityType == "dataset"
|
|
assert workunits[3].metadata.entityType == "dataJob"
|
|
assert workunits[4].metadata.entityType == "dataJob"
|
|
assert workunits[5].metadata.entityType == "dataJob"
|
|
|
|
ioAspect = workunits[5].metadata.aspect
|
|
assert ioAspect.outputDatasets == [
|
|
"urn:li:dataset:(urn:li:dataPlatform:s3,foo-nifi/tropical_data,PROD)"
|
|
]
|
|
assert ioAspect.inputDatasets == []
|
|
|
|
with patch(
|
|
"datahub.ingestion.source.nifi.NifiSource.fetch_provenance_events"
|
|
) as mock_provenance_events, patch(
|
|
"datahub.ingestion.source.nifi.NifiSource.delete_provenance"
|
|
) as mock_delete_provenance:
|
|
mocked_functions(mock_provenance_events, mock_delete_provenance, "fetchs3")
|
|
|
|
nifi_source = NifiSource(nifi_config, ctx)
|
|
|
|
nifi_source.nifi_flow = NifiFlow(
|
|
version="1.15.0",
|
|
clustered=False,
|
|
root_process_group=NifiProcessGroup(
|
|
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
|
|
name="Standalone Flow",
|
|
parent_group_id=None,
|
|
),
|
|
components={
|
|
"91d59f03-1c2b-3f3f-48bc-f89296a328bd": NifiComponent(
|
|
id="91d59f03-1c2b-3f3f-48bc-f89296a328bd9",
|
|
name="FetchS3Object",
|
|
type="org.apache.nifi.processors.aws.s3.FetchS3Object",
|
|
parent_group_id="80404c81-017d-1000-e8e8-af7420af06c1",
|
|
nifi_type=NifiType.PROCESSOR,
|
|
comments="",
|
|
status=None,
|
|
inlets={},
|
|
outlets={},
|
|
config={},
|
|
target_uris=None,
|
|
last_event_time=None,
|
|
)
|
|
},
|
|
remotely_accessible_ports={},
|
|
connections=BidirectionalComponentGraph(),
|
|
processGroups={
|
|
"803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup(
|
|
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
|
|
name="Standalone Flow",
|
|
parent_group_id=None,
|
|
),
|
|
"80404c81-017d-1000-e8e8-af7420af06c1": NifiProcessGroup(
|
|
id="80404c81-017d-1000-e8e8-af7420af06c1",
|
|
name="Single_Site_S3_to_S3",
|
|
parent_group_id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
|
|
),
|
|
},
|
|
remoteProcessGroups={},
|
|
remote_ports={},
|
|
)
|
|
|
|
NifiSource.process_provenance_events(nifi_source)
|
|
workunits = list(NifiSource.construct_workunits(nifi_source))
|
|
|
|
# one aspect for dataflow and two aspects for datajob
|
|
# and two aspects for dataset
|
|
assert len(workunits) == 6
|
|
assert workunits[0].metadata.entityType == "dataFlow"
|
|
|
|
assert workunits[1].metadata.entityType == "dataset"
|
|
assert workunits[2].metadata.entityType == "dataset"
|
|
assert workunits[3].metadata.entityType == "dataJob"
|
|
assert workunits[4].metadata.entityType == "dataJob"
|
|
assert workunits[5].metadata.entityType == "dataJob"
|
|
|
|
ioAspect = workunits[5].metadata.aspect
|
|
assert ioAspect.outputDatasets == []
|
|
assert ioAspect.inputDatasets == [
|
|
"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
|
|
]
|
|
|
|
|
|
def mocked_functions(mock_provenance_events, mock_delete_provenance, provenance_case):
|
|
puts3_provenance_response = [
|
|
{
|
|
"id": "49",
|
|
"eventId": 49,
|
|
"eventTime": "12/08/2021 10:06:39.938 UTC",
|
|
"eventType": "SEND",
|
|
"flowFileUuid": "0b205834-5cac-4979-ad7a-e3db1cb17685",
|
|
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
|
|
"componentId": "aed63edf-e660-3f29-b56b-192cf6286889",
|
|
"componentType": "PutS3Object",
|
|
"componentName": "PutS3Object",
|
|
"attributes": [
|
|
{
|
|
"name": "filename",
|
|
"value": "test_rare.json",
|
|
"previousValue": "test_rare.json",
|
|
},
|
|
{"name": "path", "value": "./", "previousValue": "./"},
|
|
{
|
|
"name": "s3.bucket",
|
|
"value": "foo-nifi",
|
|
"previousValue": "enriched-topical-chat",
|
|
},
|
|
{"name": "s3.key", "value": "tropical_data/test_rare.json"},
|
|
],
|
|
"transitUri": "https://foo-nifi.s3.amazonaws.com/tropical_data/test_rare.json",
|
|
},
|
|
{
|
|
"id": "46",
|
|
"eventId": 46,
|
|
"eventTime": "12/08/2021 10:06:30.560 UTC",
|
|
"eventType": "SEND",
|
|
"flowFileUuid": "e8a6ad9a-1d02-4bf1-b104-dad1ff180ddd",
|
|
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
|
|
"componentId": "aed63edf-e660-3f29-b56b-192cf6286889",
|
|
"componentType": "PutS3Object",
|
|
"componentName": "PutS3Object",
|
|
"attributes": [
|
|
{
|
|
"name": "filename",
|
|
"value": "test_freq.json",
|
|
"previousValue": "test_freq.json",
|
|
},
|
|
{"name": "path", "value": "./", "previousValue": "./"},
|
|
{
|
|
"name": "s3.bucket",
|
|
"value": "foo-nifi",
|
|
"previousValue": "enriched-topical-chat",
|
|
},
|
|
{"name": "s3.key", "value": "tropical_data/test_freq.json"},
|
|
],
|
|
"transitUri": "https://foo-nifi.s3.amazonaws.com/tropical_data/test_freq.json",
|
|
},
|
|
]
|
|
|
|
fetchs3_provenance_response = [
|
|
{
|
|
"id": "44",
|
|
"eventId": 44,
|
|
"eventTime": "12/08/2021 10:06:19.828 UTC",
|
|
"eventType": "FETCH",
|
|
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
|
|
"componentId": "91d59f03-1c2b-3f3f-48bc-f89296a328bd",
|
|
"componentType": "FetchS3Object",
|
|
"componentName": "FetchS3Object",
|
|
"attributes": [
|
|
{
|
|
"name": "filename",
|
|
"value": "test_rare.json",
|
|
"previousValue": "test_rare.json",
|
|
},
|
|
{"name": "path", "value": "./", "previousValue": "./"},
|
|
{
|
|
"name": "s3.bucket",
|
|
"value": "enriched-topical-chat",
|
|
"previousValue": "enriched-topical-chat",
|
|
},
|
|
],
|
|
"transitUri": "http://enriched-topical-chat.amazonaws.com/test_rare.json",
|
|
},
|
|
{
|
|
"id": "42",
|
|
"eventId": 42,
|
|
"eventTime": "12/08/2021 10:06:16.952 UTC",
|
|
"eventType": "FETCH",
|
|
"flowFileUuid": "e8a6ad9a-1d02-4bf1-b104-dad1ff180ddd",
|
|
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
|
|
"componentId": "91d59f03-1c2b-3f3f-48bc-f89296a328bd",
|
|
"componentType": "FetchS3Object",
|
|
"componentName": "FetchS3Object",
|
|
"attributes": [
|
|
{
|
|
"name": "filename",
|
|
"value": "test_freq.json",
|
|
"previousValue": "test_freq.json",
|
|
},
|
|
{"name": "path", "value": "./", "previousValue": "./"},
|
|
{
|
|
"name": "s3.bucket",
|
|
"value": "enriched-topical-chat",
|
|
"previousValue": "enriched-topical-chat",
|
|
},
|
|
],
|
|
"transitUri": "http://enriched-topical-chat.amazonaws.com/test_freq.json",
|
|
},
|
|
]
|
|
mock_delete_provenance.return_value = None
|
|
if provenance_case == "fetchs3":
|
|
mock_provenance_events.return_value = fetchs3_provenance_response
|
|
else:
|
|
mock_provenance_events.return_value = puts3_provenance_response
|
|
|
|
|
|
@pytest.mark.parametrize("auth", ["SINGLE_USER", "BASIC_AUTH"])
|
|
def test_auth_without_password(auth):
|
|
with pytest.raises(
|
|
ValueError, match=f"`username` and `password` is required for {auth} auth"
|
|
):
|
|
NifiSourceConfig.parse_obj(
|
|
{
|
|
"site_url": "https://localhost:8443",
|
|
"auth": auth,
|
|
"username": "someuser",
|
|
}
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("auth", ["SINGLE_USER", "BASIC_AUTH"])
|
|
def test_auth_without_username_and_password(auth):
|
|
with pytest.raises(
|
|
ValueError, match=f"`username` and `password` is required for {auth} auth"
|
|
):
|
|
NifiSourceConfig.parse_obj(
|
|
{
|
|
"site_url": "https://localhost:8443",
|
|
"auth": auth,
|
|
}
|
|
)
|
|
|
|
|
|
def test_client_cert_auth_without_client_cert_file():
|
|
with pytest.raises(
|
|
ValueError, match="`client_cert_file` is required for CLIENT_CERT auth"
|
|
):
|
|
NifiSourceConfig.parse_obj(
|
|
{
|
|
"site_url": "https://localhost:8443",
|
|
"auth": "CLIENT_CERT",
|
|
}
|
|
)
|
|
|
|
|
|
def test_single_user_auth_failed_to_get_token():
|
|
config = NifiSourceConfig(
|
|
site_url="https://localhost:12345", # will never work
|
|
username="username",
|
|
password="password",
|
|
auth="SINGLE_USER",
|
|
)
|
|
source = NifiSource(
|
|
config=config,
|
|
ctx=PipelineContext("nifi-run"),
|
|
)
|
|
|
|
# No exception
|
|
list(source.get_workunits())
|
|
|
|
assert source.get_report().failures
|
|
|
|
assert "Failed to authenticate" in [
|
|
failure.message for failure in source.get_report().failures
|
|
]
|
|
|
|
|
|
def test_kerberos_auth_failed_to_get_token():
|
|
config = NifiSourceConfig(
|
|
site_url="https://localhost:12345", # will never work
|
|
auth="KERBEROS",
|
|
)
|
|
source = NifiSource(
|
|
config=config,
|
|
ctx=PipelineContext("nifi-run"),
|
|
)
|
|
|
|
# No exception
|
|
list(source.get_workunits())
|
|
|
|
assert source.get_report().failures
|
|
assert "Failed to authenticate" in [
|
|
failure.message for failure in source.get_report().failures
|
|
]
|
|
|
|
|
|
def test_client_cert_auth_failed():
|
|
config = NifiSourceConfig(
|
|
site_url="https://localhost:12345", # will never work
|
|
auth="CLIENT_CERT",
|
|
client_cert_file="nonexisting_file",
|
|
)
|
|
source = NifiSource(
|
|
config=config,
|
|
ctx=PipelineContext("nifi-run"),
|
|
)
|
|
|
|
# No exception
|
|
list(source.get_workunits())
|
|
|
|
assert source.get_report().failures
|
|
assert "Failed to authenticate" in [
|
|
failure.message for failure in source.get_report().failures
|
|
]
|
|
|
|
|
|
def test_failure_to_create_nifi_flow():
|
|
with patch("datahub.ingestion.source.nifi.NifiSource.authenticate"):
|
|
config = NifiSourceConfig(
|
|
site_url="https://localhost:12345", # will never work
|
|
auth="KERBEROS",
|
|
)
|
|
source = NifiSource(
|
|
config=config,
|
|
ctx=PipelineContext("nifi-run"),
|
|
)
|
|
|
|
# No exception
|
|
list(source.get_workunits())
|
|
|
|
assert source.get_report().failures
|
|
assert "Failed to get root process group flow" in [
|
|
failure.message for failure in source.get_report().failures
|
|
]
|
|
|
|
|
|
def test_site_url_no_context():
|
|
supported_urls = [
|
|
"https://localhost:8443",
|
|
"https://localhost:8443/",
|
|
"https://localhost:8443/nifi",
|
|
"https://localhost:8443/nifi/",
|
|
]
|
|
ctx = PipelineContext("run-id")
|
|
|
|
for url in supported_urls:
|
|
config = NifiSourceConfig(
|
|
site_url=url,
|
|
)
|
|
assert config.site_url == "https://localhost:8443/nifi/"
|
|
assert config.site_url_to_site_name["https://localhost:8443/nifi/"] == "default"
|
|
|
|
assert (
|
|
NifiSource(config, ctx).rest_api_base_url
|
|
== "https://localhost:8443/nifi-api/"
|
|
)
|
|
|
|
|
|
def test_site_url_with_context():
|
|
supported_urls = [
|
|
"https://host/context",
|
|
"https://host/context/",
|
|
"https://host/context/nifi",
|
|
"https://host/context/nifi/",
|
|
]
|
|
ctx = PipelineContext("run-id")
|
|
|
|
for url in supported_urls:
|
|
config = NifiSourceConfig(
|
|
site_url=url,
|
|
)
|
|
assert config.site_url == "https://host/context/nifi/"
|
|
assert config.site_url_to_site_name["https://host/context/nifi/"] == "default"
|
|
assert (
|
|
NifiSource(config, ctx).rest_api_base_url
|
|
== "https://host/context/nifi-api/"
|
|
)
|
|
|
|
|
|
def test_incorrect_site_urls():
|
|
unsupported_urls = ["localhost:8443", "localhost:8443/context/"]
|
|
for url in unsupported_urls:
|
|
with pytest.raises(ValidationError, match="site_url must start with http"):
|
|
NifiSourceConfig(site_url=url)
|