From 0b6e3741b3edc740b7ca76bc40a1132a29ec6378 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 19 Oct 2022 14:12:23 +0530 Subject: [PATCH] Fix Datalake Json Error (#8246) --- ingestion/src/metadata/utils/s3_utils.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index c4470aa81c6..50dc53bd44c 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -41,7 +41,7 @@ def read_tsv_from_s3( Read the tsv file from the s3 bucket and return a dataframe """ - read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size) + return read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size) def read_json_from_s3( @@ -50,9 +50,14 @@ def read_json_from_s3( """ Read the json file from the s3 bucket and return a dataframe """ - - line_stream = client.get_object(Bucket=bucket_name, Key=key)["Body"].iter_lines() - return pd.DataFrame.from_records(map(json.loads, line_stream), nrows=sample_size) + obj = client.get_object(Bucket=bucket_name, Key=key) + json_text = obj["Body"].read().decode("utf-8") + data = json.loads(json_text) + if isinstance(data, list): + return pd.DataFrame.from_dict(data[:sample_size]) + return pd.DataFrame.from_dict( + {key: pd.Series(value) for key, value in data.items()} + ) def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame: