feat(cli): improve error reporting, make sink config optional (#4718)

This commit is contained in:
Shirshanka Das 2022-04-24 17:12:21 -07:00 committed by GitHub
parent b1b1898752
commit a518e3d13e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 166 additions and 66 deletions

View File

@ -77,14 +77,6 @@ module.exports = {
{
Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"),
},
{
"Custom Integrations": [
"metadata-ingestion/as-a-library",
"metadata-integration/java/as-a-library",
"metadata-ingestion/integration_docs/great-expectations",
"metadata-integration/java/datahub-protobuf/README",
],
},
{
Scheduling: [
"metadata-ingestion/schedule_docs/intro",
@ -94,14 +86,20 @@ module.exports = {
],
},
{
Lineage: [
"docs/lineage/intro",
"docs/lineage/airflow",
"docker/airflow/local_airflow",
"docs/lineage/sample_code",
"Push-Based Integrations": [
{
Airflow: ["docs/lineage/airflow", "docker/airflow/local_airflow"],
},
"metadata-integration/java/spark-lineage/README",
"metadata-ingestion/integration_docs/great-expectations",
"metadata-integration/java/datahub-protobuf/README",
"metadata-ingestion/as-a-library",
"metadata-integration/java/as-a-library",
],
},
{
Lineage: ["docs/lineage/intro", "docs/lineage/sample_code"],
},
{
Guides: [
"metadata-ingestion/adding-source",

View File

@ -1,6 +1,11 @@
# Lineage with Airflow
# Airflow Integration
There's a couple ways to get lineage information from Airflow into DataHub.
DataHub supports integration of
- Airflow Pipeline (DAG) metadata
- DAG and Task run information as well as
- Lineage information when present
There are a few ways to enable these integrations from Airflow into DataHub.
## Using Datahub's Airflow lineage backend (recommended)
@ -11,9 +16,10 @@ The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+.
:::
## Running on Docker locally
:::note
If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). Otherwise proceed to follow the instructions below.
:::
## Setting up Airflow to use DataHub as Lineage Backend
@ -49,7 +55,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
- `cluster` (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with.
- `capture_ownership_info` (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.
- `capture_tags_info` (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.
- `capture_executions` (defaults to false): If true, it captures task runs as DataHub DatprocessInstances. **This feature only works with Datahub GMS version v0.8.33 or greater.**
- `capture_executions` (defaults to false): If true, it captures task runs as DataHub DataProcessInstances. **This feature only works with Datahub GMS version v0.8.33 or greater.**
- `graceful_exceptions` (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
4. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html).
5. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation.
@ -60,4 +66,4 @@ Take a look at this sample DAG:
- [`lineage_emission_dag.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_emission_dag.py) - emits lineage using the DatahubEmitterOperator.
In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.
In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.

View File

@ -1,8 +1,28 @@
# Introduction to Metadata Ingestion
![Python version 3.6+](https://img.shields.io/badge/python-3.6%2B-blue)
## Integration Options
### Metadata Ingestion Source Status
DataHub supports both **push-based** and **pull-based** metadata integration.
Push-based integrations allow you to emit metadata directly from your data systems when metadata changes, while pull-based integrations allow you to "crawl" or "ingest" metadata from the data systems by connecting to them and extracting metadata in a batch or incremental-batch manner. Supporting both mechanisms means that you can integrate with all your systems in the most flexible way possible.
Examples of push-based integrations include [Airflow](../docs/lineage/airflow.md), [Spark](../metadata-integration/java/spark-lineage/README.md), [Great Expectations](./integration_docs/great-expectations.md) and [Protobuf Schemas](../metadata-integration/java/datahub-protobuf/README.md). This allows you to get low-latency metadata integration from the "active" agents in your data ecosystem. Examples of pull-based integrations include BigQuery, Snowflake, Looker, Tableau and many others.
This document describes the pull-based metadata ingestion system that is built into DataHub for easy integration with a wide variety of sources in your data stack.
## Getting Started
### Prerequisites
Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. You can either run ingestion via the [UI](../docs/ui-ingestion.md) or via the [CLI](../docs/cli.md). You can reference the CLI usage guide given there as you go through this page.
## Core Concepts
### Sources
Data systems that we are extracting metadata from are referred to as **Sources**. The `Sources` tab on the left in the sidebar shows you all the sources that are available for you to ingest metadata from. For example, we have sources for [BigQuery](./source_docs/bigquery.md), [Looker](./source_docs/looker.md), [Tableau](./source_docs/tableau.md) and many others.
#### Metadata Ingestion Source Status
We apply a Support Status to each Metadata Source to help you understand the integration reliability at a glance.
@ -12,23 +32,24 @@ We apply a Support Status to each Metadata Source to help you understand the int
![Testing](https://img.shields.io/badge/support%20status-testing-lightgrey): Testing Sources are available for experiementation by DataHub Community members, but may change without notice.
## Getting Started
### Sinks
### Prerequisites
Sinks are destinations for metadata. When configuring ingestion for DataHub, you're likely to be sending the metadata to DataHub over either the [REST (datahub-sink)](./sink_docs/datahub.md#datahub-rest) or the [Kafka (datahub-kafka)](./sink_docs/datahub.md#datahub-kafka) sink. In some cases, the [File](./sink_docs/file.md) sink is also helpful to store a persistent offline copy of the metadata during debugging.
Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. If you are trying this out locally check out the [CLI](../docs/cli.md) to install the CLI and understand the options available in the CLI. You can reference the CLI usage guide given there as you go through this page.
The default sink that most of the ingestion systems and guides assume is the `datahub-rest` sink, but you should be able to adapt all of them for the other sinks as well!
### Core Concepts
### Recipes
## Recipes
A recipe is the main configuration file that puts it all together. It tells our ingestion scripts where to pull data from (source) and where to put it (sink).
A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink).
Here's a simple example that pulls metadata from MSSQL (source) and puts it into datahub rest (sink).
Since `acryl-datahub` version `>=0.8.33.2`, the default sink is assumed to be a DataHub REST endpoint:
- Hosted at "http://localhost:8080" or the environment variable `${DATAHUB_HOST}` if present
- With an empty auth token or the environment variable `${DATAHUB_TOKEN}` if present.
> Note that one recipe file can only have 1 source and 1 sink. If you want multiple sources then you will need multiple recipe files.
Here's a simple recipe that pulls metadata from MSSQL (source) and puts it into the default sink (datahub rest).
```yaml
# A sample recipe that pulls metadata from MSSQL and puts it into DataHub
# The simplest recipe that pulls metadata from MSSQL and puts it into DataHub
# using the Rest API.
source:
type: mssql
@ -37,20 +58,23 @@ source:
password: ${MSSQL_PASSWORD}
database: DemoData
transformers:
- type: "fully-qualified-class-name-of-transformer"
config:
some_property: "some.value"
# sink section omitted as we want to use the default datahub-rest sink
```
Running this recipe is as simple as:
```shell
datahub ingest -c recipe.yaml
```
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
or if you want to override the default endpoints, you can provide the environment variables as part of the command like below:
```shell
DATAHUB_SERVER="https://my-datahub-server:8080" DATAHUB_TOKEN="my-datahub-token" datahub ingest -c recipe.yaml
```
A number of recipes are included in the [examples/recipes](./examples/recipes) directory. For full info and context on each source and sink, see the pages described in the [table of plugins](../docs/cli.md#installing-plugins).
> Note that one recipe file can only have 1 source and 1 sink. If you want multiple sources then you will need multiple recipe files.
### Handling sensitive information in recipes
We automatically expand environment variables in the config (e.g. `${MSSQL_PASSWORD}`),
@ -64,8 +88,8 @@ pip install 'acryl-datahub[datahub-rest]' # install the required plugin
datahub ingest -c ./examples/recipes/mssql_to_datahub.yml
```
The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to ensure that the
ingestion recipe is producing the desired workunits before ingesting them into datahub.
The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to validate that the
ingestion recipe is producing the desired metadata events before ingesting them into datahub.
```shell
# Dry run
@ -100,19 +124,37 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml --suppress-erro
## Transformations
If you'd like to modify data before it reaches the ingestion sinks for instance, adding additional owners or tags you can use a transformer to write your own module and integrate it with DataHub.
If you'd like to modify data before it reaches the ingestion sinks for instance, adding additional owners or tags you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.
Check out the [transformers guide](./transformers.md) for more info!
For example, a pipeline that ingests metadata from MSSQL and applies a default "important" tag to all datasets is described below:
```yaml
# A recipe to ingest metadata from MSSQL and apply default tags to all tables
source:
type: mssql
config:
username: sa
password: ${MSSQL_PASSWORD}
database: DemoData
## Using as a library
transformers: # an array of transformers applied sequentially
- type: simple_add_dataset_tags
config:
tag_urns:
- "urn:li:tag:Important"
# default sink, no config needed
```
Check out the [transformers guide](./transformers.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!
## Using as a library (SDK)
In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. In this case, take a look at the [Python emitter](./as-a-library.md) and the [Java emitter](../metadata-integration/java/as-a-library.md) libraries which can be called from your own code.
### Programmatic Pipeline
In some cases, you might want to configure and run a pipeline entirely from within your custom python script. Here is an example of how to do it.
In some cases, you might want to configure and run a pipeline entirely from within your custom Python script. Here is an example of how to do it.
- [programmatic_pipeline.py](./examples/library/programatic_pipeline.py) - a basic mysql to REST programmatic pipeline.
## Developing
See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./transformers.md).

View File

@ -1,7 +1,6 @@
# Data Quality with Great Expectations
This guide helps to setup and configure `DataHubValidationAction` in Great Expectations to send assertions(expectations) and their results to Datahub using Datahub's Python Rest emitter.
# Great Expectations
This guide helps to setup and configure `DataHubValidationAction` in Great Expectations to send assertions(expectations) and their results to DataHub using DataHub's Python Rest emitter.
## Capabilities
@ -51,4 +50,4 @@ This integration does not support
## Learn more
To see the Great Expectations in action, check out [this demo](https://www.loom.com/share/d781c9f0b270477fb5d6b0c26ef7f22d) from the Feb 2022 townhall.
To see the Great Expectations in action, check out [this demo](https://www.loom.com/share/d781c9f0b270477fb5d6b0c26ef7f22d) from the Feb 2022 townhall.

View File

@ -5,6 +5,7 @@ import sys
import click
import stackprinter
from pydantic import ValidationError
import datahub as datahub_package
from datahub.cli.check_cli import check
@ -18,6 +19,7 @@ from datahub.cli.put_cli import put
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.configuration import SensitiveError
from datahub.configuration.common import ConfigurationError
from datahub.telemetry import telemetry
from datahub.utilities.server_config_util import get_gms_config
@ -149,15 +151,19 @@ def main(**kwargs):
kwargs = {"show_vals": None}
exc = sensitive_cause
logger.error(
stackprinter.format(
exc,
line_wrap=MAX_CONTENT_WIDTH,
truncate_vals=10 * MAX_CONTENT_WIDTH,
suppressed_paths=[r"lib/python.*/site-packages/click/"],
**kwargs,
# suppress stack printing for common configuration errors
if isinstance(exc, (ConfigurationError, ValidationError)):
logger.error(exc)
else:
logger.error(
stackprinter.format(
exc,
line_wrap=MAX_CONTENT_WIDTH,
truncate_vals=10 * MAX_CONTENT_WIDTH,
suppressed_paths=[r"lib/python.*/site-packages/click/"],
**kwargs,
)
)
)
logger.info(
f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}"
)

View File

@ -6,8 +6,9 @@ from math import log10
from typing import Any, Dict, Iterable, List, Optional
import click
from pydantic import validator
from pydantic import root_validator, validator
from datahub.configuration import config_loader
from datahub.configuration.common import (
ConfigModel,
DynamicTypedConfig,
@ -53,8 +54,8 @@ class PipelineConfig(ConfigModel):
cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any
) -> str:
if v == "__DEFAULT_RUN_ID":
if values["source"] is not None:
if values["source"].type is not None:
if "source" in values:
if hasattr(values["source"], "type"):
source_type = values["source"].type
current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
return f"{source_type}-{current_time}"
@ -64,12 +65,30 @@ class PipelineConfig(ConfigModel):
assert v is not None
return v
@root_validator(pre=True)
def default_sink_is_datahub_rest(cls, values: Dict[str, Any]) -> Any:
if "sink" not in values:
default_sink_config = {
"type": "datahub-rest",
"config": {
"server": "${DATAHUB_SERVER:-http://localhost:8080}",
"token": "${DATAHUB_TOKEN:-}",
},
}
# resolve env variables if present
default_sink_config = config_loader.resolve_env_variables(
default_sink_config
)
values["sink"] = default_sink_config
return values
@validator("datahub_api", always=True)
def datahub_api_should_use_rest_sink_as_default(
cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DatahubClientConfig]:
if v is None:
if values["sink"].type is not None:
if "sink" in values and "type" in values["sink"]:
sink_type = values["sink"].type
if sink_type == "datahub-rest":
sink_config = values["sink"].config
@ -123,7 +142,7 @@ class Pipeline:
sink_type = self.config.sink.type
sink_class = sink_registry.get(sink_type)
sink_config = self.config.sink.dict().get("config", {})
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")

View File

@ -5,7 +5,7 @@ from dataclasses import dataclass
from typing import Union, cast
from datahub.cli.cli_utils import set_env_variables_override_config
from datahub.configuration.common import OperationalError
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
@ -52,7 +52,14 @@ class DatahubRestSink(Sink):
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
gms_config = self.emitter.test_connection()
try:
gms_config = self.emitter.test_connection()
except Exception as exc:
raise ConfigurationError(
f"💥 Failed to connect to DataHub@{self.config.server} (token:{'XXX-redacted' if self.config.token else 'empty'}) over REST",
exc,
)
self.report.gms_version = (
gms_config.get("versions", {})
.get("linkedin/datahub", {})

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest
from freezegun import freeze_time
from datahub.configuration.common import DynamicTypedConfig
from datahub.ingestion.api.committable import CommitPolicy, Committable
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
from datahub.ingestion.api.source import Source, SourceReport
@ -42,6 +43,28 @@ class TestPipeline(object):
mock_source.assert_called_once()
mock_sink.assert_called_once()
@freeze_time(FROZEN_TIME)
@patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection")
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True)
def test_configure_without_sink(self, mock_source, mock_test_connection):
mock_test_connection.return_value = {"noCode": True}
pipeline = Pipeline.create(
{
"source": {
"type": "kafka",
"config": {"connection": {"bootstrap": "localhost:9092"}},
},
}
)
# assert that the default sink config is for a DatahubRestSink
assert isinstance(pipeline.config.sink, DynamicTypedConfig)
assert pipeline.config.sink.type == "datahub-rest"
assert pipeline.config.sink.config == {
"server": "http://localhost:8080",
"token": "",
}
@freeze_time(FROZEN_TIME)
def test_run_including_fake_transformation(self):
pipeline = Pipeline.create(

View File

@ -1,6 +1,6 @@
# Protobuf Integration
# Protobuf Schemas
This module is designed to be used with the Java Emitter, the input is a compiled protobuf binary `*.protoc` files and optionally the corresponding `*.proto` source code. In addition, you can supply the root message in cases where a single protobuf source file includes multiple non-nested messages.
The `datahub-protobuf` module is designed to be used with the Java Emitter, the input is a compiled protobuf binary `*.protoc` files and optionally the corresponding `*.proto` source code. In addition, you can supply the root message in cases where a single protobuf source file includes multiple non-nested messages.
## Supported Features

View File

@ -1,4 +1,4 @@
# Spark Integration
# Spark
To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios.
## Configuring Spark agent