feat(ingestion): Adds --dry-run and --preview options to datahub ingest command. (#3584)

This commit is contained in:
Ravindra Lanka 2021-11-17 09:24:58 -08:00 committed by GitHub
parent 806b28e697
commit 3a9ef61147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 9 deletions

View File

@ -96,6 +96,26 @@ pip install 'acryl-datahub[datahub-rest]' # install the required plugin
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml
```
The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to ensure that the
ingestion recipe is producing the desired workunits before ingesting them into datahub.
```shell
# Dry run
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml --dry-run
# Short-form
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n
```
The `--preview` option of the `ingest` command performs all of the ingestion steps, but limits the processing to only the first 10 workunits produced by the source.
This option helps with quick end-to-end smoke testing of the ingestion recipe.
```shell
# Preview
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml --preview
# Preview with dry-run
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n --preview
```
### Install using Docker
[![Docker Hub](https://img.shields.io/docker/pulls/linkedin/datahub-ingestion?style=plastic)](https://hub.docker.com/r/linkedin/datahub-ingestion)

View File

@ -40,7 +40,22 @@ def ingest() -> None:
help="Config file in .toml or .yaml format.",
required=True,
)
def run(config: str) -> None:
@click.option(
"-n",
"--dry-run",
type=bool,
is_flag=True,
default=False,
help="Perform a dry run of the ingestion, essentially skipping writing to sink.",
)
@click.option(
"--preview",
type=bool,
is_flag=True,
default=False,
help="Perform limited ingestion from the source to the sink to get a quick preview.",
)
def run(config: str, dry_run: bool, preview: bool) -> None:
"""Ingest metadata into DataHub."""
logger.debug("DataHub CLI version: %s", datahub_package.nice_version_name())
@ -49,7 +64,7 @@ def run(config: str) -> None:
try:
logger.debug(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(pipeline_config)
pipeline = Pipeline.create(pipeline_config, dry_run, preview)
except ValidationError as e:
click.echo(e, err=True)
sys.exit(1)

View File

@ -1,4 +1,5 @@
import datetime
import itertools
import logging
import uuid
from typing import Any, Dict, Iterable, List, Optional
@ -93,8 +94,12 @@ class Pipeline:
sink: Sink
transformers: List[Transformer]
def __init__(self, config: PipelineConfig):
def __init__(
self, config: PipelineConfig, dry_run: bool = False, preview_mode: bool = False
):
self.config = config
self.dry_run = dry_run
self.preview_mode = preview_mode
self.ctx = PipelineContext(
run_id=self.config.run_id, datahub_api=self.config.datahub_api
)
@ -131,24 +136,31 @@ class Pipeline:
)
@classmethod
def create(cls, config_dict: dict) -> "Pipeline":
def create(
cls, config_dict: dict, dry_run: bool = False, preview_mode: bool = False
) -> "Pipeline":
config = PipelineConfig.parse_obj(config_dict)
return cls(config)
return cls(config, dry_run=dry_run, preview_mode=preview_mode)
def run(self) -> None:
callback = LoggingCallback()
extractor: Extractor = self.extractor_class()
for wu in self.source.get_workunits():
for wu in itertools.islice(
self.source.get_workunits(), 10 if self.preview_mode else None
):
# TODO: change extractor interface
extractor.configure({}, self.ctx)
self.sink.handle_work_unit_start(wu)
if not self.dry_run:
self.sink.handle_work_unit_start(wu)
record_envelopes = extractor.get_records(wu)
for record_envelope in self.transform(record_envelopes):
self.sink.write_record_async(record_envelope, callback)
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)
extractor.close()
self.sink.handle_work_unit_end(wu)
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
self.source.close()
self.sink.close()