Metadata Ingestion
This module hosts an extensible Python-based metadata ingestion system for DataHub. This supports sending data to DataHub using Kafka or through the REST api. It can be used through our CLI tool or as a library e.g. with an orchestrator like Airflow.
Architecture
The architecture of this metadata ingestion framework is heavily inspired by Apache Gobblin (also originally a LinkedIn project!). We have a standardized format - the MetadataChangeEvent - and sources and sinks which respectively produce and consume these objects. The sources pull metadata from a variety of data systems, while the sinks are primarily for moving this metadata into DataHub.
Pre-Requisites
Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. If you are trying this out locally, the easiest way to do that is through quickstart Docker images.
Migrating from the old scripts
If you were previously using the mce_cli.py tool to push metadata into DataHub: the new way for doing this is by creating a recipe with a file source pointing at your JSON file and a DataHub sink to push that metadata into DataHub.
This example recipe demonstrates how to ingest the sample data (previously called bootstrap_mce.dat) into DataHub over the REST API.
Note that we no longer use the .dat format, but instead use JSON. The main differences are that the JSON uses null instead of None and uses objects/dictionaries instead of tuples when representing unions.
If you were previously using one of the sql-etl scripts: the new way for doing this is by using the associated source. See below for configuration details. Note that the source needs to be paired with a sink - likely datahub-kafka or datahub-rest, depending on your needs.
Building from source:
Pre-Requisites
- Python 3.6+ must be installed in your host environment.
 - You also need to build the 
mxe-schemasmodule as below.
This is needed to generate./gradlew :metadata-events:mxe-schemas:buildMetadataChangeEvent.avscwhich is the schema for theMetadataChangeEvent_v4Kafka topic. - On MacOS: 
brew install librdkafka - On Debian/Ubuntu: 
sudo apt install librdkafka-dev python3-dev python3-venv 
Set up your Python environment
python3 -m venv venv
source venv/bin/activate
pip install -e .
./scripts/codegen.sh
Usage
datahub ingest -c examples/recipes/file_to_file.yml
We have also included a couple sample DAGs that can be used with Airflow.
generic_recipe_sample_dag.py- a simple Airflow DAG that picks up a DataHub ingestion recipe configuration and runs it.mysql_sample_dag.py- an Airflow DAG that runs a MySQL metadata ingestion pipeline using an inlined configuration.
Recipes
A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink). Here's a simple example that pulls metadata from MSSQL and puts it into datahub.
# A sample recipe that pulls metadata from MSSQL and puts it into DataHub
# using the Rest API.
source:
  type: mssql
  config:
    username: sa
    password: test!Password
    database: DemoData
sink:
  type: "datahub-rest"
  config:
    server: 'http://localhost:8080'
Running a recipe is quite easy.
datahub ingest -c ./examples/recipes/mssql_to_datahub.yml
A number of recipes are included in the examples/recipes directory.
Sources
Kafka Metadata kafka
Extracts:
- List of topics - from the Kafka broker
 - Schemas associated with each topic - from the schema registry
 
source:
  type: "kafka"
  config:
    connection.bootstrap: "broker:9092"
    connection.schema_registry_url: http://localhost:8081
MySQL Metadata mysql
Extracts:
- List of databases and tables
 - Column types and schema associated with each table
 
Extra requirements: pip install pymysql
source:
  type: mysql
  config:
    username: root
    password: example
    database: dbname
    host_port: localhost:3306
    table_pattern:
      allow:
      - "schema1.table2"
      deny:
      - "performance_schema"
Microsoft SQL Server Metadata mssql
Extracts:
- List of databases, schema, and tables
 - Column types associated with each table
 
Extra requirements: pip install sqlalchemy-pytds
source:
  type: mssql
  config:
    username: user
    password: pass
    database: DemoDatabase
    table_pattern:
      allow:
      - "schema1.table1"
      - "schema1.table2"
      deny:
      - "^.*\\.sys_.*" # deny all tables that start with sys_
Hive hive
Extracts:
- List of databases, schema, and tables
 - Column types associated with each table
 
Extra requirements: pip install pyhive[hive]
source:
  type: hive
  config:
    username: user
    password: pass
    host_port: localhost:10000
    database: DemoDatabase
    # table_pattern is same as above
PostgreSQL postgres
Extracts:
- List of databases, schema, and tables
 - Column types associated with each table
 
Extra requirements: pip install psycopg2-binary or pip install psycopg2
source:
  type: postgres
  config:
    username: user
    password: pass
    host_port: localhost:5432
    database: DemoDatabase
    # table_pattern is same as above
Snowflake snowflake
Extracts:
- List of databases, schema, and tables
 - Column types associated with each table
 
Extra requirements: pip install snowflake-sqlalchemy
source:
  type: snowflake
  config:
    username: user
    password: pass
    host_port: account_name
    # table_pattern is same as above
Google BigQuery bigquery
Extracts:
- List of databases, schema, and tables
 - Column types associated with each table
 
Extra requirements: pip install pybigquery
source:
  type: bigquery
  config:
    project_id: project
    options:
      credential_path: "/path/to/keyfile.json"
    # table_pattern is same as above
File file
Pulls metadata from a previously generated file. Note that the file sink can produce such files, and a number of samples are included in the examples/mce_files directory.
source:
  type: file
  filename: ./path/to/mce/file.json
Sinks
DataHub Rest datahub-rest
Pushes metadata to DataHub using the GMA rest API. The advantage of the rest-based interface is that any errors can immediately be reported.
sink:
  type: "datahub-rest"
  config:
    server: 'http://localhost:8080'
DataHub Kafka datahub-kafka
Pushes metadata to DataHub by publishing messages to Kafka. The advantage of the Kafka-based interface is that it's asynchronous and can handle higher throughput. This requires the Datahub mce-consumer container to be running.
sink:
  type: "datahub-kafka"
  config:
    connection.bootstrap: "localhost:9092"
Console console
Simply prints each metadata event to stdout. Useful for experimentation and debugging purposes.
sink:
  type: "console"
File file
Outputs metadata to a file. This can be used to decouple metadata sourcing from the process of pushing it into DataHub, and is particularly useful for debugging purposes. Note that the file source can read files generated by this sink.
sink:
  type: file
  filename: ./path/to/mce/file.json
Contributing
Contributions welcome!
Code layout
- The CLI interface is defined in entrypoints.py.
 - The high level interfaces are defined in the API directory.
 - The actual sources and sinks implementations have their own directories - the 
__init__.pyfiles in those directories are used to register the short codes for use in recipes. - The metadata models are created using code generation, and eventually live in the 
./src/datahub/metadatadirectory. However, these files are not checked in and instead are generated at build time. See the codegen script for details. 
Testing
# Follow standard install procedure - see above.
# Install requirements.
pip install -r test_requirements.txt
# Run unit tests.
pytest tests/unit
# Run integration tests.
# Note: the integration tests require docker.
pytest tests/integration
Sanity check code before committing
# Requires test_requirements.txt to have been installed.
black --exclude 'datahub/metadata' -S -t py36 src tests
isort src tests
flake8 src tests
mypy -p datahub
pytest
			
		