feat(ingest): MongoDB ingestion source (#2289)

This commit is contained in:
Harshal Sheth 2021-03-23 20:15:44 -07:00 committed by GitHub
parent 259e6af494
commit a921d0deae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 432 additions and 11 deletions

View File

@ -94,6 +94,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| mysql | `pip install -e '.[mysql]'` | MySQL source |
| postgres | `pip install -e '.[postgres]'` | Postgres source |
| snowflake | `pip install -e '.[snowflake]'` | Snowflake source |
| mongodb | `pip install -e '.[mongodb]'` | MongoDB source |
| ldap | `pip install -e '.[ldap]'` ([extra requirements]) | LDAP source |
| kakfa | `pip install -e '.[kafka]'` | Kafka source |
| druid | `pip install -e '.[druid]'` | Druid Source |
@ -372,6 +373,29 @@ source:
# options is same as above
```
### MongoDB `mongodb`
Extracts:
- List of databases
- List of collections in each database
```yml
source:
type: "mongodb"
config:
# For advanced configurations, see the MongoDB docs.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: "mongodb://localhost"
username: admin
password: password
authMechanism: "DEFAULT"
options: {}
database_pattern: {}
collection_pattern: {}
# database_pattern/collection_pattern are similar to schema_pattern/table_pattern from above
```
### LDAP `ldap`
Extracts:
@ -405,11 +429,12 @@ source:
### DBT `dbt`
Pull metadata from DBT output files:
* [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
* This file contains model, source and lineage data.
* [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
* This file contains schema data.
* DBT does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
- This file contains model, source and lineage data.
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- DBT does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
```yml
source:
@ -417,7 +442,6 @@ source:
config:
manifest_path: "./path/dbt/manifest_file.json"
catalog_path: "./path/dbt/catalog_file.json"
```
## Sinks

View File

@ -10,6 +10,7 @@ cp tmp/test_serde_large0/output.json tests/unit/serde/test_serde_large.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mce_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mce_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mce_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mce_golden.json
# Print success message.
set +x

View File

@ -46,6 +46,8 @@ ignore_missing_imports = yes
ignore_missing_imports = yes
[mypy-pydruid.*]
ignore_missing_imports = yes
[mypy-pymongo.*]
ignore_missing_imports = yes
[isort]
profile = black

View File

@ -75,6 +75,7 @@ plugins: Dict[str, Set[str]] = {
"snowflake": sql_common | {"snowflake-sqlalchemy"},
"ldap": {"python-ldap>=2.4"},
"druid": sql_common | {"pydruid>=0.6.2"},
"mongodb": {"pymongo>=3.11"},
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
@ -100,6 +101,7 @@ dev_requirements = {
"bigquery",
"mysql",
"mssql",
"mongodb",
"ldap",
"datahub-kafka",
"datahub-rest",

View File

@ -0,0 +1,121 @@
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
import pymongo
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import DatasetPropertiesClass
# These are MongoDB-internal databases, which we want to skip.
# See https://docs.mongodb.com/manual/reference/local-database/ and
# https://docs.mongodb.com/manual/reference/config-database/ and
# https://stackoverflow.com/a/48273736/5004662.
DENY_DATABASE_LIST = set(["admin", "config", "local"])
class MongoDBConfig(ConfigModel):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = "mongodb://localhost"
username: Optional[str] = None
password: Optional[str] = None
authMechanism: Optional[str] = None
options: dict = {}
database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
collection_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
@dataclass
class MongoDBSourceReport(SourceReport):
filtered: List[str] = field(default_factory=list)
def report_dropped(self, name: str) -> None:
self.filtered.append(name)
@dataclass
class MongoDBSource(Source):
config: MongoDBConfig
report: MongoDBSourceReport
def __init__(self, ctx: PipelineContext, config: MongoDBConfig):
super().__init__(ctx)
self.config = config
self.report = MongoDBSourceReport()
options = {}
if self.config.username is not None:
options["username"] = self.config.username
if self.config.password is not None:
options["password"] = self.config.password
if self.config.authMechanism is not None:
options["authMechanism"] = self.config.authMechanism
options = {
**options,
**self.config.options,
}
self.mongo_client = pymongo.MongoClient(self.config.connect_uri, **options)
# This cheaply tests the connection. For details, see
# https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient
self.mongo_client.admin.command("ismaster")
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext):
config = MongoDBConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
env = "PROD"
platform = "mongodb"
database_names: List[str] = self.mongo_client.list_database_names()
for database_name in database_names:
if database_name in DENY_DATABASE_LIST:
continue
if not self.config.database_pattern.allowed(database_name):
self.report.report_dropped(database_name)
continue
database = self.mongo_client[database_name]
collection_names: List[str] = database.list_collection_names()
for collection_name in collection_names:
dataset_name = f"{database_name}.{collection_name}"
if not self.config.collection_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
mce = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot()
dataset_snapshot.urn = f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})"
dataset_properties = DatasetPropertiesClass(
tags=[],
customProperties={},
)
dataset_snapshot.aspects.append(dataset_properties)
# TODO: Guess the schema via sampling
# State of the art seems to be https://github.com/variety/variety.
# TODO: use list_indexes() or index_information() to get index information
# See https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.list_indexes.
mce.proposedSnapshot = dataset_snapshot
wu = MetadataWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
def get_report(self) -> MongoDBSourceReport:
return self.report
def close(self):
self.mongo_client.close()

View File

@ -57,6 +57,13 @@ try:
except ImportError as e:
source_registry.register_disabled("snowflake", e)
try:
from .druid import DruidSource
source_registry.register("druid", DruidSource)
except ImportError as e:
source_registry.register_disabled("druid", e)
try:
from .kafka import KafkaSource
@ -78,9 +85,10 @@ try:
except ImportError as e:
source_registry.register_disabled("ldap", e)
try:
from .druid import DruidSource
source_registry.register("druid", DruidSource)
try:
from .mongodb import MongoDBSource
source_registry.register("mongodb", MongoDBSource)
except ImportError as e:
source_registry.register_disabled("druid", e)
source_registry.register_disabled("mongodb", e)

View File

@ -3,7 +3,7 @@ import mce_helpers
from datahub.ingestion.run.pipeline import Pipeline
def test_dbt_ingest(mysql, pytestconfig, tmp_path, mock_time):
def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
pipeline = Pipeline.create(

View File

@ -23,6 +23,19 @@ services:
- ./mysql/setup:/setup
- ./mysql/setup/setup.sql:/docker-entrypoint-initdb.d/setup.sql
testmongodb:
image: mongo
container_name: "testmongodb"
environment:
MONGO_INITDB_ROOT_USERNAME: mongoadmin
MONGO_INITDB_ROOT_PASSWORD: examplepass
MONGO_INITDB_DATABASE: mngdb
ports:
- 57017:27017
volumes:
- ./mongodb/setup/mongo_init.js:/docker-entrypoint-initdb.d/mongo_init.js:ro
- ./mongodb/setup:/setup
openldap:
image: osixia/openldap:latest
command: --copy-service --loglevel debug

View File

@ -39,6 +39,11 @@ def mysql(docker_ip, docker_services):
return wait_for_db(docker_services, "testmysql", 3306)
@pytest.fixture(scope="session")
def mongodb(docker_ip, docker_services):
return wait_for_db(docker_services, "testmongodb", 27017)
@pytest.fixture(scope="session")
def ldap(docker_ip, docker_services):
return wait_for_db(docker_services, "openldap", 3306)

View File

@ -0,0 +1,21 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.mycollection,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"description": null,
"uri": null,
"tags": [],
"customProperties": {}
}
}
]
}
},
"proposedDelta": null
}
]

View File

@ -0,0 +1,7 @@
db.mycollection.createIndex({ myfield: 1 }, { unique: true }),
db.mycollection.createIndex({ thatfield: 1 }),
db.mycollection.insert({ myfield: 'hello1', thatfield: 'testing', noindex: 8}),
db.mycollection.insert({ myfield: 'hello2', thatfield: 'testing', noindex: 2}),
db.mycollection.insert({ myfield: 'hello3', thatfield: 'testing', noindex: 5}),
db.mycollection.insert({ myfield: 'hello5', thatfield: 'testing', noindex: 2})

View File

@ -0,0 +1,182 @@
#!/usr/bin/env bash
# Use this script to test if a given TCP host/port are available
WAITFORIT_cmdname=${0##*/}
echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }
usage()
{
cat << USAGE >&2
Usage:
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
-h HOST | --host=HOST Host or IP under test
-p PORT | --port=PORT TCP port under test
Alternatively, you specify the host and port as host:port
-s | --strict Only execute subcommand if the test succeeds
-q | --quiet Don't output any status messages
-t TIMEOUT | --timeout=TIMEOUT
Timeout in seconds, zero for no timeout
-- COMMAND ARGS Execute command with args after the test finishes
USAGE
exit 1
}
wait_for()
{
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
else
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
fi
WAITFORIT_start_ts=$(date +%s)
while :
do
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
WAITFORIT_result=$?
else
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
WAITFORIT_result=$?
fi
if [[ $WAITFORIT_result -eq 0 ]]; then
WAITFORIT_end_ts=$(date +%s)
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
break
fi
sleep 1
done
return $WAITFORIT_result
}
wait_for_wrapper()
{
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
else
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
fi
WAITFORIT_PID=$!
trap "kill -INT -$WAITFORIT_PID" INT
wait $WAITFORIT_PID
WAITFORIT_RESULT=$?
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
fi
return $WAITFORIT_RESULT
}
# process arguments
while [[ $# -gt 0 ]]
do
case "$1" in
*:* )
WAITFORIT_hostport=(${1//:/ })
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
shift 1
;;
--child)
WAITFORIT_CHILD=1
shift 1
;;
-q | --quiet)
WAITFORIT_QUIET=1
shift 1
;;
-s | --strict)
WAITFORIT_STRICT=1
shift 1
;;
-h)
WAITFORIT_HOST="$2"
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
shift 2
;;
--host=*)
WAITFORIT_HOST="${1#*=}"
shift 1
;;
-p)
WAITFORIT_PORT="$2"
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
shift 2
;;
--port=*)
WAITFORIT_PORT="${1#*=}"
shift 1
;;
-t)
WAITFORIT_TIMEOUT="$2"
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
shift 2
;;
--timeout=*)
WAITFORIT_TIMEOUT="${1#*=}"
shift 1
;;
--)
shift
WAITFORIT_CLI=("$@")
break
;;
--help)
usage
;;
*)
echoerr "Unknown argument: $1"
usage
;;
esac
done
if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
echoerr "Error: you need to provide a host and port to test."
usage
fi
WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}
# Check to see if timeout is from busybox?
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)
WAITFORIT_BUSYTIMEFLAG=""
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
WAITFORIT_ISBUSY=1
# Check if busybox timeout uses -t flag
# (recent Alpine versions don't support -t anymore)
if timeout &>/dev/stdout | grep -q -e '-t '; then
WAITFORIT_BUSYTIMEFLAG="-t"
fi
else
WAITFORIT_ISBUSY=0
fi
if [[ $WAITFORIT_CHILD -gt 0 ]]; then
wait_for
WAITFORIT_RESULT=$?
exit $WAITFORIT_RESULT
else
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
wait_for_wrapper
WAITFORIT_RESULT=$?
else
wait_for
WAITFORIT_RESULT=$?
fi
fi
if [[ $WAITFORIT_CLI != "" ]]; then
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
exit $WAITFORIT_RESULT
fi
exec "${WAITFORIT_CLI[@]}"
else
exit $WAITFORIT_RESULT
fi

View File

@ -0,0 +1,35 @@
import mce_helpers
from datahub.ingestion.run.pipeline import Pipeline
def test_mongodb_ingest(mongodb, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/mongodb"
pipeline = Pipeline.create(
{
"run_id": "mongodb-test",
"source": {
"type": "mongodb",
"config": {
"connect_uri": "mongodb://localhost:57017",
"username": "mongoadmin",
"password": "examplepass",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mongodb_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
output = mce_helpers.load_json_file(str(tmp_path / "mongodb_mces.json"))
golden = mce_helpers.load_json_file(
str(test_resources_dir / "mongodb_mce_golden.json")
)
mce_helpers.assert_mces_equal(output, golden)