diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 3bfb6b3f4c..b9bb561794 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -75,6 +75,14 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml --preview # Preview with dry-run datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n --preview ``` + +By default `--preview` creates 10 workunits. But if you wish to try producing more workunits you can use another option `--preview-workunits` + +```shell +# Preview 20 workunits without sending anything to sink +datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml -n --preview --preview-workunits=20 +``` + ## Transformations If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 45f89099bd..7091c5fa39 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -59,6 +59,12 @@ def ingest() -> None: default=False, help="Perform limited ingestion from the source to the sink to get a quick preview.", ) +@click.option( + "--preview-workunits", + type=int, + default=10, + help="The number of workunits to produce for preview.", +) @click.option( "--strict-warnings/--no-strict-warnings", default=False, @@ -68,7 +74,12 @@ def ingest() -> None: @telemetry.with_telemetry @memory_leak_detector.with_leak_detection def run( - ctx: click.Context, config: str, dry_run: bool, preview: bool, strict_warnings: bool + ctx: click.Context, + config: str, + dry_run: bool, + preview: bool, + strict_warnings: bool, + preview_workunits: int, ) -> None: """Ingest metadata into DataHub.""" @@ -79,7 +90,7 @@ def run( try: logger.debug(f"Using config: {pipeline_config}") - pipeline = Pipeline.create(pipeline_config, dry_run, preview) + pipeline = Pipeline.create(pipeline_config, dry_run, preview, preview_workunits) except ValidationError as e: click.echo(e, err=True) sys.exit(1) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index b2ebc3d984..7835c1c3de 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -103,11 +103,16 @@ class Pipeline: transformers: List[Transformer] def __init__( - self, config: PipelineConfig, dry_run: bool = False, preview_mode: bool = False + self, + config: PipelineConfig, + dry_run: bool = False, + preview_mode: bool = False, + preview_workunits: int = 10, ): self.config = config self.dry_run = dry_run self.preview_mode = preview_mode + self.preview_workunits = preview_workunits self.ctx = PipelineContext( run_id=self.config.run_id, datahub_api=self.config.datahub_api, @@ -169,17 +174,27 @@ class Pipeline: @classmethod def create( - cls, config_dict: dict, dry_run: bool = False, preview_mode: bool = False + cls, + config_dict: dict, + dry_run: bool = False, + preview_mode: bool = False, + preview_workunits: int = 10, ) -> "Pipeline": config = PipelineConfig.parse_obj(config_dict) - return cls(config, dry_run=dry_run, preview_mode=preview_mode) + return cls( + config, + dry_run=dry_run, + preview_mode=preview_mode, + preview_workunits=preview_workunits, + ) def run(self) -> None: callback = LoggingCallback() extractor: Extractor = self.extractor_class() for wu in itertools.islice( - self.source.get_workunits(), 10 if self.preview_mode else None + self.source.get_workunits(), + self.preview_workunits if self.preview_mode else None, ): # TODO: change extractor interface extractor.configure({}, self.ctx)