fix(ingest): ensure upgrade checks run async (#5383)

This commit is contained in:
Shirshanka Das 2022-07-16 17:38:33 -07:00 committed by GitHub
parent 36f3ab3fa3
commit f387fa6149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 222 additions and 133 deletions

View File

@ -60,6 +60,7 @@ framework_common = {
"types-Deprecated",
"humanfriendly",
"packaging",
"aiohttp<4",
}
kafka_common = {
@ -266,7 +267,7 @@ plugins: Dict[str, Set[str]] = {
"redshift": sql_common | redshift_common,
"redshift-usage": sql_common | usage_common | redshift_common,
"sagemaker": aws_common,
"salesforce":{"simple-salesforce"},
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common,
"snowflake-usage": snowflake_common
| usage_common
@ -347,6 +348,7 @@ base_dev_requirements = {
"bigquery-usage",
"clickhouse",
"clickhouse-usage",
"delta-lake",
"druid",
"elasticsearch",
"ldap",
@ -363,7 +365,6 @@ base_dev_requirements = {
"redshift",
"redshift-usage",
"data-lake",
"delta-lake",
"s3",
"tableau",
"trino",
@ -432,6 +433,7 @@ full_test_dev_requirements = {
"feast-legacy",
"hana",
"hive",
"kafka-connect",
"ldap",
"mongodb",
"mssql",
@ -439,7 +441,6 @@ full_test_dev_requirements = {
"mariadb",
"snowflake",
"redash",
"kafka-connect",
"vertica",
]
for dependency in plugins[plugin]

View File

@ -1,4 +1,6 @@
import asyncio
import csv
import functools
import json
import logging
import os
@ -82,7 +84,6 @@ def ingest() -> None:
help="Suppress display of variable values in logs by suppressing elaborate stacktrace (stackprinter) during ingestion failures",
)
@click.pass_context
@upgrade.check_upgrade
@telemetry.with_telemetry
@memory_leak_detector.with_leak_detection
def run(
@ -96,6 +97,56 @@ def run(
) -> None:
"""Ingest metadata into DataHub."""
def run_pipeline_to_completion(pipeline: Pipeline) -> int:
logger.info("Starting metadata ingestion")
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.config.source.type}) report:\n{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}"
)
# We dont want to log sensitive information in variables if the pipeline fails due to
# an unexpected error. Disable printing sensitive info to logs if ingestion is running
# with `--suppress-error-logs` flag.
if suppress_error_logs:
raise SensitiveError() from e
else:
raise e
else:
logger.info("Finished metadata ingestion")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
return ret
async def run_pipeline_async(pipeline: Pipeline) -> int:
loop = asyncio._get_running_loop()
return await loop.run_in_executor(
None, functools.partial(run_pipeline_to_completion, pipeline)
)
async def run_func_check_upgrade(pipeline: Pipeline) -> None:
version_stats_future = asyncio.ensure_future(
upgrade.retrieve_version_stats(pipeline.ctx.graph)
)
the_one_future = asyncio.ensure_future(run_pipeline_async(pipeline))
ret = await the_one_future
# the one future has returned
if ret == 0:
try:
# we check the other futures quickly on success
version_stats = await asyncio.wait_for(version_stats_future, 0.5)
upgrade.maybe_print_upgrade_message(version_stats=version_stats)
except Exception as e:
logger.debug(
f"timed out with {e} waiting for version stats to be computed... skipping ahead."
)
sys.exit(ret)
logger.info("DataHub CLI version: %s", datahub_package.nice_version_name())
config_file = pathlib.Path(config)
@ -112,29 +163,8 @@ def run(
# in a SensitiveError to prevent detailed variable-level information from being logged.
raise SensitiveError() from e
logger.info("Starting metadata ingestion")
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.config.source.type}) report:\n{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}"
)
# We dont want to log sensitive information in variables if the pipeline fails due to
# an unexpected error. Disable printing sensitive info to logs if ingestion is running
# with `--suppress-error-logs` flag.
if suppress_error_logs:
raise SensitiveError() from e
else:
raise e
else:
logger.info("Finished metadata pipeline")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
upgrade.maybe_print_upgrade_message(pipeline.ctx.graph)
sys.exit(ret)
loop = asyncio.get_event_loop()
loop.run_until_complete(run_func_check_upgrade(pipeline))
def get_runs_url(gms_host: str) -> str:

View File

@ -164,7 +164,8 @@ def main(**kwargs):
if isinstance(exc, (ConfigurationError, ValidationError)):
logger.error(exc)
else:
logger.error(
# only print stacktraces during debug
logger.debug(
stackprinter.format(
exc,
line_wrap=MAX_CONTENT_WIDTH,
@ -184,11 +185,14 @@ def main(**kwargs):
**kwargs,
)
)
logger.error(
f"Command failed with {exc}. Run with --debug to get full trace"
)
logger.info(
f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}"
)
logger.info(
logger.debug(
f"Python version: {sys.version} at {sys.executable} on {platform.platform()}"
)
logger.info(f"GMS config {get_gms_config()}")
logger.debug(f"GMS config {get_gms_config()}")
sys.exit(1)

View File

@ -269,8 +269,10 @@ class Pipeline:
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)
except Exception as e:
logger.error(f"Failed to extract some records due to: {e}")
extractor.close()
if not self.dry_run:
self.sink.handle_work_unit_end(wu)

View File

@ -1,18 +1,20 @@
import asyncio
import contextlib
import functools
import logging
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import Any, Callable, Optional, TypeVar
from typing import Any, Callable, Optional, Tuple, TypeVar
import aiohttp
import humanfriendly
import requests
from packaging.version import Version
from pydantic import BaseModel
from termcolor import colored
from datahub import __version__
from datahub.cli import cli_utils
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.graph.client import DataHubGraph
log = logging.getLogger(__name__)
@ -41,123 +43,152 @@ class DataHubVersionStats(BaseModel):
client: ClientVersionStats
def retrieve_versions( # noqa: C901
server: Optional[DataHubGraph] = None,
) -> Optional[DataHubVersionStats]:
async def get_client_version_stats():
current_version_string = __version__
current_version = Version(current_version_string)
client_version_stats: ClientVersionStats = ClientVersionStats(
current=VersionStats(version=current_version, release_date=None), latest=None
)
server_version_stats: Optional[ServerVersionStats] = None
try:
response = requests.get("https://pypi.org/pypi/acryl_datahub/json")
if response.ok:
response_json = response.json()
releases = response_json.get("releases", [])
sorted_releases = sorted(releases.keys(), key=lambda x: Version(x))
latest_cli_release_string = [x for x in sorted_releases if "rc" not in x][
-1
]
latest_cli_release = Version(latest_cli_release_string)
current_version_info = releases.get(current_version_string)
current_version_date = None
if current_version_info:
current_version_date = datetime.strptime(
current_version_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S"
async with aiohttp.ClientSession() as session:
pypi_url = "https://pypi.org/pypi/acryl_datahub/json"
async with session.get(pypi_url) as resp:
response_json = await resp.json()
try:
releases = response_json.get("releases", [])
sorted_releases = sorted(releases.keys(), key=lambda x: Version(x))
latest_cli_release_string = [
x for x in sorted_releases if "rc" not in x
][-1]
latest_cli_release = Version(latest_cli_release_string)
current_version_info = releases.get(current_version_string)
current_version_date = None
if current_version_info:
current_version_date = datetime.strptime(
current_version_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S"
)
latest_release_info = releases.get(latest_cli_release_string)
latest_version_date = None
if latest_release_info:
latest_version_date = datetime.strptime(
latest_release_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S"
)
client_version_stats = ClientVersionStats(
current=VersionStats(
version=current_version, release_date=current_version_date
),
latest=VersionStats(
version=latest_cli_release, release_date=latest_version_date
),
)
latest_release_info = releases.get(latest_cli_release_string)
latest_version_date = None
if latest_release_info:
latest_version_date = datetime.strptime(
latest_release_info[0].get("upload_time"), "%Y-%m-%dT%H:%M:%S"
)
client_version_stats = ClientVersionStats(
current=VersionStats(
version=current_version, release_date=current_version_date
),
latest=VersionStats(
version=latest_cli_release, release_date=latest_version_date
),
)
except Exception as e:
log.debug(f"Failed to determine cli releases from pypi due to {e}")
pass
except Exception as e:
log.debug(f"Failed to determine cli releases from pypi due to {e}")
pass
return client_version_stats
latest_server_version: Optional[Version] = None
latest_server_date: Optional[datetime] = None
server_version: Optional[Version] = None
try:
gh_response = requests.get(
"https://api.github.com/repos/datahub-project/datahub/releases",
headers={"Accept": "application/vnd.github.v3+json"},
)
if gh_response.ok:
gh_response_json = gh_response.json()
async def get_github_stats():
async with aiohttp.ClientSession(
headers={"Accept": "application/vnd.github.v3+json"}
) as session:
gh_url = "https://api.github.com/repos/datahub-project/datahub/releases"
async with session.get(gh_url) as gh_response:
gh_response_json = await gh_response.json()
latest_server_version = Version(gh_response_json[0].get("tag_name"))
latest_server_date = gh_response_json[0].get("published_at")
except Exception as e:
log.debug("Failed to get release versions from github", e)
return (latest_server_version, latest_server_date)
async def get_server_config(gms_url: str, token: str) -> dict:
async with aiohttp.ClientSession(
headers={
"X-RestLi-Protocol-Version": "2.0.0",
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
) as session:
config_endpoint = f"{gms_url}/config"
async with session.get(config_endpoint) as dh_response:
dh_response_json = await dh_response.json()
return dh_response_json
async def get_server_version_stats(
server: Optional[DataHubGraph] = None,
) -> Tuple[Any, Any, Any]:
server_version: Optional[Version] = None
server_config = None
if not server:
try:
# let's get the server from the cli config
host, token = cli_utils.get_url_and_token()
server = DataHubGraph(DatahubClientConfig(server=host, token=token))
server_config = await get_server_config(host, token)
log.debug(f"server_config:{server_config}")
except Exception as e:
log.debug("Failed to get a valid server", e)
pass
else:
server_config = server.server_config
server_type = None
if server:
if server_config:
server_version_string = (
server.server_config.get("versions", {})
.get("linkedin/datahub", {})
.get("version")
server_config.get("versions", {}).get("linkedin/datahub", {}).get("version")
)
commit_hash = (
server.server_config.get("versions", {})
.get("linkedin/datahub", {})
.get("commit")
)
server_type = server.server_config.get("datahub", {}).get(
"serverType", "unknown"
server_config.get("versions", {}).get("linkedin/datahub", {}).get("commit")
)
server_type = server_config.get("datahub", {}).get("serverType", "unknown")
current_server_release_date = None
if server_type == "quickstart" and commit_hash:
try:
# get the age of the commit from github
gh_commit_response = requests.get(
f"https://api.github.com/repos/datahub-project/datahub/commits/{commit_hash}",
headers={"Accept": "application/vnd.github.v3+json"},
)
if gh_commit_response.ok:
async with aiohttp.ClientSession(
headers={"Accept": "application/vnd.github.v3+json"}
) as session:
gh_url = f"https://api.github.com/repos/datahub-project/datahub/commits/{commit_hash}"
async with session.get(gh_url) as gh_response:
gh_commit_response = await gh_response.json()
current_server_release_date = datetime.strptime(
gh_commit_response.json()["commit"]["author"]["date"],
gh_commit_response["commit"]["author"]["date"],
"%Y-%m-%dT%H:%M:%S%z",
)
except Exception as e:
log.debug(f"Failed to retrieve commit date due to {e}")
pass
if server_version_string and server_version_string.startswith("v"):
server_version = Version(server_version_string[1:])
server_version_stats = ServerVersionStats(
current=VersionStats(
version=server_version, release_date=current_server_release_date
),
latest=VersionStats(
version=latest_server_version, release_date=latest_server_date
)
if latest_server_version
else None,
current_server_type=server_type,
)
return (server_type, server_version, current_server_release_date)
async def retrieve_version_stats(
server: Optional[DataHubGraph] = None,
) -> Optional[DataHubVersionStats]:
client_version_stats_future = asyncio.ensure_future(get_client_version_stats())
github_stats_future = asyncio.ensure_future(get_github_stats())
server_config_future = asyncio.ensure_future(get_server_version_stats(server))
tasks = [client_version_stats_future, github_stats_future, server_config_future]
while len(tasks):
done, pending = await asyncio.wait(tasks, timeout=0.1)
for t in done:
if t == client_version_stats_future:
client_version_stats = t.result()
elif t == github_stats_future:
(last_server_version, last_server_date) = t.result()
elif t == server_config_future:
(
current_server_type,
current_server_version,
current_server_release_date,
) = server_config_future.result()
tasks.remove(t)
server_version_stats = ServerVersionStats(
current=VersionStats(
version=current_server_version, release_date=current_server_release_date
),
latest=VersionStats(version=last_server_version, release_date=last_server_date)
if last_server_version
else None,
current_server_type=current_server_type,
)
if client_version_stats and server_version_stats:
return DataHubVersionStats(
server=server_version_stats, client=client_version_stats
@ -214,7 +245,7 @@ def is_client_server_compatible(client: VersionStats, server: VersionStats) -> i
def maybe_print_upgrade_message( # noqa: C901
server: Optional[DataHubGraph] = None,
version_stats: Optional[DataHubVersionStats],
) -> None: # noqa: C901
days_before_cli_stale = 7
days_before_quickstart_stale = 7
@ -223,9 +254,11 @@ def maybe_print_upgrade_message( # noqa: C901
client_server_compat = 0
encourage_quickstart_upgrade = False
with contextlib.suppress(Exception):
version_stats = retrieve_versions(server)
if not version_stats:
log.debug("No version stats found")
return
else:
log.debug(f"Version stats found: {version_stats}")
current_release_date = version_stats.client.current.release_date
latest_release_date = (
version_stats.client.latest.release_date
@ -255,11 +288,17 @@ def maybe_print_upgrade_message( # noqa: C901
- version_stats.server.current.release_date
)
if time_delta > timedelta(days=days_before_quickstart_stale):
log.debug(
f"will encourage upgrade due to server being old {version_stats.server.current.release_date},{time_delta}"
)
encourage_quickstart_upgrade = True
if version_stats.server.latest and (
version_stats.server.latest.version
> version_stats.server.current.version
):
log.debug(
f"Will encourage upgrade due to newer version of server {version_stats.server.latest.version} being available compared to {version_stats.server.current.version}"
)
encourage_quickstart_upgrade = True
# Compute recommendations and print one
@ -317,16 +356,29 @@ def maybe_print_upgrade_message( # noqa: C901
def check_upgrade(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
def async_wrapper(*args: Any, **kwargs: Any) -> Any:
async def run_inner_func():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, functools.partial(func, *args, **kwargs)
)
res = func(*args, **kwargs)
try:
# ensure this cannot fail
maybe_print_upgrade_message()
except Exception as e:
log.debug(f"Failed to check for upgrade due to {e}")
pass
async def run_func_check_upgrade():
version_stats_future = asyncio.ensure_future(retrieve_version_stats())
the_one_future = asyncio.ensure_future(run_inner_func())
ret = await the_one_future
return res
# the one future has returned
# we check the other futures quickly
try:
version_stats = await asyncio.wait_for(version_stats_future, 0.5)
maybe_print_upgrade_message(version_stats=version_stats)
except Exception:
log.debug("timed out waiting for version stats to be computed")
return wrapper
return ret
loop = asyncio.get_event_loop()
loop.run_until_complete(run_func_check_upgrade())
return async_wrapper

View File

@ -1,7 +1,7 @@
version: "2"
services:
clickhouse:
image: yandex/clickhouse-server:22.1
image: clickhouse/clickhouse-server:22.5.2
container_name: "testclickhouse"
environment:
CLICKHOUSE_USER: clickhouseuser

View File

@ -2,7 +2,7 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
image: confluentinc/cp-zookeeper:7.2.0
env_file: zookeeper.env
hostname: test_zookeeper
ports:
@ -11,7 +11,7 @@ services:
- test_zkdata:/var/opt/zookeeper
broker:
image: confluentinc/cp-kafka:5.4.0
image: confluentinc/cp-kafka:7.2.0
env_file: broker.env
hostname: test_broker
container_name: test_broker
@ -21,7 +21,7 @@ services:
- "59092:59092"
schema-registry:
image: confluentinc/cp-schema-registry:5.4.0
image: confluentinc/cp-schema-registry:7.2.0
env_file: schema-registry.env
container_name: test_schema_registry
depends_on: