mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-07 06:54:48 +00:00
feat(ingest): fix validators (#10115)
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
parent
07ef677ad3
commit
25d9d6656c
@ -3,7 +3,7 @@ from typing import Dict, Optional, Set
|
||||
from pydantic import validator
|
||||
from pydantic.fields import Field
|
||||
|
||||
from datahub.configuration.common import ConfigModel, ConfigurationError
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
|
||||
from datahub.metadata.schema_classes import FabricTypeClass
|
||||
|
||||
@ -45,7 +45,7 @@ class EnvConfigMixin(ConfigModel):
|
||||
@validator("env")
|
||||
def env_must_be_one_of(cls, v: str) -> str:
|
||||
if v.upper() not in ALL_ENV_TYPES:
|
||||
raise ConfigurationError(f"env must be one of {ALL_ENV_TYPES}, found {v}")
|
||||
raise ValueError(f"env must be one of {ALL_ENV_TYPES}, found {v}")
|
||||
return v.upper()
|
||||
|
||||
|
||||
|
@ -122,7 +122,7 @@ class BigQueryAuditLogApi:
|
||||
)
|
||||
|
||||
for i, entry in enumerate(list_entries):
|
||||
if i % 1000 == 0:
|
||||
if i > 0 and i % 1000 == 0:
|
||||
logger.info(
|
||||
f"Loaded {i} log entries from GCP Log for {client.project}"
|
||||
)
|
||||
|
@ -14,7 +14,7 @@ from okta.models import Group, GroupProfile, User, UserProfile, UserStatus
|
||||
from pydantic import validator
|
||||
from pydantic.fields import Field
|
||||
|
||||
from datahub.configuration.common import ConfigModel, ConfigurationError
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
@ -157,7 +157,7 @@ class OktaConfig(StatefulIngestionConfigBase, ConfigModel):
|
||||
@validator("okta_users_search")
|
||||
def okta_users_one_of_filter_or_search(cls, v, values):
|
||||
if v and values["okta_users_filter"]:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"Only one of okta_users_filter or okta_users_search can be set"
|
||||
)
|
||||
return v
|
||||
@ -165,7 +165,7 @@ class OktaConfig(StatefulIngestionConfigBase, ConfigModel):
|
||||
@validator("okta_groups_search")
|
||||
def okta_groups_one_of_filter_or_search(cls, v, values):
|
||||
if v and values["okta_groups_filter"]:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"Only one of okta_groups_filter or okta_groups_search can be set"
|
||||
)
|
||||
return v
|
||||
|
@ -8,7 +8,7 @@ from pydantic import Field, validator
|
||||
from typing_extensions import ClassVar
|
||||
|
||||
from datahub.configuration import ConfigModel
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
from datahub.configuration.source_common import (
|
||||
EnvConfigMixin,
|
||||
PlatformInstanceConfigMixin,
|
||||
@ -59,11 +59,11 @@ class NamingPattern(ConfigModel):
|
||||
|
||||
for v in variables:
|
||||
if v not in self.ALLOWED_VARS:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Failed to find {v} in allowed_variables {self.ALLOWED_VARS}"
|
||||
)
|
||||
if at_least_one and len(variables) == 0:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Failed to find any variable assigned to pattern {self.pattern}. Must have at least one. {self.allowed_docstring()}"
|
||||
)
|
||||
return True
|
||||
|
@ -8,11 +8,7 @@ from pydantic.fields import Field
|
||||
|
||||
import datahub.metadata.schema_classes as models
|
||||
from datahub.cli.cli_utils import get_aspects_for_entity
|
||||
from datahub.configuration.common import (
|
||||
ConfigModel,
|
||||
ConfigurationError,
|
||||
VersionedConfig,
|
||||
)
|
||||
from datahub.configuration.common import ConfigModel, VersionedConfig
|
||||
from datahub.configuration.config_loader import load_config_file
|
||||
from datahub.configuration.source_common import EnvConfigMixin
|
||||
from datahub.emitter.mce_builder import (
|
||||
@ -57,11 +53,19 @@ class EntityConfig(EnvConfigMixin):
|
||||
def type_must_be_supported(cls, v: str) -> str:
|
||||
allowed_types = ["dataset"]
|
||||
if v not in allowed_types:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Type must be one of {allowed_types}, {v} is not yet supported."
|
||||
)
|
||||
return v
|
||||
|
||||
@validator("name")
|
||||
def validate_name(cls, v: str) -> str:
|
||||
if v.startswith("urn:li:"):
|
||||
raise ValueError(
|
||||
"Name should not start with urn:li: - use a plain name, not an urn"
|
||||
)
|
||||
return v
|
||||
|
||||
|
||||
class FineGrainedLineageConfig(ConfigModel):
|
||||
upstreamType: str = "FIELD_SET"
|
||||
@ -79,7 +83,7 @@ class FineGrainedLineageConfig(ConfigModel):
|
||||
FineGrainedLineageUpstreamType.NONE,
|
||||
]
|
||||
if v not in allowed_types:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Upstream Type must be one of {allowed_types}, {v} is not yet supported."
|
||||
)
|
||||
return v
|
||||
|
@ -7,7 +7,7 @@ from typing import Dict, Iterable, Optional, Tuple
|
||||
from pydantic import validator
|
||||
from pydantic.fields import Field
|
||||
|
||||
from datahub.configuration.common import ConfigModel, ConfigurationError
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.emitter.mce_builder import make_tag_urn
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
@ -87,9 +87,7 @@ class OpenApiConfig(ConfigModel):
|
||||
cls, bearer_token: Optional[str], values: Dict
|
||||
) -> Optional[str]:
|
||||
if bearer_token is not None and values.get("token") is not None:
|
||||
raise ConfigurationError(
|
||||
"Unable to use 'token' and 'bearer_token' together."
|
||||
)
|
||||
raise ValueError("Unable to use 'token' and 'bearer_token' together.")
|
||||
return bearer_token
|
||||
|
||||
def get_swagger(self) -> Dict:
|
||||
|
@ -2,7 +2,7 @@ import re
|
||||
|
||||
import pydantic
|
||||
|
||||
from datahub.configuration.common import ConfigModel, ConfigurationError
|
||||
from datahub.configuration.common import ConfigModel
|
||||
|
||||
# Regexp for sharded tables.
|
||||
# A sharded table is a table that has a suffix of the form _yyyymmdd or yyyymmdd, where yyyymmdd is a date.
|
||||
@ -38,7 +38,7 @@ class BigQueryBaseConfig(ConfigModel):
|
||||
try:
|
||||
re.compile(v)
|
||||
except Exception as e:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"sharded_table_pattern configuration pattern is invalid. The exception was: {e}"
|
||||
)
|
||||
return v
|
||||
|
@ -2,7 +2,7 @@ from typing import Any, Dict
|
||||
|
||||
import pydantic
|
||||
|
||||
from datahub.configuration.common import ConfigModel, ConfigurationError
|
||||
from datahub.configuration.common import ConfigModel
|
||||
|
||||
|
||||
class CSVEnricherConfig(ConfigModel):
|
||||
@ -24,7 +24,7 @@ class CSVEnricherConfig(ConfigModel):
|
||||
@pydantic.validator("write_semantics")
|
||||
def validate_write_semantics(cls, write_semantics: str) -> str:
|
||||
if write_semantics.lower() not in {"patch", "override"}:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"write_semantics cannot be any other value than PATCH or OVERRIDE. Default value is PATCH. "
|
||||
"For PATCH semantics consider using the datahub-rest sink or "
|
||||
"provide a datahub_api: configuration on your ingestion recipe"
|
||||
@ -34,7 +34,7 @@ class CSVEnricherConfig(ConfigModel):
|
||||
@pydantic.validator("array_delimiter")
|
||||
def validator_diff(cls, array_delimiter: str, values: Dict[str, Any]) -> str:
|
||||
if array_delimiter == values["delimiter"]:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"array_delimiter and delimiter are the same. Please choose different delimiters."
|
||||
)
|
||||
return array_delimiter
|
||||
|
@ -5,7 +5,7 @@ from typing import Any, Dict, Optional
|
||||
import pydantic
|
||||
from pydantic.fields import Field
|
||||
|
||||
from datahub.configuration.common import ConfigModel, ConfigurationError
|
||||
from datahub.configuration.common import ConfigModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -34,7 +34,7 @@ class OperationConfig(ConfigModel):
|
||||
and profile_day_of_week is None
|
||||
and profile_date_of_month is None
|
||||
):
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"Lower freq profiling setting is enabled but no day of week or date of month is specified. Profiling will be done.",
|
||||
)
|
||||
return values
|
||||
@ -45,7 +45,7 @@ class OperationConfig(ConfigModel):
|
||||
if profile_day_of_week is None:
|
||||
return None
|
||||
if profile_day_of_week < 0 or profile_day_of_week > 6:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Invalid value {profile_day_of_week} for profile_day_of_week. Must be between 0 to 6 (both inclusive)."
|
||||
)
|
||||
return profile_day_of_week
|
||||
@ -56,7 +56,7 @@ class OperationConfig(ConfigModel):
|
||||
if profile_date_of_month is None:
|
||||
return None
|
||||
if profile_date_of_month < 1 or profile_date_of_month > 31:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Invalid value {profile_date_of_month} for profile_date_of_month. Must be between 1 to 31 (both inclusive)."
|
||||
)
|
||||
return profile_date_of_month
|
||||
|
@ -4,7 +4,7 @@ from urllib.parse import urlparse
|
||||
|
||||
from pydantic import Field, validator
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
from datahub.configuration.source_common import (
|
||||
EnvConfigMixin,
|
||||
PlatformInstanceConfigMixin,
|
||||
@ -102,7 +102,7 @@ class PulsarSourceConfig(
|
||||
cls, token: Optional[str], values: Dict[str, Optional[str]]
|
||||
) -> Optional[str]:
|
||||
if token is not None and values.get("issuer_url") is not None:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"Expected only one authentication method, either issuer_url or token."
|
||||
)
|
||||
return token
|
||||
@ -114,7 +114,7 @@ class PulsarSourceConfig(
|
||||
if values.get("issuer_url") is not None and (
|
||||
client_secret is None or values.get("client_id") is None
|
||||
):
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"Missing configuration: client_id and client_secret are mandatory when issuer_url is set."
|
||||
)
|
||||
return client_secret
|
||||
@ -125,12 +125,10 @@ class PulsarSourceConfig(
|
||||
url = urlparse(val)
|
||||
|
||||
if url.scheme not in ["http", "https"]:
|
||||
raise ConfigurationError(
|
||||
f"Scheme should be http or https, found {url.scheme}"
|
||||
)
|
||||
raise ValueError(f"Scheme should be http or https, found {url.scheme}")
|
||||
|
||||
if not _is_valid_hostname(url.hostname.__str__()):
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
f"Not a valid hostname, hostname contains invalid characters, found {url.hostname}"
|
||||
)
|
||||
|
||||
|
@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional
|
||||
import pydantic
|
||||
|
||||
from datahub.configuration import ConfigModel
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
from datahub.configuration.source_common import EnvConfigMixin
|
||||
from datahub.configuration.validate_field_removal import pydantic_removed_field
|
||||
from datahub.configuration.validate_multiline_string import pydantic_multiline_string
|
||||
@ -151,7 +151,7 @@ class BigQueryUsageConfig(BigQueryBaseConfig, EnvConfigMixin, BaseUsageConfig):
|
||||
@pydantic.validator("use_exported_bigquery_audit_metadata")
|
||||
def use_exported_bigquery_audit_metadata_uses_v2(cls, v, values):
|
||||
if v is True and not values["use_v2_audit_metadata"]:
|
||||
raise ConfigurationError(
|
||||
raise ValueError(
|
||||
"To use exported BigQuery audit metadata, you must also use v2 audit metadata"
|
||||
)
|
||||
return v
|
||||
|
@ -6,7 +6,6 @@ from typing import Any, Dict, List, Tuple
|
||||
import pydantic
|
||||
import pytest
|
||||
|
||||
from datahub.configuration.common import ConfigurationError
|
||||
from datahub.ingestion.source.elastic_search import (
|
||||
CollapseUrns,
|
||||
ElasticsearchSourceConfig,
|
||||
@ -19,7 +18,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_elasticsearch_throws_error_wrong_operation_config():
|
||||
with pytest.raises(ConfigurationError):
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
ElasticsearchSourceConfig.parse_obj(
|
||||
{
|
||||
"profiling": {
|
||||
|
@ -3,8 +3,8 @@ from typing import List
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
from pydantic import ValidationError
|
||||
|
||||
from datahub.configuration.common import ConfigurationError
|
||||
from datahub.ingestion.source.metadata.lineage import LineageConfig, _get_lineage_mcp
|
||||
from datahub.metadata.schema_classes import FineGrainedLineageClass, UpstreamClass
|
||||
|
||||
@ -196,25 +196,25 @@ def test_basic_lineage_finegrained_upstream_urns(basic_mcp):
|
||||
def test_unsupported_entity_type():
|
||||
"""
|
||||
Checks to see how we handle the case of unsupported entity types.
|
||||
If validation is working correctly, it should raise a ConfigurationError
|
||||
If validation is working correctly, it should raise a ValidationError
|
||||
"""
|
||||
with pytest.raises(ConfigurationError):
|
||||
with pytest.raises(ValidationError):
|
||||
unsupported_entity_type_mcp()
|
||||
|
||||
|
||||
def test_unsupported_upstream_entity_type():
|
||||
"""
|
||||
Checks to see how invalid types work in the upstream node.
|
||||
If validation is working correctly, it should raise a ConfigurationError
|
||||
If validation is working correctly, it should raise a ValidationError
|
||||
"""
|
||||
with pytest.raises(ConfigurationError):
|
||||
with pytest.raises(ValidationError):
|
||||
unsupported_upstream_entity_type_mcp()
|
||||
|
||||
|
||||
def test_unsupported_entity_env():
|
||||
"""
|
||||
Checks to see how invalid envs work.
|
||||
If validation is working correctly, it should raise a ConfigurationError
|
||||
If validation is working correctly, it should raise a ValidationError
|
||||
"""
|
||||
with pytest.raises(ConfigurationError):
|
||||
with pytest.raises(ValidationError):
|
||||
unsupported_entity_env_mcp()
|
||||
|
@ -4,7 +4,6 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from datahub.configuration.common import ConfigurationError
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.source.pulsar import (
|
||||
@ -24,6 +23,9 @@ mock_schema_response: Dict[str, Any] = {
|
||||
|
||||
|
||||
class TestPulsarSourceConfig:
|
||||
# TODO: While these tests work, we really shouldn't be calling pydantic
|
||||
# validator methods directly.
|
||||
|
||||
def test_pulsar_source_config_valid_web_service_url(self):
|
||||
assert (
|
||||
PulsarSourceConfig().web_service_url_scheme_host_port(
|
||||
@ -34,7 +36,7 @@ class TestPulsarSourceConfig:
|
||||
|
||||
def test_pulsar_source_config_invalid_web_service_url_scheme(self):
|
||||
with pytest.raises(
|
||||
ConfigurationError, match=r"Scheme should be http or https, found ftp"
|
||||
ValueError, match=r"Scheme should be http or https, found ftp"
|
||||
):
|
||||
PulsarSourceConfig().web_service_url_scheme_host_port(
|
||||
"ftp://localhost:8080/"
|
||||
@ -42,7 +44,7 @@ class TestPulsarSourceConfig:
|
||||
|
||||
def test_pulsar_source_config_invalid_web_service_url_host(self):
|
||||
with pytest.raises(
|
||||
ConfigurationError,
|
||||
ValueError,
|
||||
match=r"Not a valid hostname, hostname contains invalid characters, found localhost&",
|
||||
):
|
||||
PulsarSourceConfig().web_service_url_scheme_host_port(
|
||||
|
Loading…
x
Reference in New Issue
Block a user