refactor(ingest): avoid allowing extras for all DataHubGraphConfig (#7448)

This commit is contained in:
Harshal Sheth 2023-02-28 10:42:31 -08:00 committed by GitHub
parent 639bbcfa86
commit 73493c577b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 14 deletions

View File

@ -4,6 +4,7 @@ output
pvenv36/
/venv*/
bq_credentials.json
junit.*.xml
/tmp
# Byte-compiled / optimized / DLL files

View File

@ -1,9 +1,8 @@
import json
import logging
from json.decoder import JSONDecodeError
from typing import Any, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Dict, Iterable, List, Optional, Type
import pydantic
from avro.schema import RecordSchema
from deprecated import deprecated
from requests.adapters import Response
@ -48,15 +47,13 @@ class DatahubClientConfig(ConfigModel):
disable_ssl_verification: bool = False
class DataHubGraphConfig(DatahubClientConfig):
class Config:
extra = (
pydantic.Extra.allow
) # lossy to allow interop with DataHubRestSinkConfig
# Alias for backwards compatibility.
# DEPRECATION: Remove in v0.10.2.
DataHubGraphConfig = DatahubClientConfig
class DataHubGraph(DatahubRestEmitter):
def __init__(self, config: Union[DatahubClientConfig, DataHubGraphConfig]) -> None:
def __init__(self, config: DatahubClientConfig) -> None:
self.config = config
super().__init__(
gms_server=self.config.server,
@ -456,4 +453,4 @@ class DataHubGraph(DatahubRestEmitter):
def get_default_graph() -> DataHubGraph:
(url, token) = get_url_and_token()
return DataHubGraph(DataHubGraphConfig(server=url, token=token))
return DataHubGraph(DatahubClientConfig(server=url, token=token))

View File

@ -8,7 +8,7 @@ from pydantic import Field, root_validator, validator
from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration import config_loader
from datahub.configuration.common import ConfigModel, DynamicTypedConfig
from datahub.ingestion.graph.client import DataHubGraphConfig
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.sink.file import FileSinkConfig
logger = logging.getLogger(__name__)
@ -47,7 +47,7 @@ class PipelineConfig(ConfigModel):
transformers: Optional[List[DynamicTypedConfig]]
reporting: List[ReporterConfig] = []
run_id: str = DEFAULT_RUN_ID
datahub_api: Optional[DataHubGraphConfig] = None
datahub_api: Optional[DatahubClientConfig] = None
pipeline_name: Optional[str] = None
failure_log: FailureLoggingConfig = FailureLoggingConfig()
@ -91,13 +91,13 @@ class PipelineConfig(ConfigModel):
@validator("datahub_api", always=True)
def datahub_api_should_use_rest_sink_as_default(
cls, v: Optional[DataHubGraphConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DataHubGraphConfig]:
cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DatahubClientConfig]:
if v is None and "sink" in values and hasattr(values["sink"], "type"):
sink_type = values["sink"].type
if sink_type == "datahub-rest":
sink_config = values["sink"].config
v = DataHubGraphConfig.parse_obj(sink_config)
v = DatahubClientConfig.parse_obj_allow_extras(sink_config)
return v
@classmethod