mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-15 20:57:15 +00:00
feat(cli): suggest upgrades when appropriate (#5091)
This commit is contained in:
parent
72d6c57044
commit
e62c647693
@ -58,6 +58,7 @@ framework_common = {
|
||||
"markupsafe>=1.1.1,<=2.0.1",
|
||||
"Deprecated",
|
||||
"types-Deprecated",
|
||||
"humanfriendly",
|
||||
}
|
||||
|
||||
kafka_common = {
|
||||
|
@ -19,6 +19,7 @@ from datahub.metadata.schema_classes import (
|
||||
SystemMetadataClass,
|
||||
)
|
||||
from datahub.telemetry import telemetry
|
||||
from datahub.upgrade import upgrade
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -94,6 +95,7 @@ def delete_for_registry(
|
||||
@click.option("--registry-id", required=False, type=str)
|
||||
@click.option("-n", "--dry-run", required=False, is_flag=True)
|
||||
@click.option("--include-removed", required=False, is_flag=True)
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def delete(
|
||||
urn: str,
|
||||
|
@ -19,6 +19,7 @@ from datahub.cli.docker_check import (
|
||||
)
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.telemetry import telemetry
|
||||
from datahub.upgrade import upgrade
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -73,6 +74,7 @@ def docker_check_impl() -> None:
|
||||
|
||||
|
||||
@docker.command()
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def check() -> None:
|
||||
"""Check that the Docker containers are healthy"""
|
||||
@ -164,6 +166,7 @@ def should_use_neo4j_for_graph_service(graph_service_override: Optional[str]) ->
|
||||
default=None,
|
||||
help="If set, forces docker-compose to use that graph service implementation",
|
||||
)
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def quickstart(
|
||||
version: str,
|
||||
|
@ -7,6 +7,7 @@ from click.exceptions import UsageError
|
||||
|
||||
from datahub.cli.cli_utils import get_aspects_for_entity
|
||||
from datahub.telemetry import telemetry
|
||||
from datahub.upgrade import upgrade
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -21,6 +22,7 @@ logger = logging.getLogger(__name__)
|
||||
@click.option("--urn", required=False, type=str)
|
||||
@click.option("-a", "--aspect", required=False, multiple=True, type=str)
|
||||
@click.pass_context
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
|
||||
"""
|
||||
|
@ -23,6 +23,7 @@ from datahub.configuration import SensitiveError
|
||||
from datahub.configuration.config_loader import load_config_file
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.telemetry import telemetry
|
||||
from datahub.upgrade import upgrade
|
||||
from datahub.utilities import memory_leak_detector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -81,6 +82,7 @@ def ingest() -> None:
|
||||
help="Supress display of variable values in logs by supressing elaborae stacktrace (stackprinter) during ingestion failures",
|
||||
)
|
||||
@click.pass_context
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
@memory_leak_detector.with_leak_detection
|
||||
def run(
|
||||
@ -131,6 +133,7 @@ def run(
|
||||
logger.info("Finished metadata pipeline")
|
||||
pipeline.log_ingestion_stats()
|
||||
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
|
||||
upgrade.maybe_print_upgrade_message(pipeline.ctx.graph)
|
||||
sys.exit(ret)
|
||||
|
||||
|
||||
@ -166,6 +169,7 @@ def parse_restli_response(response):
|
||||
default=False,
|
||||
help="If enabled, will list ingestion runs which have been soft deleted",
|
||||
)
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> None:
|
||||
"""List recent ingestion runs to datahub"""
|
||||
@ -213,6 +217,7 @@ def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> N
|
||||
help="If enabled, will include aspects that have been soft deleted",
|
||||
)
|
||||
@click.option("-a", "--show-aspect", required=False, is_flag=True)
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def show(
|
||||
run_id: str, start: int, count: int, include_soft_deletes: bool, show_aspect: bool
|
||||
@ -256,6 +261,7 @@ def show(
|
||||
default="./rollback-reports",
|
||||
help="Path to directory where rollback reports will be saved to",
|
||||
)
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def rollback(
|
||||
run_id: str, force: bool, dry_run: bool, safe: bool, report_dir: str
|
||||
|
@ -6,6 +6,7 @@ import click
|
||||
|
||||
from datahub.cli.cli_utils import guess_entity_type, post_entity
|
||||
from datahub.telemetry import telemetry
|
||||
from datahub.upgrade import upgrade
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -21,6 +22,7 @@ logger = logging.getLogger(__name__)
|
||||
@click.option("-a", "--aspect", required=True, type=str)
|
||||
@click.option("-d", "--aspect-data", required=True, type=str)
|
||||
@click.pass_context
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def put(ctx: Any, urn: str, aspect: str, aspect_data: str) -> None:
|
||||
"""Update a single aspect of an entity"""
|
||||
|
@ -12,6 +12,7 @@ from termcolor import colored
|
||||
import datahub.cli.cli_utils
|
||||
from datahub.emitter.mce_builder import dataset_urn_to_key, schema_field_urn_to_key
|
||||
from datahub.telemetry import telemetry
|
||||
from datahub.upgrade import upgrade
|
||||
from datahub.utilities.urns.urn import Urn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -132,6 +133,7 @@ def get_timeline(
|
||||
)
|
||||
@click.option("--raw", type=bool, is_flag=True, help="Show the raw diff")
|
||||
@click.pass_context
|
||||
@upgrade.check_upgrade
|
||||
@telemetry.with_telemetry
|
||||
def timeline(
|
||||
ctx: Any,
|
||||
|
@ -58,7 +58,7 @@ class DataHubGraph(DatahubRestEmitter):
|
||||
self.server_id = client_id.clientId if client_id else "missing"
|
||||
except Exception as e:
|
||||
self.server_id = "missing"
|
||||
logger.debug("Failed to get server id", e)
|
||||
logger.debug(f"Failed to get server id due to {e}")
|
||||
|
||||
def _get_generic(self, url: str) -> Dict:
|
||||
try:
|
||||
|
0
metadata-ingestion/src/datahub/upgrade/__init__.py
Normal file
0
metadata-ingestion/src/datahub/upgrade/__init__.py
Normal file
343
metadata-ingestion/src/datahub/upgrade/upgrade.py
Normal file
343
metadata-ingestion/src/datahub/upgrade/upgrade.py
Normal file
@ -0,0 +1,343 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from functools import wraps
|
||||
from typing import Any, Callable, Optional, TypeVar
|
||||
|
||||
import humanfriendly
|
||||
import requests
|
||||
from packaging.version import Version
|
||||
from pydantic import BaseModel
|
||||
from termcolor import colored
|
||||
|
||||
from datahub import __version__
|
||||
from datahub.cli import cli_utils
|
||||
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class VersionStats(BaseModel, arbitrary_types_allowed=True):
|
||||
version: Version
|
||||
release_date: Optional[datetime]
|
||||
|
||||
|
||||
class ServerVersionStats(BaseModel):
|
||||
current: VersionStats
|
||||
latest: Optional[VersionStats]
|
||||
current_server_type: Optional[str]
|
||||
|
||||
|
||||
class ClientVersionStats(BaseModel):
|
||||
current: VersionStats
|
||||
latest: Optional[VersionStats]
|
||||
|
||||
|
||||
class DataHubVersionStats(BaseModel):
|
||||
server: ServerVersionStats
|
||||
client: ClientVersionStats
|
||||
|
||||
|
||||
def retrieve_versions( # noqa: C901
|
||||
server: Optional[DataHubGraph] = None,
|
||||
) -> Optional[DataHubVersionStats]:
|
||||
|
||||
current_version_string = __version__
|
||||
current_version = Version(current_version_string)
|
||||
client_version_stats: ClientVersionStats = ClientVersionStats(
|
||||
current=VersionStats(version=current_version, release_date=None), latest=None
|
||||
)
|
||||
server_version_stats: Optional[ServerVersionStats] = None
|
||||
|
||||
try:
|
||||
response = requests.get("https://pypi.org/pypi/acryl_datahub/json")
|
||||
if response.ok:
|
||||
response_json = response.json()
|
||||
releases = response_json.get("releases", [])
|
||||
sorted_releases = sorted(releases.keys(), key=lambda x: Version(x))
|
||||
latest_cli_release_string = [x for x in sorted_releases if "rc" not in x][
|
||||
-1
|
||||
]
|
||||
latest_cli_release = Version(latest_cli_release_string)
|
||||
current_version_info = releases.get(current_version_string)
|
||||
current_version_date = None
|
||||
if current_version_info:
|
||||
current_version_date = datetime.strptime(
|
||||
current_version_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S"
|
||||
)
|
||||
latest_release_info = releases.get(latest_cli_release_string)
|
||||
latest_version_date = None
|
||||
if latest_release_info:
|
||||
latest_version_date = datetime.strptime(
|
||||
latest_release_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S"
|
||||
)
|
||||
client_version_stats = ClientVersionStats(
|
||||
current=VersionStats(
|
||||
version=current_version, release_date=current_version_date
|
||||
),
|
||||
latest=VersionStats(
|
||||
version=latest_cli_release, release_date=latest_version_date
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug(f"Failed to determine cli releases from pypi due to {e}")
|
||||
pass
|
||||
|
||||
latest_server_version: Optional[Version] = None
|
||||
latest_server_date: Optional[datetime] = None
|
||||
server_version: Optional[Version] = None
|
||||
|
||||
try:
|
||||
gh_response = requests.get(
|
||||
"https://api.github.com/repos/datahub-project/datahub/releases",
|
||||
headers={"Accept": "application/vnd.github.v3+json"},
|
||||
)
|
||||
if gh_response.ok:
|
||||
gh_response_json = gh_response.json()
|
||||
latest_server_version = Version(gh_response_json[0].get("tag_name"))
|
||||
latest_server_date = gh_response_json[0].get("published_at")
|
||||
except Exception as e:
|
||||
log.debug("Failed to get release versions from github", e)
|
||||
|
||||
if not server:
|
||||
try:
|
||||
# let's get the server from the cli config
|
||||
host, token = cli_utils.get_host_and_token()
|
||||
server = DataHubGraph(DatahubClientConfig(server=host, token=token))
|
||||
except Exception as e:
|
||||
log.debug("Failed to get a valid server", e)
|
||||
pass
|
||||
|
||||
server_type = None
|
||||
if server:
|
||||
server_version_string = (
|
||||
server.server_config.get("versions", {})
|
||||
.get("linkedin/datahub", {})
|
||||
.get("version")
|
||||
)
|
||||
commit_hash = (
|
||||
server.server_config.get("versions", {})
|
||||
.get("linkedin/datahub", {})
|
||||
.get("commit")
|
||||
)
|
||||
server_type = server.server_config.get("datahub", {}).get(
|
||||
"serverType", "unknown"
|
||||
)
|
||||
current_server_release_date = None
|
||||
if server_type == "quickstart" and commit_hash:
|
||||
try:
|
||||
# get the age of the commit from github
|
||||
gh_commit_response = requests.get(
|
||||
f"https://api.github.com/repos/datahub-project/datahub/commits/{commit_hash}",
|
||||
headers={"Accept": "application/vnd.github.v3+json"},
|
||||
)
|
||||
if gh_commit_response.ok:
|
||||
current_server_release_date = datetime.strptime(
|
||||
gh_commit_response.json()["commit"]["author"]["date"],
|
||||
"%Y-%m-%dT%H:%M:%S%z",
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug(f"Failed to retrieve commit date due to {e}")
|
||||
pass
|
||||
|
||||
if server_version_string and server_version_string.startswith("v"):
|
||||
server_version = Version(server_version_string[1:])
|
||||
|
||||
server_version_stats = ServerVersionStats(
|
||||
current=VersionStats(
|
||||
version=server_version, release_date=current_server_release_date
|
||||
),
|
||||
latest=VersionStats(
|
||||
version=latest_server_version, release_date=latest_server_date
|
||||
)
|
||||
if latest_server_version
|
||||
else None,
|
||||
current_server_type=server_type,
|
||||
)
|
||||
|
||||
if client_version_stats and server_version_stats:
|
||||
return DataHubVersionStats(
|
||||
server=server_version_stats, client=client_version_stats
|
||||
)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def get_days(time_point: Optional[Any]) -> str:
|
||||
if time_point:
|
||||
return "(released " + (
|
||||
humanfriendly.format_timespan(
|
||||
datetime.now(timezone.utc) - time_point, max_units=1
|
||||
)
|
||||
+ " ago)"
|
||||
)
|
||||
else:
|
||||
return ""
|
||||
|
||||
|
||||
def valid_client_version(version: Version) -> bool:
|
||||
"""Only version strings like 0.4.5 and 0.6.7.8 are valid. 0.8.6.7rc1 is not"""
|
||||
if version.is_prerelease or version.is_postrelease or version.is_devrelease:
|
||||
return False
|
||||
if version.major == 0 and version.minor in [8, 9, 10, 11]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def valid_server_version(version: Version) -> bool:
|
||||
"""Only version strings like 0.8.x or 0.9.x are valid. 0.1.x is not"""
|
||||
if version.is_prerelease or version.is_postrelease or version.is_devrelease:
|
||||
return False
|
||||
|
||||
if version.major == 0 and version.minor in [8, 9]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_client_server_compatible(client: VersionStats, server: VersionStats) -> int:
|
||||
"""
|
||||
-ve implies client is behind server
|
||||
0 implies client and server are aligned
|
||||
+ve implies server is ahead of client
|
||||
"""
|
||||
if not valid_client_version(client.version) or not valid_server_version(
|
||||
server.version
|
||||
):
|
||||
# we cannot evaluate compatibility, choose True as default
|
||||
return 0
|
||||
return server.version.micro - client.version.micro
|
||||
|
||||
|
||||
def maybe_print_upgrade_message( # noqa: C901
|
||||
server: Optional[DataHubGraph] = None,
|
||||
) -> None: # noqa: C901
|
||||
days_before_cli_stale = 7
|
||||
days_before_quickstart_stale = 7
|
||||
|
||||
encourage_cli_upgrade = False
|
||||
client_server_compat = 0
|
||||
encourage_quickstart_upgrade = False
|
||||
try:
|
||||
version_stats = retrieve_versions(server)
|
||||
if not version_stats:
|
||||
return
|
||||
current_release_date = version_stats.client.current.release_date
|
||||
latest_release_date = (
|
||||
version_stats.client.latest.release_date
|
||||
if version_stats.client.latest
|
||||
else None
|
||||
)
|
||||
client_server_compat = is_client_server_compatible(
|
||||
version_stats.client.current, version_stats.server.current
|
||||
)
|
||||
|
||||
if latest_release_date and current_release_date:
|
||||
assert version_stats.client.latest
|
||||
time_delta = latest_release_date - current_release_date
|
||||
if time_delta > timedelta(days=days_before_cli_stale):
|
||||
encourage_cli_upgrade = True
|
||||
current_version = version_stats.client.current.version
|
||||
latest_version = version_stats.client.latest.version
|
||||
|
||||
if (
|
||||
version_stats.server.current_server_type
|
||||
and version_stats.server.current_server_type == "quickstart"
|
||||
):
|
||||
# if we detect a quickstart server, we encourage upgrading the server too!
|
||||
if version_stats.server.current.release_date:
|
||||
time_delta = (
|
||||
datetime.now(timezone.utc)
|
||||
- version_stats.server.current.release_date
|
||||
)
|
||||
if time_delta > timedelta(days=days_before_quickstart_stale):
|
||||
encourage_quickstart_upgrade = True
|
||||
if version_stats.server.latest and (
|
||||
version_stats.server.latest.version
|
||||
> version_stats.server.current.version
|
||||
):
|
||||
encourage_quickstart_upgrade = True
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Compute recommendations and print one
|
||||
if client_server_compat < 0:
|
||||
try:
|
||||
assert version_stats
|
||||
print(
|
||||
colored("❗Client-Server Incompatible❗", "yellow"),
|
||||
colored(
|
||||
f"Your client version {version_stats.client.current.version} is newer than your server version {version_stats.server.current.version}. Downgrading the cli to {version_stats.server.current.version} is recommended.\n",
|
||||
"cyan",
|
||||
),
|
||||
colored(
|
||||
f"➡️ Downgrade via `\"pip install 'acryl-datahub=={version_stats.server.current.version}'\"",
|
||||
"cyan",
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
elif client_server_compat > 0:
|
||||
try:
|
||||
assert version_stats
|
||||
print(
|
||||
colored("❗Client-Server Incompatible❗", "red"),
|
||||
colored(
|
||||
f"Your client version {version_stats.client.current.version} is older than your server version {version_stats.server.current.version}. Upgrading the cli to {version_stats.server.current.version} is recommended.\n",
|
||||
"cyan",
|
||||
),
|
||||
colored(
|
||||
f"➡️ Upgrade via \"pip install 'acryl-datahub=={version_stats.server.current.version}'\"",
|
||||
"cyan",
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# we only encourage upgrades if we think client_server is currently compatible
|
||||
elif client_server_compat == 0 and encourage_cli_upgrade:
|
||||
try:
|
||||
print(
|
||||
colored("💡 Upgrade cli!", "yellow"),
|
||||
colored(
|
||||
f"You seem to be running an old version of datahub cli: {current_version} {get_days(current_release_date)}. Latest version is {latest_version} {get_days(latest_release_date)}.\nUpgrade via \"pip install -U 'acryl-datahub'\"",
|
||||
"cyan",
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elif encourage_quickstart_upgrade:
|
||||
try:
|
||||
assert version_stats
|
||||
print(
|
||||
colored("💡 Upgrade available!", "yellow"),
|
||||
colored(
|
||||
f'You seem to be running a slightly old quickstart image {get_days(version_stats.server.current.release_date)}. Run "datahub docker quickstart" to get the latest updates without losing any data!',
|
||||
"cyan",
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug(f"Failed to suggest quickstart upgrade due to {e}")
|
||||
pass
|
||||
|
||||
|
||||
def check_upgrade(func: Callable[..., T]) -> Callable[..., T]:
|
||||
@wraps(func)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
|
||||
res = func(*args, **kwargs)
|
||||
try:
|
||||
# ensure this cannot fail
|
||||
maybe_print_upgrade_message()
|
||||
except Exception as e:
|
||||
log.debug(f"Failed to check for upgrade due to {e}")
|
||||
pass
|
||||
|
||||
return res
|
||||
|
||||
return wrapper
|
Loading…
x
Reference in New Issue
Block a user