mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-26 01:23:16 +00:00
fix(cli): rest emitter should override config and env variables (#4622)
* fix(cli): rest emitter should override env variables * fix(cli): change to not update env variables, small refactor * fix bug
This commit is contained in:
parent
15474cdad5
commit
98d4fd4ea9
@ -1,5 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
import sys
|
import sys
|
||||||
import typing
|
import typing
|
||||||
@ -66,6 +67,8 @@ ENV_SKIP_CONFIG = "DATAHUB_SKIP_CONFIG"
|
|||||||
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
|
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
|
||||||
ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN"
|
ENV_METADATA_TOKEN = "DATAHUB_GMS_TOKEN"
|
||||||
|
|
||||||
|
config_override: Dict = {}
|
||||||
|
|
||||||
|
|
||||||
class GmsConfig(BaseModel):
|
class GmsConfig(BaseModel):
|
||||||
server: str
|
server: str
|
||||||
@ -76,6 +79,13 @@ class DatahubConfig(BaseModel):
|
|||||||
gms: GmsConfig
|
gms: GmsConfig
|
||||||
|
|
||||||
|
|
||||||
|
def set_env_variables_override_config(host: str, token: Optional[str]) -> None:
|
||||||
|
"""Should be used to override the config when using rest emitter"""
|
||||||
|
config_override[ENV_METADATA_HOST] = host
|
||||||
|
if token is not None:
|
||||||
|
config_override[ENV_METADATA_TOKEN] = token
|
||||||
|
|
||||||
|
|
||||||
def write_datahub_config(host: str, token: Optional[str]) -> None:
|
def write_datahub_config(host: str, token: Optional[str]) -> None:
|
||||||
config = {
|
config = {
|
||||||
"gms": {
|
"gms": {
|
||||||
@ -137,22 +147,12 @@ def guess_entity_type(urn: str) -> str:
|
|||||||
return urn.split(":")[2]
|
return urn.split(":")[2]
|
||||||
|
|
||||||
|
|
||||||
def get_token():
|
def get_host_and_token():
|
||||||
_, gms_token_env = get_details_from_env()
|
|
||||||
if should_skip_config():
|
|
||||||
gms_token = gms_token_env
|
|
||||||
else:
|
|
||||||
ensure_datahub_config()
|
|
||||||
_, gms_token_conf = get_details_from_config()
|
|
||||||
gms_token = first_non_null([gms_token_env, gms_token_conf])
|
|
||||||
return gms_token
|
|
||||||
|
|
||||||
|
|
||||||
def get_session_and_host():
|
|
||||||
session = requests.Session()
|
|
||||||
|
|
||||||
gms_host_env, gms_token_env = get_details_from_env()
|
gms_host_env, gms_token_env = get_details_from_env()
|
||||||
if should_skip_config():
|
if len(config_override.keys()) > 0:
|
||||||
|
gms_host = config_override.get(ENV_METADATA_HOST)
|
||||||
|
gms_token = config_override.get(ENV_METADATA_TOKEN)
|
||||||
|
elif should_skip_config():
|
||||||
gms_host = gms_host_env
|
gms_host = gms_host_env
|
||||||
gms_token = gms_token_env
|
gms_token = gms_token_env
|
||||||
else:
|
else:
|
||||||
@ -160,6 +160,17 @@ def get_session_and_host():
|
|||||||
gms_host_conf, gms_token_conf = get_details_from_config()
|
gms_host_conf, gms_token_conf = get_details_from_config()
|
||||||
gms_host = first_non_null([gms_host_env, gms_host_conf])
|
gms_host = first_non_null([gms_host_env, gms_host_conf])
|
||||||
gms_token = first_non_null([gms_token_env, gms_token_conf])
|
gms_token = first_non_null([gms_token_env, gms_token_conf])
|
||||||
|
return gms_host, gms_token
|
||||||
|
|
||||||
|
|
||||||
|
def get_token():
|
||||||
|
return get_host_and_token()[1]
|
||||||
|
|
||||||
|
|
||||||
|
def get_session_and_host():
|
||||||
|
session = requests.Session()
|
||||||
|
|
||||||
|
gms_host, gms_token = get_host_and_token()
|
||||||
|
|
||||||
if gms_host is None or gms_host.strip() == "":
|
if gms_host is None or gms_host.strip() == "":
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -4,6 +4,7 @@ import logging
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Union, cast
|
from typing import Union, cast
|
||||||
|
|
||||||
|
from datahub.cli.cli_utils import set_env_variables_override_config
|
||||||
from datahub.configuration.common import OperationalError
|
from datahub.configuration.common import OperationalError
|
||||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||||
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
||||||
@ -57,7 +58,9 @@ class DatahubRestSink(Sink):
|
|||||||
.get("linkedin/datahub", {})
|
.get("linkedin/datahub", {})
|
||||||
.get("version", "")
|
.get("version", "")
|
||||||
)
|
)
|
||||||
logger.info("Setting gms config")
|
logger.debug("Setting env variables to override config")
|
||||||
|
set_env_variables_override_config(self.config.server, self.config.token)
|
||||||
|
logger.debug("Setting gms config")
|
||||||
set_gms_config(gms_config)
|
set_gms_config(gms_config)
|
||||||
self.executor = concurrent.futures.ThreadPoolExecutor(
|
self.executor = concurrent.futures.ThreadPoolExecutor(
|
||||||
max_workers=self.config.max_threads
|
max_workers=self.config.max_threads
|
||||||
|
Loading…
x
Reference in New Issue
Block a user