diff --git a/.github/workflows/metadata-ingestion-slow.yml b/.github/workflows/metadata-ingestion-slow.yml new file mode 100644 index 0000000000..041bb5037d --- /dev/null +++ b/.github/workflows/metadata-ingestion-slow.yml @@ -0,0 +1,49 @@ +name: metadata ingestion slow integration tests +on: + push: + branches: + - master + paths-ignore: + - "docs/**" + - "**.md" + pull_request: + branches: + - master + paths: + - "**/nifi/**" + - "**/nifi.py" + release: + types: [published, edited] + +jobs: + metadata-ingestion-slow-integration: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.9"] + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: ./metadata-ingestion/scripts/install_deps.sh + - name: Run metadata-ingestion slow integration tests + run: ./gradlew :metadata-ingestion:testSlowIntegration + - uses: actions/upload-artifact@v2 + if: always() + with: + name: Test Results (metadata ingestion slow integration tests) + path: | + **/build/reports/tests/test/** + **/build/test-results/test/** + **/junit.*.xml + + event-file: + runs-on: ubuntu-latest + steps: + - name: Upload + uses: actions/upload-artifact@v2 + with: + name: Event File + path: ${{ github.event_path }} diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 894ccb87c4..b191abdc6a 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -66,6 +66,7 @@ Sources: | [superset](./source_docs/superset.md) | `pip install 'acryl-datahub[superset]'` | Superset source | | [trino](./source_docs/trino.md) | `pip install 'acryl-datahub[trino]` | Trino source | | [starburst-trino-usage](./source_docs/trino.md) | `pip install 'acryl-datahub[starburst-trino-usage]'` | Starburst Trino usage statistics source | +| [nifi](./source_docs/nifi.md) | `pip install 'acryl-datahub[nifi]' | Nifi source | Sinks diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index f809e89b00..576a59e92b 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -78,7 +78,7 @@ task testQuick(type: Exec, dependsOn: installDev) { inputs.files(project.fileTree(dir: "tests/")) outputs.dir("${venv_name}") commandLine 'bash', '-x', '-c', - "source ${venv_name}/bin/activate && pytest -m 'not integration' -vv --continue-on-collection-errors --junit-xml=junit.quick.xml" + "source ${venv_name}/bin/activate && pytest -m 'not integration and not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.quick.xml" } task installDevTest(type: Exec, dependsOn: [installDev]) { @@ -105,7 +105,12 @@ task testSingle(dependsOn: [installDevTest]) { task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) { commandLine 'bash', '-x', '-c', - "source ${venv_name}/bin/activate && pytest -vv --continue-on-collection-errors --junit-xml=junit.full.xml" + "source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml" +} + +task testSlowIntegration(type: Exec, dependsOn: [testQuick, installDevTest]) { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest -m 'slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml" } task cleanPythonCache(type: Exec) { diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index 58b7e6cd60..c508e75237 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -97,10 +97,13 @@ pip install -e '.[dev]' pip install -e '.[integration-tests]' # Run unit tests. -pytest -m 'not integration' +pytest -m 'not integration and not slow_integration' # Run Docker-based integration tests. pytest -m 'integration' + +# Run Docker-based slow integration tests. +pytest -m 'slow_integration' ``` ### Sanity check code before committing diff --git a/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml b/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml index 453a8421d4..617c54e981 100644 --- a/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml +++ b/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml @@ -1,9 +1,9 @@ ---- +run_id: test_cluster # see https://datahubproject.io/docs/metadata-ingestion/source_docs/file for complete documentation source: type: "file" config: - filename: "./examples/mce_files/bootstrap_mce.json" + filename: "./tests/integration/nifi/nifi_mces_golden_cluster.json" # see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index 3f3c45f800..c241fd5de0 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -54,6 +54,7 @@ disallow_untyped_defs = yes addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers markers = integration: marks tests to only run in integration (deselect with '-m "not integration"') + slow_integration: marks tests that are too slow to even run in integration (deselect with '-m "not slow_integration") testpaths = tests/unit tests/integration diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 00d7de9682..522f424743 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -144,6 +144,8 @@ plugins: Dict[str, Set[str]] = { # PR is from same author as that of sqlalchemy-trino library below. "sqlalchemy-trino" }, + "nifi": {"requests"}, + } all_exclude_plugins: Set[str] = { @@ -296,6 +298,8 @@ entry_points = { "openapi = datahub.ingestion.source.openapi:OpenApiSource", "trino = datahub.ingestion.source.sql.trino:TrinoSource", "starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource", + "nifi = datahub.ingestion.source.nifi:NifiSource", + ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/source_docs/nifi.md b/metadata-ingestion/source_docs/nifi.md new file mode 100644 index 0000000000..93bd259566 --- /dev/null +++ b/metadata-ingestion/source_docs/nifi.md @@ -0,0 +1,74 @@ +# Nifi + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +## Setup + +To install this plugin, run `pip install 'acryl-datahub[nifi]'`. + +## Capabilities + +This plugin extracts the following: + +- Nifi flow as `DataFlow` entity +- Ingress, egress processors, remote input and output ports as `DataJob` entity +- Input and output ports receiving remote connections as `Dataset` entity +- Lineage information between external datasets and ingress/egress processors by analyzing provenance events + +Current limitations: + +- Limited ingress/egress processors are supported + - S3: `ListS3`, `FetchS3Object`, `PutS3Object` + - SFTP: `ListSFTP`, `FetchSFTP`, `GetSFTP`, `PutSFTP` + +## Quickstart recipe + +Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. + +For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes). + +```yml +source: + type: "nifi" + config: + # Coordinates + site_url: "https://localhost:8443/nifi/" + + # Credentials + auth: SINGLE_USER + username: admin + password: password + +sink: + # sink configs +``` + +## Config details + +Note that a `.` is used to denote nested fields in the YAML recipe. + +| Field | Required | Default | Description | +| -------------------------- | -------- | -------------------------- | ------------------------------------------------------- | +| `site_url` | ✅ | `"https://localhost:8443/nifi/"` | URI to connect to. | +| `site_name` | | `"default"` | Site name to identify this site with, useful when using input and output ports receiving remote connections | +| `auth` | | `"NO_AUTH"` | Nifi authentication. must be one of : NO_AUTH, SINGLE_USER, CLIENT_CERT | +| `username` | | | Nifi username, must be set for `auth` = `"SINGLE_USER"` | +| `password` | | | Nifi password, must be set for `auth` = `"SINGLE_USER"` | +| `client_cert_file` | | | Path to PEM file containing the public certificates for the user/client identity, must be set for `auth` = `"CLIENT_CERT"` | +| `client_key_file` | | | Path to PEM file containing the client’s secret key | +| `client_key_password` | | | The password to decrypt the client_key_file | +| `ca_file` | | | Path to PEM file containing certs for the root CA(s) for the NiFi | +| `provenance_days` | | | time window to analyze provenance events for external datasets | +| `site_url_to_site_name` | | | Lookup to find site_name for site_url, required if using remote process groups in nifi flow | +|`process_group_pattern.allow`| | | List of regex patterns for process groups to include in ingestion. | +| `process_group_pattern.deny`| | | List of regex patterns for process groups to exclude from ingestion. | +| `process_group_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | +| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | + +## Compatibility + +Coming soon! + +## Questions + +If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)! diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py new file mode 100644 index 0000000000..4f652e5a1f --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -0,0 +1,1024 @@ +import json +import logging +import ssl +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Callable, Dict, Iterable, List, Optional, Tuple +from urllib.parse import urljoin + +import requests +from dateutil import parser +from requests.adapters import HTTPAdapter + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.schema_classes import ( + ChangeTypeClass, + DataFlowInfoClass, + DataJobInfoClass, + DataJobInputOutputClass, + DataPlatformInstanceClass, + DatasetPropertiesClass, +) + +logger = logging.getLogger(__name__) +NIFI = "nifi" + + +# Python requests does not support passing password for key file, +# The same can be achieved by mounting ssl context +# as described here - https://github.com/psf/requests/issues/2519 +# and here - https://github.com/psf/requests/issues/1573 +class SSLAdapter(HTTPAdapter): + def __init__(self, certfile, keyfile, password=None): + self.context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + self.context.load_cert_chain( + certfile=certfile, keyfile=keyfile, password=password + ) + super().__init__() + + def init_poolmanager(self, *args, **kwargs): + kwargs["ssl_context"] = self.context + return super().init_poolmanager(*args, **kwargs) + + +class NifiAuthType(Enum): + NO_AUTH = "NO_AUTH" + SINGLE_USER = "SINGLE_USER" + CLIENT_CERT = "CLIENT_CERT" + + +class NifiSourceConfig(ConfigModel): + site_url: str + + auth: NifiAuthType = NifiAuthType.NO_AUTH + + provenance_days: int = 7 # Fetch provenance events for past 1 week + process_group_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + + # Required for nifi deployments using Remote Process Groups + site_name: str = "default" + site_url_to_site_name: Dict[str, str] = {} + + # Required to be set if auth is of type SINGLE_USER + username: Optional[str] + password: Optional[str] + + # Required to be set if auth is of type CLIENT_CERT + client_cert_file: Optional[str] + client_key_file: Optional[str] + client_key_password: Optional[str] + + # Required to be set if nifi server certificate is not signed by + # root CA trusted by client system, e.g. self-signed certificates + ca_file: Optional[str] + + +TOKEN_ENDPOINT = "/nifi-api/access/token" +CLUSTER_ENDPOINT = "/nifi-api/flow/cluster/summary" +PG_ENDPOINT = "/nifi-api/flow/process-groups/" +PROVENANCE_ENDPOINT = "/nifi-api/provenance/" + + +class NifiType(Enum): + PROCESSOR = "PROCESSOR" + FUNNEL = "FUNNEL" + INPUT_PORT = "INPUT_PORT" + OUTPUT_PORT = "OUTPUT_PORT" + REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT" + REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT" + + +class NifiEventType: + CREATE = "CREATE" + FETCH = "FETCH" + SEND = "SEND" + RECEIVE = "RECEIVE" + + +class NifiProcessorType: + ListS3 = "org.apache.nifi.processors.aws.s3.ListS3" + FetchS3Object = "org.apache.nifi.processors.aws.s3.FetchS3Object" + PutS3Object = "org.apache.nifi.processors.aws.s3.PutS3Object" + ListSFTP = "org.apache.nifi.processors.standard.ListSFTP" + FetchSFTP = "org.apache.nifi.processors.standard.FetchSFTP" + GetSFTP = "org.apache.nifi.processors.standard.GetSFTP" + PutSFTP = "org.apache.nifi.processors.standard.PutSFTP" + + +# To support new processor type, +# 1. add an entry in KNOWN_INGRESS_EGRESS_PROCESORS +# 2. Implement provenance event analyzer to find external dataset and +# map it in provenance_event_to_lineage_map +class NifiProcessorProvenanceEventAnalyzer: + + KNOWN_INGRESS_EGRESS_PROCESORS = { + NifiProcessorType.ListS3: NifiEventType.CREATE, + NifiProcessorType.FetchS3Object: NifiEventType.FETCH, + NifiProcessorType.PutS3Object: NifiEventType.SEND, + NifiProcessorType.ListSFTP: NifiEventType.CREATE, + NifiProcessorType.FetchSFTP: NifiEventType.FETCH, + NifiProcessorType.GetSFTP: NifiEventType.RECEIVE, + NifiProcessorType.PutSFTP: NifiEventType.SEND, + } + + def __init__(self) -> None: + # Map of Nifi processor type to the provenance event analyzer to find lineage + self.provenance_event_to_lineage_map: Dict[ + str, Callable[[Dict], ExternalDataset] + ] = { + NifiProcessorType.ListS3: self.process_s3_provenance_event, + NifiProcessorType.FetchS3Object: self.process_s3_provenance_event, + NifiProcessorType.PutS3Object: self.process_s3_provenance_event, + NifiProcessorType.ListSFTP: self.process_sftp_provenance_event, + NifiProcessorType.FetchSFTP: self.process_sftp_provenance_event, + NifiProcessorType.GetSFTP: self.process_sftp_provenance_event, + NifiProcessorType.PutSFTP: self.process_sftp_provenance_event, + } + + def process_s3_provenance_event(self, event): + attributes = event.get("attributes", []) + s3_bucket = get_attribute_value(attributes, "s3.bucket") + s3_key = get_attribute_value(attributes, "s3.key") + if not s3_key: + s3_key = get_attribute_value(attributes, "filename") + + s3_url = f"s3://{s3_bucket}/{s3_key}" + s3_url = s3_url[: s3_url.rindex("/")] + dataset_name = s3_url.replace("s3://", "").replace("/", ".") + platform = "urn:li:dataPlatform:s3" + dataset_urn = builder.make_dataset_urn(platform, dataset_name) + return ExternalDataset( + platform, + dataset_name, + dict(s3_uri=s3_url), + dataset_urn, + ) + + def process_sftp_provenance_event(self, event): + attributes = event.get("attributes", []) + remote_host = get_attribute_value(attributes, "sftp.remote.host") + path = get_attribute_value(attributes, "path") + filename = get_attribute_value(attributes, "filename") + absolute_path = f"sftp://{remote_host}/{path}/{filename}" + if remote_host is None or path is None or filename is None: + absolute_path = event.get("transitUri") + + absolute_path = absolute_path.replace("/./", "/") + if absolute_path.endswith("/."): + absolute_path = absolute_path[:-2] + absolute_path = absolute_path[: absolute_path.rindex("/")] + dataset_name = absolute_path.replace("sftp://", "").replace("/", ".") + platform = "file" + dataset_urn = builder.make_dataset_urn(platform, dataset_name) + return ExternalDataset( + platform, + dataset_name, + dict(uri=absolute_path), + dataset_urn, + ) + + +@dataclass +class ExternalDataset: + platform: str + dataset_name: str + dataset_properties: Dict[str, str] + dataset_urn: str + + +@dataclass +class NifiComponent: + id: str + name: str + type: str + parent_group_id: str + nifi_type: NifiType + comments: Optional[str] = None + status: Optional[str] = None + + # present only for nifi remote ports and processors + inlets: Dict[str, ExternalDataset] = field(default_factory=dict) + outlets: Dict[str, ExternalDataset] = field(default_factory=dict) + + # present only for processors + config: Optional[Dict] = None + + # present only for nifi remote ports + target_uris: Optional[str] = None + parent_rpg_id: Optional[str] = None + + # Last successful event time + last_event_time: Optional[str] = None + + +@dataclass +class NifiProcessGroup: + id: str + name: str + parent_group_id: Optional[str] + + +@dataclass +class NifiRemoteProcessGroup: + id: str + name: str + parent_group_id: str + remote_ports: Dict[str, NifiComponent] + + +@dataclass +class NifiFlow: + clustered: Optional[bool] + root_process_group: NifiProcessGroup + components: Dict[str, NifiComponent] = field(default_factory=dict) + remotely_accessible_ports: Dict[str, NifiComponent] = field(default_factory=dict) + connections: List[Tuple[str, str]] = field(default_factory=list) + processGroups: Dict[str, NifiProcessGroup] = field(default_factory=dict) + remoteProcessGroups: Dict[str, NifiRemoteProcessGroup] = field(default_factory=dict) + remote_ports: Dict[str, NifiComponent] = field(default_factory=dict) + + +def get_attribute_value(attr_lst: List[dict], attr_name: str) -> Optional[str]: + match = [entry for entry in attr_lst if entry["name"] == attr_name] + if len(match) > 0: + return match[0]["value"] + return None + + +@dataclass +class NifiSourceReport(SourceReport): + filtered: List[str] = field(default_factory=list) + + def report_dropped(self, ent_name: str) -> None: + self.filtered.append(ent_name) + + +# allowRemoteAccess +class NifiSource(Source): + config: NifiSourceConfig + report: NifiSourceReport + + def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None: + super().__init__(ctx) + self.config = config + self.report = NifiSourceReport() + self.session = requests.Session() + + if self.config.ca_file is not None: + self.session.verify = self.config.ca_file + + if self.config.site_url_to_site_name is None: + self.config.site_url_to_site_name = {} + if ( + not urljoin(self.config.site_url, "/nifi/") + in self.config.site_url_to_site_name + ): + self.config.site_url_to_site_name[ + urljoin(self.config.site_url, "/nifi/") + ] = self.config.site_name + + if self.config.auth is NifiAuthType.CLIENT_CERT: + logger.debug("Setting client certificates in requests ssl context") + assert ( + self.config.client_cert_file is not None + ), "Config client_cert_file is required for CLIENT_CERT auth" + self.session.mount( + urljoin(self.config.site_url, "/nifi-api/"), + SSLAdapter( + certfile=self.config.client_cert_file, + keyfile=self.config.client_key_file, + password=self.config.client_key_password, + ), + ) + if self.config.auth is NifiAuthType.SINGLE_USER: + assert ( + self.config.username is not None + ), "Config username is required for SINGLE_USER auth" + assert ( + self.config.password is not None + ), "Config password is required for SINGLE_USER auth" + token_response = self.session.post( + url=urljoin(self.config.site_url, TOKEN_ENDPOINT), + data={"username": self.config.username, "password": "admin@datahub"}, + ) + if not token_response.ok: + logger.error("Failed to get token") + self.report.report_failure(self.config.site_url, "Failed to get token") + + self.session.headers.update( + { + "Authorization": "Bearer " + token_response.text, + # "Accept": "application/json", + "Content-Type": "application/json", + } + ) + else: + self.session.headers.update( + { + # "Accept": "application/json", + "Content-Type": "application/json", + } + ) + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": + config = NifiSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_report(self) -> SourceReport: + return self.report + + def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 + breadcrumb_dto = pg_flow_dto.get("breadcrumb", {}).get("breadcrumb", {}) + nifi_pg = NifiProcessGroup( + breadcrumb_dto.get("id"), + breadcrumb_dto.get("name"), + pg_flow_dto.get("parentGroupId"), + ) + self.nifi_flow.processGroups[nifi_pg.id] = nifi_pg + if not self.config.process_group_pattern.allowed(nifi_pg.name): + self.report.report_dropped(f"{nifi_pg.name}.*") + return + + flow_dto = pg_flow_dto.get("flow", {}) + + for processor in flow_dto.get("processors", []): + component = processor.get("component") + self.nifi_flow.components[component.get("id")] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + component.get("parentGroupId"), + NifiType.PROCESSOR, + config=component.get("config"), + comments=component.get("config", {}).get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + for funnel in flow_dto.get("funnels", []): + component = funnel.get("component") + self.nifi_flow.components[component.get("id")] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + component.get("parentGroupId"), + NifiType.FUNNEL, + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding funnel {component.get('id')}") + + for connection in flow_dto.get("connections", []): + # Exclude self - recursive relationships + if connection.get("sourceId") != connection.get("destinationId"): + self.nifi_flow.connections.append( + (connection.get("sourceId"), connection.get("destinationId")) + ) + + for inputPort in flow_dto.get("inputPorts", []): + component = inputPort.get("component") + if inputPort.get("allowRemoteAccess"): + self.nifi_flow.remotely_accessible_ports[ + component.get("id") + ] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + component.get("parentGroupId"), + NifiType.INPUT_PORT, + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding remotely accessible port {component.get('id')}") + else: + self.nifi_flow.components[component.get("id")] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + component.get("parentGroupId"), + NifiType.INPUT_PORT, + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding port {component.get('id')}") + + for outputPort in flow_dto.get("outputPorts", []): + component = outputPort.get("component") + if outputPort.get("allowRemoteAccess"): + self.nifi_flow.remotely_accessible_ports[ + component.get("id") + ] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + component.get("parentGroupId"), + NifiType.OUTPUT_PORT, + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding remotely accessible port {component.get('id')}") + else: + self.nifi_flow.components[component.get("id")] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + component.get("parentGroupId"), + NifiType.OUTPUT_PORT, + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding report port {component.get('id')}") + + for rpg in flow_dto.get("remoteProcessGroups", []): + rpg_component = rpg.get("component", {}) + remote_ports = {} + + contents = rpg_component.get("contents", {}) + for component in contents.get("outputPorts", []): + if component.get("connected", False): + remote_ports[component.get("id")] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + rpg_component.get("parentGroupId"), + NifiType.REMOTE_OUTPUT_PORT, + target_uris=rpg_component.get("targetUris"), + parent_rpg_id=rpg_component.get("id"), + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding remote output port {component.get('id')}") + + for component in contents.get("inputPorts", []): + if component.get("connected", False): + remote_ports[component.get("id")] = NifiComponent( + component.get("id"), + component.get("name"), + component.get("type"), + rpg_component.get("parentGroupId"), + NifiType.REMOTE_INPUT_PORT, + target_uris=rpg_component.get("targetUris"), + parent_rpg_id=rpg_component.get("id"), + comments=component.get("comments"), + status=component.get("status", {}).get("runStatus"), + ) + logger.debug(f"Adding remote input port {component.get('id')}") + + nifi_rpg = NifiRemoteProcessGroup( + rpg_component.get("id"), + rpg_component.get("name"), + component.get("parentGroupId"), + remote_ports, + ) + logger.debug(f"Adding remote process group {rpg_component.get('id')}") + self.nifi_flow.components.update(remote_ports) + self.nifi_flow.remoteProcessGroups[nifi_rpg.id] = nifi_rpg + + for pg in flow_dto.get("processGroups", []): + pg_response = self.session.get( + url=urljoin(self.config.site_url, PG_ENDPOINT) + pg.get("id") + ) + + if not pg_response.ok: + self.report_warning( + self.config.site_url, + "Failed to get process group flow " + pg.get("id"), + ) + continue + + pg_flow_dto = pg_response.json().get("processGroupFlow", {}) + + self.update_flow(pg_flow_dto) + + def update_flow_keep_only_ingress_egress(self): + components_to_del: List[NifiComponent] = [] + for component in self.nifi_flow.components.values(): + if ( + component.nifi_type is NifiType.PROCESSOR + and component.type + not in NifiProcessorProvenanceEventAnalyzer.KNOWN_INGRESS_EGRESS_PROCESORS.keys() + ) or component.nifi_type not in [ + NifiType.PROCESSOR, + NifiType.REMOTE_INPUT_PORT, + NifiType.REMOTE_OUTPUT_PORT, + ]: + components_to_del.append(component) + incoming = list( + filter(lambda x: x[1] == component.id, self.nifi_flow.connections) + ) + outgoing = list( + filter(lambda x: x[0] == component.id, self.nifi_flow.connections) + ) + # Create new connections from incoming to outgoing + for i in incoming: + for j in outgoing: + self.nifi_flow.connections.append((i[0], j[1])) + + # Remove older connections, as we already created + # new connections bypassing component to be deleted + + for i in incoming: + self.nifi_flow.connections.remove(i) + for j in outgoing: + self.nifi_flow.connections.remove(j) + + for c in components_to_del: + if c.nifi_type is NifiType.PROCESSOR and ( + c.name.startswith("Get") + or c.name.startswith("List") + or c.name.startswith("Fetch") + or c.name.startswith("Put") + ): + self.report_warning( + self.config.site_url, + f"Dropping Nifi Processor of type {c.type}, id {c.id}, name {c.name} from lineage view. \ + This is likely an Ingress or Egress node which may be reading to/writing from external datasets \ + However not currently supported in datahub", + ) + else: + logger.debug( + f"Dropping Nifi Component of type {c.type}, id {c.id}, name {c.name} from lineage view." + ) + + del self.nifi_flow.components[c.id] + + def create_nifi_flow(self): + cluster_response = self.session.get( + url=urljoin(self.config.site_url, CLUSTER_ENDPOINT) + ) + clustered: Optional[bool] = None + if cluster_response.ok: + clustered = ( + cluster_response.json().get("clusterSummary", {}).get("clustered") + ) + else: + logger.warn("Failed to fetch cluster summary for flow") + pg_response = self.session.get( + url=urljoin(self.config.site_url, PG_ENDPOINT) + "root" + ) + + if not pg_response.ok: + logger.error("Failed to get root process group flow") + self.report.report_failure( + self.config.site_url, "Failed to get of root process group flow" + ) + + pg_flow_dto = pg_response.json().get("processGroupFlow", {}) + breadcrumb_dto = pg_flow_dto.get("breadcrumb", {}).get("breadcrumb", {}) + self.nifi_flow = NifiFlow( + clustered=clustered, + root_process_group=NifiProcessGroup( + breadcrumb_dto.get("id"), + breadcrumb_dto.get("name"), + pg_flow_dto.get("parentGroupId"), + ), + ) + self.update_flow(pg_flow_dto) + self.update_flow_keep_only_ingress_egress() + + def fetch_provenance_events( + self, + processor: NifiComponent, + eventType: str, + startDate: datetime, + endDate: Optional[datetime] = None, + ) -> Iterable[Dict]: + + logger.debug( + f"Fetching {eventType} provenance events for {processor.id}\ + of processor type {processor.type}, Start date: {startDate}, End date: {endDate}" + ) + + payload = json.dumps( + { + "provenance": { + "request": { + "maxResults": 1000, + "summarize": False, + "searchTerms": { + "ProcessorID": {"value": processor.id}, + "EventType": {"value": eventType}, + }, + "startDate": startDate.strftime("%m/%d/%Y %H:%M:%S %Z"), + "endDate": ( + endDate.strftime("%m/%d/%Y %H:%M:%S %Z") + if endDate + else None + ), + } + } + } + ) + logger.debug(payload) + provenance_response = self.session.post( + url=urljoin(self.config.site_url, PROVENANCE_ENDPOINT), data=payload + ) + + if provenance_response.ok: + provenance = provenance_response.json().get("provenance", {}) + provenance_uri = provenance.get("uri") + + provenance_response = self.session.get(provenance_uri) + if provenance_response.ok: + provenance = provenance_response.json().get("provenance", {}) + + attempts = 5 # wait for at most 5 attempts 5*1= 5 seconds + while (not provenance.get("finished", False)) and attempts > 0: + logger.warn( + f"Provenance query not completed, attempts left : {attempts}" + ) + # wait until the uri returns percentcomplete 100 + time.sleep(1) + provenance_response = self.session.get(provenance_uri) + attempts -= 1 + if provenance_response.ok: + provenance = provenance_response.json().get("provenance", {}) + + events = provenance.get("results", {}).get("provenanceEvents", []) + last_event_time: Optional[datetime] = None + oldest_event_time: Optional[datetime] = None + + for event in events: + event_time = parser.parse(event.get("eventTime")) + # datetime.strptime( + # event.get("eventTime"), "%m/%d/%Y %H:%M:%S.%f %Z" + # ) + if not last_event_time or event_time > last_event_time: + last_event_time = event_time + + if not oldest_event_time or event_time < oldest_event_time: + oldest_event_time = event_time + + yield event + + processor.last_event_time = str(last_event_time) + self.delete_provenance(provenance_uri) + + total = provenance.get("results", {}).get("total") + totalCount = provenance.get("results", {}).get("totalCount") + if total != str(totalCount): + yield from self.fetch_provenance_events( + processor, eventType, startDate, oldest_event_time + ) + else: + self.report_warning( + self.config.site_url, + f"provenance events could not be fetched for processor \ + {processor.id} of type {processor.name}", + ) + logger.warn(provenance_response.text) + return + + def report_warning(self, key: str, reason: str) -> None: + logger.warning(f"{key}: {reason}") + self.report.report_warning(key, reason) + + def delete_provenance(self, provenance_uri): + delete_response = self.session.delete(provenance_uri) + if not delete_response.ok: + logger.error("failed to delete provenance ", provenance_uri) + + def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 + + rootpg = self.nifi_flow.root_process_group + flow_name = rootpg.name # self.config.site_name + flow_urn = builder.make_data_flow_urn(orchestrator=NIFI, flow_id=rootpg.id) + flow_properties = dict() + if self.nifi_flow.clustered is not None: + flow_properties["clustered"] = str(self.nifi_flow.clustered) + yield from self.construct_flow_workunits( + flow_urn, flow_name, self.make_external_url(rootpg.id), flow_properties + ) + + for component in self.nifi_flow.components.values(): + job_name = component.name + job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id) + + incoming = list( + filter(lambda x: x[1] == component.id, self.nifi_flow.connections) + ) + outgoing = list( + filter(lambda x: x[0] == component.id, self.nifi_flow.connections) + ) + inputJobs = [] + jobProperties = None + + if component.nifi_type is NifiType.PROCESSOR: + jobProperties = { + k: str(v) + for k, v in component.config.items() # type: ignore + if k + in [ + "schedulingPeriod", + "schedulingStrategy", + "executionNode", + "concurrentlySchedulableTaskCount", + ] + } + jobProperties["properties"] = json.dumps( + component.config.get("properties") # type: ignore + ) + if component.last_event_time is not None: + jobProperties["last_event_time"] = component.last_event_time + + for dataset in component.inlets.values(): + yield from self.construct_dataset_workunits( + dataset.platform, + dataset.dataset_name, + dataset.dataset_urn, + datasetProperties=dataset.dataset_properties, + ) + + for dataset in component.outlets.values(): + yield from self.construct_dataset_workunits( + dataset.platform, + dataset.dataset_name, + dataset.dataset_urn, + datasetProperties=dataset.dataset_properties, + ) + + for edge in incoming: + incoming_from = edge[0] + if incoming_from in self.nifi_flow.remotely_accessible_ports.keys(): + dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[incoming_from].name}" + dataset_urn = builder.make_dataset_urn( + NIFI, + dataset_name, + ) + component.inlets[dataset_urn] = ExternalDataset( + NIFI, + dataset_name, + dict(nifi_uri=self.config.site_url), + dataset_urn, + ) + else: + inputJobs.append( + builder.make_data_job_urn_with_flow(flow_urn, incoming_from) + ) + + for edge in outgoing: + outgoing_to = edge[1] + if outgoing_to in self.nifi_flow.remotely_accessible_ports.keys(): + dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[outgoing_to].name}" + dataset_urn = builder.make_dataset_urn(NIFI, dataset_name) + component.outlets[dataset_urn] = ExternalDataset( + NIFI, + dataset_name, + dict(nifi_uri=self.config.site_url), + dataset_urn, + ) + + if component.nifi_type is NifiType.REMOTE_INPUT_PORT: + # TODO - if target_uris is not set, but http proxy is used in RPG + site_urls = component.target_uris.split(",") # type: ignore + for site_url in site_urls: + if site_url not in self.config.site_url_to_site_name: + self.report_warning( + site_url, + f"Site with url {site_url} is being used in flow but\ + corresponding site name is not configured via site_url_to_site_name.\ + This may result in broken lineage.", + ) + else: + site_name = self.config.site_url_to_site_name[site_url] + dataset_name = f"{site_name}.{component.name}" + dataset_urn = builder.make_dataset_urn( + NIFI, + dataset_name, + ) + component.outlets[dataset_urn] = ExternalDataset( + NIFI, dataset_name, dict(nifi_uri=site_url), dataset_urn + ) + break + + if component.nifi_type is NifiType.REMOTE_OUTPUT_PORT: + site_urls = component.target_uris.split(",") # type: ignore + for site_url in site_urls: + if site_url not in self.config.site_url_to_site_name: + self.report_warning( + self.config.site_url, + f"Site with url {site_url} is being used in flow but\ + corresponding site name is not configured via site_url_to_site_name.\ + This may result in broken lineage.", + ) + else: + site_name = self.config.site_url_to_site_name[site_url] + + dataset_name = f"{site_name}.{component.name}" + dataset_urn = builder.make_dataset_urn( + NIFI, + dataset_name, + ) + component.inlets[dataset_urn] = ExternalDataset( + NIFI, dataset_name, dict(nifi_uri=site_url), dataset_urn + ) + break + + yield from self.construct_job_workunits( + job_urn, + job_name, + external_url=self.make_external_url( + component.parent_group_id, component.id, component.parent_rpg_id + ), + job_type=NIFI.upper() + "_" + component.nifi_type.value, + description=component.comments, + job_properties=jobProperties, + inlets=list(component.inlets.keys()), + outlets=list(component.outlets.keys()), + inputJobs=inputJobs, + status=component.status, + ) + + for port in self.nifi_flow.remotely_accessible_ports.values(): + dataset_name = f"{self.config.site_name}.{port.name}" + dataset_platform = NIFI + yield from self.construct_dataset_workunits( + dataset_platform, + dataset_name, + external_url=self.make_external_url(port.parent_group_id, port.id), + ) + + def process_provenance_events(self): + + startDate = datetime.now(timezone.utc) - timedelta( + days=self.config.provenance_days + ) + + eventAnalyzer = NifiProcessorProvenanceEventAnalyzer() + + for component in self.nifi_flow.components.values(): + if component.nifi_type is NifiType.PROCESSOR: + eventType = eventAnalyzer.KNOWN_INGRESS_EGRESS_PROCESORS[component.type] + events = self.fetch_provenance_events(component, eventType, startDate) + for event in events: + dataset = eventAnalyzer.provenance_event_to_lineage_map[ + component.type + ](event) + if eventType in [ + NifiEventType.CREATE, + NifiEventType.FETCH, + NifiEventType.RECEIVE, + ]: + component.inlets[dataset.dataset_urn] = dataset + else: + component.outlets[dataset.dataset_urn] = dataset + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + + # Creates nifi_flow by invoking /flow rest api and saves as self.nifi_flow + self.create_nifi_flow() + + # Updates inlets and outlets of nifi_flow.components by invoking /provenance rest api + self.process_provenance_events() + + # Reads and translates entities from self.nifi_flow into mcps + yield from self.construct_workunits() + + def make_external_url( + self, + parent_group_id: str, + component_id: Optional[str] = "", + parent_rpg_id: Optional[str] = None, + ) -> str: + if parent_rpg_id is not None: + component_id = parent_rpg_id + return urljoin( + self.config.site_url, + f"/nifi/?processGroupId={parent_group_id}&componentIds={component_id}", + ) + + def construct_flow_workunits( + self, + flow_urn: str, + flow_name: str, + external_url: str, + flow_properties: Optional[Dict[str, str]] = None, + ) -> Iterable[MetadataWorkUnit]: + mcp = MetadataChangeProposalWrapper( + entityType="dataFlow", + entityUrn=flow_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="dataFlowInfo", + aspect=DataFlowInfoClass( + name=flow_name, + customProperties=flow_properties, + externalUrl=external_url, + ), + ) + for proposal in [mcp]: + wu = MetadataWorkUnit( + id=f"{NIFI}.{flow_name}.{proposal.aspectName}", mcp=proposal + ) + self.report.report_workunit(wu) + yield wu + + def construct_job_workunits( + self, + job_urn: str, + job_name: str, + external_url: str, + job_type: str, + description: Optional[str], + job_properties: Optional[Dict[str, str]] = None, + inlets: List[str] = [], + outlets: List[str] = [], + inputJobs: List[str] = [], + status: Optional[str] = None, + ) -> Iterable[MetadataWorkUnit]: + if job_properties: + job_properties = {k: v for k, v in job_properties.items() if v is not None} + + mcp = MetadataChangeProposalWrapper( + entityType="dataJob", + entityUrn=job_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="dataJobInfo", + aspect=DataJobInfoClass( + name=job_name, + type=job_type, + description=description, + customProperties=job_properties, + externalUrl=external_url, + status=status, + ), + ) + + wu = MetadataWorkUnit( + id=f"{NIFI}.{job_name}.{mcp.aspectName}", + mcp=mcp, + ) + self.report.report_workunit(wu) + yield wu + + inlets.sort() + outlets.sort() + inputJobs.sort() + + mcp = MetadataChangeProposalWrapper( + entityType="dataJob", + entityUrn=job_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="dataJobInputOutput", + aspect=DataJobInputOutputClass( + inputDatasets=inlets, outputDatasets=outlets, inputDatajobs=inputJobs + ), + ) + + wu = MetadataWorkUnit( + id=f"{NIFI}.{job_name}.{mcp.aspectName}", + mcp=mcp, + ) + self.report.report_workunit(wu) + yield wu + + def construct_dataset_workunits( + self, + dataset_platform: str, + dataset_name: str, + dataset_urn: Optional[str] = None, + external_url: Optional[str] = None, + datasetProperties: Optional[Dict[str, str]] = None, + ) -> Iterable[MetadataWorkUnit]: + + if not dataset_urn: + dataset_urn = builder.make_dataset_urn(dataset_platform, dataset_name) + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=builder.make_data_platform_urn(dataset_platform) + ), + ) + platform = ( + dataset_platform[dataset_platform.rindex(":") + 1 :] + if dataset_platform.startswith("urn:") + else dataset_platform + ) + wu = MetadataWorkUnit(id=f"{platform}.{dataset_name}.{mcp.aspectName}", mcp=mcp) + if wu.id not in self.report.workunit_ids: + self.report.report_workunit(wu) + yield wu + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="datasetProperties", + aspect=DatasetPropertiesClass( + externalUrl=external_url, customProperties=datasetProperties + ), + ) + + wu = MetadataWorkUnit(id=f"{platform}.{dataset_name}.{mcp.aspectName}", mcp=mcp) + if wu.id not in self.report.workunit_ids: + self.report.report_workunit(wu) + yield wu diff --git a/metadata-ingestion/tests/integration/nifi/docker-compose.yml b/metadata-ingestion/tests/integration/nifi/docker-compose.yml new file mode 100644 index 0000000000..02b9f11a9e --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/docker-compose.yml @@ -0,0 +1,126 @@ +services: + nifi1: + image: apache/nifi:1.15.0 + container_name: nifi1 + hostname: nifi1 + environment: + #AUTH: tls + NIFI_REMOTE_INPUT_HOST: nifi1 + NIFI_SENSITIVE_PROPS_KEY: admin@datahub + #SINGLE_USER_CREDENTIALS_USERNAME: admin + #SINGLE_USER_CREDENTIALS_PASSWORD: admin@datahub + #KEYSTORE_PATH: /opt/certs/server_keystore.jks + #KEYSTORE_TYPE: JKS + #KEYSTORE_PASSWORD: datahub + #TRUSTSTORE_PATH: /opt/certs/server_truststore.jks + #TRUSTSTORE_PASSWORD: datahub + #TRUSTSTORE_TYPE: JKS + #INITIAL_ADMIN_IDENTITY: 'CN=DatahubUser, C=US' + NIFI_WEB_HTTP_PORT: 9443 + volumes: + - ./setup/conf:/opt/nifi/tmp:ro + #- ./setup/ssl_files:/opt/certs + entrypoint: + - bash + - -c + - | + echo "Copying Flow" + # + cp /opt/nifi/tmp/flow.xml.gz /opt/nifi/nifi-current/conf/flow.xml.gz + # + echo "Starting Nifi" + # + /opt/nifi/scripts/start.sh & + # + sleep infinity + ports: + - 9443:9443 + + + nifi_zookeeper: + hostname: nifi_zookeeper + container_name: nifi_zookeeper + image: 'bitnami/zookeeper:latest' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + - ZOO_PORT_NUMBER=52181 + ports: + - 52181:52181 + + nifi01: + image: apache/nifi:1.15.0 + container_name: nifi01 + hostname: nifi01 + depends_on: + - nifi_zookeeper + ports: + - 9080:9080 + volumes: + - ./setup/conf_clustered:/opt/nifi/tmp:ro + entrypoint: + - bash + - -c + - | + echo "Copying Flow" + # + cp /opt/nifi/tmp/flow.xml.gz /opt/nifi/nifi-current/conf/flow.xml.gz + # + echo "Starting Nifi" + # + /opt/nifi/scripts/start.sh & + # + sleep infinity + environment: + - NIFI_WEB_HTTP_PORT=9080 + - NIFI_CLUSTER_IS_NODE=true + - NIFI_CLUSTER_NODE_PROTOCOL_PORT=7080 + - NIFI_ZK_CONNECT_STRING=nifi_zookeeper:52181 + - NIFI_ELECTION_MAX_WAIT=1 min + - NIFI_SENSITIVE_PROPS_KEY=admin@datahub + + + nifi02: + image: apache/nifi:1.15.0 + container_name: nifi02 + hostname: nifi02 + depends_on: + - nifi_zookeeper + ports: + - 9081:9081 + volumes: + - ./setup/conf/awscreds.properties:/opt/nifi/nifi-current/conf/awscreds.properties + environment: + - NIFI_WEB_HTTP_PORT=9081 + - NIFI_CLUSTER_IS_NODE=true + - NIFI_CLUSTER_NODE_PROTOCOL_PORT=7080 + - NIFI_ZK_CONNECT_STRING=nifi_zookeeper:52181 + - NIFI_ELECTION_MAX_WAIT=1 min + - NIFI_SENSITIVE_PROPS_KEY=admin@datahub + + nifi03: + image: apache/nifi:1.15.0 + container_name: nifi03 + hostname: nifi03 + depends_on: + - nifi_zookeeper + ports: + - 9082:9082 + volumes: + - ./setup/conf/awscreds.properties:/opt/nifi/nifi-current/conf/awscreds.properties + environment: + - NIFI_WEB_HTTP_PORT=9082 + - NIFI_CLUSTER_IS_NODE=true + - NIFI_CLUSTER_NODE_PROTOCOL_PORT=7080 + - NIFI_ZK_CONNECT_STRING=nifi_zookeeper:52181 + - NIFI_ELECTION_MAX_WAIT=1 min + - NIFI_SENSITIVE_PROPS_KEY=admin@datahub + + sftp_public_host: + image: atmoz/sftp + container_name: sftp_public_host + hostname: sftp_public_host + volumes: + - ./setup/sftp_files:/home/foo + ports: + - "2222:22" + command: "foo:pass:::" \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_cluster.json b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_cluster.json new file mode 100644 index 0000000000..939ba157a6 --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_cluster.json @@ -0,0 +1,353 @@ +[ + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"clustered\": \"True\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=80820b2f-017d-1000-85cf-05f56cde9185&componentIds=\", \"name\": \"Cluster Flow\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),fed5914b-937b-37dd-89c0-b34ffbae9cf4)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Object Key\\\": \\\"tropical_data/${filename}\\\", \\\"Bucket\\\": \\\"${s3.destbucket}\\\", \\\"Content Type\\\": null, \\\"Content Disposition\\\": null, \\\"Cache Control\\\": null, \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": \\\"69fc8d86-2f01-3f07-910f-2b14edc81779\\\", \\\"s3-object-tags-prefix\\\": null, \\\"s3-object-remove-tags-prefix\\\": \\\"false\\\", \\\"Storage Class\\\": \\\"Standard\\\", \\\"Region\\\": \\\"us-east-1\\\", \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Expiration Time Rule\\\": null, \\\"FullControl User List\\\": \\\"${s3.permissions.full.users}\\\", \\\"Read Permission User List\\\": \\\"${s3.permissions.read.users}\\\", \\\"Write Permission User List\\\": \\\"${s3.permissions.write.users}\\\", \\\"Read ACL User List\\\": \\\"${s3.permissions.readacl.users}\\\", \\\"Write ACL User List\\\": \\\"${s3.permissions.writeacl.users}\\\", \\\"Owner\\\": \\\"${s3.owner}\\\", \\\"canned-acl\\\": \\\"${s3.permissions.cannedacl}\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"Multipart Threshold\\\": \\\"5 GB\\\", \\\"Multipart Part Size\\\": \\\"5 GB\\\", \\\"Multipart Upload AgeOff Interval\\\": \\\"60 min\\\", \\\"Multipart Upload Max Age Threshold\\\": \\\"7 days\\\", \\\"s3-temporary-directory-multipart\\\": \\\"${java.io.tmpdir}\\\", \\\"server-side-encryption\\\": \\\"None\\\", \\\"encryption-service\\\": null, \\\"use-chunked-encoding\\\": \\\"true\\\", \\\"use-path-style-access\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null}\", \"last_event_time\": \"None\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=fed5914b-937b-37dd-89c0-b34ffbae9cf4\", \"name\": \"PutS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),fed5914b-937b-37dd-89c0-b34ffbae9cf4)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:s3\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {\"s3_uri\": \"s3://enriched-topical-chat\"}, \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c5f6fc66-ffbb-3f60-9564-f2466ae32493)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Object Key\\\": \\\"${filename}\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Version\\\": null, \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"encryption-service\\\": null, \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"requester-pays\\\": \\\"false\\\", \\\"range-start\\\": null, \\\"range-length\\\": null}\", \"last_event_time\": \"2021-12-08 14:23:21.702000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=c5f6fc66-ffbb-3f60-9564-f2466ae32493\", \"name\": \"FetchS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c5f6fc66-ffbb-3f60-9564-f2466ae32493)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8a218b6e-e6a0-36b6-bc4b-79d202a80167)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8a218b6e-e6a0-36b6-bc4b-79d202a80167)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"PRIMARY\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"record-writer\\\": null, \\\"min-age\\\": \\\"0 sec\\\", \\\"Listing Batch Size\\\": \\\"100\\\", \\\"write-s3-object-tags\\\": \\\"false\\\", \\\"write-s3-user-metadata\\\": \\\"false\\\", \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"delimiter\\\": null, \\\"prefix\\\": null, \\\"use-versions\\\": \\\"false\\\", \\\"list-type\\\": \\\"1\\\", \\\"requester-pays\\\": \\\"false\\\"}\", \"last_event_time\": \"2021-12-08 14:23:07.204000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=8a218b6e-e6a0-36b6-bc4b-79d202a80167\", \"name\": \"ListS3\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8a218b6e-e6a0-36b6-bc4b-79d202a80167)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),71bc17ed-a3bc-339a-a100-ebad434717d4)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=8efa023d-017d-1000-0000-0000479b764f\", \"name\": \"s3_data\", \"description\": \"\", \"type\": {\"string\": \"NIFI_REMOTE_INPUT_PORT\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),71bc17ed-a3bc-339a-a100-ebad434717d4)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)\"], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c5f6fc66-ffbb-3f60-9564-f2466ae32493)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:file\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {\"uri\": \"sftp://sftp_public_host\"}, \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:file\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {\"uri\": \"sftp://sftp_public_host/temperature\"}, \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb5263d-017d-1000-ffff-ffff911b23aa)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"PRIMARY\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"listing-strategy\\\": \\\"timestamps\\\", \\\"Hostname\\\": \\\"${sftp.host}\\\", \\\"Port\\\": \\\"22\\\", \\\"Username\\\": \\\"${sftp.username}\\\", \\\"Password\\\": \\\"********\\\", \\\"Private Key Path\\\": null, \\\"Private Key Passphrase\\\": null, \\\"Remote Path\\\": \\\".\\\", \\\"record-writer\\\": null, \\\"Distributed Cache Service\\\": null, \\\"Search Recursively\\\": \\\"true\\\", \\\"follow-symlink\\\": \\\"false\\\", \\\"File Filter Regex\\\": null, \\\"Path Filter Regex\\\": null, \\\"Ignore Dotted Files\\\": \\\"true\\\", \\\"Strict Host Key Checking\\\": \\\"false\\\", \\\"Host Key File\\\": null, \\\"Connection Timeout\\\": \\\"30 sec\\\", \\\"Data Timeout\\\": \\\"30 sec\\\", \\\"Send Keep Alive On Timeout\\\": \\\"true\\\", \\\"target-system-timestamp-precision\\\": \\\"auto-detect\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Type\\\": \\\"DIRECT\\\", \\\"Proxy Host\\\": null, \\\"Proxy Port\\\": null, \\\"Http Proxy Username\\\": null, \\\"Http Proxy Password\\\": null, \\\"et-state-cache\\\": null, \\\"et-time-window\\\": \\\"3 hours\\\", \\\"et-initial-listing-target\\\": \\\"all\\\", \\\"Minimum File Age\\\": \\\"0 sec\\\", \\\"Maximum File Age\\\": null, \\\"Minimum File Size\\\": \\\"0 B\\\", \\\"Maximum File Size\\\": null, \\\"Ciphers Allowed\\\": null, \\\"Key Algorithms Allowed\\\": null, \\\"Key Exchange Algorithms Allowed\\\": null, \\\"Message Authentication Codes Allowed\\\": null}\", \"last_event_time\": \"2021-12-08 14:23:02.805000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb5263d-017d-1000-ffff-ffff911b23aa\", \"name\": \"ListSFTP\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb5263d-017d-1000-ffff-ffff911b23aa)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c8c73d4c-ebdd-1bee-9b46-629672cd11a0)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Object Key\\\": \\\"sftp_data/${filename}\\\", \\\"Bucket\\\": \\\"${s3.destbucket}\\\", \\\"Content Type\\\": null, \\\"Content Disposition\\\": null, \\\"Cache Control\\\": null, \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": \\\"c8c73d64-ebdd-1bee-0000-000020079e12\\\", \\\"s3-object-tags-prefix\\\": null, \\\"s3-object-remove-tags-prefix\\\": \\\"false\\\", \\\"Storage Class\\\": \\\"Standard\\\", \\\"Region\\\": \\\"us-east-1\\\", \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Expiration Time Rule\\\": null, \\\"FullControl User List\\\": \\\"${s3.permissions.full.users}\\\", \\\"Read Permission User List\\\": \\\"${s3.permissions.read.users}\\\", \\\"Write Permission User List\\\": \\\"${s3.permissions.write.users}\\\", \\\"Read ACL User List\\\": \\\"${s3.permissions.readacl.users}\\\", \\\"Write ACL User List\\\": \\\"${s3.permissions.writeacl.users}\\\", \\\"Owner\\\": \\\"${s3.owner}\\\", \\\"canned-acl\\\": \\\"${s3.permissions.cannedacl}\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"Multipart Threshold\\\": \\\"5 GB\\\", \\\"Multipart Part Size\\\": \\\"5 GB\\\", \\\"Multipart Upload AgeOff Interval\\\": \\\"60 min\\\", \\\"Multipart Upload Max Age Threshold\\\": \\\"7 days\\\", \\\"s3-temporary-directory-multipart\\\": \\\"${java.io.tmpdir}\\\", \\\"server-side-encryption\\\": \\\"None\\\", \\\"encryption-service\\\": null, \\\"use-chunked-encoding\\\": \\\"true\\\", \\\"use-path-style-access\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null}\", \"last_event_time\": \"None\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=c8c73d4c-ebdd-1bee-9b46-629672cd11a0\", \"name\": \"PutS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c8c73d4c-ebdd-1bee-9b46-629672cd11a0)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),3ec2acd6-a0d4-3198-9066-a59fb757bc05)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb55aeb-017d-1000-ffff-fffff475768d)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Hostname\\\": \\\"${sftp.host}\\\", \\\"Port\\\": \\\"22\\\", \\\"Username\\\": \\\"${sftp.username}\\\", \\\"Password\\\": \\\"********\\\", \\\"Private Key Path\\\": null, \\\"Private Key Passphrase\\\": null, \\\"Remote File\\\": \\\"${path}/${filename}\\\", \\\"Completion Strategy\\\": \\\"None\\\", \\\"Move Destination Directory\\\": null, \\\"Create Directory\\\": \\\"false\\\", \\\"Disable Directory Listing\\\": \\\"false\\\", \\\"Connection Timeout\\\": \\\"30 sec\\\", \\\"Data Timeout\\\": \\\"30 sec\\\", \\\"Send Keep Alive On Timeout\\\": \\\"true\\\", \\\"Host Key File\\\": null, \\\"Strict Host Key Checking\\\": \\\"false\\\", \\\"Use Compression\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Type\\\": \\\"DIRECT\\\", \\\"Proxy Host\\\": null, \\\"Proxy Port\\\": null, \\\"Http Proxy Username\\\": null, \\\"Http Proxy Password\\\": null, \\\"fetchfiletransfer-notfound-loglevel\\\": \\\"ERROR\\\", \\\"Ciphers Allowed\\\": null, \\\"Key Algorithms Allowed\\\": null, \\\"Key Exchange Algorithms Allowed\\\": null, \\\"Message Authentication Codes Allowed\\\": null}\", \"last_event_time\": \"2021-12-08 14:23:04.318000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb55aeb-017d-1000-ffff-fffff475768d\", \"name\": \"FetchSFTP\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb55aeb-017d-1000-ffff-fffff475768d)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)\"], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb5263d-017d-1000-ffff-ffff911b23aa)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),3ec2acd6-a0d4-3198-9066-a59fb757bc05)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb70d94-017d-1000-ffff-ffffc94c12ce\", \"name\": \"sftp_files_out\", \"description\": \"\", \"type\": {\"string\": \"NIFI_REMOTE_OUTPUT_PORT\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),3ec2acd6-a0d4-3198-9066-a59fb757bc05)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:nifi\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=8ef96dcf-017d-1000-ffff-ffff8f7528f0\", \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:nifi\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb66675-017d-1000-ffff-ffffa56e2758\", \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + } + ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json new file mode 100644 index 0000000000..ac8e9fdc5e --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json @@ -0,0 +1,119 @@ +[ + { + "auditHeader": null, + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {\"clustered\": \"False\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=803ebb92-017d-1000-2961-4bdaa27a3ba0&componentIds=\", \"name\": \"Standalone Flow\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),aed63edf-e660-3f29-b56b-192cf6286889)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Object Key\\\": \\\"tropical_data/${filename}\\\", \\\"Bucket\\\": \\\"${s3.destbucket}\\\", \\\"Content Type\\\": null, \\\"Content Disposition\\\": null, \\\"Cache Control\\\": null, \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": \\\"80436b00-017d-1000-54c8-ff854b5c8990\\\", \\\"s3-object-tags-prefix\\\": null, \\\"s3-object-remove-tags-prefix\\\": \\\"false\\\", \\\"Storage Class\\\": \\\"Standard\\\", \\\"Region\\\": \\\"us-east-1\\\", \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Expiration Time Rule\\\": null, \\\"FullControl User List\\\": \\\"${s3.permissions.full.users}\\\", \\\"Read Permission User List\\\": \\\"${s3.permissions.read.users}\\\", \\\"Write Permission User List\\\": \\\"${s3.permissions.write.users}\\\", \\\"Read ACL User List\\\": \\\"${s3.permissions.readacl.users}\\\", \\\"Write ACL User List\\\": \\\"${s3.permissions.writeacl.users}\\\", \\\"Owner\\\": \\\"${s3.owner}\\\", \\\"canned-acl\\\": \\\"${s3.permissions.cannedacl}\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"Multipart Threshold\\\": \\\"5 GB\\\", \\\"Multipart Part Size\\\": \\\"5 GB\\\", \\\"Multipart Upload AgeOff Interval\\\": \\\"60 min\\\", \\\"Multipart Upload Max Age Threshold\\\": \\\"7 days\\\", \\\"s3-temporary-directory-multipart\\\": \\\"${java.io.tmpdir}\\\", \\\"server-side-encryption\\\": \\\"None\\\", \\\"encryption-service\\\": null, \\\"use-chunked-encoding\\\": \\\"true\\\", \\\"use-path-style-access\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null}\", \"last_event_time\": \"None\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=80404c81-017d-1000-e8e8-af7420af06c1&componentIds=aed63edf-e660-3f29-b56b-192cf6286889\", \"name\": \"PutS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),aed63edf-e660-3f29-b56b-192cf6286889)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),91d59f03-1c2b-3f3f-48bc-f89296a328bd)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:s3\"}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {\"s3_uri\": \"s3://enriched-topical-chat\"}, \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),91d59f03-1c2b-3f3f-48bc-f89296a328bd)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Object Key\\\": \\\"${filename}\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Version\\\": null, \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"encryption-service\\\": null, \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"requester-pays\\\": \\\"false\\\", \\\"range-start\\\": null, \\\"range-length\\\": null}\", \"last_event_time\": \"2021-12-08 14:01:14.043000+00:00\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=80404c81-017d-1000-e8e8-af7420af06c1&componentIds=91d59f03-1c2b-3f3f-48bc-f89296a328bd\", \"name\": \"FetchS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),91d59f03-1c2b-3f3f-48bc-f89296a328bd)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),cb7693ed-f93b-3340-3776-fe80e6283ddc)\"]}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),cb7693ed-f93b-3340-3776-fe80e6283ddc)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"PRIMARY\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"record-writer\\\": null, \\\"min-age\\\": \\\"0 sec\\\", \\\"Listing Batch Size\\\": \\\"100\\\", \\\"write-s3-object-tags\\\": \\\"false\\\", \\\"write-s3-user-metadata\\\": \\\"false\\\", \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"delimiter\\\": null, \\\"prefix\\\": null, \\\"use-versions\\\": \\\"false\\\", \\\"list-type\\\": \\\"1\\\", \\\"requester-pays\\\": \\\"false\\\"}\", \"last_event_time\": \"2021-12-08 14:00:58.978000+00:00\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=80404c81-017d-1000-e8e8-af7420af06c1&componentIds=cb7693ed-f93b-3340-3776-fe80e6283ddc\", \"name\": \"ListS3\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}", + "contentType": "application/json" + }, + "systemMetadata": null + }, + { + "auditHeader": null, + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),cb7693ed-f93b-3340-3776-fe80e6283ddc)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}", + "contentType": "application/json" + }, + "systemMetadata": null + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/nifi/setup/conf/flow.xml.gz b/metadata-ingestion/tests/integration/nifi/setup/conf/flow.xml.gz new file mode 100644 index 0000000000..fb098eb99b Binary files /dev/null and b/metadata-ingestion/tests/integration/nifi/setup/conf/flow.xml.gz differ diff --git a/metadata-ingestion/tests/integration/nifi/setup/conf_clustered/flow.xml.gz b/metadata-ingestion/tests/integration/nifi/setup/conf_clustered/flow.xml.gz new file mode 100644 index 0000000000..5b684d598f Binary files /dev/null and b/metadata-ingestion/tests/integration/nifi/setup/conf_clustered/flow.xml.gz differ diff --git a/metadata-ingestion/tests/integration/nifi/setup/sftp_files/baz.csv b/metadata-ingestion/tests/integration/nifi/setup/sftp_files/baz.csv new file mode 100644 index 0000000000..6d872c605f --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/setup/sftp_files/baz.csv @@ -0,0 +1,2 @@ +col1, col2, col3 +10,20,30 diff --git a/metadata-ingestion/tests/integration/nifi/setup/sftp_files/temperature/2021_11.csv b/metadata-ingestion/tests/integration/nifi/setup/sftp_files/temperature/2021_11.csv new file mode 100644 index 0000000000..6c4a2bc30b --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/setup/sftp_files/temperature/2021_11.csv @@ -0,0 +1,2 @@ +city, temperature + diff --git a/metadata-ingestion/tests/integration/nifi/setup/sftp_files/temperature/2021_12.csv b/metadata-ingestion/tests/integration/nifi/setup/sftp_files/temperature/2021_12.csv new file mode 100644 index 0000000000..6c4a2bc30b --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/setup/sftp_files/temperature/2021_12.csv @@ -0,0 +1,2 @@ +city, temperature + diff --git a/metadata-ingestion/tests/integration/nifi/test_nifi.py b/metadata-ingestion/tests/integration/nifi/test_nifi.py new file mode 100644 index 0000000000..c3a6c3fb3e --- /dev/null +++ b/metadata-ingestion/tests/integration/nifi/test_nifi.py @@ -0,0 +1,125 @@ +import time + +import pytest +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import fs_helpers, mce_helpers +from tests.test_helpers.docker_helpers import wait_for_port + +FROZEN_TIME = "2021-12-03 12:00:00" + + +@freeze_time(FROZEN_TIME) +@pytest.mark.slow_integration +def test_nifi_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/nifi" + with docker_compose_runner( + test_resources_dir / "docker-compose.yml", "nifi" + ) as docker_services: + wait_for_port( + docker_services, + container_name="nifi1", + container_port=9443, + timeout=300, + ) + wait_for_port( + docker_services, + container_name="nifi01", + container_port=9080, + timeout=60, + ) + wait_for_port( + docker_services, + container_name="nifi02", + container_port=9081, + timeout=60, + ) + wait_for_port( + docker_services, + container_name="nifi03", + container_port=9082, + timeout=60, + ) + + # Wait for nifi to execute all processors + time.sleep(120) + + # Run the metadata ingestion pipeline. + with fs_helpers.isolated_filesystem(tmp_path): + + # Run nifi ingestion run. + pipeline = Pipeline.create( + { + "run_id": "nifi-test-standalone", + "source": { + "type": "nifi", + "config": { + "site_url": "http://localhost:9443/nifi/", + # "auth": "CLIENT_CERT", + # "client_cert_file": f"{test_resources_dir}/setup/ssl_files/client-cert.pem", + # "client_key_file": f"{test_resources_dir}/setup/ssl_files/client-private-key.pem", + # "client_key_password": "datahub", + # "ca_file": f"{test_resources_dir}/setup/ssl_files/server_certfile.pem", + "process_group_pattern": {"deny": ["^WIP"]}, + }, + }, + "sink": { + "type": "file", + "config": {"filename": "./nifi_mces.json"}, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + # Verify the output. ignore values for aspects having last_event_time values + # TODO: ignore paths with respect to aspect value in case of MCPs + mce_helpers.check_golden_file( + pytestconfig, + output_path="nifi_mces.json", + golden_path=test_resources_dir / "nifi_mces_golden_standalone.json", + ignore_paths=[ + r"root\[1\]\['aspect'\]\['value'\]", + r"root\[5\]\['aspect'\]\['value'\]", + r"root\[7\]\['aspect'\]\['value'\]", + ], + ) + + # Run nifi ingestion run. + pipeline = Pipeline.create( + { + "run_id": "nifi-test-cluster", + "source": { + "type": "nifi", + "config": { + "site_url": "http://localhost:9080/nifi/", + "auth": "NO_AUTH", + "site_url_to_site_name": { + "http://nifi01:9080/nifi/": "default", + "http://nifi02:9081/nifi/": "default", + }, + }, + }, + "sink": { + "type": "file", + "config": {"filename": "./nifi_mces_cluster.json"}, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + # Verify the output. + # TODO: ignore paths with respect to aspect value in case of MCPs + mce_helpers.check_golden_file( + pytestconfig, + output_path="nifi_mces_cluster.json", + golden_path=test_resources_dir / "nifi_mces_golden_cluster.json", + ignore_paths=[ + r"root\[5\]\['aspect'\]\['value'\]", + r"root\[7\]\['aspect'\]\['value'\]", + r"root\[15\]\['aspect'\]\['value'\]", + r"root\[19\]\['aspect'\]\['value'\]", + ], + ) diff --git a/metadata-ingestion/tox.ini b/metadata-ingestion/tox.ini index 23c958d0ea..c306e2650c 100644 --- a/metadata-ingestion/tox.ini +++ b/metadata-ingestion/tox.ini @@ -22,7 +22,7 @@ deps = .[dev] commands = pytest --cov={envsitepackagesdir}/datahub --cov={envsitepackagesdir}/datahub_provider \ - py3-quick,py3-airflow1: -m 'not integration' --junit-xml=junit.quick.xml \ + py3-quick,py3-airflow1: -m 'not integration and not slow_integration' --junit-xml=junit.quick.xml \ py3-full: --cov-fail-under 70 --junit-xml=junit.full.xml \ --continue-on-collection-errors \ -vv