diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index e8f1c04353..a0310f8cad 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -58,6 +58,7 @@ framework_common = { "markupsafe>=1.1.1,<=2.0.1", "Deprecated", "types-Deprecated", + "humanfriendly", } kafka_common = { diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index f1dd465ebb..1ca0ac8646 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -19,6 +19,7 @@ from datahub.metadata.schema_classes import ( SystemMetadataClass, ) from datahub.telemetry import telemetry +from datahub.upgrade import upgrade logger = logging.getLogger(__name__) @@ -94,6 +95,7 @@ def delete_for_registry( @click.option("--registry-id", required=False, type=str) @click.option("-n", "--dry-run", required=False, is_flag=True) @click.option("--include-removed", required=False, is_flag=True) +@upgrade.check_upgrade @telemetry.with_telemetry def delete( urn: str, diff --git a/metadata-ingestion/src/datahub/cli/docker.py b/metadata-ingestion/src/datahub/cli/docker.py index 08be8e4fdf..3076e8bf90 100644 --- a/metadata-ingestion/src/datahub/cli/docker.py +++ b/metadata-ingestion/src/datahub/cli/docker.py @@ -19,6 +19,7 @@ from datahub.cli.docker_check import ( ) from datahub.ingestion.run.pipeline import Pipeline from datahub.telemetry import telemetry +from datahub.upgrade import upgrade logger = logging.getLogger(__name__) @@ -73,6 +74,7 @@ def docker_check_impl() -> None: @docker.command() +@upgrade.check_upgrade @telemetry.with_telemetry def check() -> None: """Check that the Docker containers are healthy""" @@ -164,6 +166,7 @@ def should_use_neo4j_for_graph_service(graph_service_override: Optional[str]) -> default=None, help="If set, forces docker-compose to use that graph service implementation", ) +@upgrade.check_upgrade @telemetry.with_telemetry def quickstart( version: str, diff --git a/metadata-ingestion/src/datahub/cli/get_cli.py b/metadata-ingestion/src/datahub/cli/get_cli.py index f20c451d7a..ee5b9d6462 100644 --- a/metadata-ingestion/src/datahub/cli/get_cli.py +++ b/metadata-ingestion/src/datahub/cli/get_cli.py @@ -7,6 +7,7 @@ from click.exceptions import UsageError from datahub.cli.cli_utils import get_aspects_for_entity from datahub.telemetry import telemetry +from datahub.upgrade import upgrade logger = logging.getLogger(__name__) @@ -21,6 +22,7 @@ logger = logging.getLogger(__name__) @click.option("--urn", required=False, type=str) @click.option("-a", "--aspect", required=False, multiple=True, type=str) @click.pass_context +@upgrade.check_upgrade @telemetry.with_telemetry def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None: """ diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index cd60e55e67..55d3a0cdc0 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -23,6 +23,7 @@ from datahub.configuration import SensitiveError from datahub.configuration.config_loader import load_config_file from datahub.ingestion.run.pipeline import Pipeline from datahub.telemetry import telemetry +from datahub.upgrade import upgrade from datahub.utilities import memory_leak_detector logger = logging.getLogger(__name__) @@ -81,6 +82,7 @@ def ingest() -> None: help="Supress display of variable values in logs by supressing elaborae stacktrace (stackprinter) during ingestion failures", ) @click.pass_context +@upgrade.check_upgrade @telemetry.with_telemetry @memory_leak_detector.with_leak_detection def run( @@ -131,6 +133,7 @@ def run( logger.info("Finished metadata pipeline") pipeline.log_ingestion_stats() ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings) + upgrade.maybe_print_upgrade_message(pipeline.ctx.graph) sys.exit(ret) @@ -166,6 +169,7 @@ def parse_restli_response(response): default=False, help="If enabled, will list ingestion runs which have been soft deleted", ) +@upgrade.check_upgrade @telemetry.with_telemetry def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> None: """List recent ingestion runs to datahub""" @@ -213,6 +217,7 @@ def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> N help="If enabled, will include aspects that have been soft deleted", ) @click.option("-a", "--show-aspect", required=False, is_flag=True) +@upgrade.check_upgrade @telemetry.with_telemetry def show( run_id: str, start: int, count: int, include_soft_deletes: bool, show_aspect: bool @@ -256,6 +261,7 @@ def show( default="./rollback-reports", help="Path to directory where rollback reports will be saved to", ) +@upgrade.check_upgrade @telemetry.with_telemetry def rollback( run_id: str, force: bool, dry_run: bool, safe: bool, report_dir: str diff --git a/metadata-ingestion/src/datahub/cli/put_cli.py b/metadata-ingestion/src/datahub/cli/put_cli.py index 69da72aab1..2f326804bf 100644 --- a/metadata-ingestion/src/datahub/cli/put_cli.py +++ b/metadata-ingestion/src/datahub/cli/put_cli.py @@ -6,6 +6,7 @@ import click from datahub.cli.cli_utils import guess_entity_type, post_entity from datahub.telemetry import telemetry +from datahub.upgrade import upgrade logger = logging.getLogger(__name__) @@ -21,6 +22,7 @@ logger = logging.getLogger(__name__) @click.option("-a", "--aspect", required=True, type=str) @click.option("-d", "--aspect-data", required=True, type=str) @click.pass_context +@upgrade.check_upgrade @telemetry.with_telemetry def put(ctx: Any, urn: str, aspect: str, aspect_data: str) -> None: """Update a single aspect of an entity""" diff --git a/metadata-ingestion/src/datahub/cli/timeline_cli.py b/metadata-ingestion/src/datahub/cli/timeline_cli.py index dc123a389d..40c5af4e1e 100644 --- a/metadata-ingestion/src/datahub/cli/timeline_cli.py +++ b/metadata-ingestion/src/datahub/cli/timeline_cli.py @@ -12,6 +12,7 @@ from termcolor import colored import datahub.cli.cli_utils from datahub.emitter.mce_builder import dataset_urn_to_key, schema_field_urn_to_key from datahub.telemetry import telemetry +from datahub.upgrade import upgrade from datahub.utilities.urns.urn import Urn logger = logging.getLogger(__name__) @@ -132,6 +133,7 @@ def get_timeline( ) @click.option("--raw", type=bool, is_flag=True, help="Show the raw diff") @click.pass_context +@upgrade.check_upgrade @telemetry.with_telemetry def timeline( ctx: Any, diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index aa3af43c74..e75ced1240 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -58,7 +58,7 @@ class DataHubGraph(DatahubRestEmitter): self.server_id = client_id.clientId if client_id else "missing" except Exception as e: self.server_id = "missing" - logger.debug("Failed to get server id", e) + logger.debug(f"Failed to get server id due to {e}") def _get_generic(self, url: str) -> Dict: try: diff --git a/metadata-ingestion/src/datahub/upgrade/__init__.py b/metadata-ingestion/src/datahub/upgrade/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py new file mode 100644 index 0000000000..046f32202d --- /dev/null +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -0,0 +1,343 @@ +import logging +from datetime import datetime, timedelta, timezone +from functools import wraps +from typing import Any, Callable, Optional, TypeVar + +import humanfriendly +import requests +from packaging.version import Version +from pydantic import BaseModel +from termcolor import colored + +from datahub import __version__ +from datahub.cli import cli_utils +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph + +log = logging.getLogger(__name__) + + +T = TypeVar("T") + + +class VersionStats(BaseModel, arbitrary_types_allowed=True): + version: Version + release_date: Optional[datetime] + + +class ServerVersionStats(BaseModel): + current: VersionStats + latest: Optional[VersionStats] + current_server_type: Optional[str] + + +class ClientVersionStats(BaseModel): + current: VersionStats + latest: Optional[VersionStats] + + +class DataHubVersionStats(BaseModel): + server: ServerVersionStats + client: ClientVersionStats + + +def retrieve_versions( # noqa: C901 + server: Optional[DataHubGraph] = None, +) -> Optional[DataHubVersionStats]: + + current_version_string = __version__ + current_version = Version(current_version_string) + client_version_stats: ClientVersionStats = ClientVersionStats( + current=VersionStats(version=current_version, release_date=None), latest=None + ) + server_version_stats: Optional[ServerVersionStats] = None + + try: + response = requests.get("https://pypi.org/pypi/acryl_datahub/json") + if response.ok: + response_json = response.json() + releases = response_json.get("releases", []) + sorted_releases = sorted(releases.keys(), key=lambda x: Version(x)) + latest_cli_release_string = [x for x in sorted_releases if "rc" not in x][ + -1 + ] + latest_cli_release = Version(latest_cli_release_string) + current_version_info = releases.get(current_version_string) + current_version_date = None + if current_version_info: + current_version_date = datetime.strptime( + current_version_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S" + ) + latest_release_info = releases.get(latest_cli_release_string) + latest_version_date = None + if latest_release_info: + latest_version_date = datetime.strptime( + latest_release_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S" + ) + client_version_stats = ClientVersionStats( + current=VersionStats( + version=current_version, release_date=current_version_date + ), + latest=VersionStats( + version=latest_cli_release, release_date=latest_version_date + ), + ) + except Exception as e: + log.debug(f"Failed to determine cli releases from pypi due to {e}") + pass + + latest_server_version: Optional[Version] = None + latest_server_date: Optional[datetime] = None + server_version: Optional[Version] = None + + try: + gh_response = requests.get( + "https://api.github.com/repos/datahub-project/datahub/releases", + headers={"Accept": "application/vnd.github.v3+json"}, + ) + if gh_response.ok: + gh_response_json = gh_response.json() + latest_server_version = Version(gh_response_json[0].get("tag_name")) + latest_server_date = gh_response_json[0].get("published_at") + except Exception as e: + log.debug("Failed to get release versions from github", e) + + if not server: + try: + # let's get the server from the cli config + host, token = cli_utils.get_host_and_token() + server = DataHubGraph(DatahubClientConfig(server=host, token=token)) + except Exception as e: + log.debug("Failed to get a valid server", e) + pass + + server_type = None + if server: + server_version_string = ( + server.server_config.get("versions", {}) + .get("linkedin/datahub", {}) + .get("version") + ) + commit_hash = ( + server.server_config.get("versions", {}) + .get("linkedin/datahub", {}) + .get("commit") + ) + server_type = server.server_config.get("datahub", {}).get( + "serverType", "unknown" + ) + current_server_release_date = None + if server_type == "quickstart" and commit_hash: + try: + # get the age of the commit from github + gh_commit_response = requests.get( + f"https://api.github.com/repos/datahub-project/datahub/commits/{commit_hash}", + headers={"Accept": "application/vnd.github.v3+json"}, + ) + if gh_commit_response.ok: + current_server_release_date = datetime.strptime( + gh_commit_response.json()["commit"]["author"]["date"], + "%Y-%m-%dT%H:%M:%S%z", + ) + except Exception as e: + log.debug(f"Failed to retrieve commit date due to {e}") + pass + + if server_version_string and server_version_string.startswith("v"): + server_version = Version(server_version_string[1:]) + + server_version_stats = ServerVersionStats( + current=VersionStats( + version=server_version, release_date=current_server_release_date + ), + latest=VersionStats( + version=latest_server_version, release_date=latest_server_date + ) + if latest_server_version + else None, + current_server_type=server_type, + ) + + if client_version_stats and server_version_stats: + return DataHubVersionStats( + server=server_version_stats, client=client_version_stats + ) + else: + return None + + +def get_days(time_point: Optional[Any]) -> str: + if time_point: + return "(released " + ( + humanfriendly.format_timespan( + datetime.now(timezone.utc) - time_point, max_units=1 + ) + + " ago)" + ) + else: + return "" + + +def valid_client_version(version: Version) -> bool: + """Only version strings like 0.4.5 and 0.6.7.8 are valid. 0.8.6.7rc1 is not""" + if version.is_prerelease or version.is_postrelease or version.is_devrelease: + return False + if version.major == 0 and version.minor in [8, 9, 10, 11]: + return True + + return False + + +def valid_server_version(version: Version) -> bool: + """Only version strings like 0.8.x or 0.9.x are valid. 0.1.x is not""" + if version.is_prerelease or version.is_postrelease or version.is_devrelease: + return False + + if version.major == 0 and version.minor in [8, 9]: + return True + + return False + + +def is_client_server_compatible(client: VersionStats, server: VersionStats) -> int: + """ + -ve implies client is behind server + 0 implies client and server are aligned + +ve implies server is ahead of client + """ + if not valid_client_version(client.version) or not valid_server_version( + server.version + ): + # we cannot evaluate compatibility, choose True as default + return 0 + return server.version.micro - client.version.micro + + +def maybe_print_upgrade_message( # noqa: C901 + server: Optional[DataHubGraph] = None, +) -> None: # noqa: C901 + days_before_cli_stale = 7 + days_before_quickstart_stale = 7 + + encourage_cli_upgrade = False + client_server_compat = 0 + encourage_quickstart_upgrade = False + try: + version_stats = retrieve_versions(server) + if not version_stats: + return + current_release_date = version_stats.client.current.release_date + latest_release_date = ( + version_stats.client.latest.release_date + if version_stats.client.latest + else None + ) + client_server_compat = is_client_server_compatible( + version_stats.client.current, version_stats.server.current + ) + + if latest_release_date and current_release_date: + assert version_stats.client.latest + time_delta = latest_release_date - current_release_date + if time_delta > timedelta(days=days_before_cli_stale): + encourage_cli_upgrade = True + current_version = version_stats.client.current.version + latest_version = version_stats.client.latest.version + + if ( + version_stats.server.current_server_type + and version_stats.server.current_server_type == "quickstart" + ): + # if we detect a quickstart server, we encourage upgrading the server too! + if version_stats.server.current.release_date: + time_delta = ( + datetime.now(timezone.utc) + - version_stats.server.current.release_date + ) + if time_delta > timedelta(days=days_before_quickstart_stale): + encourage_quickstart_upgrade = True + if version_stats.server.latest and ( + version_stats.server.latest.version + > version_stats.server.current.version + ): + encourage_quickstart_upgrade = True + + except Exception: + pass + + # Compute recommendations and print one + if client_server_compat < 0: + try: + assert version_stats + print( + colored("❗Client-Server Incompatible❗", "yellow"), + colored( + f"Your client version {version_stats.client.current.version} is newer than your server version {version_stats.server.current.version}. Downgrading the cli to {version_stats.server.current.version} is recommended.\n", + "cyan", + ), + colored( + f"➡️ Downgrade via `\"pip install 'acryl-datahub=={version_stats.server.current.version}'\"", + "cyan", + ), + ) + except Exception: + pass + elif client_server_compat > 0: + try: + assert version_stats + print( + colored("❗Client-Server Incompatible❗", "red"), + colored( + f"Your client version {version_stats.client.current.version} is older than your server version {version_stats.server.current.version}. Upgrading the cli to {version_stats.server.current.version} is recommended.\n", + "cyan", + ), + colored( + f"➡️ Upgrade via \"pip install 'acryl-datahub=={version_stats.server.current.version}'\"", + "cyan", + ), + ) + except Exception: + pass + + # we only encourage upgrades if we think client_server is currently compatible + elif client_server_compat == 0 and encourage_cli_upgrade: + try: + print( + colored("💡 Upgrade cli!", "yellow"), + colored( + f"You seem to be running an old version of datahub cli: {current_version} {get_days(current_release_date)}. Latest version is {latest_version} {get_days(latest_release_date)}.\nUpgrade via \"pip install -U 'acryl-datahub'\"", + "cyan", + ), + ) + except Exception: + pass + + elif encourage_quickstart_upgrade: + try: + assert version_stats + print( + colored("💡 Upgrade available!", "yellow"), + colored( + f'You seem to be running a slightly old quickstart image {get_days(version_stats.server.current.release_date)}. Run "datahub docker quickstart" to get the latest updates without losing any data!', + "cyan", + ), + ) + except Exception as e: + log.debug(f"Failed to suggest quickstart upgrade due to {e}") + pass + + +def check_upgrade(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + + res = func(*args, **kwargs) + try: + # ensure this cannot fail + maybe_print_upgrade_message() + except Exception as e: + log.debug(f"Failed to check for upgrade due to {e}") + pass + + return res + + return wrapper