fix(ingest): add missing optional defaults for pydantic (#14342)

This commit is contained in:
Harshal Sheth 2025-08-06 11:15:33 -07:00 committed by GitHub
parent ed304f03cd
commit dd89fc0ad1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 38 additions and 48 deletions

View File

@ -136,7 +136,7 @@ class StructuredPropertiesHelper:
class SchemaFieldSpecification(StrictModel):
id: Optional[str] = None
urn: Optional[str] = None
urn: Optional[str] = Field(None, validate_default=True)
structured_properties: Optional[StructuredProperties] = None
type: Optional[str] = None
nativeDataType: Optional[str] = None
@ -382,9 +382,9 @@ class Dataset(StrictModel):
id: Optional[str] = None
platform: Optional[str] = None
env: str = "PROD"
urn: Optional[str] = None
urn: Optional[str] = Field(None, validate_default=True)
description: Optional[str] = None
name: Optional[str] = None
name: Optional[str] = Field(None, validate_default=True)
schema_metadata: Optional[SchemaSpecification] = Field(default=None, alias="schema")
downstreams: Optional[List[str]] = None
properties: Optional[Dict[str, str]] = None

View File

@ -5,7 +5,7 @@ from pathlib import Path
from typing import List, Optional, Union
import yaml
from pydantic import validator
from pydantic import Field, validator
from ruamel.yaml import YAML
from typing_extensions import Literal
@ -67,7 +67,7 @@ class Prompt(ConfigModel):
description: Optional[str] = None
type: str
structured_property_id: Optional[str] = None
structured_property_urn: Optional[str] = None
structured_property_urn: Optional[str] = Field(default=None, validate_default=True)
required: Optional[bool] = None
@validator("structured_property_urn", pre=True, always=True)
@ -111,7 +111,7 @@ class Actors(ConfigModel):
class Forms(ConfigModel):
id: Optional[str] = None
urn: Optional[str] = None
urn: Optional[str] = Field(default=None, validate_default=True)
name: str
description: Optional[str] = None
prompts: List[Prompt] = []

View File

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Iterable, List, Optional, Union
import yaml
from pydantic import StrictStr, validator
from pydantic import Field, StrictStr, validator
from ruamel.yaml import YAML
from datahub.configuration.common import ConfigModel
@ -68,7 +68,7 @@ class TypeQualifierAllowedTypes(ConfigModel):
class StructuredProperties(ConfigModel):
id: Optional[str] = None
urn: Optional[str] = None
urn: Optional[str] = Field(None, validate_default=True)
qualified_name: Optional[str] = None
type: str
value_entity_types: Optional[List[str]] = None

View File

@ -44,7 +44,7 @@ def get_minimum_supported_version_message(version: str) -> str:
class QuickstartExecutionPlan(BaseModel):
composefile_git_ref: str
docker_tag: str
mysql_tag: Optional[str]
mysql_tag: Optional[str] = None
def _is_it_a_version(version: str) -> bool:

View File

@ -90,7 +90,7 @@ class EnsureAspectSizeProcessor:
on GMS side and failure of the entire ingestion. This processor will attempt to trim suspected aspects.
"""
for wu in stream:
logger.debug(f"Ensuring size of workunit: {wu.id}")
# logger.debug(f"Ensuring size of workunit: {wu.id}")
if schema := wu.get_aspect_of_type(SchemaMetadataClass):
self.ensure_schema_metadata_size(wu.get_urn(), schema)

View File

@ -151,7 +151,7 @@ class DataLakeSourceConfig(
raise ValueError("platform must not be empty")
return platform
@pydantic.root_validator()
@pydantic.root_validator(skip_on_failure=True)
def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:

View File

@ -72,7 +72,7 @@ class DataLakeProfilerConfig(ConfigModel):
description="Whether to profile for the sample values for all columns.",
)
@pydantic.root_validator()
@pydantic.root_validator(skip_on_failure=True)
def ensure_field_level_settings_are_normalized(
cls: "DataLakeProfilerConfig", values: Dict[str, Any]
) -> Dict[str, Any]:

View File

@ -37,7 +37,7 @@ class LakeFormationTagPlatformResourceId(BaseModel, ExternalEntityId):
tag_key: str
tag_value: Optional[str] = None
platform_instance: Optional[str]
platform_instance: Optional[str] = None
catalog: Optional[str] = None
exists_in_lake_formation: bool = False
persisted: bool = False
@ -227,7 +227,7 @@ class LakeFormationTagPlatformResource(BaseModel, ExternalEntity):
datahub_urns: LinkedResourceSet
managed_by_datahub: bool
id: LakeFormationTagPlatformResourceId
allowed_values: Optional[List[str]]
allowed_values: Optional[List[str]] = None
def get_id(self) -> ExternalEntityId:
return self.id

View File

@ -62,7 +62,6 @@ class SortKey(ConfigModel):
date_format: Optional[str] = Field(
default=None,
type=str,
description="The date format to use when sorting. This is used to parse the date from the key. The format should follow the java [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) format.",
)
@ -260,7 +259,7 @@ class PathSpec(ConfigModel):
) -> Union[None, parse.Result, parse.Match]:
return self.compiled_folder_include.parse(path)
@pydantic.root_validator()
@pydantic.root_validator(skip_on_failure=True)
def validate_no_double_stars(cls, values: Dict) -> Dict:
if "include" not in values:
return values

View File

@ -69,7 +69,7 @@ class HexSourceConfig(
)
include_components: bool = Field(
default=True,
desciption="Include Hex Components in the ingestion",
description="Include Hex Components in the ingestion",
)
page_size: int = Field(
default=HEX_API_PAGE_SIZE_DEFAULT,

View File

@ -524,11 +524,11 @@ class IcebergSource(StatefulIngestionSourceBase):
custom_properties["format-version"] = str(table.metadata.format_version)
custom_properties["partition-spec"] = str(self._get_partition_aspect(table))
last_modified: Optional[int] = table.metadata.last_updated_ms
if table.current_snapshot():
custom_properties["snapshot-id"] = str(table.current_snapshot().snapshot_id)
custom_properties["manifest-list"] = table.current_snapshot().manifest_list
if current_snapshot := table.current_snapshot():
custom_properties["snapshot-id"] = str(current_snapshot.snapshot_id)
custom_properties["manifest-list"] = current_snapshot.manifest_list
if not last_modified:
last_modified = int(table.current_snapshot().timestamp_ms)
last_modified = int(current_snapshot.timestamp_ms)
if "created-at" in custom_properties:
try:
dt = dateutil_parser.isoparse(custom_properties["created-at"])

View File

@ -27,10 +27,8 @@ class CatalogItem(BaseModel):
is_favorite: bool = Field(alias="IsFavorite")
user_info: Any = Field(None, alias="UserInfo")
display_name: Optional[str] = Field(None, alias="DisplayName")
has_data_sources: bool = Field(default=False, alias="HasDataSources")
data_sources: Optional[List["DataSource"]] = Field(
default_factory=list, alias="DataSources"
)
has_data_sources: bool = Field(False, alias="HasDataSources")
data_sources: Optional[List["DataSource"]] = Field(None, alias="DataSources")
@validator("display_name", always=True)
def validate_diplay_name(cls, value, values):

View File

@ -26,7 +26,7 @@ from datahub.utilities.search_utils import LogicalOperator
class OutboundSharePlatformResource(BaseModel):
namespace: str
platform_instance: Optional[str]
platform_instance: Optional[str] = None
env: str
source_database: str
share_name: str

View File

@ -203,38 +203,31 @@ class SlackSourceConfig(
description="Bot token for the Slack workspace. Needs `users:read`, `users:read.email`, `users.profile:read`, and `team:read` scopes.",
)
enrich_user_metadata: bool = Field(
type=bool,
default=True,
True,
description="When enabled, will enrich provisioned DataHub users' metadata with information from Slack.",
)
ingest_users: bool = Field(
type=bool,
default=True,
True,
description="Whether to ingest users. When set to true, will ingest all users in the Slack workspace (as platform resources) to simplify user enrichment after they are provisioned on DataHub.",
)
api_requests_per_min: int = Field(
type=int,
default=10,
10,
description="Number of API requests per minute. Low-level config. Do not tweak unless you are facing any issues.",
)
ingest_public_channels: bool = Field(
type=bool,
default=False,
False,
description="Whether to ingest public channels. If set to true needs `channels:read` scope.",
)
channels_iteration_limit: int = Field(
type=int,
default=200,
200,
description="Limit the number of channels to be ingested in a iteration. Low-level config. Do not tweak unless you are facing any issues.",
)
channel_min_members: int = Field(
type=int,
default=2,
2,
description="Ingest channels with at least this many members.",
)
should_ingest_archived_channels: bool = Field(
type=bool,
default=False,
False,
description="Whether to ingest archived channels.",
)

View File

@ -72,7 +72,7 @@ class ColumnUpstreamJob(BaseModel):
class ColumnUpstreamLineage(BaseModel):
column_name: Optional[str]
column_name: Optional[str] = None
upstreams: List[ColumnUpstreamJob] = Field(default_factory=list)
@ -91,9 +91,9 @@ class Query(BaseModel):
class UpstreamLineageEdge(BaseModel):
DOWNSTREAM_TABLE_NAME: str
DOWNSTREAM_TABLE_DOMAIN: str
UPSTREAM_TABLES: Optional[List[UpstreamTableNode]]
UPSTREAM_COLUMNS: Optional[List[ColumnUpstreamLineage]]
QUERIES: Optional[List[Query]]
UPSTREAM_TABLES: Optional[List[UpstreamTableNode]] = None
UPSTREAM_COLUMNS: Optional[List[ColumnUpstreamLineage]] = None
QUERIES: Optional[List[Query]] = None
_json_upstream_tables = pydantic_parse_json("UPSTREAM_TABLES")
_json_upstream_columns = pydantic_parse_json("UPSTREAM_COLUMNS")

View File

@ -36,7 +36,7 @@ class UnityCatalogTagPlatformResourceId(BaseModel, ExternalEntityId):
tag_key: str
tag_value: Optional[str] = None
platform_instance: Optional[str]
platform_instance: Optional[str] = None
exists_in_unity_catalog: bool = False
persisted: bool = False
@ -218,7 +218,7 @@ class UnityCatalogTagPlatformResource(BaseModel, ExternalEntity):
datahub_urns: LinkedResourceSet
managed_by_datahub: bool
id: UnityCatalogTagPlatformResourceId
allowed_values: Optional[List[str]]
allowed_values: Optional[List[str]] = None
def get_id(self) -> ExternalEntityId:
return self.id

View File

@ -60,7 +60,7 @@ AggregatedDataset = GenericAggregatedDataset[TrinoTableRef]
class TrinoConnectorInfo(BaseModel):
partitionIds: List[str]
truncated: Optional[bool]
truncated: Optional[bool] = None
class TrinoAccessedMetadata(BaseModel):
@ -80,7 +80,7 @@ class TrinoJoinedAccessEvent(BaseModel):
table: Optional[str] = None
accessed_metadata: List[TrinoAccessedMetadata]
starttime: datetime = Field(alias="create_time")
endtime: Optional[datetime] = Field(alias="end_time")
endtime: Optional[datetime] = Field(None, alias="end_time")
class EnvBasedSourceBaseConfig: