feat(ingest): add nifi source (#3681)

This commit is contained in:
mayurinehate 2021-12-09 04:26:31 +05:30 committed by GitHub
parent 3bac7f7c43
commit 1d7ec8dba8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1896 additions and 6 deletions

View File

@ -0,0 +1,49 @@
name: metadata ingestion slow integration tests
on:
push:
branches:
- master
paths-ignore:
- "docs/**"
- "**.md"
pull_request:
branches:
- master
paths:
- "**/nifi/**"
- "**/nifi.py"
release:
types: [published, edited]
jobs:
metadata-ingestion-slow-integration:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9"]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Run metadata-ingestion slow integration tests
run: ./gradlew :metadata-ingestion:testSlowIntegration
- uses: actions/upload-artifact@v2
if: always()
with:
name: Test Results (metadata ingestion slow integration tests)
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
event-file:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v2
with:
name: Event File
path: ${{ github.event_path }}

View File

@ -66,6 +66,7 @@ Sources:
| [superset](./source_docs/superset.md) | `pip install 'acryl-datahub[superset]'` | Superset source |
| [trino](./source_docs/trino.md) | `pip install 'acryl-datahub[trino]` | Trino source |
| [starburst-trino-usage](./source_docs/trino.md) | `pip install 'acryl-datahub[starburst-trino-usage]'` | Starburst Trino usage statistics source |
| [nifi](./source_docs/nifi.md) | `pip install 'acryl-datahub[nifi]' | Nifi source |
Sinks

View File

@ -78,7 +78,7 @@ task testQuick(type: Exec, dependsOn: installDev) {
inputs.files(project.fileTree(dir: "tests/"))
outputs.dir("${venv_name}")
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && pytest -m 'not integration' -vv --continue-on-collection-errors --junit-xml=junit.quick.xml"
"source ${venv_name}/bin/activate && pytest -m 'not integration and not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.quick.xml"
}
task installDevTest(type: Exec, dependsOn: [installDev]) {
@ -105,7 +105,12 @@ task testSingle(dependsOn: [installDevTest]) {
task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) {
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && pytest -vv --continue-on-collection-errors --junit-xml=junit.full.xml"
"source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml"
}
task testSlowIntegration(type: Exec, dependsOn: [testQuick, installDevTest]) {
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && pytest -m 'slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml"
}
task cleanPythonCache(type: Exec) {

View File

@ -97,10 +97,13 @@ pip install -e '.[dev]'
pip install -e '.[integration-tests]'
# Run unit tests.
pytest -m 'not integration'
pytest -m 'not integration and not slow_integration'
# Run Docker-based integration tests.
pytest -m 'integration'
# Run Docker-based slow integration tests.
pytest -m 'slow_integration'
```
### Sanity check code before committing

View File

@ -1,9 +1,9 @@
---
run_id: test_cluster
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/file for complete documentation
source:
type: "file"
config:
filename: "./examples/mce_files/bootstrap_mce.json"
filename: "./tests/integration/nifi/nifi_mces_golden_cluster.json"
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:

View File

@ -54,6 +54,7 @@ disallow_untyped_defs = yes
addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers
markers =
integration: marks tests to only run in integration (deselect with '-m "not integration"')
slow_integration: marks tests that are too slow to even run in integration (deselect with '-m "not slow_integration")
testpaths =
tests/unit
tests/integration

View File

@ -144,6 +144,8 @@ plugins: Dict[str, Set[str]] = {
# PR is from same author as that of sqlalchemy-trino library below.
"sqlalchemy-trino"
},
"nifi": {"requests"},
}
all_exclude_plugins: Set[str] = {
@ -296,6 +298,8 @@ entry_points = {
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"trino = datahub.ingestion.source.sql.trino:TrinoSource",
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
"nifi = datahub.ingestion.source.nifi:NifiSource",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",

View File

@ -0,0 +1,74 @@
# Nifi
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
To install this plugin, run `pip install 'acryl-datahub[nifi]'`.
## Capabilities
This plugin extracts the following:
- Nifi flow as `DataFlow` entity
- Ingress, egress processors, remote input and output ports as `DataJob` entity
- Input and output ports receiving remote connections as `Dataset` entity
- Lineage information between external datasets and ingress/egress processors by analyzing provenance events
Current limitations:
- Limited ingress/egress processors are supported
- S3: `ListS3`, `FetchS3Object`, `PutS3Object`
- SFTP: `ListSFTP`, `FetchSFTP`, `GetSFTP`, `PutSFTP`
## Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
```yml
source:
type: "nifi"
config:
# Coordinates
site_url: "https://localhost:8443/nifi/"
# Credentials
auth: SINGLE_USER
username: admin
password: password
sink:
# sink configs
```
## Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| -------------------------- | -------- | -------------------------- | ------------------------------------------------------- |
| `site_url` | ✅ | `"https://localhost:8443/nifi/"` | URI to connect to. |
| `site_name` | | `"default"` | Site name to identify this site with, useful when using input and output ports receiving remote connections |
| `auth` | | `"NO_AUTH"` | Nifi authentication. must be one of : NO_AUTH, SINGLE_USER, CLIENT_CERT |
| `username` | | | Nifi username, must be set for `auth` = `"SINGLE_USER"` |
| `password` | | | Nifi password, must be set for `auth` = `"SINGLE_USER"` |
| `client_cert_file` | | | Path to PEM file containing the public certificates for the user/client identity, must be set for `auth` = `"CLIENT_CERT"` |
| `client_key_file` | | | Path to PEM file containing the clients secret key |
| `client_key_password` | | | The password to decrypt the client_key_file |
| `ca_file` | | | Path to PEM file containing certs for the root CA(s) for the NiFi |
| `provenance_days` | | | time window to analyze provenance events for external datasets |
| `site_url_to_site_name` | | | Lookup to find site_name for site_url, required if using remote process groups in nifi flow |
|`process_group_pattern.allow`| | | List of regex patterns for process groups to include in ingestion. |
| `process_group_pattern.deny`| | | List of regex patterns for process groups to exclude from ingestion. |
| `process_group_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
## Compatibility
Coming soon!
## Questions
If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)!

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,126 @@
services:
nifi1:
image: apache/nifi:1.15.0
container_name: nifi1
hostname: nifi1
environment:
#AUTH: tls
NIFI_REMOTE_INPUT_HOST: nifi1
NIFI_SENSITIVE_PROPS_KEY: admin@datahub
#SINGLE_USER_CREDENTIALS_USERNAME: admin
#SINGLE_USER_CREDENTIALS_PASSWORD: admin@datahub
#KEYSTORE_PATH: /opt/certs/server_keystore.jks
#KEYSTORE_TYPE: JKS
#KEYSTORE_PASSWORD: datahub
#TRUSTSTORE_PATH: /opt/certs/server_truststore.jks
#TRUSTSTORE_PASSWORD: datahub
#TRUSTSTORE_TYPE: JKS
#INITIAL_ADMIN_IDENTITY: 'CN=DatahubUser, C=US'
NIFI_WEB_HTTP_PORT: 9443
volumes:
- ./setup/conf:/opt/nifi/tmp:ro
#- ./setup/ssl_files:/opt/certs
entrypoint:
- bash
- -c
- |
echo "Copying Flow"
#
cp /opt/nifi/tmp/flow.xml.gz /opt/nifi/nifi-current/conf/flow.xml.gz
#
echo "Starting Nifi"
#
/opt/nifi/scripts/start.sh &
#
sleep infinity
ports:
- 9443:9443
nifi_zookeeper:
hostname: nifi_zookeeper
container_name: nifi_zookeeper
image: 'bitnami/zookeeper:latest'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_PORT_NUMBER=52181
ports:
- 52181:52181
nifi01:
image: apache/nifi:1.15.0
container_name: nifi01
hostname: nifi01
depends_on:
- nifi_zookeeper
ports:
- 9080:9080
volumes:
- ./setup/conf_clustered:/opt/nifi/tmp:ro
entrypoint:
- bash
- -c
- |
echo "Copying Flow"
#
cp /opt/nifi/tmp/flow.xml.gz /opt/nifi/nifi-current/conf/flow.xml.gz
#
echo "Starting Nifi"
#
/opt/nifi/scripts/start.sh &
#
sleep infinity
environment:
- NIFI_WEB_HTTP_PORT=9080
- NIFI_CLUSTER_IS_NODE=true
- NIFI_CLUSTER_NODE_PROTOCOL_PORT=7080
- NIFI_ZK_CONNECT_STRING=nifi_zookeeper:52181
- NIFI_ELECTION_MAX_WAIT=1 min
- NIFI_SENSITIVE_PROPS_KEY=admin@datahub
nifi02:
image: apache/nifi:1.15.0
container_name: nifi02
hostname: nifi02
depends_on:
- nifi_zookeeper
ports:
- 9081:9081
volumes:
- ./setup/conf/awscreds.properties:/opt/nifi/nifi-current/conf/awscreds.properties
environment:
- NIFI_WEB_HTTP_PORT=9081
- NIFI_CLUSTER_IS_NODE=true
- NIFI_CLUSTER_NODE_PROTOCOL_PORT=7080
- NIFI_ZK_CONNECT_STRING=nifi_zookeeper:52181
- NIFI_ELECTION_MAX_WAIT=1 min
- NIFI_SENSITIVE_PROPS_KEY=admin@datahub
nifi03:
image: apache/nifi:1.15.0
container_name: nifi03
hostname: nifi03
depends_on:
- nifi_zookeeper
ports:
- 9082:9082
volumes:
- ./setup/conf/awscreds.properties:/opt/nifi/nifi-current/conf/awscreds.properties
environment:
- NIFI_WEB_HTTP_PORT=9082
- NIFI_CLUSTER_IS_NODE=true
- NIFI_CLUSTER_NODE_PROTOCOL_PORT=7080
- NIFI_ZK_CONNECT_STRING=nifi_zookeeper:52181
- NIFI_ELECTION_MAX_WAIT=1 min
- NIFI_SENSITIVE_PROPS_KEY=admin@datahub
sftp_public_host:
image: atmoz/sftp
container_name: sftp_public_host
hostname: sftp_public_host
volumes:
- ./setup/sftp_files:/home/foo
ports:
- "2222:22"
command: "foo:pass:::"

View File

@ -0,0 +1,353 @@
[
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"clustered\": \"True\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=80820b2f-017d-1000-85cf-05f56cde9185&componentIds=\", \"name\": \"Cluster Flow\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),fed5914b-937b-37dd-89c0-b34ffbae9cf4)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Object Key\\\": \\\"tropical_data/${filename}\\\", \\\"Bucket\\\": \\\"${s3.destbucket}\\\", \\\"Content Type\\\": null, \\\"Content Disposition\\\": null, \\\"Cache Control\\\": null, \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": \\\"69fc8d86-2f01-3f07-910f-2b14edc81779\\\", \\\"s3-object-tags-prefix\\\": null, \\\"s3-object-remove-tags-prefix\\\": \\\"false\\\", \\\"Storage Class\\\": \\\"Standard\\\", \\\"Region\\\": \\\"us-east-1\\\", \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Expiration Time Rule\\\": null, \\\"FullControl User List\\\": \\\"${s3.permissions.full.users}\\\", \\\"Read Permission User List\\\": \\\"${s3.permissions.read.users}\\\", \\\"Write Permission User List\\\": \\\"${s3.permissions.write.users}\\\", \\\"Read ACL User List\\\": \\\"${s3.permissions.readacl.users}\\\", \\\"Write ACL User List\\\": \\\"${s3.permissions.writeacl.users}\\\", \\\"Owner\\\": \\\"${s3.owner}\\\", \\\"canned-acl\\\": \\\"${s3.permissions.cannedacl}\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"Multipart Threshold\\\": \\\"5 GB\\\", \\\"Multipart Part Size\\\": \\\"5 GB\\\", \\\"Multipart Upload AgeOff Interval\\\": \\\"60 min\\\", \\\"Multipart Upload Max Age Threshold\\\": \\\"7 days\\\", \\\"s3-temporary-directory-multipart\\\": \\\"${java.io.tmpdir}\\\", \\\"server-side-encryption\\\": \\\"None\\\", \\\"encryption-service\\\": null, \\\"use-chunked-encoding\\\": \\\"true\\\", \\\"use-path-style-access\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null}\", \"last_event_time\": \"None\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=fed5914b-937b-37dd-89c0-b34ffbae9cf4\", \"name\": \"PutS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),fed5914b-937b-37dd-89c0-b34ffbae9cf4)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:s3\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {\"s3_uri\": \"s3://enriched-topical-chat\"}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c5f6fc66-ffbb-3f60-9564-f2466ae32493)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Object Key\\\": \\\"${filename}\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Version\\\": null, \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"encryption-service\\\": null, \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"requester-pays\\\": \\\"false\\\", \\\"range-start\\\": null, \\\"range-length\\\": null}\", \"last_event_time\": \"2021-12-08 14:23:21.702000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=c5f6fc66-ffbb-3f60-9564-f2466ae32493\", \"name\": \"FetchS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c5f6fc66-ffbb-3f60-9564-f2466ae32493)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8a218b6e-e6a0-36b6-bc4b-79d202a80167)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8a218b6e-e6a0-36b6-bc4b-79d202a80167)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"PRIMARY\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"record-writer\\\": null, \\\"min-age\\\": \\\"0 sec\\\", \\\"Listing Batch Size\\\": \\\"100\\\", \\\"write-s3-object-tags\\\": \\\"false\\\", \\\"write-s3-user-metadata\\\": \\\"false\\\", \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"delimiter\\\": null, \\\"prefix\\\": null, \\\"use-versions\\\": \\\"false\\\", \\\"list-type\\\": \\\"1\\\", \\\"requester-pays\\\": \\\"false\\\"}\", \"last_event_time\": \"2021-12-08 14:23:07.204000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=8a218b6e-e6a0-36b6-bc4b-79d202a80167\", \"name\": \"ListS3\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8a218b6e-e6a0-36b6-bc4b-79d202a80167)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),71bc17ed-a3bc-339a-a100-ebad434717d4)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=8efa023d-017d-1000-0000-0000479b764f\", \"name\": \"s3_data\", \"description\": \"\", \"type\": {\"string\": \"NIFI_REMOTE_INPUT_PORT\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),71bc17ed-a3bc-339a-a100-ebad434717d4)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)\"], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c5f6fc66-ffbb-3f60-9564-f2466ae32493)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:file\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {\"uri\": \"sftp://sftp_public_host\"}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:file\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {\"uri\": \"sftp://sftp_public_host/temperature\"}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb5263d-017d-1000-ffff-ffff911b23aa)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"PRIMARY\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"listing-strategy\\\": \\\"timestamps\\\", \\\"Hostname\\\": \\\"${sftp.host}\\\", \\\"Port\\\": \\\"22\\\", \\\"Username\\\": \\\"${sftp.username}\\\", \\\"Password\\\": \\\"********\\\", \\\"Private Key Path\\\": null, \\\"Private Key Passphrase\\\": null, \\\"Remote Path\\\": \\\".\\\", \\\"record-writer\\\": null, \\\"Distributed Cache Service\\\": null, \\\"Search Recursively\\\": \\\"true\\\", \\\"follow-symlink\\\": \\\"false\\\", \\\"File Filter Regex\\\": null, \\\"Path Filter Regex\\\": null, \\\"Ignore Dotted Files\\\": \\\"true\\\", \\\"Strict Host Key Checking\\\": \\\"false\\\", \\\"Host Key File\\\": null, \\\"Connection Timeout\\\": \\\"30 sec\\\", \\\"Data Timeout\\\": \\\"30 sec\\\", \\\"Send Keep Alive On Timeout\\\": \\\"true\\\", \\\"target-system-timestamp-precision\\\": \\\"auto-detect\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Type\\\": \\\"DIRECT\\\", \\\"Proxy Host\\\": null, \\\"Proxy Port\\\": null, \\\"Http Proxy Username\\\": null, \\\"Http Proxy Password\\\": null, \\\"et-state-cache\\\": null, \\\"et-time-window\\\": \\\"3 hours\\\", \\\"et-initial-listing-target\\\": \\\"all\\\", \\\"Minimum File Age\\\": \\\"0 sec\\\", \\\"Maximum File Age\\\": null, \\\"Minimum File Size\\\": \\\"0 B\\\", \\\"Maximum File Size\\\": null, \\\"Ciphers Allowed\\\": null, \\\"Key Algorithms Allowed\\\": null, \\\"Key Exchange Algorithms Allowed\\\": null, \\\"Message Authentication Codes Allowed\\\": null}\", \"last_event_time\": \"2021-12-08 14:23:02.805000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb5263d-017d-1000-ffff-ffff911b23aa\", \"name\": \"ListSFTP\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb5263d-017d-1000-ffff-ffff911b23aa)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c8c73d4c-ebdd-1bee-9b46-629672cd11a0)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Object Key\\\": \\\"sftp_data/${filename}\\\", \\\"Bucket\\\": \\\"${s3.destbucket}\\\", \\\"Content Type\\\": null, \\\"Content Disposition\\\": null, \\\"Cache Control\\\": null, \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": \\\"c8c73d64-ebdd-1bee-0000-000020079e12\\\", \\\"s3-object-tags-prefix\\\": null, \\\"s3-object-remove-tags-prefix\\\": \\\"false\\\", \\\"Storage Class\\\": \\\"Standard\\\", \\\"Region\\\": \\\"us-east-1\\\", \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Expiration Time Rule\\\": null, \\\"FullControl User List\\\": \\\"${s3.permissions.full.users}\\\", \\\"Read Permission User List\\\": \\\"${s3.permissions.read.users}\\\", \\\"Write Permission User List\\\": \\\"${s3.permissions.write.users}\\\", \\\"Read ACL User List\\\": \\\"${s3.permissions.readacl.users}\\\", \\\"Write ACL User List\\\": \\\"${s3.permissions.writeacl.users}\\\", \\\"Owner\\\": \\\"${s3.owner}\\\", \\\"canned-acl\\\": \\\"${s3.permissions.cannedacl}\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"Multipart Threshold\\\": \\\"5 GB\\\", \\\"Multipart Part Size\\\": \\\"5 GB\\\", \\\"Multipart Upload AgeOff Interval\\\": \\\"60 min\\\", \\\"Multipart Upload Max Age Threshold\\\": \\\"7 days\\\", \\\"s3-temporary-directory-multipart\\\": \\\"${java.io.tmpdir}\\\", \\\"server-side-encryption\\\": \\\"None\\\", \\\"encryption-service\\\": null, \\\"use-chunked-encoding\\\": \\\"true\\\", \\\"use-path-style-access\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null}\", \"last_event_time\": \"None\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=c8c73d4c-ebdd-1bee-9b46-629672cd11a0\", \"name\": \"PutS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),c8c73d4c-ebdd-1bee-9b46-629672cd11a0)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),3ec2acd6-a0d4-3198-9066-a59fb757bc05)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb55aeb-017d-1000-ffff-fffff475768d)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Hostname\\\": \\\"${sftp.host}\\\", \\\"Port\\\": \\\"22\\\", \\\"Username\\\": \\\"${sftp.username}\\\", \\\"Password\\\": \\\"********\\\", \\\"Private Key Path\\\": null, \\\"Private Key Passphrase\\\": null, \\\"Remote File\\\": \\\"${path}/${filename}\\\", \\\"Completion Strategy\\\": \\\"None\\\", \\\"Move Destination Directory\\\": null, \\\"Create Directory\\\": \\\"false\\\", \\\"Disable Directory Listing\\\": \\\"false\\\", \\\"Connection Timeout\\\": \\\"30 sec\\\", \\\"Data Timeout\\\": \\\"30 sec\\\", \\\"Send Keep Alive On Timeout\\\": \\\"true\\\", \\\"Host Key File\\\": null, \\\"Strict Host Key Checking\\\": \\\"false\\\", \\\"Use Compression\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Type\\\": \\\"DIRECT\\\", \\\"Proxy Host\\\": null, \\\"Proxy Port\\\": null, \\\"Http Proxy Username\\\": null, \\\"Http Proxy Password\\\": null, \\\"fetchfiletransfer-notfound-loglevel\\\": \\\"ERROR\\\", \\\"Ciphers Allowed\\\": null, \\\"Key Algorithms Allowed\\\": null, \\\"Key Exchange Algorithms Allowed\\\": null, \\\"Message Authentication Codes Allowed\\\": null}\", \"last_event_time\": \"2021-12-08 14:23:04.318000+00:00\"}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb55aeb-017d-1000-ffff-fffff475768d\", \"name\": \"FetchSFTP\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb55aeb-017d-1000-ffff-fffff475768d)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:file,sftp_public_host.temperature,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)\"], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),8eb5263d-017d-1000-ffff-ffff911b23aa)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),3ec2acd6-a0d4-3198-9066-a59fb757bc05)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb70d94-017d-1000-ffff-ffffc94c12ce\", \"name\": \"sftp_files_out\", \"description\": \"\", \"type\": {\"string\": \"NIFI_REMOTE_OUTPUT_PORT\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,80820b2f-017d-1000-85cf-05f56cde9185,prod),3ec2acd6-a0d4-3198-9066-a59fb757bc05)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:nifi\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.s3_data,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=1c61a8d9-3462-387f-8145-09e6e7785e5c&componentIds=8ef96dcf-017d-1000-ffff-ffff8f7528f0\", \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:nifi\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:nifi,default.sftp_files_out,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"http://localhost:9080/nifi/?processGroupId=8eb4f034-017d-1000-ffff-ffffccebd06c&componentIds=8eb66675-017d-1000-ffff-ffffa56e2758\", \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": null
}
]

View File

@ -0,0 +1,119 @@
[
{
"auditHeader": null,
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"clustered\": \"False\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=803ebb92-017d-1000-2961-4bdaa27a3ba0&componentIds=\", \"name\": \"Standalone Flow\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),aed63edf-e660-3f29-b56b-192cf6286889)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Object Key\\\": \\\"tropical_data/${filename}\\\", \\\"Bucket\\\": \\\"${s3.destbucket}\\\", \\\"Content Type\\\": null, \\\"Content Disposition\\\": null, \\\"Cache Control\\\": null, \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": \\\"80436b00-017d-1000-54c8-ff854b5c8990\\\", \\\"s3-object-tags-prefix\\\": null, \\\"s3-object-remove-tags-prefix\\\": \\\"false\\\", \\\"Storage Class\\\": \\\"Standard\\\", \\\"Region\\\": \\\"us-east-1\\\", \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Expiration Time Rule\\\": null, \\\"FullControl User List\\\": \\\"${s3.permissions.full.users}\\\", \\\"Read Permission User List\\\": \\\"${s3.permissions.read.users}\\\", \\\"Write Permission User List\\\": \\\"${s3.permissions.write.users}\\\", \\\"Read ACL User List\\\": \\\"${s3.permissions.readacl.users}\\\", \\\"Write ACL User List\\\": \\\"${s3.permissions.writeacl.users}\\\", \\\"Owner\\\": \\\"${s3.owner}\\\", \\\"canned-acl\\\": \\\"${s3.permissions.cannedacl}\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"Multipart Threshold\\\": \\\"5 GB\\\", \\\"Multipart Part Size\\\": \\\"5 GB\\\", \\\"Multipart Upload AgeOff Interval\\\": \\\"60 min\\\", \\\"Multipart Upload Max Age Threshold\\\": \\\"7 days\\\", \\\"s3-temporary-directory-multipart\\\": \\\"${java.io.tmpdir}\\\", \\\"server-side-encryption\\\": \\\"None\\\", \\\"encryption-service\\\": null, \\\"use-chunked-encoding\\\": \\\"true\\\", \\\"use-path-style-access\\\": \\\"false\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null}\", \"last_event_time\": \"None\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=80404c81-017d-1000-e8e8-af7420af06c1&componentIds=aed63edf-e660-3f29-b56b-192cf6286889\", \"name\": \"PutS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),aed63edf-e660-3f29-b56b-192cf6286889)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),91d59f03-1c2b-3f3f-48bc-f89296a328bd)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:s3\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {\"s3_uri\": \"s3://enriched-topical-chat\"}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"ALL\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Object Key\\\": \\\"${filename}\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"Version\\\": null, \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"encryption-service\\\": null, \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"requester-pays\\\": \\\"false\\\", \\\"range-start\\\": null, \\\"range-length\\\": null}\", \"last_event_time\": \"2021-12-08 14:01:14.043000+00:00\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=80404c81-017d-1000-e8e8-af7420af06c1&componentIds=91d59f03-1c2b-3f3f-48bc-f89296a328bd\", \"name\": \"FetchS3Object\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": [\"urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),cb7693ed-f93b-3340-3776-fe80e6283ddc)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {\"schedulingPeriod\": \"0 sec\", \"schedulingStrategy\": \"TIMER_DRIVEN\", \"executionNode\": \"PRIMARY\", \"concurrentlySchedulableTaskCount\": \"1\", \"properties\": \"{\\\"Bucket\\\": \\\"enriched-topical-chat\\\", \\\"Region\\\": \\\"us-west-2\\\", \\\"Access Key\\\": null, \\\"Secret Key\\\": null, \\\"record-writer\\\": null, \\\"min-age\\\": \\\"0 sec\\\", \\\"Listing Batch Size\\\": \\\"100\\\", \\\"write-s3-object-tags\\\": \\\"false\\\", \\\"write-s3-user-metadata\\\": \\\"false\\\", \\\"Credentials File\\\": null, \\\"AWS Credentials Provider service\\\": null, \\\"Communications Timeout\\\": \\\"30 secs\\\", \\\"SSL Context Service\\\": null, \\\"Endpoint Override URL\\\": null, \\\"Signer Override\\\": \\\"Default Signature\\\", \\\"proxy-configuration-service\\\": null, \\\"Proxy Host\\\": null, \\\"Proxy Host Port\\\": null, \\\"proxy-user-name\\\": null, \\\"proxy-user-password\\\": null, \\\"delimiter\\\": null, \\\"prefix\\\": null, \\\"use-versions\\\": \\\"false\\\", \\\"list-type\\\": \\\"1\\\", \\\"requester-pays\\\": \\\"false\\\"}\", \"last_event_time\": \"2021-12-08 14:00:58.978000+00:00\"}, \"externalUrl\": \"http://localhost:9443/nifi/?processGroupId=80404c81-017d-1000-e8e8-af7420af06c1&componentIds=cb7693ed-f93b-3340-3776-fe80e6283ddc\", \"name\": \"ListS3\", \"description\": \"\", \"type\": {\"string\": \"NIFI_PROCESSOR\"}}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,prod),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)\"], \"outputDatasets\": [], \"inputDatajobs\": []}",
"contentType": "application/json"
},
"systemMetadata": null
}
]

View File

@ -0,0 +1,2 @@
col1, col2, col3
10,20,30
1 col1 col2 col3
2 10 20 30

View File

@ -0,0 +1,2 @@
city, temperature
1 city temperature

View File

@ -0,0 +1,2 @@
city, temperature
1 city temperature

View File

@ -0,0 +1,125 @@
import time
import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import fs_helpers, mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2021-12-03 12:00:00"
@freeze_time(FROZEN_TIME)
@pytest.mark.slow_integration
def test_nifi_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/nifi"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "nifi"
) as docker_services:
wait_for_port(
docker_services,
container_name="nifi1",
container_port=9443,
timeout=300,
)
wait_for_port(
docker_services,
container_name="nifi01",
container_port=9080,
timeout=60,
)
wait_for_port(
docker_services,
container_name="nifi02",
container_port=9081,
timeout=60,
)
wait_for_port(
docker_services,
container_name="nifi03",
container_port=9082,
timeout=60,
)
# Wait for nifi to execute all processors
time.sleep(120)
# Run the metadata ingestion pipeline.
with fs_helpers.isolated_filesystem(tmp_path):
# Run nifi ingestion run.
pipeline = Pipeline.create(
{
"run_id": "nifi-test-standalone",
"source": {
"type": "nifi",
"config": {
"site_url": "http://localhost:9443/nifi/",
# "auth": "CLIENT_CERT",
# "client_cert_file": f"{test_resources_dir}/setup/ssl_files/client-cert.pem",
# "client_key_file": f"{test_resources_dir}/setup/ssl_files/client-private-key.pem",
# "client_key_password": "datahub",
# "ca_file": f"{test_resources_dir}/setup/ssl_files/server_certfile.pem",
"process_group_pattern": {"deny": ["^WIP"]},
},
},
"sink": {
"type": "file",
"config": {"filename": "./nifi_mces.json"},
},
}
)
pipeline.run()
pipeline.raise_from_status()
# Verify the output. ignore values for aspects having last_event_time values
# TODO: ignore paths with respect to aspect value in case of MCPs
mce_helpers.check_golden_file(
pytestconfig,
output_path="nifi_mces.json",
golden_path=test_resources_dir / "nifi_mces_golden_standalone.json",
ignore_paths=[
r"root\[1\]\['aspect'\]\['value'\]",
r"root\[5\]\['aspect'\]\['value'\]",
r"root\[7\]\['aspect'\]\['value'\]",
],
)
# Run nifi ingestion run.
pipeline = Pipeline.create(
{
"run_id": "nifi-test-cluster",
"source": {
"type": "nifi",
"config": {
"site_url": "http://localhost:9080/nifi/",
"auth": "NO_AUTH",
"site_url_to_site_name": {
"http://nifi01:9080/nifi/": "default",
"http://nifi02:9081/nifi/": "default",
},
},
},
"sink": {
"type": "file",
"config": {"filename": "./nifi_mces_cluster.json"},
},
}
)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
# TODO: ignore paths with respect to aspect value in case of MCPs
mce_helpers.check_golden_file(
pytestconfig,
output_path="nifi_mces_cluster.json",
golden_path=test_resources_dir / "nifi_mces_golden_cluster.json",
ignore_paths=[
r"root\[5\]\['aspect'\]\['value'\]",
r"root\[7\]\['aspect'\]\['value'\]",
r"root\[15\]\['aspect'\]\['value'\]",
r"root\[19\]\['aspect'\]\['value'\]",
],
)

View File

@ -22,7 +22,7 @@ deps =
.[dev]
commands =
pytest --cov={envsitepackagesdir}/datahub --cov={envsitepackagesdir}/datahub_provider \
py3-quick,py3-airflow1: -m 'not integration' --junit-xml=junit.quick.xml \
py3-quick,py3-airflow1: -m 'not integration and not slow_integration' --junit-xml=junit.quick.xml \
py3-full: --cov-fail-under 70 --junit-xml=junit.full.xml \
--continue-on-collection-errors \
-vv