feat-22574: Datalake ingestion fix for larger files (#22575)

This commit is contained in:
harshsoni2024 2025-07-29 20:40:19 +05:30 committed by harshsoni2024
parent c199002d0a
commit 4bd4d6fe53
4 changed files with 517 additions and 15 deletions

View File

@ -22,6 +22,8 @@ from metadata.readers.file.config_source_factory import get_reader
from metadata.readers.models import ConfigSource from metadata.readers.models import ConfigSource
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
MAX_FILE_SIZE_FOR_PREVIEW = 50 * 1024 * 1024 # 50MB
logger = ingestion_logger() logger = ingestion_logger()

View File

@ -14,6 +14,8 @@ Generic Delimiter-Separated-Values implementation
""" """
from functools import singledispatchmethod from functools import singledispatchmethod
from pyarrow.parquet import ParquetFile
from metadata.generated.schema.entity.services.connections.database.datalake.azureConfig import ( from metadata.generated.schema.entity.services.connections.database.datalake.azureConfig import (
AzureConfig, AzureConfig,
) )
@ -26,11 +28,18 @@ from metadata.generated.schema.entity.services.connections.database.datalake.s3C
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
LocalConfig, LocalConfig,
) )
from metadata.readers.dataframe.base import DataFrameReader, FileFormatException from metadata.readers.dataframe.base import (
MAX_FILE_SIZE_FOR_PREVIEW,
DataFrameReader,
FileFormatException,
)
from metadata.readers.dataframe.common import dataframe_to_chunks from metadata.readers.dataframe.common import dataframe_to_chunks
from metadata.readers.dataframe.models import DatalakeColumnWrapper from metadata.readers.dataframe.models import DatalakeColumnWrapper
from metadata.readers.file.adls import AZURE_PATH, return_azure_storage_options from metadata.readers.file.adls import AZURE_PATH, return_azure_storage_options
from metadata.readers.models import ConfigSource from metadata.readers.models import ConfigSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class ParquetDataFrameReader(DataFrameReader): class ParquetDataFrameReader(DataFrameReader):
@ -39,6 +48,90 @@ class ParquetDataFrameReader(DataFrameReader):
from any source based on its init client. from any source based on its init client.
""" """
def _read_parquet_in_batches(
self, parquet_file: ParquetFile, batch_size: int = 10000
):
"""
Read a large parquet file in batches to avoid memory issues.
Includes multiple fallback strategies for older PyArrow versions.
Args:
parquet_file: PyArrow ParquetFile or similar object
batch_size: Number of rows to read per batch
Returns:
List of DataFrame chunks
"""
chunks = []
batch_count = 0
try:
# Method 1: iter_batches (PyArrow >= 3.0 - preferred)
if hasattr(parquet_file, "iter_batches"):
logger.info(
"Reading large parquet file in batches to avoid memory issues"
)
for batch in parquet_file.iter_batches(batch_size=batch_size):
df_batch = batch.to_pandas()
if not df_batch.empty:
chunks.extend(dataframe_to_chunks(df_batch))
batch_count += 1
logger.info(
f"Successfully processed {batch_count} batches from large parquet file"
)
return chunks
# Method 2: Row group reading (PyArrow >= 0.15.0)
elif hasattr(parquet_file, "num_row_groups") and hasattr(
parquet_file, "read_row_group"
):
logger.warning(
"iter_batches not available, using row group reading as fallback"
)
for i in range(parquet_file.num_row_groups):
try:
row_group_table = parquet_file.read_row_group(i)
df_chunk = row_group_table.to_pandas()
if not df_chunk.empty:
# Further chunk if row group is still too large
if len(df_chunk) > batch_size:
chunks.extend(dataframe_to_chunks(df_chunk))
else:
chunks.append(df_chunk)
batch_count += 1
except Exception as row_exc:
logger.warning(f"Failed to read row group {i}: {row_exc}")
continue
if chunks:
logger.info(
f"Successfully processed {batch_count} row groups from large parquet file"
)
return chunks
# Method 3: Regular reading (final fallback)
logger.warning(
"No chunking methods available, falling back to regular reading"
)
df = parquet_file.read().to_pandas()
chunks.extend(dataframe_to_chunks(df))
except Exception as exc:
# If all chunking fails, try regular reading as final fallback
logger.warning(
f"Batched reading failed: {exc}. Falling back to regular reading - this may cause memory issues for large files"
)
try:
df = parquet_file.read().to_pandas()
chunks.extend(dataframe_to_chunks(df))
except Exception as fallback_exc:
logger.error(f"Failed to read parquet file: {fallback_exc}")
raise fallback_exc
return chunks
@singledispatchmethod @singledispatchmethod
def _read_parquet_dispatch( def _read_parquet_dispatch(
self, config_source: ConfigSource, key: str, bucket_name: str self, config_source: ConfigSource, key: str, bucket_name: str
@ -48,24 +141,46 @@ class ParquetDataFrameReader(DataFrameReader):
@_read_parquet_dispatch.register @_read_parquet_dispatch.register
def _(self, _: GCSConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper: def _(self, _: GCSConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
""" """
Read the CSV file from the gcs bucket and return a dataframe Read the Parquet file from the gcs bucket and return a dataframe
""" """
# pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from gcsfs import GCSFileSystem from gcsfs import GCSFileSystem
from pyarrow.parquet import ParquetFile
gcs = GCSFileSystem() gcs = GCSFileSystem()
file = gcs.open(f"gs://{bucket_name}/{key}") file_path = f"gs://{bucket_name}/{key}"
dataframe_response = (
ParquetFile(file).read().to_pandas(split_blocks=True, self_destruct=True) # Check file size to determine reading strategy
) try:
return dataframe_to_chunks(dataframe_response) file_info = gcs.info(file_path)
file_size = file_info.get("size", 0)
file = gcs.open(file_path)
parquet_file = ParquetFile(file)
if self._should_use_chunking(file_size):
# Use batched reading for large files
return self._read_parquet_in_batches(parquet_file)
else:
# Use regular reading for smaller files
dataframe_response = parquet_file.read().to_pandas(
split_blocks=True, self_destruct=True
)
return dataframe_to_chunks(dataframe_response)
except Exception:
# Fallback to regular reading if size check fails
file = gcs.open(file_path)
parquet_file = ParquetFile(file)
dataframe_response = parquet_file.read().to_pandas(
split_blocks=True, self_destruct=True
)
return dataframe_to_chunks(dataframe_response)
@_read_parquet_dispatch.register @_read_parquet_dispatch.register
def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper: def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper:
# pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from pyarrow.fs import S3FileSystem from pyarrow.fs import S3FileSystem
from pyarrow.parquet import ParquetDataset from pyarrow.parquet import ParquetDataset, ParquetFile
client_kwargs = { client_kwargs = {
"endpoint_override": ( "endpoint_override": (
@ -90,13 +205,40 @@ class ParquetDataFrameReader(DataFrameReader):
s3_fs = S3FileSystem(**client_kwargs) s3_fs = S3FileSystem(**client_kwargs)
bucket_uri = f"{bucket_name}/{key}" bucket_uri = f"{bucket_name}/{key}"
dataset = ParquetDataset(bucket_uri, filesystem=s3_fs)
return dataframe_to_chunks(dataset.read_pandas().to_pandas()) # Check file size to determine reading strategy
try:
file_info = s3_fs.get_file_info(bucket_uri)
file_size = file_info.size if hasattr(file_info, "size") else 0
if self._should_use_chunking(file_size):
# Use ParquetFile for batched reading of large files
logger.info(
f"Large parquet file detected ({file_size} bytes > {MAX_FILE_SIZE_FOR_PREVIEW} bytes). "
f"Using batched reading for file: {bucket_uri}"
)
parquet_file = ParquetFile(bucket_uri, filesystem=s3_fs)
return self._read_parquet_in_batches(parquet_file)
else:
# Use ParquetDataset for regular reading of smaller files
logger.debug(
f"Reading small parquet file ({file_size} bytes): {bucket_uri}"
)
dataset = ParquetDataset(bucket_uri, filesystem=s3_fs)
return dataframe_to_chunks(dataset.read_pandas().to_pandas())
except Exception as exc:
# Fallback to regular reading if size check fails
logger.warning(
f"Could not determine file size for {bucket_uri}: {exc}. Using regular reading"
)
dataset = ParquetDataset(bucket_uri, filesystem=s3_fs)
return dataframe_to_chunks(dataset.read_pandas().to_pandas())
@_read_parquet_dispatch.register @_read_parquet_dispatch.register
def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper: def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
import pandas as pd # pylint: disable=import-outside-toplevel import pandas as pd # pylint: disable=import-outside-toplevel
import pyarrow.fs as fs
storage_options = return_azure_storage_options(self.config_source) storage_options = return_azure_storage_options(self.config_source)
account_url = AZURE_PATH.format( account_url = AZURE_PATH.format(
@ -104,8 +246,33 @@ class ParquetDataFrameReader(DataFrameReader):
account_name=self.config_source.securityConfig.accountName, account_name=self.config_source.securityConfig.accountName,
key=key, key=key,
) )
dataframe = pd.read_parquet(account_url, storage_options=storage_options)
return dataframe_to_chunks(dataframe) # Check file size to determine reading strategy
try:
# Try to get file size from Azure
azure_fs = fs.SubTreeFileSystem(
account_url, fs.AzureFileSystem(**storage_options)
)
file_info = azure_fs.get_file_info("/")
file_size = file_info.size if hasattr(file_info, "size") else 0
if self._should_use_chunking(file_size):
# Use PyArrow ParquetFile for batched reading of large files
parquet_file = ParquetFile(
account_url, filesystem=fs.AzureFileSystem(**storage_options)
)
return self._read_parquet_in_batches(parquet_file)
else:
# Use pandas for regular reading of smaller files
dataframe = pd.read_parquet(
account_url, storage_options=storage_options
)
return dataframe_to_chunks(dataframe)
except Exception:
# Fallback to regular pandas reading if size check or batching fails
dataframe = pd.read_parquet(account_url, storage_options=storage_options)
return dataframe_to_chunks(dataframe)
@_read_parquet_dispatch.register @_read_parquet_dispatch.register
def _( def _(
@ -114,10 +281,27 @@ class ParquetDataFrameReader(DataFrameReader):
key: str, key: str,
bucket_name: str, # pylint: disable=unused-argument bucket_name: str, # pylint: disable=unused-argument
) -> DatalakeColumnWrapper: ) -> DatalakeColumnWrapper:
import os
import pandas as pd # pylint: disable=import-outside-toplevel import pandas as pd # pylint: disable=import-outside-toplevel
dataframe = pd.read_parquet(key) # Check file size to determine reading strategy
return dataframe_to_chunks(dataframe) try:
file_size = os.path.getsize(key)
if self._should_use_chunking(file_size):
# Use PyArrow ParquetFile for batched reading of large files
parquet_file = ParquetFile(key)
return self._read_parquet_in_batches(parquet_file)
else:
# Use pandas for regular reading of smaller files
dataframe = pd.read_parquet(key)
return dataframe_to_chunks(dataframe)
except Exception:
# Fallback to regular pandas reading if size check fails
dataframe = pd.read_parquet(key)
return dataframe_to_chunks(dataframe)
def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper: def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
return DatalakeColumnWrapper( return DatalakeColumnWrapper(
@ -125,3 +309,6 @@ class ParquetDataFrameReader(DataFrameReader):
self.config_source, key=key, bucket_name=bucket_name self.config_source, key=key, bucket_name=bucket_name
) )
) )
def _should_use_chunking(self, file_size: int) -> bool:
return file_size > MAX_FILE_SIZE_FOR_PREVIEW or file_size == 0

View File

@ -0,0 +1,313 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for ParquetDataFrameReader _read_parquet_in_batches method
"""
import unittest
from unittest.mock import Mock, patch
import pandas as pd
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
LocalConfig,
)
from metadata.readers.dataframe.common import dataframe_to_chunks
from metadata.readers.dataframe.parquet import ParquetDataFrameReader
class TestParquetBatchReading(unittest.TestCase):
"""
Test the _read_parquet_in_batches method functionality
"""
def setUp(self):
"""Set up test fixtures"""
self.config_source = LocalConfig()
self.reader = ParquetDataFrameReader(self.config_source, None)
def test_successful_batch_reading_with_iter_batches(self):
"""Test successful batched reading when iter_batches is available"""
# Create mock parquet file with iter_batches capability
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Create sample data for two batches
batch1_data = pd.DataFrame(
{"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "age": [25, 30, 35]}
)
batch2_data = pd.DataFrame(
{"id": [4, 5], "name": ["David", "Eve"], "age": [40, 45]}
)
# Create mock arrow table batches
mock_batch1 = Mock()
mock_batch1.to_pandas.return_value = batch1_data
mock_batch2 = Mock()
mock_batch2.to_pandas.return_value = batch2_data
mock_parquet_file.iter_batches.return_value = iter([mock_batch1, mock_batch2])
with patch(
"metadata.readers.dataframe.parquet.dataframe_to_chunks"
) as mock_chunks:
# Mock dataframe_to_chunks to return list of chunks per dataframe
mock_chunks.side_effect = lambda df: [df] if not df.empty else []
# Test the method
result = self.reader._read_parquet_in_batches(
mock_parquet_file, batch_size=1000
)
# Verify iter_batches was called with correct batch_size
mock_parquet_file.iter_batches.assert_called_once_with(batch_size=1000)
# Verify dataframe_to_chunks was called twice (once per batch)
self.assertEqual(mock_chunks.call_count, 2)
# Verify result contains chunks from both batches
self.assertEqual(len(result), 2)
pd.testing.assert_frame_equal(result[0], batch1_data)
pd.testing.assert_frame_equal(result[1], batch2_data)
def test_batch_reading_with_empty_batches(self):
"""Test that empty batches are properly skipped"""
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Create one non-empty and one empty batch
non_empty_batch = pd.DataFrame({"id": [1], "name": ["Alice"]})
empty_batch = pd.DataFrame()
mock_batch1 = Mock()
mock_batch1.to_pandas.return_value = non_empty_batch
mock_batch2 = Mock()
mock_batch2.to_pandas.return_value = empty_batch
mock_parquet_file.iter_batches.return_value = iter([mock_batch1, mock_batch2])
with patch(
"metadata.readers.dataframe.parquet.dataframe_to_chunks"
) as mock_chunks:
mock_chunks.side_effect = lambda df: [df] if not df.empty else []
result = self.reader._read_parquet_in_batches(mock_parquet_file)
# Should only call dataframe_to_chunks once for non-empty batch
mock_chunks.assert_called_once_with(non_empty_batch)
# Result should only contain the non-empty batch
self.assertEqual(len(result), 1)
pd.testing.assert_frame_equal(result[0], non_empty_batch)
def test_exception_handling_with_successful_fallback(self):
"""Test exception handling when batching fails but fallback succeeds"""
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Make iter_batches raise an exception
mock_parquet_file.iter_batches.side_effect = Exception("Batching failed")
# But make regular read work
sample_data = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
mock_arrow_table = Mock()
mock_arrow_table.to_pandas.return_value = sample_data
mock_parquet_file.read.return_value = mock_arrow_table
with patch(
"metadata.readers.dataframe.parquet.dataframe_to_chunks"
) as mock_chunks:
mock_chunks.return_value = [sample_data]
with patch("metadata.readers.dataframe.parquet.logger") as mock_logger:
result = self.reader._read_parquet_in_batches(mock_parquet_file)
# Verify warning was logged about fallback
mock_logger.warning.assert_called_with(
"Batched reading failed: Batching failed. Falling back to regular reading - "
"this may cause memory issues for large files"
)
# Verify regular read was used as fallback
mock_parquet_file.read.assert_called_once()
# Verify result
self.assertEqual(result, [sample_data])
def test_exception_handling_with_failed_fallback(self):
"""Test when both batching and fallback fail"""
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Make both iter_batches and read fail
mock_parquet_file.iter_batches.side_effect = Exception("Batching failed")
mock_parquet_file.read.side_effect = Exception("Regular read failed")
with patch("metadata.readers.dataframe.parquet.logger") as mock_logger:
# Should raise the fallback exception
with self.assertRaises(Exception) as context:
self.reader._read_parquet_in_batches(mock_parquet_file)
self.assertEqual(str(context.exception), "Regular read failed")
# Verify error was logged
mock_logger.error.assert_called_with(
"Failed to read parquet file: Regular read failed"
)
def test_custom_batch_size(self):
"""Test that custom batch size is properly passed to iter_batches"""
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Create a simple batch
batch_data = pd.DataFrame({"id": [1], "name": ["Alice"]})
mock_batch = Mock()
mock_batch.to_pandas.return_value = batch_data
mock_parquet_file.iter_batches.return_value = iter([mock_batch])
with patch(
"metadata.readers.dataframe.parquet.dataframe_to_chunks"
) as mock_chunks:
mock_chunks.return_value = [batch_data]
# Test with custom batch size
custom_batch_size = 5000
self.reader._read_parquet_in_batches(
mock_parquet_file, batch_size=custom_batch_size
)
# Verify custom batch size was used
mock_parquet_file.iter_batches.assert_called_once_with(
batch_size=custom_batch_size
)
def test_batch_counting_and_logging(self):
"""Test that batch counting and logging work correctly"""
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Create multiple batches
batches_data = [
pd.DataFrame({"id": [1], "name": ["Alice"]}),
pd.DataFrame({"id": [2], "name": ["Bob"]}),
pd.DataFrame({"id": [3], "name": ["Charlie"]}),
]
mock_batches = []
for data in batches_data:
mock_batch = Mock()
mock_batch.to_pandas.return_value = data
mock_batches.append(mock_batch)
mock_parquet_file.iter_batches.return_value = iter(mock_batches)
with patch(
"metadata.readers.dataframe.parquet.dataframe_to_chunks"
) as mock_chunks:
mock_chunks.side_effect = lambda df: [df] if not df.empty else []
with patch("metadata.readers.dataframe.parquet.logger") as mock_logger:
result = self.reader._read_parquet_in_batches(mock_parquet_file)
# Verify info logs were called
mock_logger.info.assert_any_call(
"Reading large parquet file in batches to avoid memory issues"
)
mock_logger.info.assert_any_call(
"Successfully processed 3 batches from large parquet file"
)
# Verify all batches were processed
self.assertEqual(len(result), 3)
def test_dataframe_to_chunks_integration(self):
"""Test integration with dataframe_to_chunks function"""
mock_parquet_file = Mock()
mock_parquet_file.iter_batches = Mock()
# Create a batch that would generate multiple chunks
batch_data = pd.DataFrame(
{
"id": list(
range(1000)
), # Large enough to potentially create multiple chunks
"name": [f"User{i}" for i in range(1000)],
}
)
mock_batch = Mock()
mock_batch.to_pandas.return_value = batch_data
mock_parquet_file.iter_batches.return_value = iter([mock_batch])
# Use real dataframe_to_chunks function
result = self.reader._read_parquet_in_batches(mock_parquet_file)
# Verify that result is a list of dataframes (chunks)
self.assertIsInstance(result, list)
self.assertTrue(len(result) > 0)
# Verify that all chunks are DataFrames
for chunk in result:
self.assertIsInstance(chunk, pd.DataFrame)
# Verify that concatenating all chunks gives us back the original data
if len(result) > 1:
concatenated = pd.concat(result, ignore_index=True)
pd.testing.assert_frame_equal(concatenated, batch_data)
else:
pd.testing.assert_frame_equal(result[0], batch_data)
def test_real_parquet_file_batch_processing(self):
"""Test batch processing with real flights-1m.parquet file (7MB)"""
import os
from pyarrow.parquet import ParquetFile
# Path to the test parquet file
test_file_path = os.path.join(
os.path.dirname(__file__), "test_files", "flights-1m.parquet"
)
# Skip test if file doesn't exist
if not os.path.exists(test_file_path):
self.skipTest(f"Test file not found: {test_file_path}")
try:
# Create ParquetFile object from real file
parquet_file = ParquetFile(test_file_path)
# Get some basic info about the file
file_size = os.path.getsize(test_file_path)
total_rows = parquet_file.metadata.num_rows
print(
f"Testing with real parquet file: {file_size} bytes, {total_rows} rows"
)
result = self.reader._read_parquet_in_batches(parquet_file)
fallback_method_result = dataframe_to_chunks(
parquet_file.read().to_pandas()
)
result_processed_rows = sum(len(chunk) for chunk in result)
fallback_method_processed_rows = sum(
len(chunk) for chunk in fallback_method_result
)
self.assertEqual(result_processed_rows, fallback_method_processed_rows)
except Exception as e:
self.fail(f"Real parquet file test failed: {e}")
if __name__ == "__main__":
unittest.main()