feat(ingest): start airflow integration + metadata builders (#2331)

This commit is contained in:
Harshal Sheth 2021-04-05 19:11:28 -07:00 committed by GitHub
parent c1f3eaed35
commit bd78b84bd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 729 additions and 214 deletions

View File

@ -39,6 +39,7 @@ module.exports = {
// Serves as user guides.
"docs/quickstart",
"docs/debugging",
"metadata-ingestion/README",
// TODO "docs/how/data-source-onboarding",
],
Architecture: [
@ -90,7 +91,7 @@ module.exports = {
// "metadata-jobs/README",
"metadata-jobs/mae-consumer-job/README",
"metadata-jobs/mce-consumer-job/README",
"metadata-ingestion/README",
"metadata-ingestion/developing",
],
"Advanced Guides": [
"docs/advanced/aspect-versioning",

View File

@ -1,3 +0,0 @@
0.0.1
-----
* Modernizing python scripts and creating first package

View File

@ -1,16 +1,10 @@
# Metadata Ingestion
# DataHub Metadata Ingestion
![Python version 3.6+](https://img.shields.io/badge/python-3.6%2B-blue)
This module hosts an extensible Python-based metadata ingestion system for DataHub.
This supports sending data to DataHub using Kafka or through the REST api.
It can be used through our CLI tool or as a library e.g. with an orchestrator like Airflow.
### Architecture
![metadata ingestion framework layout](../docs/imgs/datahub-metadata-ingestion-framework.png)
The architecture of this metadata ingestion framework is heavily inspired by [Apache Gobblin](https://gobblin.apache.org/) (also originally a LinkedIn project!). We have a standardized format - the MetadataChangeEvent - and sources and sinks which respectively produce and consume these objects. The sources pull metadata from a variety of data systems, while the sinks are primarily for moving this metadata into DataHub.
This supports sending data to DataHub using Kafka or through the REST API.
It can be used through our CLI tool, with an orchestrator like Airflow, or as a library.
## Getting Started
@ -18,95 +12,48 @@ The architecture of this metadata ingestion framework is heavily inspired by [Ap
Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. If you are trying this out locally, the easiest way to do that is through [quickstart Docker images](../docker).
<!-- You can run this ingestion framework by building from source or by running docker images. -->
### Install from PyPI
### Install from Source
#### Requirements
1. Python 3.6+ must be installed in your host environment.
2. You also need to build the `mxe-schemas` module as below.
```
(cd .. && ./gradlew :metadata-events:mxe-schemas:build)
```
This is needed to generate `MetadataChangeEvent.avsc` which is the schema for the `MetadataChangeEvent_v4` Kafka topic.
3. On MacOS: `brew install librdkafka`
4. On Debian/Ubuntu: `sudo apt install librdkafka-dev python3-dev python3-venv`
5. On Fedora (if using LDAP source integration): `sudo yum install openldap-devel`
#### Set up your Python environment
The folks over at [Acryl](https://www.acryl.io/) maintain a PyPI package for DataHub metadata ingestion.
```sh
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip wheel setuptools
pip install -e .
./scripts/codegen.sh
# Requires Python 3.6+
pip install --upgrade pip==20.2.4 wheel setuptools
pip uninstall datahub acryl-datahub || true # sanity check - ok if it fails
pip install acryl-datahub
datahub version
```
Common issues (click to expand):
<details>
<summary>Wheel issues e.g. "Failed building wheel for avro-python3" or "error: invalid command 'bdist_wheel'"</summary>
This means Python's `wheel` is not installed. Try running the following commands and then retry.
```sh
pip install --upgrade pip wheel setuptools
pip cache purge
```
</details>
<details>
<summary>Failure to install confluent_kafka: "error: command 'x86_64-linux-gnu-gcc' failed with exit status 1"</summary>
This sometimes happens if there's a version mismatch between the Kafka's C library and the Python wrapper library. Try running `pip install confluent_kafka==1.5.0` and then retrying.
</details>
<details>
<summary>Failure to install avro-python3: "distutils.errors.DistutilsOptionError: Version loaded from file: avro/VERSION.txt does not comply with PEP 440"</summary>
The underlying `avro-python3` package is buggy. In particular, it often only installs correctly when installed from a pre-built "wheel" but not when from source. Try running the following commands and then retry.
```sh
pip uninstall avro-python3 # sanity check, ok if this fails
pip install --upgrade pip wheel setuptools
pip cache purge
pip install avro-python3
```
</details>
If you run into an error, try checking the [_common setup issues_](./developing.md#Common-setup-issues).
#### Installing Plugins
We use a plugin architecture so that you can install only the dependencies you actually need.
| Plugin Name | Install Command | Provides |
| ------------- | ------------------------------------------------- | -------------------------- |
| file | _included by default_ | File source and sink |
| console | _included by default_ | Console sink |
| athena | `pip install -e '.[athena]'` | AWS Athena source |
| bigquery | `pip install -e '.[bigquery]'` | BigQuery source |
| glue | `pip install -e '.[glue]'` | AWS Glue source |
| hive | `pip install -e '.[hive]'` | Hive source |
| mssql | `pip install -e '.[mssql]'` | SQL Server source |
| mysql | `pip install -e '.[mysql]'` | MySQL source |
| postgres | `pip install -e '.[postgres]'` | Postgres source |
| snowflake | `pip install -e '.[snowflake]'` | Snowflake source |
| mongodb | `pip install -e '.[mongodb]'` | MongoDB source |
| ldap | `pip install -e '.[ldap]'` ([extra requirements]) | LDAP source |
| kakfa | `pip install -e '.[kafka]'` | Kafka source |
| druid | `pip install -e '.[druid]'` | Druid Source |
| dbt | no additional dependencies | DBT source |
| datahub-rest | `pip install -e '.[datahub-rest]'` | DataHub sink over REST API |
| datahub-kafka | `pip install -e '.[datahub-kafka]'` | DataHub sink over Kafka |
| Plugin Name | Install Command | Provides |
| ------------- | ---------------------------------------------------------- | -------------------------- |
| file | _included by default_ | File source and sink |
| console | _included by default_ | Console sink |
| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source |
| hive | `pip install 'acryl-datahub[hive]'` | Hive source |
| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source |
| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source |
| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source |
| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source |
| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source |
| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source |
| kakfa | `pip install 'acryl-datahub[kafka]'` | Kafka source |
| druid | `pip install 'acryl-datahub[druid]'` | Druid Source |
| dbt | _no additional dependencies_ | DBT source |
| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API |
| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka |
These plugins can be mixed and matched as desired. For example:
```sh
pip install -e '.[bigquery,datahub-rest]
pip install 'acryl-datahub[bigquery,datahub-rest]'
```
You can check the active plugins:
@ -120,7 +67,7 @@ datahub ingest-list-plugins
#### Basic Usage
```sh
pip install -e '.[datahub-rest]' # install the required plugin
pip install 'acryl-datahub[datahub-rest]' # install the required plugin
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml
```
@ -138,6 +85,10 @@ _Limitation: the datahub_docker.sh convenience script assumes that the recipe an
./scripts/datahub_docker.sh ingest -c ./examples/recipes/example_to_datahub_rest.yml
```
### Install from source
If you'd like to install from source, see the [developer guide](./developing.md).
### Usage within Airflow
We have also included a couple [sample DAGs](./examples/airflow) that can be used with [Airflow](https://airflow.apache.org/).
@ -357,10 +308,13 @@ source:
```
### AWS Glue `glue`
Extracts:
- List of tables
- Column types associated with each table
- Table metadata, such as owner, description and parameters
```yml
source:
type: glue
@ -526,47 +480,8 @@ In some cases, you might want to construct the MetadataChangeEvents yourself but
- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`)
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`)
## Migrating from the old scripts
For a basic usage example, see the [lineage_emitter.py](./examples/library/lineage_emitter.py) example.
If you were previously using the `mce_cli.py` tool to push metadata into DataHub: the new way for doing this is by creating a recipe with a file source pointing at your JSON file and a DataHub sink to push that metadata into DataHub.
This [example recipe](./examples/recipes/example_to_datahub_rest.yml) demonstrates how to ingest the [sample data](./examples/mce_files/bootstrap_mce.json) (previously called `bootstrap_mce.dat`) into DataHub over the REST API.
Note that we no longer use the `.dat` format, but instead use JSON. The main differences are that the JSON uses `null` instead of `None` and uses objects/dictionaries instead of tuples when representing unions.
## Developing
If you were previously using one of the `sql-etl` scripts: the new way for doing this is by using the associated source. See [above](#Sources) for configuration details. Note that the source needs to be paired with a sink - likely `datahub-kafka` or `datahub-rest`, depending on your needs.
## Contributing
Contributions welcome!
### Code layout
- The CLI interface is defined in [entrypoints.py](./src/datahub/entrypoints.py).
- The high level interfaces are defined in the [API directory](./src/datahub/ingestion/api).
- The actual [sources](./src/datahub/ingestion/source) and [sinks](./src/datahub/ingestion/sink) have their own directories. The registry files in those directories import the implementations.
- The metadata models are created using code generation, and eventually live in the `./src/datahub/metadata` directory. However, these files are not checked in and instead are generated at build time. See the [codegen](./scripts/codegen.sh) script for details.
### Testing
```sh
# Follow standard install procedure - see above.
# Install, including all dev requirements.
pip install -e '.[dev]'
# Run unit tests.
pytest tests/unit
# Run integration tests. Note that the integration tests require docker.
pytest tests/integration
```
### Sanity check code before committing
```sh
# Assumes: pip install -e '.[dev]'
black src tests
isort src tests
flake8 src tests
mypy -p datahub
pytest
```
See the [developing guide](./developing.md).

View File

@ -0,0 +1,104 @@
# Developing on Metadata Ingestion
If you just want to use metadata ingestion, check the [user-centric](./README.md) guide.
## Architecture
![metadata ingestion framework layout](../docs/imgs/datahub-metadata-ingestion-framework.png)
The architecture of this metadata ingestion framework is heavily inspired by [Apache Gobblin](https://gobblin.apache.org/) (also originally a LinkedIn project!). We have a standardized format - the MetadataChangeEvent - and sources and sinks which respectively produce and consume these objects. The sources pull metadata from a variety of data systems, while the sinks are primarily for moving this metadata into DataHub.
## Getting Started
### Requirements
1. Python 3.6+ must be installed in your host environment.
2. You also need to build the `mxe-schemas` module as below.
```
(cd .. && ./gradlew :metadata-events:mxe-schemas:build)
```
This is needed to generate `MetadataChangeEvent.avsc` which is the schema for the `MetadataChangeEvent_v4` Kafka topic.
3. On MacOS: `brew install librdkafka`
4. On Debian/Ubuntu: `sudo apt install librdkafka-dev python3-dev python3-venv`
5. On Fedora (if using LDAP source integration): `sudo yum install openldap-devel`
### Set up your Python environment
```sh
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip==20.2.4 wheel setuptools
pip uninstall datahub || true ; rm -r src/*.egg-info || true
pip install -e .
./scripts/codegen.sh
```
### Common setup issues
Common issues (click to expand):
<details>
<summary>Wheel issues e.g. "Failed building wheel for avro-python3" or "error: invalid command 'bdist_wheel'"</summary>
This means Python's `wheel` is not installed. Try running the following commands and then retry.
```sh
pip install --upgrade pip==20.2.4 wheel setuptools
pip cache purge
```
</details>
<details>
<summary>Failure to install confluent_kafka: "error: command 'x86_64-linux-gnu-gcc' failed with exit status 1"</summary>
This sometimes happens if there's a version mismatch between the Kafka's C library and the Python wrapper library. Try running `pip install confluent_kafka==1.5.0` and then retrying.
</details>
### Using Plugins in Development
The syntax for installing plugins is slightly different in development. For example:
```diff
- pip install 'acryl-datahub[bigquery,datahub-rest]'
+ pip install -e '.[bigquery,datahub-rest]'
```
## Code layout
- The CLI interface is defined in [entrypoints.py](./src/datahub/entrypoints.py).
- The high level interfaces are defined in the [API directory](./src/datahub/ingestion/api).
- The actual [sources](./src/datahub/ingestion/source) and [sinks](./src/datahub/ingestion/sink) have their own directories. The registry files in those directories import the implementations.
- The metadata models are created using code generation, and eventually live in the `./src/datahub/metadata` directory. However, these files are not checked in and instead are generated at build time. See the [codegen](./scripts/codegen.sh) script for details.
- Tests live in the [`tests`](./tests) directory. They're split between smaller unit tests and larger integration tests.
## Contributing
Contributions welcome!
### Testing
```sh
# Follow standard install from source procedure - see above.
# Install, including all dev requirements.
pip install -e '.[dev]'
# Run unit tests.
pytest tests/unit
# Run integration tests. Note that the integration tests require docker.
pytest tests/integration
```
### Sanity check code before committing
```sh
# Assumes: pip install -e '.[dev]'
black .
isort .
flake8 .
mypy .
pytest
```

View File

@ -7,14 +7,12 @@ DataHub ingestion pipeline within an Airflow DAG.
from datetime import timedelta
import yaml
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datahub.ingestion.run.pipeline import Pipeline
default_args = {
"owner": "airflow",
"depends_on_past": False,

View File

@ -0,0 +1,69 @@
"""Lineage Emission
This example demonstrates how to emit lineage to DataHub within an Airflow DAG.
"""
from datetime import timedelta
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago
import datahub.emitter.mce_builder as builder
from datahub.integrations.airflow.operators import DatahubEmitterOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
with DAG(
"datahub_lineage_emission_example",
default_args=default_args,
description="An example DAG demonstrating lineage emission within an Airflow DAG.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
# This example shows a SnowflakeOperator followed by a lineage emission. However, the
# same DatahubEmitterOperator can be used to emit lineage in any context.
sql = """CREATE OR REPLACE TABLE `mydb.schema.tableC` AS
WITH some_table AS (
SELECT * FROM `mydb.schema.tableA`
),
some_other_table AS (
SELECT id, some_column FROM `mydb.schema.tableB`
)
SELECT * FROM some_table
LEFT JOIN some_other_table ON some_table.unique_id=some_other_table.id"""
transformation_task = SnowflakeOperator(
task_id="snowflake_transformation",
dag=dag,
snowflake_conn_id="snowflake_default",
sql=sql,
)
emit_lineage_task = DatahubEmitterOperator(
task_id="emit_lineage",
datahub_rest_conn_id="datahub_rest_default",
mces=[
builder.make_lineage_mce(
[
builder.make_dataset_urn("snowflake", "mydb.schema.tableA"),
builder.make_dataset_urn("snowflake", "mydb.schema.tableB"),
],
builder.make_dataset_urn("snowflake", "mydb.schema.tableC"),
)
],
)
transformation_task >> emit_lineage_task

View File

@ -8,12 +8,11 @@ embedded within the code.
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datahub.ingestion.run.pipeline import Pipeline
default_args = {
"owner": "airflow",
"depends_on_past": False,

View File

@ -4,27 +4,27 @@ This script reads that JSON file, adds to it using the directives pull from a Go
produces a new JSON file called demo_data.json.
"""
from typing import List
import pathlib
import json
import csv
import dataclasses
import json
import pathlib
import time
from typing import List
from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
DatasetSnapshotClass,
OwnershipClass,
OwnerClass,
OwnershipTypeClass,
CorpUserSnapshotClass,
CorpUserInfoClass,
AuditStampClass,
UpstreamLineageClass,
UpstreamClass,
CorpUserInfoClass,
CorpUserSnapshotClass,
DatasetLineageTypeClass,
GlobalTagsClass,
DatasetSnapshotClass,
EditableSchemaMetadataClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
DEMO_DATA_DIR = pathlib.Path("./examples/demo_data")
@ -145,25 +145,33 @@ def create_lineage_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
)
)
def create_global_tags_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_name_to_urn(directive.table),
aspects=[
GlobalTagsClass(
tags=[]
)
],
aspects=[GlobalTagsClass(tags=[])],
)
)
def create_editable_schema_info_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
def create_editable_schema_info_aspect_mce(
directive: Directive,
) -> MetadataChangeEventClass:
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_name_to_urn(directive.table),
aspects=[
EditableSchemaMetadataClass(
editableSchemaFieldInfo=[]
created=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
lastModified=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
editableSchemaFieldInfo=[],
)
],
)

View File

@ -0,0 +1,17 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
# Construct a lineage object.
lineage_mce = builder.make_lineage_mce(
[
builder.make_dataset_urn("bigquery", "upstream1"),
builder.make_dataset_urn("bigquery", "upstream2"),
],
builder.make_dataset_urn("bigquery", "downstream"),
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
# Emit metadata!
emitter.emit_mce(lineage_mce)

View File

@ -0,0 +1,24 @@
from datahub.ingestion.run.pipeline import Pipeline
# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
{
"source": {
"type": "mysql",
"config": {
"username": "user",
"password": "pass",
"database": "db_name",
"host_port": "localhost:3306",
},
},
"sink": {
"type": "datahub-rest",
"config": {"server": "http://localhost:8080"},
},
}
)
# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

View File

@ -1,21 +1,24 @@
#!/bin/bash
set -euxo pipefail
sudo apt-get update && sudo apt-get install -y \
librdkafka-dev \
python3-ldap \
libldap2-dev \
libsasl2-dev \
ldap-utils
if [ "$(uname)" == "Darwin" ]; then
brew install librdkafka
else
sudo apt-get update && sudo apt-get install -y \
librdkafka-dev \
python3-ldap \
libldap2-dev \
libsasl2-dev \
ldap-utils
fi
python -m pip install --upgrade pip wheel setuptools
python -m pip install --upgrade pip==20.2.4 wheel setuptools
pip install -e ".[dev]"
./scripts/codegen.sh
black --check src tests
isort --check-only src tests
flake8 --count --statistics src tests
mypy -p datahub
pytest
black --check .
isort --check-only .
flake8 --count --statistics .
mypy .
pytest -vv

View File

@ -5,7 +5,7 @@
set -euo pipefail
DOCKER_IMAGE=linkedin/datahub-ingestion:latest
DOCKER_IMAGE=linkedin/datahub-ingestion:${DATAHUB_VERSION:-latest}
docker pull --quiet $DOCKER_IMAGE

View File

@ -0,0 +1,9 @@
#!/bin/bash
set -euxo pipefail
(cd .. && ./gradlew :metadata-events:mxe-schemas:build)
./scripts/ci.sh
rm -rf build dist || true
python -m build
python -m twine upload 'dist/*'

View File

@ -10,18 +10,18 @@ ignore =
exclude =
.git,
src/datahub/metadata,
venv,
__pycache__
per-file-ignores =
# imported but unused
__init__.py: F401
[mypy]
mypy_path = src
plugins =
sqlmypy,
pydantic.mypy
exclude = \.git|venv|build|dist
ignore_missing_imports = yes
namespace_packages = true
strict_optional = yes
check_untyped_defs = yes
# eventually we'd like to enable these

View File

@ -1,14 +1,11 @@
from typing import Dict, Set
import os
from typing import Dict, Set
import setuptools
def get_version():
root = os.path.dirname(__file__)
changelog = os.path.join(root, "CHANGELOG")
with open(changelog) as f:
return f.readline().strip()
package_metadata: dict = {}
with open("./src/datahub/__init__.py") as fp:
exec(fp.read(), package_metadata)
def get_long_description():
@ -16,11 +13,6 @@ def get_long_description():
with open(os.path.join(root, "README.md")) as f:
description = f.read()
description += "\n\nChangelog\n=========\n\n"
with open(os.path.join(root, "CHANGELOG")) as f:
description += f.read()
return description
@ -61,6 +53,11 @@ sql_common = {
# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
# Integrations.
"airflow": {"apache-airflow >= 1.10.3"},
# Source plugins
"kafka": kafka_common,
"athena": sql_common | {"PyAthena[SQLAlchemy]"},
@ -69,7 +66,8 @@ plugins: Dict[str, Set[str]] = {
# This will change to a normal reference to pybigquery once a new version is released to PyPI.
# We need to use this custom version in order to correctly get table descriptions.
# See this PR by hsheth2 for details: https://github.com/tswast/pybigquery/pull/82.
"pybigquery @ git+https://github.com/tswast/pybigquery@3250fa796b28225cb1c89d7afea3c2e2a2bf2305#egg=pybigquery"
# "pybigquery @ git+https://github.com/tswast/pybigquery@3250fa796b28225cb1c89d7afea3c2e2a2bf2305#egg=pybigquery"
"pybigquery"
},
"hive": sql_common | {"pyhive[hive]"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
@ -80,9 +78,6 @@ plugins: Dict[str, Set[str]] = {
"druid": sql_common | {"pydruid>=0.6.2"},
"mongodb": {"pymongo>=3.11"},
"glue": {"boto3"},
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
}
dev_requirements = {
@ -99,7 +94,8 @@ dev_requirements = {
"sqlalchemy-stubs",
"deepdiff",
"freezegun",
"botocore",
"build",
"twine",
# Also add the plugins which are used for tests.
*list(
dependency
@ -109,19 +105,27 @@ dev_requirements = {
"mssql",
"mongodb",
"ldap",
"glue",
"datahub-kafka",
"datahub-rest",
"glue",
"airflow",
]
for dependency in plugins[plugin]
),
"apache-airflow-providers-snowflake", # Used in the example DAGs.
}
setuptools.setup(
name="datahub",
version=get_version(),
url="https://github.com/linkedin/datahub",
# Package metadata.
name=package_metadata["__package_name__"],
version=package_metadata["__version__"],
url="https://datahubproject.io/",
project_urls={
"Documentation": "https://datahubproject.io/docs/",
"Source": "https://github.com/linkedin/datahub",
"Changelog": "https://github.com/linkedin/datahub/releases",
},
author="DataHub Committers",
license="Apache License 2.0",
description="A CLI to work with DataHub metadata",
@ -147,6 +151,8 @@ setuptools.setup(
"Environment :: MacOS X",
"Topic :: Software Development",
],
# Package info.
zip_safe=False,
python_requires=">=3.6",
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="./src"),
@ -179,7 +185,11 @@ setuptools.setup(
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
],
"apache_airflow_provider": [
"provider_info=datahub.integrations.airflow.get_provider_info:get_provider_info"
],
},
# Dependencies.
install_requires=list(base_requirements | framework_common),
extras_require={
"base": list(framework_common),

View File

@ -0,0 +1,2 @@
__package_name__ = "acryl-datahub"
__version__ = "0.1.0"

View File

@ -0,0 +1,54 @@
"""Convenience functions for creating MCEs"""
import time
from typing import List, Union
from datahub.metadata import (
AuditStampClass,
DatasetLineageTypeClass,
DatasetSnapshotClass,
MetadataChangeEventClass,
UpstreamClass,
UpstreamLineageClass,
)
def make_dataset_urn(platform: str, name: str, env: str = "PROD"):
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})"
def make_user_urn(username: str):
return f"urn:li:corpuser:{username}"
def make_lineage_mce(
upstream_urns: Union[str, List[str]],
downstream_urn: str,
actor: str = make_user_urn("datahub"),
lineage_type: str = DatasetLineageTypeClass.TRANSFORMED,
) -> MetadataChangeEventClass:
sys_time = int(time.time() * 1000)
if not isinstance(upstream_urns, list):
upstream_urns = [upstream_urns]
mce = MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=downstream_urn,
aspects=[
UpstreamLineageClass(
upstreams=[
UpstreamClass(
auditStamp=AuditStampClass(
time=sys_time,
actor=actor,
),
dataset=upstream_urn,
type=lineage_type,
)
for upstream_urn in upstream_urns
]
)
],
)
)
return mce

View File

@ -6,6 +6,7 @@ import sys
import click
from pydantic import ValidationError
import datahub as datahub_package
from datahub.check.check_cli import check
from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline
@ -40,6 +41,13 @@ def datahub(debug: bool) -> None:
# breakpoint()
@datahub.command()
def version() -> None:
"""Print version number and exit"""
click.echo(f"DataHub CLI version: {datahub_package.__version__}")
click.echo(f"Python version: {sys.version}")
@datahub.command()
@click.option(
"-c",

View File

@ -232,6 +232,9 @@ class GlueSource(Source):
def get_report(self):
return self.report
def close(self):
pass
def get_column_type(
glue_source: GlueSource, field_type: str, table_name: str, field_name: str

View File

@ -0,0 +1,14 @@
import datahub
def get_provider_info():
return {
"name": "DataHub",
"description": "`DataHub <https://datahubproject.io/>`__\n",
"hook-class-names": [
"datahub.integrations.airflow.hooks.DatahubRestHook",
"datahub.integrations.airflow.hooks.DatahubKafkaHook",
],
"package-name": datahub.__package_name__,
"versions": [datahub.__version__],
}

View File

@ -0,0 +1,108 @@
from typing import Any, Dict, List
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class DatahubRestHook(BaseHook):
conn_name_attr = "datahub_rest_conn_id"
default_conn_name = "datahub_rest_default"
conn_type = "datahub_rest"
hook_name = "DataHub REST Server"
def __init__(self, datahub_rest_conn_id: str = default_conn_name) -> None:
super().__init__()
self.datahub_rest_conn_id = datahub_rest_conn_id
@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
return {}
@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behavior"""
return {
"hidden_fields": ["port", "schema", "login"],
"relabeling": {
"host": "Server Endpoint",
},
}
def _gms_endpoint(self) -> str:
conn = self.get_connection(self.datahub_rest_conn_id)
host = conn.host
if host is None:
raise AirflowException("host parameter is required")
return host
def make_emitter(self) -> DatahubRestEmitter:
return DatahubRestEmitter(self._gms_endpoint())
def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
emitter = self.make_emitter()
for mce in mces:
emitter.emit_mce(mce)
class DatahubKafkaHook(BaseHook):
conn_name_attr = "datahub_kafka_conn_id"
default_conn_name = "datahub_kafka_default"
conn_type = "datahub_kafka"
hook_name = "DataHub Kafka Sink"
def __init__(self, datahub_kafka_conn_id: str = default_conn_name) -> None:
super().__init__()
self.datahub_kafka_conn_id = datahub_kafka_conn_id
@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
return {}
@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behavior"""
return {
"hidden_fields": ["port", "schema", "login", "password"],
"relabeling": {
"host": "Kafka Broker",
},
}
def _get_config(self) -> KafkaSinkConfig:
conn = self.get_connection(self.datahub_kafka_conn_id)
obj = conn.extra_dejson
obj.setdefault("connection", {})
if conn.host is not None:
if "bootstrap" in obj["connection"]:
raise AirflowException(
"Kafka broker specified twice (present in host and extra)"
)
obj["connection"]["bootstrap"] = conn.host
config = KafkaSinkConfig.parse_obj(obj)
return config
def make_emitter(self) -> DatahubKafkaEmitter:
sink_config = self._get_config()
return DatahubKafkaEmitter(sink_config)
def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
emitter = self.make_emitter()
errors = []
def callback(exc, msg):
if exc:
errors.append(exc)
for mce in mces:
emitter.emit_mce_async(mce, callback)
emitter.flush()
if errors:
raise AirflowException(f"failed to push some MCEs: {errors}")

View File

@ -0,0 +1,55 @@
from typing import List, Optional, Union
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datahub.integrations.airflow.hooks import DatahubKafkaHook, DatahubRestHook
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class DatahubBaseOperator(BaseOperator):
ui_color = "#4398c8"
hook: Union[DatahubRestHook, DatahubKafkaHook]
@apply_defaults
def __init__(
self,
*,
datahub_rest_conn_id: Optional[str] = None,
datahub_kafka_conn_id: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
if datahub_rest_conn_id and datahub_kafka_conn_id:
raise AirflowException(
"two hook conn id given when only exactly one required"
)
elif datahub_rest_conn_id:
self.hook = DatahubRestHook(datahub_rest_conn_id)
elif datahub_kafka_conn_id:
self.hook = DatahubKafkaHook(datahub_kafka_conn_id)
else:
raise AirflowException("no hook conn id provided")
class DatahubEmitterOperator(DatahubBaseOperator):
@apply_defaults
def __init__(
self,
mces: List[MetadataChangeEvent],
datahub_rest_conn_id: Optional[str] = None,
datahub_kafka_conn_id: Optional[str] = None,
**kwargs,
):
super().__init__(
datahub_rest_conn_id=datahub_rest_conn_id,
datahub_kafka_conn_id=datahub_kafka_conn_id,
**kwargs,
)
self.mces = mces
def execute(self, context):
self.hook.emit_mces(self.mces)

View File

@ -0,0 +1,103 @@
import json
from contextlib import contextmanager
from typing import Iterator
from unittest import mock
from airflow.models import Connection, DagBag
import datahub.emitter.mce_builder as builder
from datahub.integrations.airflow.get_provider_info import get_provider_info
from datahub.integrations.airflow.hooks import DatahubKafkaHook, DatahubRestHook
from datahub.integrations.airflow.operators import DatahubEmitterOperator
lineage_mce = builder.make_lineage_mce(
[
builder.make_dataset_urn("bigquery", "upstream1"),
builder.make_dataset_urn("bigquery", "upstream2"),
],
builder.make_dataset_urn("bigquery", "downstream1"),
)
datahub_rest_connection_config = Connection(
conn_id="datahub_rest_test",
conn_type="datahub_rest",
host="http://test_host:8080/",
extra=None,
)
datahub_kafka_connection_config = Connection(
conn_id="datahub_kafka_test",
conn_type="datahub_kafka",
host="test_broker:9092",
extra=json.dumps(
{
"connection": {
"producer_config": {},
"schema_registry_url": "http://localhost:8081",
}
}
),
)
def test_airflow_provider_info():
assert get_provider_info()
def test_dags_load_with_no_errors(pytestconfig):
airflow_examples_folder = pytestconfig.rootpath / "examples/airflow"
dag_bag = DagBag(dag_folder=str(airflow_examples_folder), include_examples=False)
assert dag_bag.import_errors == {}
assert len(dag_bag.dag_ids) > 0
@contextmanager
def patch_airflow_connection(conn: Connection) -> Iterator[Connection]:
# The return type should really by ContextManager, but mypy doesn't like that.
# See https://stackoverflow.com/questions/49733699/python-type-hints-and-context-managers#comment106444758_58349659.
with mock.patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
yield conn
@mock.patch("datahub.integrations.airflow.hooks.DatahubRestEmitter", autospec=True)
def test_datahub_rest_hook(mock_emitter):
with patch_airflow_connection(datahub_rest_connection_config) as config:
hook = DatahubRestHook(config.conn_id)
hook.emit_mces([lineage_mce])
mock_emitter.assert_called_once_with(config.host)
instance = mock_emitter.return_value
instance.emit_mce.assert_called_with(lineage_mce)
@mock.patch("datahub.integrations.airflow.hooks.DatahubKafkaEmitter", autospec=True)
def test_datahub_kafka_hook(mock_emitter):
with patch_airflow_connection(datahub_kafka_connection_config) as config:
hook = DatahubKafkaHook(config.conn_id)
hook.emit_mces([lineage_mce])
mock_emitter.assert_called_once()
instance = mock_emitter.return_value
instance.emit_mce_async.assert_called()
instance.flush.assert_called_once()
@mock.patch("datahub.integrations.airflow.operators.DatahubRestHook", autospec=True)
def test_datahub_lineage_operator(mock_hook):
task = DatahubEmitterOperator(
task_id="emit_lineage",
datahub_rest_conn_id=datahub_rest_connection_config.conn_id,
mces=[
builder.make_lineage_mce(
[
builder.make_dataset_urn("snowflake", "mydb.schema.tableA"),
builder.make_dataset_urn("snowflake", "mydb.schema.tableB"),
],
builder.make_dataset_urn("snowflake", "mydb.schema.tableC"),
)
],
)
task.execute(None)
mock_hook.assert_called()
mock_hook.return_value.emit_mces.assert_called_once()

View File

@ -9,6 +9,12 @@ def test_cli_help():
assert result.output
def test_cli_version():
runner = CliRunner()
result = runner.invoke(datahub, ["--debug", "version"])
assert result.output
def test_check_local_docker():
# This just verifies that it runs without error.
# We don't actually know what environment this will be run in, so

View File

@ -4,6 +4,7 @@ from datetime import datetime
from botocore.stub import Stubber
from freezegun import freeze_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.glue import GlueSource, GlueSourceConfig, get_column_type
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status
@ -31,7 +32,10 @@ FROZEN_TIME = "2020-04-14 07:00:00"
class GlueSourceTest(unittest.TestCase):
glue_source = GlueSource(ctx=None, config=GlueSourceConfig(aws_region="us-east-1"))
glue_source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(aws_region="us-east-1"),
)
def test_get_column_type_contains_key(self):
@ -120,7 +124,7 @@ class GlueSourceTest(unittest.TestCase):
with Stubber(self.glue_source.glue_client) as stubber:
stubber.add_response("search_tables", response, {})
actual_work_unit = next(self.glue_source.get_workunits())
actual_work_unit = list(self.glue_source.get_workunits())[0]
expected_metadata_work_unit = create_metadata_work_unit(timestamp)
@ -168,6 +172,7 @@ def create_metadata_work_unit(timestamp):
type=SchemaFieldDataType(type=NumberTypeClass()),
description="Maximum attendees permitted",
nullable=True,
recursive=False,
)
]

View File

@ -7,8 +7,8 @@ from datahub.ingestion.sink.datahub_kafka import DatahubKafkaSink, _KafkaCallbac
class KafkaSinkTest(unittest.TestCase):
@patch("datahub.ingestion.sink.datahub_kafka.PipelineContext")
@patch("datahub.emitter.kafka_emitter.SerializingProducer")
@patch("datahub.ingestion.sink.datahub_kafka.PipelineContext", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
def test_kafka_sink_config(self, mock_producer, mock_context):
kafka_sink = DatahubKafkaSink.create(
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
@ -22,9 +22,9 @@ class KafkaSinkTest(unittest.TestCase):
assert constructor_args[1] == record_envelope
assert constructor_args[2] == write_callback
@patch("datahub.ingestion.sink.datahub_kafka.PipelineContext")
@patch("datahub.emitter.kafka_emitter.SerializingProducer")
@patch("datahub.ingestion.sink.datahub_kafka._KafkaCallback")
@patch("datahub.ingestion.sink.datahub_kafka.PipelineContext", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
@patch("datahub.ingestion.sink.datahub_kafka._KafkaCallback", autospec=True)
def test_kafka_sink_write(self, mock_k_callback, mock_producer, mock_context):
mock_producer_instance = mock_producer.return_value
mock_k_callback_instance = mock_k_callback.return_value
@ -46,16 +46,16 @@ class KafkaSinkTest(unittest.TestCase):
# TODO: Test that kafka producer is configured correctly
@patch("datahub.ingestion.sink.datahub_kafka.PipelineContext")
@patch("datahub.emitter.kafka_emitter.SerializingProducer")
@patch("datahub.ingestion.sink.datahub_kafka.PipelineContext", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
def test_kafka_sink_close(self, mock_producer, mock_context):
mock_producer_instance = mock_producer.return_value
kafka_sink = DatahubKafkaSink.create({}, mock_context)
kafka_sink.close()
mock_producer_instance.flush.assert_called_once()
@patch("datahub.ingestion.sink.datahub_kafka.RecordEnvelope")
@patch("datahub.ingestion.sink.datahub_kafka.WriteCallback")
@patch("datahub.ingestion.sink.datahub_kafka.RecordEnvelope", autospec=True)
@patch("datahub.ingestion.sink.datahub_kafka.WriteCallback", autospec=True)
def test_kafka_callback_class(self, mock_w_callback, mock_re):
callback = _KafkaCallback(
SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback
@ -63,7 +63,9 @@ class KafkaSinkTest(unittest.TestCase):
mock_error = MagicMock()
mock_message = MagicMock()
callback.kafka_callback(mock_error, mock_message)
assert mock_w_callback.on_failure.call_count == 1
mock_w_callback.on_failure.called_with(mock_re, None, {"error", mock_error})
mock_w_callback.on_failure.assert_called_once()
assert mock_w_callback.on_failure.call_args[0][0] == mock_re
assert mock_w_callback.on_failure.call_args[0][1] == mock_error
callback.kafka_callback(None, mock_message)
mock_w_callback.on_success.called_once_with(mock_re, {"msg", mock_message})
mock_w_callback.on_success.assert_called_once()
assert mock_w_callback.on_success.call_args[0][0] == mock_re

View File

@ -7,7 +7,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class KafkaSourceTest(unittest.TestCase):
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer")
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_kafka_source_configuration(self, mock_kafka):
ctx = PipelineContext(run_id="test")
kafka_source = KafkaSource.create(
@ -16,7 +16,7 @@ class KafkaSourceTest(unittest.TestCase):
kafka_source.close()
assert mock_kafka.call_count == 1
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer")
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_kafka_source_workunits_wildcard_topic(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
mock_cluster_metadata = MagicMock()
@ -37,7 +37,7 @@ class KafkaSourceTest(unittest.TestCase):
mock_kafka_instance.list_topics.assert_called_once()
assert len(workunits) == 2
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer")
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_kafka_source_workunits_topic_pattern(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
mock_cluster_metadata = MagicMock()
@ -70,7 +70,7 @@ class KafkaSourceTest(unittest.TestCase):
workunits = [w for w in kafka_source.get_workunits()]
assert len(workunits) == 2
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer")
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_close(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
ctx = PipelineContext(run_id="test")

View File

@ -5,8 +5,8 @@ from datahub.ingestion.run.pipeline import Pipeline
class PipelineTest(unittest.TestCase):
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits")
@patch("datahub.ingestion.sink.console.ConsoleSink.close")
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True)
@patch("datahub.ingestion.sink.console.ConsoleSink.close", autospec=True)
def test_configure(self, mock_sink, mock_source):
pipeline = Pipeline.create(
{

View File

@ -28,6 +28,7 @@ def test_list_all():
runner = CliRunner()
result = runner.invoke(datahub, ["ingest-list-plugins"])
assert result.exit_code == 0
assert len(result.output.splitlines()) > 3
def test_registry():