datahub/metadata-ingestion/tests/unit/test_nifi_source.py

450 lines
16 KiB
Python

import typing
from unittest.mock import patch
import pytest
from pydantic import ValidationError
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.nifi import (
BidirectionalComponentGraph,
NifiComponent,
NifiFlow,
NifiProcessGroup,
NifiSource,
NifiSourceConfig,
NifiType,
)
@typing.no_type_check
def test_nifi_s3_provenance_event():
config_dict = {"site_url": "http://localhost:8080", "incremental_lineage": False}
nifi_config = NifiSourceConfig.parse_obj(config_dict)
ctx = PipelineContext(run_id="test")
with patch(
"datahub.ingestion.source.nifi.NifiSource.fetch_provenance_events"
) as mock_provenance_events, patch(
"datahub.ingestion.source.nifi.NifiSource.delete_provenance"
) as mock_delete_provenance:
mocked_functions(mock_provenance_events, mock_delete_provenance, "puts3")
nifi_source = NifiSource(nifi_config, ctx)
nifi_source.nifi_flow = NifiFlow(
version="1.15.0",
clustered=False,
root_process_group=NifiProcessGroup(
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
name="Standalone Flow",
parent_group_id=None,
),
components={
"aed63edf-e660-3f29-b56b-192cf6286889": NifiComponent(
id="aed63edf-e660-3f29-b56b-192cf6286889",
name="PutS3Object",
type="org.apache.nifi.processors.aws.s3.PutS3Object",
parent_group_id="80404c81-017d-1000-e8e8-af7420af06c1",
nifi_type=NifiType.PROCESSOR,
comments="",
status=None,
inlets={},
outlets={},
config={},
target_uris=None,
last_event_time=None,
)
},
remotely_accessible_ports={},
connections=BidirectionalComponentGraph(),
processGroups={
"803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup(
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
name="Standalone Flow",
parent_group_id=None,
),
"80404c81-017d-1000-e8e8-af7420af06c1": NifiProcessGroup(
id="80404c81-017d-1000-e8e8-af7420af06c1",
name="Single_Site_S3_to_S3",
parent_group_id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
),
},
remoteProcessGroups={},
remote_ports={},
)
NifiSource.process_provenance_events(nifi_source)
workunits = list(NifiSource.construct_workunits(nifi_source))
# one aspect for dataflow and two aspects for datajob
# and two aspects for dataset
assert len(workunits) == 6
assert workunits[0].metadata.entityType == "dataFlow"
assert workunits[1].metadata.entityType == "dataset"
assert workunits[2].metadata.entityType == "dataset"
assert workunits[3].metadata.entityType == "dataJob"
assert workunits[4].metadata.entityType == "dataJob"
assert workunits[5].metadata.entityType == "dataJob"
ioAspect = workunits[5].metadata.aspect
assert ioAspect.outputDatasets == [
"urn:li:dataset:(urn:li:dataPlatform:s3,foo-nifi/tropical_data,PROD)"
]
assert ioAspect.inputDatasets == []
with patch(
"datahub.ingestion.source.nifi.NifiSource.fetch_provenance_events"
) as mock_provenance_events, patch(
"datahub.ingestion.source.nifi.NifiSource.delete_provenance"
) as mock_delete_provenance:
mocked_functions(mock_provenance_events, mock_delete_provenance, "fetchs3")
nifi_source = NifiSource(nifi_config, ctx)
nifi_source.nifi_flow = NifiFlow(
version="1.15.0",
clustered=False,
root_process_group=NifiProcessGroup(
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
name="Standalone Flow",
parent_group_id=None,
),
components={
"91d59f03-1c2b-3f3f-48bc-f89296a328bd": NifiComponent(
id="91d59f03-1c2b-3f3f-48bc-f89296a328bd9",
name="FetchS3Object",
type="org.apache.nifi.processors.aws.s3.FetchS3Object",
parent_group_id="80404c81-017d-1000-e8e8-af7420af06c1",
nifi_type=NifiType.PROCESSOR,
comments="",
status=None,
inlets={},
outlets={},
config={},
target_uris=None,
last_event_time=None,
)
},
remotely_accessible_ports={},
connections=BidirectionalComponentGraph(),
processGroups={
"803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup(
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
name="Standalone Flow",
parent_group_id=None,
),
"80404c81-017d-1000-e8e8-af7420af06c1": NifiProcessGroup(
id="80404c81-017d-1000-e8e8-af7420af06c1",
name="Single_Site_S3_to_S3",
parent_group_id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
),
},
remoteProcessGroups={},
remote_ports={},
)
NifiSource.process_provenance_events(nifi_source)
workunits = list(NifiSource.construct_workunits(nifi_source))
# one aspect for dataflow and two aspects for datajob
# and two aspects for dataset
assert len(workunits) == 6
assert workunits[0].metadata.entityType == "dataFlow"
assert workunits[1].metadata.entityType == "dataset"
assert workunits[2].metadata.entityType == "dataset"
assert workunits[3].metadata.entityType == "dataJob"
assert workunits[4].metadata.entityType == "dataJob"
assert workunits[5].metadata.entityType == "dataJob"
ioAspect = workunits[5].metadata.aspect
assert ioAspect.outputDatasets == []
assert ioAspect.inputDatasets == [
"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
]
def mocked_functions(mock_provenance_events, mock_delete_provenance, provenance_case):
puts3_provenance_response = [
{
"id": "49",
"eventId": 49,
"eventTime": "12/08/2021 10:06:39.938 UTC",
"eventType": "SEND",
"flowFileUuid": "0b205834-5cac-4979-ad7a-e3db1cb17685",
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
"componentId": "aed63edf-e660-3f29-b56b-192cf6286889",
"componentType": "PutS3Object",
"componentName": "PutS3Object",
"attributes": [
{
"name": "filename",
"value": "test_rare.json",
"previousValue": "test_rare.json",
},
{"name": "path", "value": "./", "previousValue": "./"},
{
"name": "s3.bucket",
"value": "foo-nifi",
"previousValue": "enriched-topical-chat",
},
{"name": "s3.key", "value": "tropical_data/test_rare.json"},
],
"transitUri": "https://foo-nifi.s3.amazonaws.com/tropical_data/test_rare.json",
},
{
"id": "46",
"eventId": 46,
"eventTime": "12/08/2021 10:06:30.560 UTC",
"eventType": "SEND",
"flowFileUuid": "e8a6ad9a-1d02-4bf1-b104-dad1ff180ddd",
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
"componentId": "aed63edf-e660-3f29-b56b-192cf6286889",
"componentType": "PutS3Object",
"componentName": "PutS3Object",
"attributes": [
{
"name": "filename",
"value": "test_freq.json",
"previousValue": "test_freq.json",
},
{"name": "path", "value": "./", "previousValue": "./"},
{
"name": "s3.bucket",
"value": "foo-nifi",
"previousValue": "enriched-topical-chat",
},
{"name": "s3.key", "value": "tropical_data/test_freq.json"},
],
"transitUri": "https://foo-nifi.s3.amazonaws.com/tropical_data/test_freq.json",
},
]
fetchs3_provenance_response = [
{
"id": "44",
"eventId": 44,
"eventTime": "12/08/2021 10:06:19.828 UTC",
"eventType": "FETCH",
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
"componentId": "91d59f03-1c2b-3f3f-48bc-f89296a328bd",
"componentType": "FetchS3Object",
"componentName": "FetchS3Object",
"attributes": [
{
"name": "filename",
"value": "test_rare.json",
"previousValue": "test_rare.json",
},
{"name": "path", "value": "./", "previousValue": "./"},
{
"name": "s3.bucket",
"value": "enriched-topical-chat",
"previousValue": "enriched-topical-chat",
},
],
"transitUri": "http://enriched-topical-chat.amazonaws.com/test_rare.json",
},
{
"id": "42",
"eventId": 42,
"eventTime": "12/08/2021 10:06:16.952 UTC",
"eventType": "FETCH",
"flowFileUuid": "e8a6ad9a-1d02-4bf1-b104-dad1ff180ddd",
"groupId": "80404c81-017d-1000-e8e8-af7420af06c1",
"componentId": "91d59f03-1c2b-3f3f-48bc-f89296a328bd",
"componentType": "FetchS3Object",
"componentName": "FetchS3Object",
"attributes": [
{
"name": "filename",
"value": "test_freq.json",
"previousValue": "test_freq.json",
},
{"name": "path", "value": "./", "previousValue": "./"},
{
"name": "s3.bucket",
"value": "enriched-topical-chat",
"previousValue": "enriched-topical-chat",
},
],
"transitUri": "http://enriched-topical-chat.amazonaws.com/test_freq.json",
},
]
mock_delete_provenance.return_value = None
if provenance_case == "fetchs3":
mock_provenance_events.return_value = fetchs3_provenance_response
else:
mock_provenance_events.return_value = puts3_provenance_response
@pytest.mark.parametrize("auth", ["SINGLE_USER", "BASIC_AUTH"])
def test_auth_without_password(auth):
with pytest.raises(
ValueError, match=f"`username` and `password` is required for {auth} auth"
):
NifiSourceConfig.parse_obj(
{
"site_url": "https://localhost:8443",
"auth": auth,
"username": "someuser",
}
)
@pytest.mark.parametrize("auth", ["SINGLE_USER", "BASIC_AUTH"])
def test_auth_without_username_and_password(auth):
with pytest.raises(
ValueError, match=f"`username` and `password` is required for {auth} auth"
):
NifiSourceConfig.parse_obj(
{
"site_url": "https://localhost:8443",
"auth": auth,
}
)
def test_client_cert_auth_without_client_cert_file():
with pytest.raises(
ValueError, match="`client_cert_file` is required for CLIENT_CERT auth"
):
NifiSourceConfig.parse_obj(
{
"site_url": "https://localhost:8443",
"auth": "CLIENT_CERT",
}
)
def test_single_user_auth_failed_to_get_token():
config = NifiSourceConfig(
site_url="https://localhost:12345", # will never work
username="username",
password="password",
auth="SINGLE_USER",
)
source = NifiSource(
config=config,
ctx=PipelineContext("nifi-run"),
)
# No exception
list(source.get_workunits())
assert source.get_report().failures
assert "Failed to authenticate" in [
failure.message for failure in source.get_report().failures
]
def test_kerberos_auth_failed_to_get_token():
config = NifiSourceConfig(
site_url="https://localhost:12345", # will never work
auth="KERBEROS",
)
source = NifiSource(
config=config,
ctx=PipelineContext("nifi-run"),
)
# No exception
list(source.get_workunits())
assert source.get_report().failures
assert "Failed to authenticate" in [
failure.message for failure in source.get_report().failures
]
def test_client_cert_auth_failed():
config = NifiSourceConfig(
site_url="https://localhost:12345", # will never work
auth="CLIENT_CERT",
client_cert_file="nonexisting_file",
)
source = NifiSource(
config=config,
ctx=PipelineContext("nifi-run"),
)
# No exception
list(source.get_workunits())
assert source.get_report().failures
assert "Failed to authenticate" in [
failure.message for failure in source.get_report().failures
]
def test_failure_to_create_nifi_flow():
with patch("datahub.ingestion.source.nifi.NifiSource.authenticate"):
config = NifiSourceConfig(
site_url="https://localhost:12345", # will never work
auth="KERBEROS",
)
source = NifiSource(
config=config,
ctx=PipelineContext("nifi-run"),
)
# No exception
list(source.get_workunits())
assert source.get_report().failures
assert "Failed to get root process group flow" in [
failure.message for failure in source.get_report().failures
]
def test_site_url_no_context():
supported_urls = [
"https://localhost:8443",
"https://localhost:8443/",
"https://localhost:8443/nifi",
"https://localhost:8443/nifi/",
]
ctx = PipelineContext("run-id")
for url in supported_urls:
config = NifiSourceConfig(
site_url=url,
)
assert config.site_url == "https://localhost:8443/nifi/"
assert config.site_url_to_site_name["https://localhost:8443/nifi/"] == "default"
assert (
NifiSource(config, ctx).rest_api_base_url
== "https://localhost:8443/nifi-api/"
)
def test_site_url_with_context():
supported_urls = [
"https://host/context",
"https://host/context/",
"https://host/context/nifi",
"https://host/context/nifi/",
]
ctx = PipelineContext("run-id")
for url in supported_urls:
config = NifiSourceConfig(
site_url=url,
)
assert config.site_url == "https://host/context/nifi/"
assert config.site_url_to_site_name["https://host/context/nifi/"] == "default"
assert (
NifiSource(config, ctx).rest_api_base_url
== "https://host/context/nifi-api/"
)
def test_incorrect_site_urls():
unsupported_urls = ["localhost:8443", "localhost:8443/context/"]
for url in unsupported_urls:
with pytest.raises(ValidationError, match="site_url must start with http"):
NifiSourceConfig(site_url=url)