Add json zip support for datalake (#10100)

This commit is contained in:
Mayur Singal 2023-02-03 18:33:55 +05:30 committed by GitHub
parent 4d181ba8dc
commit 01cad0f811
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 4 deletions

View File

@ -68,7 +68,9 @@ DATALAKE_DATA_TYPES = {
), ),
} }
DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".json", ".parquet", ".json.gz") JSON_SUPPORTED_TYPES = (".json", ".json.gz", ".json.zip")
DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".parquet") + JSON_SUPPORTED_TYPES
def ometa_to_dataframe(config_source, client, table): def ometa_to_dataframe(config_source, client, table):
@ -456,7 +458,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
if key.endswith(".tsv"): if key.endswith(".tsv"):
return read_tsv_from_gcs(key, bucket_name) return read_tsv_from_gcs(key, bucket_name)
if key.endswith((".json", ".json.gz")): if key.endswith(JSON_SUPPORTED_TYPES):
return read_json_from_gcs(client, key, bucket_name) return read_json_from_gcs(client, key, bucket_name)
if key.endswith(".parquet"): if key.endswith(".parquet"):
@ -484,7 +486,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
if key.endswith(".csv"): if key.endswith(".csv"):
return read_csv_from_azure(client, key, container_name, storage_options) return read_csv_from_azure(client, key, container_name, storage_options)
if key.endswith((".json", ".json.gz")): if key.endswith(JSON_SUPPORTED_TYPES):
return read_json_from_azure( return read_json_from_azure(
client, key, container_name, storage_options client, key, container_name, storage_options
) )
@ -525,7 +527,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
if key.endswith(".tsv"): if key.endswith(".tsv"):
return read_tsv_from_s3(client, key, bucket_name) return read_tsv_from_s3(client, key, bucket_name)
if key.endswith((".json", ".json.gz")): if key.endswith(JSON_SUPPORTED_TYPES):
return read_json_from_s3(client, key, bucket_name) return read_json_from_s3(client, key, bucket_name)
if key.endswith(".parquet"): if key.endswith(".parquet"):

View File

@ -14,8 +14,10 @@ Utils module to convert different file types from gcs buckets into a dataframe
""" """
import gzip import gzip
import io
import json import json
import traceback import traceback
import zipfile
from typing import Any from typing import Any
import gcsfs import gcsfs
@ -32,6 +34,9 @@ logger = utils_logger()
def _get_json_text(key: str, text: str) -> str: def _get_json_text(key: str, text: str) -> str:
if key.endswith(".gz"): if key.endswith(".gz"):
return gzip.decompress(text) return gzip.decompress(text)
if key.endswith(".zip"):
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
return zip_file.read(zip_file.infolist()[0]).decode("utf-8")
return text return text

View File

@ -14,8 +14,10 @@ Utils module to convert different file types from s3 buckets into a dataframe
""" """
import gzip import gzip
import io
import json import json
import traceback import traceback
import zipfile
from typing import Any from typing import Any
import pandas as pd import pandas as pd
@ -31,6 +33,9 @@ logger = utils_logger()
def _get_json_text(key: str, text: bytes) -> str: def _get_json_text(key: str, text: bytes) -> str:
if key.endswith(".gz"): if key.endswith(".gz"):
return gzip.decompress(text) return gzip.decompress(text)
if key.endswith(".zip"):
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
return zip_file.read(zip_file.infolist()[0]).decode("utf-8")
return text.decode("utf-8") return text.decode("utf-8")