feat(cli): add extra pip and debug flag (#12621)

This commit is contained in:
Aseem Bansal 2025-02-17 16:03:07 +05:30 committed by GitHub
parent 71d1092b2e
commit 2ecd3bbab8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 192 additions and 92 deletions

View File

@ -15,14 +15,14 @@ from tabulate import tabulate
from datahub._version import nice_version_name from datahub._version import nice_version_name
from datahub.cli import cli_utils from datahub.cli import cli_utils
from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH
from datahub.configuration.common import ConfigModel, GraphError from datahub.configuration.common import GraphError
from datahub.configuration.config_loader import load_config_file from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mce_builder import datahub_guid
from datahub.ingestion.graph.client import get_default_graph from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.run.connection import ConnectionManager from datahub.ingestion.run.connection import ConnectionManager
from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.run.pipeline import Pipeline
from datahub.telemetry import telemetry from datahub.telemetry import telemetry
from datahub.upgrade import upgrade from datahub.upgrade import upgrade
from datahub.utilities.ingest_utils import deploy_source_vars
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -191,23 +191,6 @@ def run(
# don't raise SystemExit if there's no error # don't raise SystemExit if there's no error
def _make_ingestion_urn(name: str) -> str:
guid = datahub_guid(
{
"name": name,
}
)
return f"urn:li:dataHubIngestionSource:deploy-{guid}"
class DeployOptions(ConfigModel):
name: str
schedule: Optional[str] = None
time_zone: str = "UTC"
cli_version: Optional[str] = None
executor_id: str = "default"
@ingest.command() @ingest.command()
@upgrade.check_upgrade @upgrade.check_upgrade
@telemetry.with_telemetry() @telemetry.with_telemetry()
@ -258,6 +241,16 @@ class DeployOptions(ConfigModel):
required=False, required=False,
default="UTC", default="UTC",
) )
@click.option(
"--debug", type=bool, help="Should we debug.", required=False, default=False
)
@click.option(
"--extra-pip",
type=str,
help='Extra pip packages. e.g. ["memray"]',
required=False,
default=None,
)
def deploy( def deploy(
name: Optional[str], name: Optional[str],
config: str, config: str,
@ -266,6 +259,8 @@ def deploy(
cli_version: Optional[str], cli_version: Optional[str],
schedule: Optional[str], schedule: Optional[str],
time_zone: str, time_zone: str,
extra_pip: Optional[str],
debug: bool = False,
) -> None: ) -> None:
""" """
Deploy an ingestion recipe to your DataHub instance. Deploy an ingestion recipe to your DataHub instance.
@ -276,83 +271,23 @@ def deploy(
datahub_graph = get_default_graph() datahub_graph = get_default_graph()
pipeline_config = load_config_file( variables = deploy_source_vars(
config, name=name,
allow_stdin=True, config=config,
allow_remote=True, urn=urn,
resolve_env_vars=False, executor_id=executor_id,
cli_version=cli_version,
schedule=schedule,
time_zone=time_zone,
extra_pip=extra_pip,
debug=debug,
) )
deploy_options_raw = pipeline_config.pop("deployment", None)
if deploy_options_raw is not None:
deploy_options = DeployOptions.parse_obj(deploy_options_raw)
if name:
logger.info(f"Overriding deployment name {deploy_options.name} with {name}")
deploy_options.name = name
else:
if not name:
raise click.UsageError(
"Either --name must be set or deployment_name specified in the config"
)
deploy_options = DeployOptions(name=name)
# Use remaining CLI args to override deploy_options
if schedule:
deploy_options.schedule = schedule
if time_zone:
deploy_options.time_zone = time_zone
if cli_version:
deploy_options.cli_version = cli_version
if executor_id:
deploy_options.executor_id = executor_id
logger.info(f"Using {repr(deploy_options)}")
if not urn:
# When urn/name is not specified, we will generate a unique urn based on the deployment name.
urn = _make_ingestion_urn(deploy_options.name)
logger.info(f"Using recipe urn: {urn}")
# Invariant - at this point, both urn and deploy_options are set.
variables: dict = {
"urn": urn,
"name": deploy_options.name,
"type": pipeline_config["source"]["type"],
"recipe": json.dumps(pipeline_config),
"executorId": deploy_options.executor_id,
"version": deploy_options.cli_version,
}
if deploy_options.schedule is not None:
variables["schedule"] = {
"interval": deploy_options.schedule,
"timezone": deploy_options.time_zone,
}
# The updateIngestionSource endpoint can actually do upserts as well. # The updateIngestionSource endpoint can actually do upserts as well.
graphql_query: str = textwrap.dedent( graphql_query: str = textwrap.dedent(
""" """
mutation updateIngestionSource( mutation updateIngestionSource($urn: String!, $input: UpdateIngestionSourceInput!) {
$urn: String!, updateIngestionSource(urn: $urn, input: $input)
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!
$version: String) {
updateIngestionSource(urn: $urn, input: {
name: $name,
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
} }
""" """
) )
@ -372,7 +307,7 @@ def deploy(
sys.exit(1) sys.exit(1)
click.echo( click.echo(
f"✅ Successfully wrote data ingestion source metadata for recipe {deploy_options.name}:" f"✅ Successfully wrote data ingestion source metadata for recipe {variables['name']}:"
) )
click.echo(response) click.echo(response)

View File

@ -0,0 +1,106 @@
import json
import logging
from typing import Optional
import click
from datahub.configuration.common import ConfigModel
from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mce_builder import datahub_guid
logger = logging.getLogger(__name__)
def _make_ingestion_urn(name: str) -> str:
guid = datahub_guid(
{
"name": name,
}
)
return f"urn:li:dataHubIngestionSource:deploy-{guid}"
class DeployOptions(ConfigModel):
name: str
schedule: Optional[str] = None
time_zone: str = "UTC"
cli_version: Optional[str] = None
executor_id: str = "default"
def deploy_source_vars(
name: Optional[str],
config: str,
urn: Optional[str],
executor_id: str,
cli_version: Optional[str],
schedule: Optional[str],
time_zone: str,
extra_pip: Optional[str],
debug: bool = False,
) -> dict:
pipeline_config = load_config_file(
config,
allow_stdin=True,
allow_remote=True,
resolve_env_vars=False,
)
deploy_options_raw = pipeline_config.pop("deployment", None)
if deploy_options_raw is not None:
deploy_options = DeployOptions.parse_obj(deploy_options_raw)
if name:
logger.info(f"Overriding deployment name {deploy_options.name} with {name}")
deploy_options.name = name
else:
if not name:
raise click.UsageError(
"Either --name must be set or deployment_name specified in the config"
)
deploy_options = DeployOptions(name=name)
# Use remaining CLI args to override deploy_options
if schedule:
deploy_options.schedule = schedule
if time_zone:
deploy_options.time_zone = time_zone
if cli_version:
deploy_options.cli_version = cli_version
if executor_id:
deploy_options.executor_id = executor_id
logger.info(f"Using {repr(deploy_options)}")
if not urn:
# When urn/name is not specified, we will generate a unique urn based on the deployment name.
urn = _make_ingestion_urn(deploy_options.name)
logger.info(f"Using recipe urn: {urn}")
variables: dict = {
"urn": urn,
"input": {
"name": deploy_options.name,
"type": pipeline_config["source"]["type"],
"config": {
"recipe": json.dumps(pipeline_config),
"executorId": deploy_options.executor_id,
"debugMode": debug,
"version": deploy_options.cli_version,
},
},
}
if deploy_options.schedule is not None:
variables["input"]["schedule"] = {
"interval": deploy_options.schedule,
"timezone": deploy_options.time_zone,
}
if extra_pip is not None:
extra_args_list = (
variables.get("input", {}).get("config", {}).get("extraArgs", [])
)
extra_args_list.append({"key": "extra_pip_requirements", "value": extra_pip})
variables["input"]["config"]["extraArgs"] = extra_args_list
return variables

View File

@ -0,0 +1,3 @@
source:
type: demo-data
config: {}

View File

@ -0,0 +1,56 @@
import pathlib
from datahub.utilities.ingest_utils import (
_make_ingestion_urn,
deploy_source_vars,
)
def test_make_ingestion_urn():
name = "test"
urn = _make_ingestion_urn(name)
assert (
urn == "urn:li:dataHubIngestionSource:deploy-2b895b6efaa28b818284e5c696a18799"
)
def test_deploy_source_vars():
name = "test"
config = pathlib.Path(__file__).parent / "sample_demo.dhub.yaml"
urn = None
executor_id = "default"
cli_version = "0.15.0.1"
schedule = "5 4 * * *"
time_zone = "UTC"
extra_pip = '["pandas"]'
debug = False
deploy_vars = deploy_source_vars(
name,
str(config),
urn,
executor_id,
cli_version,
schedule,
time_zone,
extra_pip,
debug,
)
assert deploy_vars == {
"urn": "urn:li:dataHubIngestionSource:deploy-2b895b6efaa28b818284e5c696a18799",
"input": {
"name": "test",
"schedule": {
"interval": "5 4 * * *",
"timezone": "UTC",
},
"type": "demo-data",
"config": {
"recipe": '{"source": {"type": "demo-data", "config": {}}}',
"debugMode": False,
"executorId": "default",
"version": "0.15.0.1",
"extraArgs": [{"key": "extra_pip_requirements", "value": '["pandas"]'}],
},
},
}