2023-07-07 12:43:01 -04:00
|
|
|
import logging
|
2021-12-09 04:26:31 +05:30
|
|
|
import time
|
|
|
|
|
|
|
|
import pytest
|
2023-07-07 12:43:01 -04:00
|
|
|
import requests
|
2021-12-09 04:26:31 +05:30
|
|
|
from freezegun import freeze_time
|
|
|
|
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
|
|
from tests.test_helpers import fs_helpers, mce_helpers
|
2023-10-03 23:17:49 -04:00
|
|
|
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
|
|
|
|
|
|
|
|
pytestmark = pytest.mark.integration_batch_2
|
2021-12-09 04:26:31 +05:30
|
|
|
|
|
|
|
FROZEN_TIME = "2021-12-03 12:00:00"
|
|
|
|
|
|
|
|
|
2023-07-07 12:43:01 -04:00
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def test_resources_dir(pytestconfig):
|
|
|
|
return pytestconfig.rootpath / "tests/integration/nifi"
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def loaded_nifi(docker_compose_runner, test_resources_dir):
|
2021-12-09 04:26:31 +05:30
|
|
|
with docker_compose_runner(
|
|
|
|
test_resources_dir / "docker-compose.yml", "nifi"
|
|
|
|
) as docker_services:
|
|
|
|
wait_for_port(
|
|
|
|
docker_services,
|
|
|
|
container_name="nifi1",
|
|
|
|
container_port=9443,
|
|
|
|
timeout=300,
|
|
|
|
)
|
|
|
|
wait_for_port(
|
|
|
|
docker_services,
|
|
|
|
container_name="nifi01",
|
|
|
|
container_port=9080,
|
|
|
|
timeout=60,
|
|
|
|
)
|
|
|
|
wait_for_port(
|
|
|
|
docker_services,
|
|
|
|
container_name="nifi02",
|
|
|
|
container_port=9081,
|
|
|
|
timeout=60,
|
|
|
|
)
|
|
|
|
wait_for_port(
|
|
|
|
docker_services,
|
|
|
|
container_name="nifi03",
|
|
|
|
container_port=9082,
|
|
|
|
timeout=60,
|
|
|
|
)
|
2023-07-07 12:43:01 -04:00
|
|
|
yield docker_services
|
2021-12-09 04:26:31 +05:30
|
|
|
|
2023-10-03 23:17:49 -04:00
|
|
|
# The nifi image is pretty large, so we remove it after the test.
|
|
|
|
cleanup_image("apache/nifi")
|
|
|
|
|
2021-12-09 04:26:31 +05:30
|
|
|
|
2023-07-07 12:43:01 -04:00
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
def test_nifi_ingest_standalone(
|
|
|
|
loaded_nifi, pytestconfig, tmp_path, test_resources_dir
|
|
|
|
):
|
|
|
|
# Wait for nifi standalone to execute all lineage processors, max wait time 120 seconds
|
|
|
|
url = "http://localhost:9443/nifi-api/flow/process-groups/80404c81-017d-1000-e8e8-af7420af06c1"
|
|
|
|
for i in range(23):
|
|
|
|
logging.info("waiting...")
|
|
|
|
time.sleep(5)
|
|
|
|
resp = requests.get(url)
|
|
|
|
if resp.status_code != 200:
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
pgs = resp.json()["processGroupFlow"]["flow"]["processors"]
|
|
|
|
statuses = [pg["status"] for pg in pgs]
|
|
|
|
status = next(s for s in statuses if s["name"] == "FetchS3Object")
|
2021-12-09 04:26:31 +05:30
|
|
|
|
2023-07-07 12:43:01 -04:00
|
|
|
if status["aggregateSnapshot"]["flowFilesOut"] >= 1:
|
2025-01-18 15:06:20 +05:30
|
|
|
logging.info(f"Waited for time {i * 5} seconds")
|
2023-07-07 12:43:01 -04:00
|
|
|
break
|
2021-12-09 04:26:31 +05:30
|
|
|
|
2023-07-07 12:43:01 -04:00
|
|
|
# Run the metadata ingestion pipeline.
|
|
|
|
with fs_helpers.isolated_filesystem(tmp_path):
|
|
|
|
# Run nifi ingestion run.
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"run_id": "nifi-test-standalone",
|
|
|
|
"source": {
|
|
|
|
"type": "nifi",
|
|
|
|
"config": {
|
|
|
|
"site_url": "http://localhost:9443/nifi/",
|
|
|
|
"process_group_pattern": {"deny": ["^WIP"]},
|
2021-12-09 04:26:31 +05:30
|
|
|
},
|
2023-07-07 12:43:01 -04:00
|
|
|
},
|
|
|
|
"sink": {
|
|
|
|
"type": "file",
|
|
|
|
"config": {"filename": "./nifi_mces.json"},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
|
|
|
|
|
|
|
# Verify the output. ignore values for aspects having last_event_time values
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path="nifi_mces.json",
|
|
|
|
golden_path=test_resources_dir / "nifi_mces_golden_standalone.json",
|
|
|
|
ignore_paths=[
|
|
|
|
*mce_helpers.IGNORE_PATH_TIMESTAMPS,
|
|
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['last_event_time'\]",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
2023-07-20 23:42:33 +05:30
|
|
|
def test_nifi_ingest_cluster(loaded_nifi, pytestconfig, tmp_path, test_resources_dir):
|
2023-07-07 12:43:01 -04:00
|
|
|
# Wait for nifi cluster to execute all lineage processors, max wait time 120 seconds
|
|
|
|
url = "http://localhost:9080/nifi-api/flow/process-groups/root"
|
|
|
|
for i in range(23):
|
|
|
|
logging.info("waiting...")
|
|
|
|
time.sleep(5)
|
|
|
|
resp = requests.get(url)
|
|
|
|
if resp.status_code != 200:
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
pgs = resp.json()["processGroupFlow"]["flow"]["processGroups"]
|
|
|
|
statuses = [pg["status"] for pg in pgs]
|
|
|
|
status = next(s for s in statuses if s["name"] == "Cluster_Site_S3_to_S3")
|
|
|
|
if status["aggregateSnapshot"]["flowFilesSent"] >= 1:
|
2025-01-18 15:06:20 +05:30
|
|
|
logging.info(f"Waited for time {i * 5} seconds")
|
2023-07-07 12:43:01 -04:00
|
|
|
break
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/nifi"
|
2023-07-20 23:42:33 +05:30
|
|
|
# Run the metadata ingestion pipeline.
|
|
|
|
with fs_helpers.isolated_filesystem(tmp_path):
|
|
|
|
# Run nifi ingestion run.
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"run_id": "nifi-test-cluster",
|
|
|
|
"source": {
|
|
|
|
"type": "nifi",
|
|
|
|
"config": {
|
|
|
|
"site_url": "http://localhost:9080/nifi/",
|
|
|
|
"auth": "NO_AUTH",
|
|
|
|
"site_url_to_site_name": {
|
|
|
|
"http://nifi01:9080/nifi/": "default",
|
|
|
|
"http://nifi02:9081/nifi/": "default",
|
|
|
|
"http://nifi03:9082/nifi/": "default",
|
|
|
|
},
|
2024-04-09 17:44:20 +05:30
|
|
|
"incremental_lineage": False,
|
2021-12-09 04:26:31 +05:30
|
|
|
},
|
2023-07-07 12:43:01 -04:00
|
|
|
},
|
2023-07-20 23:42:33 +05:30
|
|
|
"sink": {
|
|
|
|
"type": "file",
|
|
|
|
"config": {"filename": "./nifi_mces_cluster.json"},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
2021-12-09 04:26:31 +05:30
|
|
|
|
2023-07-20 23:42:33 +05:30
|
|
|
# Verify the output.
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path="nifi_mces_cluster.json",
|
|
|
|
golden_path=test_resources_dir / "nifi_mces_golden_cluster.json",
|
|
|
|
ignore_paths=[
|
|
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['last_event_time'\]",
|
|
|
|
],
|
|
|
|
)
|