Feat/s3 ingestion enhancement to update schema from latest partition (#7410)

Co-authored-by: Prashant Singh Thakur <prashant.thakur@nucleusteq.com>
This commit is contained in:
nachiket-juneja 2023-02-28 13:28:28 +05:30 committed by GitHub
parent 3b8b5e8aa4
commit e07cd2090b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 2247 additions and 4 deletions

View File

@ -44,6 +44,12 @@ class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase):
description="# Whether or not to create tags in datahub from the s3 object",
)
# Whether to update the table schema when schema in files within the partitions are updated
update_schema_on_partition_file_updates: Optional[bool] = Field(
default=False,
description="Whether to update the table schema when schema in files within the partitions are updated.",
)
profile_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for tables to profile ",

View File

@ -741,6 +741,17 @@ class S3Source(Source):
if table_data.table_path not in table_dict:
table_dict[table_data.table_path] = table_data
else:
logger.debug(
f"Update schema on partition file updates is set to: {self.source_config.update_schema_on_partition_file_updates!s}"
)
if (
self.source_config.update_schema_on_partition_file_updates
and not path_spec.sample_files
):
logger.info(
"Will update table schema as file within the partitions has an updated schema."
)
table_dict[table_data.table_path] = table_data
table_dict[table_data.table_path].number_of_files = (
table_dict[table_data.table_path].number_of_files + 1
)

View File

@ -12,8 +12,8 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"number_of_files": "12",
"size_in_bytes": "174060"
"number_of_files": "13",
"size_in_bytes": "188600"
},
"name": "folder_aaa.pokemon_abilities_json",
"description": "",

View File

@ -12,8 +12,8 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"number_of_files": "12",
"size_in_bytes": "174060"
"number_of_files": "13",
"size_in_bytes": "188600"
},
"name": "folder_aaa.pokemon_abilities_json",
"description": "",

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,875 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json,UAT)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"number_of_files": "2",
"size_in_bytes": "29080",
"table_path": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json"
},
"name": "folder_aaa.pokemon_abilities_json",
"description": "",
"tags": []
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "folder_aaa.pokemon_abilities_json",
"platform": "urn:li:dataPlatform:s3",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "effect_changes",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"nativeDataType": "list",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.effect_entries",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"nativeDataType": "list",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.effect_entries.effect",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.effect_entries.language",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.effect_entries.language.is_native",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.effect_entries.language.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.effect_entries.language.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.version_group",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.version_group.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_changes.version_group.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_entries",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"nativeDataType": "list",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_entries.effect",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_entries.language",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_entries.language.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_entries.language.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "effect_entries.short_effect",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"nativeDataType": "list",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.flavor_text",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.language",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.language.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.language.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.version_group",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.version_group.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "flavor_text_entries.version_group.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "generation",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "generation.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "generation.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "id",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "is_main_series",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "bool",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "names",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"nativeDataType": "list",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "names.language",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "names.language.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "names.language.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "names.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "pokemon",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"nativeDataType": "list",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "pokemon.is_hidden",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "bool",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "pokemon.pokemon",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.RecordType": {}
}
},
"nativeDataType": "dict",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "pokemon.pokemon.name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "pokemon.pokemon.url",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "pokemon.slot",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false
}
]
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:050fedde7a12cb8c8447db8d298f5577",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "s3",
"instance": "UAT",
"bucket_name": "my-test-bucket"
},
"name": "my-test-bucket"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:050fedde7a12cb8c8447db8d298f5577",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:050fedde7a12cb8c8447db8d298f5577",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:s3"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:050fedde7a12cb8c8447db8d298f5577",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"S3 bucket"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:86297df39321e4948dbe8b8e941de98b",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "s3",
"instance": "UAT",
"folder_abs_path": "my-test-bucket/folder_a"
},
"name": "folder_a"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:86297df39321e4948dbe8b8e941de98b",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:86297df39321e4948dbe8b8e941de98b",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:s3"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:86297df39321e4948dbe8b8e941de98b",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Folder"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:86297df39321e4948dbe8b8e941de98b",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:050fedde7a12cb8c8447db8d298f5577"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:273fbeff7bd9ecb74982205aadd77994",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "s3",
"instance": "UAT",
"folder_abs_path": "my-test-bucket/folder_a/folder_aa"
},
"name": "folder_aa"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:273fbeff7bd9ecb74982205aadd77994",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:273fbeff7bd9ecb74982205aadd77994",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:s3"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:273fbeff7bd9ecb74982205aadd77994",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Folder"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:273fbeff7bd9ecb74982205aadd77994",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:86297df39321e4948dbe8b8e941de98b"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ec362903c4c7de60197fcc7b7a79e4c2",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "s3",
"instance": "UAT",
"folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa"
},
"name": "folder_aaa"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ec362903c4c7de60197fcc7b7a79e4c2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ec362903c4c7de60197fcc7b7a79e4c2",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:s3"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ec362903c4c7de60197fcc7b7a79e4c2",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Folder"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ec362903c4c7de60197fcc7b7a79e4c2",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:273fbeff7bd9ecb74982205aadd77994"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/pokemon_abilities_json,UAT)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:ec362903c4c7de60197fcc7b7a79e4c2"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "folder_partition_update_schema.json"
}
}
]

View File

@ -0,0 +1,23 @@
{
"type": "s3",
"config": {
"update_schema_on_partition_file_updates":true,
"env": "UAT",
"path_specs": [{
"include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*",
"sample_files": false,
"table_name": "{dept}.{table}",
"exclude":[
"**/folder_aaaa/**"
]
}],
"aws_config": {
"aws_region": "us-east-1",
"aws_access_key_id": "testing",
"aws_secret_access_key": "testing"
},
"profiling": {
"enabled": false
}
}
}