mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-02 14:18:40 +00:00
[TASK-6295] Reduce memory footprint for S3 ingestion (#6308)
* Reduce memory footprint for parquet & CSV formats * Add pagination, remove local var * Add jsonl parser
This commit is contained in:
parent
3d2f3cfd15
commit
ae491f747f
@ -161,6 +161,11 @@ class DatalakeSource(DatabaseServiceSource):
|
|||||||
database=EntityReference(id=self.context.database.id, type="database"),
|
database=EntityReference(id=self.context.database.id, type="database"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _list_s3_objects(self, **kwargs) -> Iterable:
|
||||||
|
paginator = self.client.get_paginator("list_objects_v2")
|
||||||
|
for page in paginator.paginate(**kwargs):
|
||||||
|
yield from page["Contents"]
|
||||||
|
|
||||||
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
|
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
|
||||||
"""
|
"""
|
||||||
Handle table and views.
|
Handle table and views.
|
||||||
@ -190,7 +195,7 @@ class DatalakeSource(DatabaseServiceSource):
|
|||||||
kwargs = {"Bucket": bucket_name}
|
kwargs = {"Bucket": bucket_name}
|
||||||
if prefix:
|
if prefix:
|
||||||
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
|
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
|
||||||
for key in self.client.list_objects(**kwargs)["Contents"]:
|
for key in self._list_s3_objects(**kwargs):
|
||||||
if filter_by_table(
|
if filter_by_table(
|
||||||
self.config.sourceConfig.config.tableFilterPattern, key["Key"]
|
self.config.sourceConfig.config.tableFilterPattern, key["Key"]
|
||||||
) or not self.check_valid_file_type(key["Key"]):
|
) or not self.check_valid_file_type(key["Key"]):
|
||||||
|
@ -9,41 +9,41 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import json
|
import json
|
||||||
from io import BytesIO, StringIO
|
import os
|
||||||
|
from itertools import islice
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pandas import DataFrame
|
from pandas import DataFrame
|
||||||
|
from pyarrow import fs
|
||||||
|
from pyarrow.parquet import ParquetFile
|
||||||
|
|
||||||
|
|
||||||
def read_csv_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
|
def read_csv_from_s3(
|
||||||
csv_obj = client.get_object(Bucket=bucket_name, Key=key)
|
client: Any, key: str, bucket_name: str, sep: str = ",", sample_size: int = 100
|
||||||
body = csv_obj["Body"]
|
) -> DataFrame:
|
||||||
csv_string = body.read().decode("utf-8")
|
stream = client.get_object(Bucket=bucket_name, Key=key)["Body"]
|
||||||
df = pd.read_csv(StringIO(csv_string))
|
return pd.read_csv(stream, sep=sep, nrows=sample_size + 1)
|
||||||
return df
|
|
||||||
|
|
||||||
|
|
||||||
def read_tsv_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
|
def read_tsv_from_gcs(
|
||||||
tsv_obj = client.get_object(Bucket=bucket_name, Key=key)
|
client, key: str, bucket_name: str, sample_size: int = 100
|
||||||
body = tsv_obj["Body"]
|
) -> DataFrame:
|
||||||
tsv_string = body.read().decode("utf-8")
|
read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size)
|
||||||
df = pd.read_csv(StringIO(tsv_string), sep="\t")
|
|
||||||
return df
|
|
||||||
|
|
||||||
|
|
||||||
def read_json_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
|
def read_json_from_s3(
|
||||||
obj = client.get_object(Bucket=bucket_name, Key=key)
|
client: Any, key: str, bucket_name: str, sample_size=100
|
||||||
json_text = obj["Body"].read().decode("utf-8")
|
) -> DataFrame:
|
||||||
data = json.loads(json_text)
|
line_stream = client.get_object(Bucket=bucket_name, Key=key)["Body"].iter_lines()
|
||||||
if isinstance(data, list):
|
return pd.DataFrame.from_records(map(json.loads, line_stream), nrows=sample_size)
|
||||||
df = pd.DataFrame.from_dict(data)
|
|
||||||
else:
|
|
||||||
df = pd.DataFrame.from_dict(dict([(k, pd.Series(v)) for k, v in data.items()]))
|
|
||||||
return df
|
|
||||||
|
|
||||||
|
|
||||||
def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
|
def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
|
||||||
obj = client.get_object(Bucket=bucket_name, Key=key)
|
s3 = fs.S3FileSystem(region=client.meta.region_name)
|
||||||
df = pd.read_parquet(BytesIO(obj["Body"].read()))
|
return (
|
||||||
return df
|
ParquetFile(s3.open_input_file(os.path.join(bucket_name, key)))
|
||||||
|
.schema.to_arrow_schema()
|
||||||
|
.empty_table()
|
||||||
|
.to_pandas()
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user