2023-08-09 12:37:16 +02:00
|
|
|
# Copyright 2021 Collate
|
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
JSON DataFrame reader
|
|
|
|
|
"""
|
|
|
|
|
import gzip
|
|
|
|
|
import io
|
|
|
|
|
import json
|
|
|
|
|
import zipfile
|
2024-03-26 10:03:21 +05:30
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
2023-08-09 12:37:16 +02:00
|
|
|
|
|
|
|
|
from metadata.readers.dataframe.base import DataFrameReader
|
|
|
|
|
from metadata.readers.dataframe.common import dataframe_to_chunks
|
|
|
|
|
from metadata.readers.dataframe.models import DatalakeColumnWrapper
|
2023-10-12 14:51:38 +02:00
|
|
|
from metadata.utils.constants import UTF_8
|
2023-08-09 12:37:16 +02:00
|
|
|
from metadata.utils.logger import ingestion_logger
|
|
|
|
|
|
|
|
|
|
logger = ingestion_logger()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_json_text(key: str, text: bytes, decode: bool) -> Union[str, bytes]:
|
2023-12-14 12:47:58 +01:00
|
|
|
processed_text: Union[str, bytes] = text
|
2023-08-09 12:37:16 +02:00
|
|
|
if key.endswith(".gz"):
|
2023-12-14 12:47:58 +01:00
|
|
|
processed_text = gzip.decompress(text)
|
2023-08-09 12:37:16 +02:00
|
|
|
if key.endswith(".zip"):
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
|
2023-12-14 12:47:58 +01:00
|
|
|
processed_text = zip_file.read(zip_file.infolist()[0]).decode(UTF_8)
|
2023-08-09 12:37:16 +02:00
|
|
|
if decode:
|
2023-12-14 12:47:58 +01:00
|
|
|
return processed_text.decode(UTF_8) if isinstance(text, bytes) else text
|
|
|
|
|
return processed_text
|
2023-08-09 12:37:16 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class JSONDataFrameReader(DataFrameReader):
|
|
|
|
|
"""
|
|
|
|
|
Read JSON DFs
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def read_from_json(
|
2023-10-08 20:08:51 +05:30
|
|
|
key: str, json_text: bytes, decode: bool = False, **__
|
2024-03-26 10:03:21 +05:30
|
|
|
) -> Tuple[List["DataFrame"], Optional[Dict[str, Any]]]:
|
2023-08-09 12:37:16 +02:00
|
|
|
"""
|
|
|
|
|
Decompress a JSON file (if needed) and read its contents
|
|
|
|
|
as a dataframe.
|
|
|
|
|
|
|
|
|
|
Note that for the metadata we need to flag nested columns with a
|
|
|
|
|
custom separator. For the profiler this is not needed. We require the
|
|
|
|
|
correct column name to match with the metadata description.
|
|
|
|
|
"""
|
|
|
|
|
# pylint: disable=import-outside-toplevel
|
2023-10-12 14:51:38 +02:00
|
|
|
import pandas as pd
|
2023-08-09 12:37:16 +02:00
|
|
|
|
|
|
|
|
json_text = _get_json_text(key=key, text=json_text, decode=decode)
|
2024-03-26 10:03:21 +05:30
|
|
|
raw_data = None
|
2023-08-09 12:37:16 +02:00
|
|
|
try:
|
|
|
|
|
data = json.loads(json_text)
|
2024-03-26 10:03:21 +05:30
|
|
|
if isinstance(data, dict) and data.get("$schema"):
|
|
|
|
|
raw_data = json_text
|
2023-08-09 12:37:16 +02:00
|
|
|
except json.decoder.JSONDecodeError:
|
|
|
|
|
logger.debug("Failed to read as JSON object. Trying to read as JSON Lines")
|
|
|
|
|
data = [json.loads(json_obj) for json_obj in json_text.strip().split("\n")]
|
|
|
|
|
|
2023-10-12 14:51:38 +02:00
|
|
|
# if we get a scalar value (e.g. {"a":"b"}) then we need to specify the index
|
|
|
|
|
data = data if not isinstance(data, dict) else [data]
|
2024-03-26 10:03:21 +05:30
|
|
|
return dataframe_to_chunks(pd.DataFrame.from_records(data)), raw_data
|
2023-08-09 12:37:16 +02:00
|
|
|
|
|
|
|
|
def _read(self, *, key: str, bucket_name: str, **kwargs) -> DatalakeColumnWrapper:
|
|
|
|
|
text = self.reader.read(key, bucket_name=bucket_name)
|
2024-03-26 10:03:21 +05:30
|
|
|
dataframes, raw_data = self.read_from_json(
|
|
|
|
|
key=key, json_text=text, decode=True, **kwargs
|
|
|
|
|
)
|
2023-08-09 12:37:16 +02:00
|
|
|
return DatalakeColumnWrapper(
|
2024-03-26 10:03:21 +05:30
|
|
|
dataframes=dataframes,
|
|
|
|
|
raw_data=raw_data,
|
2023-08-09 12:37:16 +02:00
|
|
|
)
|