diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index e2a2f35a36..1811b33239 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -15,14 +15,14 @@ from tabulate import tabulate from datahub._version import nice_version_name from datahub.cli import cli_utils from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH -from datahub.configuration.common import ConfigModel, GraphError +from datahub.configuration.common import GraphError from datahub.configuration.config_loader import load_config_file -from datahub.emitter.mce_builder import datahub_guid from datahub.ingestion.graph.client import get_default_graph from datahub.ingestion.run.connection import ConnectionManager from datahub.ingestion.run.pipeline import Pipeline from datahub.telemetry import telemetry from datahub.upgrade import upgrade +from datahub.utilities.ingest_utils import deploy_source_vars from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -191,23 +191,6 @@ def run( # don't raise SystemExit if there's no error -def _make_ingestion_urn(name: str) -> str: - guid = datahub_guid( - { - "name": name, - } - ) - return f"urn:li:dataHubIngestionSource:deploy-{guid}" - - -class DeployOptions(ConfigModel): - name: str - schedule: Optional[str] = None - time_zone: str = "UTC" - cli_version: Optional[str] = None - executor_id: str = "default" - - @ingest.command() @upgrade.check_upgrade @telemetry.with_telemetry() @@ -258,6 +241,16 @@ class DeployOptions(ConfigModel): required=False, default="UTC", ) +@click.option( + "--debug", type=bool, help="Should we debug.", required=False, default=False +) +@click.option( + "--extra-pip", + type=str, + help='Extra pip packages. e.g. ["memray"]', + required=False, + default=None, +) def deploy( name: Optional[str], config: str, @@ -266,6 +259,8 @@ def deploy( cli_version: Optional[str], schedule: Optional[str], time_zone: str, + extra_pip: Optional[str], + debug: bool = False, ) -> None: """ Deploy an ingestion recipe to your DataHub instance. @@ -276,83 +271,23 @@ def deploy( datahub_graph = get_default_graph() - pipeline_config = load_config_file( - config, - allow_stdin=True, - allow_remote=True, - resolve_env_vars=False, + variables = deploy_source_vars( + name=name, + config=config, + urn=urn, + executor_id=executor_id, + cli_version=cli_version, + schedule=schedule, + time_zone=time_zone, + extra_pip=extra_pip, + debug=debug, ) - deploy_options_raw = pipeline_config.pop("deployment", None) - if deploy_options_raw is not None: - deploy_options = DeployOptions.parse_obj(deploy_options_raw) - - if name: - logger.info(f"Overriding deployment name {deploy_options.name} with {name}") - deploy_options.name = name - else: - if not name: - raise click.UsageError( - "Either --name must be set or deployment_name specified in the config" - ) - deploy_options = DeployOptions(name=name) - - # Use remaining CLI args to override deploy_options - if schedule: - deploy_options.schedule = schedule - if time_zone: - deploy_options.time_zone = time_zone - if cli_version: - deploy_options.cli_version = cli_version - if executor_id: - deploy_options.executor_id = executor_id - - logger.info(f"Using {repr(deploy_options)}") - - if not urn: - # When urn/name is not specified, we will generate a unique urn based on the deployment name. - urn = _make_ingestion_urn(deploy_options.name) - logger.info(f"Using recipe urn: {urn}") - - # Invariant - at this point, both urn and deploy_options are set. - - variables: dict = { - "urn": urn, - "name": deploy_options.name, - "type": pipeline_config["source"]["type"], - "recipe": json.dumps(pipeline_config), - "executorId": deploy_options.executor_id, - "version": deploy_options.cli_version, - } - - if deploy_options.schedule is not None: - variables["schedule"] = { - "interval": deploy_options.schedule, - "timezone": deploy_options.time_zone, - } - # The updateIngestionSource endpoint can actually do upserts as well. graphql_query: str = textwrap.dedent( """ - mutation updateIngestionSource( - $urn: String!, - $name: String!, - $type: String!, - $schedule: UpdateIngestionSourceScheduleInput, - $recipe: String!, - $executorId: String! - $version: String) { - - updateIngestionSource(urn: $urn, input: { - name: $name, - type: $type, - schedule: $schedule, - config: { - recipe: $recipe, - executorId: $executorId, - version: $version, - } - }) + mutation updateIngestionSource($urn: String!, $input: UpdateIngestionSourceInput!) { + updateIngestionSource(urn: $urn, input: $input) } """ ) @@ -372,7 +307,7 @@ def deploy( sys.exit(1) click.echo( - f"✅ Successfully wrote data ingestion source metadata for recipe {deploy_options.name}:" + f"✅ Successfully wrote data ingestion source metadata for recipe {variables['name']}:" ) click.echo(response) diff --git a/metadata-ingestion/src/datahub/utilities/ingest_utils.py b/metadata-ingestion/src/datahub/utilities/ingest_utils.py new file mode 100644 index 0000000000..1782fd5a09 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/ingest_utils.py @@ -0,0 +1,106 @@ +import json +import logging +from typing import Optional + +import click + +from datahub.configuration.common import ConfigModel +from datahub.configuration.config_loader import load_config_file +from datahub.emitter.mce_builder import datahub_guid + +logger = logging.getLogger(__name__) + + +def _make_ingestion_urn(name: str) -> str: + guid = datahub_guid( + { + "name": name, + } + ) + return f"urn:li:dataHubIngestionSource:deploy-{guid}" + + +class DeployOptions(ConfigModel): + name: str + schedule: Optional[str] = None + time_zone: str = "UTC" + cli_version: Optional[str] = None + executor_id: str = "default" + + +def deploy_source_vars( + name: Optional[str], + config: str, + urn: Optional[str], + executor_id: str, + cli_version: Optional[str], + schedule: Optional[str], + time_zone: str, + extra_pip: Optional[str], + debug: bool = False, +) -> dict: + pipeline_config = load_config_file( + config, + allow_stdin=True, + allow_remote=True, + resolve_env_vars=False, + ) + + deploy_options_raw = pipeline_config.pop("deployment", None) + if deploy_options_raw is not None: + deploy_options = DeployOptions.parse_obj(deploy_options_raw) + + if name: + logger.info(f"Overriding deployment name {deploy_options.name} with {name}") + deploy_options.name = name + else: + if not name: + raise click.UsageError( + "Either --name must be set or deployment_name specified in the config" + ) + deploy_options = DeployOptions(name=name) + + # Use remaining CLI args to override deploy_options + if schedule: + deploy_options.schedule = schedule + if time_zone: + deploy_options.time_zone = time_zone + if cli_version: + deploy_options.cli_version = cli_version + if executor_id: + deploy_options.executor_id = executor_id + + logger.info(f"Using {repr(deploy_options)}") + + if not urn: + # When urn/name is not specified, we will generate a unique urn based on the deployment name. + urn = _make_ingestion_urn(deploy_options.name) + logger.info(f"Using recipe urn: {urn}") + + variables: dict = { + "urn": urn, + "input": { + "name": deploy_options.name, + "type": pipeline_config["source"]["type"], + "config": { + "recipe": json.dumps(pipeline_config), + "executorId": deploy_options.executor_id, + "debugMode": debug, + "version": deploy_options.cli_version, + }, + }, + } + + if deploy_options.schedule is not None: + variables["input"]["schedule"] = { + "interval": deploy_options.schedule, + "timezone": deploy_options.time_zone, + } + if extra_pip is not None: + extra_args_list = ( + variables.get("input", {}).get("config", {}).get("extraArgs", []) + ) + extra_args_list.append({"key": "extra_pip_requirements", "value": extra_pip}) + variables["input"]["config"]["extraArgs"] = extra_args_list + + return variables diff --git a/metadata-ingestion/tests/unit/utilities/sample_demo.dhub.yaml b/metadata-ingestion/tests/unit/utilities/sample_demo.dhub.yaml new file mode 100644 index 0000000000..2768a3acff --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/sample_demo.dhub.yaml @@ -0,0 +1,3 @@ +source: + type: demo-data + config: {} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/utilities/test_ingest_utils.py b/metadata-ingestion/tests/unit/utilities/test_ingest_utils.py new file mode 100644 index 0000000000..0ce0909e70 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_ingest_utils.py @@ -0,0 +1,56 @@ +import pathlib + +from datahub.utilities.ingest_utils import ( + _make_ingestion_urn, + deploy_source_vars, +) + + +def test_make_ingestion_urn(): + name = "test" + urn = _make_ingestion_urn(name) + assert ( + urn == "urn:li:dataHubIngestionSource:deploy-2b895b6efaa28b818284e5c696a18799" + ) + + +def test_deploy_source_vars(): + name = "test" + config = pathlib.Path(__file__).parent / "sample_demo.dhub.yaml" + urn = None + executor_id = "default" + cli_version = "0.15.0.1" + schedule = "5 4 * * *" + time_zone = "UTC" + extra_pip = '["pandas"]' + debug = False + + deploy_vars = deploy_source_vars( + name, + str(config), + urn, + executor_id, + cli_version, + schedule, + time_zone, + extra_pip, + debug, + ) + assert deploy_vars == { + "urn": "urn:li:dataHubIngestionSource:deploy-2b895b6efaa28b818284e5c696a18799", + "input": { + "name": "test", + "schedule": { + "interval": "5 4 * * *", + "timezone": "UTC", + }, + "type": "demo-data", + "config": { + "recipe": '{"source": {"type": "demo-data", "config": {}}}', + "debugMode": False, + "executorId": "default", + "version": "0.15.0.1", + "extraArgs": [{"key": "extra_pip_requirements", "value": '["pandas"]'}], + }, + }, + }