feat(ingest): option for number of workunits in preview (#4517)

* feat(ingest): option for number of workunits in preview + documentation update
This commit is contained in:
Aseem Bansal 2022-03-29 19:40:22 +05:30 committed by GitHub
parent 4358d8fb01
commit f36cf69360
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 6 deletions

View File

@ -75,6 +75,14 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml --preview
# Preview with dry-run # Preview with dry-run
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n --preview datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n --preview
``` ```
By default `--preview` creates 10 workunits. But if you wish to try producing more workunits you can use another option `--preview-workunits`
```shell
# Preview 20 workunits without sending anything to sink
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n --preview --preview-workunits=20
```
## Transformations ## Transformations
If you'd like to modify data before it reaches the ingestion sinks for instance, adding additional owners or tags you can use a transformer to write your own module and integrate it with DataHub. If you'd like to modify data before it reaches the ingestion sinks for instance, adding additional owners or tags you can use a transformer to write your own module and integrate it with DataHub.

View File

@ -59,6 +59,12 @@ def ingest() -> None:
default=False, default=False,
help="Perform limited ingestion from the source to the sink to get a quick preview.", help="Perform limited ingestion from the source to the sink to get a quick preview.",
) )
@click.option(
"--preview-workunits",
type=int,
default=10,
help="The number of workunits to produce for preview.",
)
@click.option( @click.option(
"--strict-warnings/--no-strict-warnings", "--strict-warnings/--no-strict-warnings",
default=False, default=False,
@ -68,7 +74,12 @@ def ingest() -> None:
@telemetry.with_telemetry @telemetry.with_telemetry
@memory_leak_detector.with_leak_detection @memory_leak_detector.with_leak_detection
def run( def run(
ctx: click.Context, config: str, dry_run: bool, preview: bool, strict_warnings: bool ctx: click.Context,
config: str,
dry_run: bool,
preview: bool,
strict_warnings: bool,
preview_workunits: int,
) -> None: ) -> None:
"""Ingest metadata into DataHub.""" """Ingest metadata into DataHub."""
@ -79,7 +90,7 @@ def run(
try: try:
logger.debug(f"Using config: {pipeline_config}") logger.debug(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(pipeline_config, dry_run, preview) pipeline = Pipeline.create(pipeline_config, dry_run, preview, preview_workunits)
except ValidationError as e: except ValidationError as e:
click.echo(e, err=True) click.echo(e, err=True)
sys.exit(1) sys.exit(1)

View File

@ -103,11 +103,16 @@ class Pipeline:
transformers: List[Transformer] transformers: List[Transformer]
def __init__( def __init__(
self, config: PipelineConfig, dry_run: bool = False, preview_mode: bool = False self,
config: PipelineConfig,
dry_run: bool = False,
preview_mode: bool = False,
preview_workunits: int = 10,
): ):
self.config = config self.config = config
self.dry_run = dry_run self.dry_run = dry_run
self.preview_mode = preview_mode self.preview_mode = preview_mode
self.preview_workunits = preview_workunits
self.ctx = PipelineContext( self.ctx = PipelineContext(
run_id=self.config.run_id, run_id=self.config.run_id,
datahub_api=self.config.datahub_api, datahub_api=self.config.datahub_api,
@ -169,17 +174,27 @@ class Pipeline:
@classmethod @classmethod
def create( def create(
cls, config_dict: dict, dry_run: bool = False, preview_mode: bool = False cls,
config_dict: dict,
dry_run: bool = False,
preview_mode: bool = False,
preview_workunits: int = 10,
) -> "Pipeline": ) -> "Pipeline":
config = PipelineConfig.parse_obj(config_dict) config = PipelineConfig.parse_obj(config_dict)
return cls(config, dry_run=dry_run, preview_mode=preview_mode) return cls(
config,
dry_run=dry_run,
preview_mode=preview_mode,
preview_workunits=preview_workunits,
)
def run(self) -> None: def run(self) -> None:
callback = LoggingCallback() callback = LoggingCallback()
extractor: Extractor = self.extractor_class() extractor: Extractor = self.extractor_class()
for wu in itertools.islice( for wu in itertools.islice(
self.source.get_workunits(), 10 if self.preview_mode else None self.source.get_workunits(),
self.preview_workunits if self.preview_mode else None,
): ):
# TODO: change extractor interface # TODO: change extractor interface
extractor.configure({}, self.ctx) extractor.configure({}, self.ctx)