fix(ingest/s3): Not sorting schema fields to keep original order (#9349)

This commit is contained in:
Tamas Nemeth 2024-01-29 14:14:34 +01:00 committed by GitHub
parent dc16c73937
commit 90c88082b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 29 additions and 17 deletions

View File

@ -63,6 +63,11 @@ class PathSpec(ConfigModel):
description="Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled",
)
allow_double_stars: bool = Field(
default=False,
description="Allow double stars in the include path. This can affect performance significantly if enabled",
)
def allowed(self, path: str) -> bool:
logger.debug(f"Checking file to inclusion: {path}")
if not pathlib.PurePath(path).globmatch(
@ -126,11 +131,18 @@ class PathSpec(ConfigModel):
def get_named_vars(self, path: str) -> Union[None, parse.Result, parse.Match]:
return self.compiled_include.parse(path)
@pydantic.validator("include")
def validate_no_double_stars(cls, v: str) -> str:
if "**" in v:
@pydantic.root_validator()
def validate_no_double_stars(cls, values: Dict) -> Dict:
if "include" not in values:
return values
if (
values.get("include")
and "**" in values["include"]
and not values.get("allow_double_stars")
):
raise ValueError("path_spec.include cannot contain '**'")
return v
return values
@pydantic.validator("file_types", always=True)
def validate_file_types(cls, v: Optional[List[str]]) -> List[str]:

View File

@ -93,6 +93,11 @@ class DataLakeSourceConfig(
"path_spec", "path_specs", lambda path_spec: [path_spec]
)
sort_schema_fields: bool = Field(
default=False,
description="Whether to sort schema fields by fieldPath when inferring schemas.",
)
def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config

View File

@ -458,7 +458,8 @@ class S3Source(StatefulIngestionSourceBase):
)
file.close()
logger.debug(f"Extracted fields in schema: {fields}")
fields = sorted(fields, key=lambda f: f.fieldPath)
if self.source_config.sort_schema_fields:
fields = sorted(fields, key=lambda f: f.fieldPath)
if self.source_config.add_partition_columns_to_schema:
self.add_partition_columns_to_schema(

View File

@ -48,7 +48,7 @@ class JsonInferrer(SchemaInferenceBase):
schema = construct_schema(datastore, delimiter=".")
fields: List[SchemaField] = []
for schema_field in sorted(schema.values(), key=lambda x: x["delimited_name"]):
for schema_field in schema.values():
mapped_type = _field_type_mapping.get(schema_field["type"], NullTypeClass)
native_type = schema_field["type"]

View File

@ -18,23 +18,23 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
from tests.unit.test_schema_util import assert_field_paths_match
expected_field_paths = [
"boolean_field",
"integer_field",
"boolean_field",
"string_field",
]
expected_field_paths_avro = [
"[version=2.0].[type=test].[type=boolean].boolean_field",
"[version=2.0].[type=test].[type=int].integer_field",
"[version=2.0].[type=test].[type=boolean].boolean_field",
"[version=2.0].[type=test].[type=string].string_field",
]
expected_field_types = [BooleanTypeClass, NumberTypeClass, StringTypeClass]
expected_field_types = [NumberTypeClass, BooleanTypeClass, StringTypeClass]
test_table = pd.DataFrame(
{
"boolean_field": [True, False, True],
"integer_field": [1, 2, 3],
"boolean_field": [True, False, True],
"string_field": ["a", "b", "c"],
}
)
@ -54,7 +54,6 @@ def test_infer_schema_csv():
file.seek(0)
fields = csv_tsv.CsvInferrer(max_rows=100).infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
@ -70,7 +69,6 @@ def test_infer_schema_tsv():
file.seek(0)
fields = csv_tsv.TsvInferrer(max_rows=100).infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
@ -82,7 +80,6 @@ def test_infer_schema_json():
file.seek(0)
fields = json.JsonInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
@ -92,9 +89,7 @@ def test_infer_schema_parquet():
with tempfile.TemporaryFile(mode="w+b") as file:
test_table.to_parquet(file)
file.seek(0)
fields = parquet.ParquetInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
@ -108,8 +103,8 @@ def test_infer_schema_avro():
"type": "record",
"name": "test",
"fields": [
{"name": "boolean_field", "type": "boolean"},
{"name": "integer_field", "type": "int"},
{"name": "boolean_field", "type": "boolean"},
{"name": "string_field", "type": "string"},
],
}
@ -124,7 +119,6 @@ def test_infer_schema_avro():
file.seek(0)
fields = AvroInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths_avro)
assert_field_types_match(fields, expected_field_types)