diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 291b1ed03a6..e67aa0fb3d7 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -68,7 +68,9 @@ DATALAKE_DATA_TYPES = { ), } -DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".json", ".parquet", ".json.gz") +JSON_SUPPORTED_TYPES = (".json", ".json.gz", ".json.zip") + +DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".parquet") + JSON_SUPPORTED_TYPES def ometa_to_dataframe(config_source, client, table): @@ -456,7 +458,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- if key.endswith(".tsv"): return read_tsv_from_gcs(key, bucket_name) - if key.endswith((".json", ".json.gz")): + if key.endswith(JSON_SUPPORTED_TYPES): return read_json_from_gcs(client, key, bucket_name) if key.endswith(".parquet"): @@ -484,7 +486,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- if key.endswith(".csv"): return read_csv_from_azure(client, key, container_name, storage_options) - if key.endswith((".json", ".json.gz")): + if key.endswith(JSON_SUPPORTED_TYPES): return read_json_from_azure( client, key, container_name, storage_options ) @@ -525,7 +527,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- if key.endswith(".tsv"): return read_tsv_from_s3(client, key, bucket_name) - if key.endswith((".json", ".json.gz")): + if key.endswith(JSON_SUPPORTED_TYPES): return read_json_from_s3(client, key, bucket_name) if key.endswith(".parquet"): diff --git a/ingestion/src/metadata/utils/gcs_utils.py b/ingestion/src/metadata/utils/gcs_utils.py index 518db7949c6..6eb0a6a47ce 100644 --- a/ingestion/src/metadata/utils/gcs_utils.py +++ b/ingestion/src/metadata/utils/gcs_utils.py @@ -14,8 +14,10 @@ Utils module to convert different file types from gcs buckets into a dataframe """ import gzip +import io import json import traceback +import zipfile from typing import Any import gcsfs @@ -32,6 +34,9 @@ logger = utils_logger() def _get_json_text(key: str, text: str) -> str: if key.endswith(".gz"): return gzip.decompress(text) + if key.endswith(".zip"): + with zipfile.ZipFile(io.BytesIO(text)) as zip_file: + return zip_file.read(zip_file.infolist()[0]).decode("utf-8") return text diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index 7a9f61058aa..12536eef213 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -14,8 +14,10 @@ Utils module to convert different file types from s3 buckets into a dataframe """ import gzip +import io import json import traceback +import zipfile from typing import Any import pandas as pd @@ -31,6 +33,9 @@ logger = utils_logger() def _get_json_text(key: str, text: bytes) -> str: if key.endswith(".gz"): return gzip.decompress(text) + if key.endswith(".zip"): + with zipfile.ZipFile(io.BytesIO(text)) as zip_file: + return zip_file.read(zip_file.infolist()[0]).decode("utf-8") return text.decode("utf-8")