From ae491f747f63f82018b83f2eff96bcd06a7139e0 Mon Sep 17 00:00:00 2001 From: "Francisco J. Jurado Moreno" <9376816+Beetelbrox@users.noreply.github.com> Date: Mon, 25 Jul 2022 07:24:57 +0200 Subject: [PATCH] [TASK-6295] Reduce memory footprint for S3 ingestion (#6308) * Reduce memory footprint for parquet & CSV formats * Add pagination, remove local var * Add jsonl parser --- .../ingestion/source/database/datalake.py | 7 ++- ingestion/src/metadata/utils/s3_utils.py | 50 +++++++++---------- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake.py b/ingestion/src/metadata/ingestion/source/database/datalake.py index 88629c6a4d8..1599400ee57 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake.py @@ -161,6 +161,11 @@ class DatalakeSource(DatabaseServiceSource): database=EntityReference(id=self.context.database.id, type="database"), ) + def _list_s3_objects(self, **kwargs) -> Iterable: + paginator = self.client.get_paginator("list_objects_v2") + for page in paginator.paginate(**kwargs): + yield from page["Contents"] + def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: """ Handle table and views. @@ -190,7 +195,7 @@ class DatalakeSource(DatabaseServiceSource): kwargs = {"Bucket": bucket_name} if prefix: kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" - for key in self.client.list_objects(**kwargs)["Contents"]: + for key in self._list_s3_objects(**kwargs): if filter_by_table( self.config.sourceConfig.config.tableFilterPattern, key["Key"] ) or not self.check_valid_file_type(key["Key"]): diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index ffc11f5bcdd..30e85274693 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -9,41 +9,41 @@ # See the License for the specific language governing permissions and # limitations under the License. import json -from io import BytesIO, StringIO +import os +from itertools import islice from typing import Any import pandas as pd from pandas import DataFrame +from pyarrow import fs +from pyarrow.parquet import ParquetFile -def read_csv_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame: - csv_obj = client.get_object(Bucket=bucket_name, Key=key) - body = csv_obj["Body"] - csv_string = body.read().decode("utf-8") - df = pd.read_csv(StringIO(csv_string)) - return df +def read_csv_from_s3( + client: Any, key: str, bucket_name: str, sep: str = ",", sample_size: int = 100 +) -> DataFrame: + stream = client.get_object(Bucket=bucket_name, Key=key)["Body"] + return pd.read_csv(stream, sep=sep, nrows=sample_size + 1) -def read_tsv_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame: - tsv_obj = client.get_object(Bucket=bucket_name, Key=key) - body = tsv_obj["Body"] - tsv_string = body.read().decode("utf-8") - df = pd.read_csv(StringIO(tsv_string), sep="\t") - return df +def read_tsv_from_gcs( + client, key: str, bucket_name: str, sample_size: int = 100 +) -> DataFrame: + read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size) -def read_json_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame: - obj = client.get_object(Bucket=bucket_name, Key=key) - json_text = obj["Body"].read().decode("utf-8") - data = json.loads(json_text) - if isinstance(data, list): - df = pd.DataFrame.from_dict(data) - else: - df = pd.DataFrame.from_dict(dict([(k, pd.Series(v)) for k, v in data.items()])) - return df +def read_json_from_s3( + client: Any, key: str, bucket_name: str, sample_size=100 +) -> DataFrame: + line_stream = client.get_object(Bucket=bucket_name, Key=key)["Body"].iter_lines() + return pd.DataFrame.from_records(map(json.loads, line_stream), nrows=sample_size) def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame: - obj = client.get_object(Bucket=bucket_name, Key=key) - df = pd.read_parquet(BytesIO(obj["Body"].read())) - return df + s3 = fs.S3FileSystem(region=client.meta.region_name) + return ( + ParquetFile(s3.open_input_file(os.path.join(bucket_name, key))) + .schema.to_arrow_schema() + .empty_table() + .to_pandas() + )