diff --git a/ingestion/src/metadata/readers/dataframe/base.py b/ingestion/src/metadata/readers/dataframe/base.py index 9f5562dbb3c..a5e2cf22121 100644 --- a/ingestion/src/metadata/readers/dataframe/base.py +++ b/ingestion/src/metadata/readers/dataframe/base.py @@ -22,6 +22,8 @@ from metadata.readers.file.config_source_factory import get_reader from metadata.readers.models import ConfigSource from metadata.utils.logger import ingestion_logger +MAX_FILE_SIZE_FOR_PREVIEW = 50 * 1024 * 1024 # 50MB + logger = ingestion_logger() diff --git a/ingestion/src/metadata/readers/dataframe/parquet.py b/ingestion/src/metadata/readers/dataframe/parquet.py index 3b6ea30e90a..b5873e67ccc 100644 --- a/ingestion/src/metadata/readers/dataframe/parquet.py +++ b/ingestion/src/metadata/readers/dataframe/parquet.py @@ -14,6 +14,8 @@ Generic Delimiter-Separated-Values implementation """ from functools import singledispatchmethod +from pyarrow.parquet import ParquetFile + from metadata.generated.schema.entity.services.connections.database.datalake.azureConfig import ( AzureConfig, ) @@ -26,11 +28,18 @@ from metadata.generated.schema.entity.services.connections.database.datalake.s3C from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( LocalConfig, ) -from metadata.readers.dataframe.base import DataFrameReader, FileFormatException +from metadata.readers.dataframe.base import ( + MAX_FILE_SIZE_FOR_PREVIEW, + DataFrameReader, + FileFormatException, +) from metadata.readers.dataframe.common import dataframe_to_chunks from metadata.readers.dataframe.models import DatalakeColumnWrapper from metadata.readers.file.adls import AZURE_PATH, return_azure_storage_options from metadata.readers.models import ConfigSource +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() class ParquetDataFrameReader(DataFrameReader): @@ -39,6 +48,90 @@ class ParquetDataFrameReader(DataFrameReader): from any source based on its init client. """ + def _read_parquet_in_batches( + self, parquet_file: ParquetFile, batch_size: int = 10000 + ): + """ + Read a large parquet file in batches to avoid memory issues. + Includes multiple fallback strategies for older PyArrow versions. + + Args: + parquet_file: PyArrow ParquetFile or similar object + batch_size: Number of rows to read per batch + + Returns: + List of DataFrame chunks + """ + chunks = [] + batch_count = 0 + + try: + # Method 1: iter_batches (PyArrow >= 3.0 - preferred) + if hasattr(parquet_file, "iter_batches"): + logger.info( + "Reading large parquet file in batches to avoid memory issues" + ) + for batch in parquet_file.iter_batches(batch_size=batch_size): + df_batch = batch.to_pandas() + if not df_batch.empty: + chunks.extend(dataframe_to_chunks(df_batch)) + batch_count += 1 + + logger.info( + f"Successfully processed {batch_count} batches from large parquet file" + ) + return chunks + + # Method 2: Row group reading (PyArrow >= 0.15.0) + elif hasattr(parquet_file, "num_row_groups") and hasattr( + parquet_file, "read_row_group" + ): + logger.warning( + "iter_batches not available, using row group reading as fallback" + ) + + for i in range(parquet_file.num_row_groups): + try: + row_group_table = parquet_file.read_row_group(i) + df_chunk = row_group_table.to_pandas() + if not df_chunk.empty: + # Further chunk if row group is still too large + if len(df_chunk) > batch_size: + chunks.extend(dataframe_to_chunks(df_chunk)) + else: + chunks.append(df_chunk) + batch_count += 1 + except Exception as row_exc: + logger.warning(f"Failed to read row group {i}: {row_exc}") + continue + + if chunks: + logger.info( + f"Successfully processed {batch_count} row groups from large parquet file" + ) + return chunks + + # Method 3: Regular reading (final fallback) + logger.warning( + "No chunking methods available, falling back to regular reading" + ) + df = parquet_file.read().to_pandas() + chunks.extend(dataframe_to_chunks(df)) + + except Exception as exc: + # If all chunking fails, try regular reading as final fallback + logger.warning( + f"Batched reading failed: {exc}. Falling back to regular reading - this may cause memory issues for large files" + ) + try: + df = parquet_file.read().to_pandas() + chunks.extend(dataframe_to_chunks(df)) + except Exception as fallback_exc: + logger.error(f"Failed to read parquet file: {fallback_exc}") + raise fallback_exc + + return chunks + @singledispatchmethod def _read_parquet_dispatch( self, config_source: ConfigSource, key: str, bucket_name: str @@ -48,24 +141,46 @@ class ParquetDataFrameReader(DataFrameReader): @_read_parquet_dispatch.register def _(self, _: GCSConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper: """ - Read the CSV file from the gcs bucket and return a dataframe + Read the Parquet file from the gcs bucket and return a dataframe """ # pylint: disable=import-outside-toplevel from gcsfs import GCSFileSystem - from pyarrow.parquet import ParquetFile gcs = GCSFileSystem() - file = gcs.open(f"gs://{bucket_name}/{key}") - dataframe_response = ( - ParquetFile(file).read().to_pandas(split_blocks=True, self_destruct=True) - ) - return dataframe_to_chunks(dataframe_response) + file_path = f"gs://{bucket_name}/{key}" + + # Check file size to determine reading strategy + try: + file_info = gcs.info(file_path) + file_size = file_info.get("size", 0) + + file = gcs.open(file_path) + parquet_file = ParquetFile(file) + + if self._should_use_chunking(file_size): + # Use batched reading for large files + return self._read_parquet_in_batches(parquet_file) + else: + # Use regular reading for smaller files + dataframe_response = parquet_file.read().to_pandas( + split_blocks=True, self_destruct=True + ) + return dataframe_to_chunks(dataframe_response) + + except Exception: + # Fallback to regular reading if size check fails + file = gcs.open(file_path) + parquet_file = ParquetFile(file) + dataframe_response = parquet_file.read().to_pandas( + split_blocks=True, self_destruct=True + ) + return dataframe_to_chunks(dataframe_response) @_read_parquet_dispatch.register def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper: # pylint: disable=import-outside-toplevel from pyarrow.fs import S3FileSystem - from pyarrow.parquet import ParquetDataset + from pyarrow.parquet import ParquetDataset, ParquetFile client_kwargs = { "endpoint_override": ( @@ -90,13 +205,40 @@ class ParquetDataFrameReader(DataFrameReader): s3_fs = S3FileSystem(**client_kwargs) bucket_uri = f"{bucket_name}/{key}" - dataset = ParquetDataset(bucket_uri, filesystem=s3_fs) - return dataframe_to_chunks(dataset.read_pandas().to_pandas()) + # Check file size to determine reading strategy + try: + file_info = s3_fs.get_file_info(bucket_uri) + file_size = file_info.size if hasattr(file_info, "size") else 0 + + if self._should_use_chunking(file_size): + # Use ParquetFile for batched reading of large files + logger.info( + f"Large parquet file detected ({file_size} bytes > {MAX_FILE_SIZE_FOR_PREVIEW} bytes). " + f"Using batched reading for file: {bucket_uri}" + ) + parquet_file = ParquetFile(bucket_uri, filesystem=s3_fs) + return self._read_parquet_in_batches(parquet_file) + else: + # Use ParquetDataset for regular reading of smaller files + logger.debug( + f"Reading small parquet file ({file_size} bytes): {bucket_uri}" + ) + dataset = ParquetDataset(bucket_uri, filesystem=s3_fs) + return dataframe_to_chunks(dataset.read_pandas().to_pandas()) + + except Exception as exc: + # Fallback to regular reading if size check fails + logger.warning( + f"Could not determine file size for {bucket_uri}: {exc}. Using regular reading" + ) + dataset = ParquetDataset(bucket_uri, filesystem=s3_fs) + return dataframe_to_chunks(dataset.read_pandas().to_pandas()) @_read_parquet_dispatch.register def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper: import pandas as pd # pylint: disable=import-outside-toplevel + import pyarrow.fs as fs storage_options = return_azure_storage_options(self.config_source) account_url = AZURE_PATH.format( @@ -104,8 +246,33 @@ class ParquetDataFrameReader(DataFrameReader): account_name=self.config_source.securityConfig.accountName, key=key, ) - dataframe = pd.read_parquet(account_url, storage_options=storage_options) - return dataframe_to_chunks(dataframe) + + # Check file size to determine reading strategy + try: + # Try to get file size from Azure + azure_fs = fs.SubTreeFileSystem( + account_url, fs.AzureFileSystem(**storage_options) + ) + file_info = azure_fs.get_file_info("/") + file_size = file_info.size if hasattr(file_info, "size") else 0 + + if self._should_use_chunking(file_size): + # Use PyArrow ParquetFile for batched reading of large files + parquet_file = ParquetFile( + account_url, filesystem=fs.AzureFileSystem(**storage_options) + ) + return self._read_parquet_in_batches(parquet_file) + else: + # Use pandas for regular reading of smaller files + dataframe = pd.read_parquet( + account_url, storage_options=storage_options + ) + return dataframe_to_chunks(dataframe) + + except Exception: + # Fallback to regular pandas reading if size check or batching fails + dataframe = pd.read_parquet(account_url, storage_options=storage_options) + return dataframe_to_chunks(dataframe) @_read_parquet_dispatch.register def _( @@ -114,10 +281,27 @@ class ParquetDataFrameReader(DataFrameReader): key: str, bucket_name: str, # pylint: disable=unused-argument ) -> DatalakeColumnWrapper: + import os + import pandas as pd # pylint: disable=import-outside-toplevel - dataframe = pd.read_parquet(key) - return dataframe_to_chunks(dataframe) + # Check file size to determine reading strategy + try: + file_size = os.path.getsize(key) + + if self._should_use_chunking(file_size): + # Use PyArrow ParquetFile for batched reading of large files + parquet_file = ParquetFile(key) + return self._read_parquet_in_batches(parquet_file) + else: + # Use pandas for regular reading of smaller files + dataframe = pd.read_parquet(key) + return dataframe_to_chunks(dataframe) + + except Exception: + # Fallback to regular pandas reading if size check fails + dataframe = pd.read_parquet(key) + return dataframe_to_chunks(dataframe) def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper: return DatalakeColumnWrapper( @@ -125,3 +309,6 @@ class ParquetDataFrameReader(DataFrameReader): self.config_source, key=key, bucket_name=bucket_name ) ) + + def _should_use_chunking(self, file_size: int) -> bool: + return file_size > MAX_FILE_SIZE_FOR_PREVIEW or file_size == 0 diff --git a/ingestion/tests/unit/readers/test_files/flights-1m.parquet b/ingestion/tests/unit/readers/test_files/flights-1m.parquet new file mode 100644 index 00000000000..950d0a7205a Binary files /dev/null and b/ingestion/tests/unit/readers/test_files/flights-1m.parquet differ diff --git a/ingestion/tests/unit/readers/test_parquet_batches.py b/ingestion/tests/unit/readers/test_parquet_batches.py new file mode 100644 index 00000000000..f68691c5631 --- /dev/null +++ b/ingestion/tests/unit/readers/test_parquet_batches.py @@ -0,0 +1,313 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for ParquetDataFrameReader _read_parquet_in_batches method +""" +import unittest +from unittest.mock import Mock, patch + +import pandas as pd + +from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( + LocalConfig, +) +from metadata.readers.dataframe.common import dataframe_to_chunks +from metadata.readers.dataframe.parquet import ParquetDataFrameReader + + +class TestParquetBatchReading(unittest.TestCase): + """ + Test the _read_parquet_in_batches method functionality + """ + + def setUp(self): + """Set up test fixtures""" + self.config_source = LocalConfig() + self.reader = ParquetDataFrameReader(self.config_source, None) + + def test_successful_batch_reading_with_iter_batches(self): + """Test successful batched reading when iter_batches is available""" + # Create mock parquet file with iter_batches capability + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Create sample data for two batches + batch1_data = pd.DataFrame( + {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "age": [25, 30, 35]} + ) + batch2_data = pd.DataFrame( + {"id": [4, 5], "name": ["David", "Eve"], "age": [40, 45]} + ) + + # Create mock arrow table batches + mock_batch1 = Mock() + mock_batch1.to_pandas.return_value = batch1_data + mock_batch2 = Mock() + mock_batch2.to_pandas.return_value = batch2_data + + mock_parquet_file.iter_batches.return_value = iter([mock_batch1, mock_batch2]) + + with patch( + "metadata.readers.dataframe.parquet.dataframe_to_chunks" + ) as mock_chunks: + # Mock dataframe_to_chunks to return list of chunks per dataframe + mock_chunks.side_effect = lambda df: [df] if not df.empty else [] + + # Test the method + result = self.reader._read_parquet_in_batches( + mock_parquet_file, batch_size=1000 + ) + + # Verify iter_batches was called with correct batch_size + mock_parquet_file.iter_batches.assert_called_once_with(batch_size=1000) + + # Verify dataframe_to_chunks was called twice (once per batch) + self.assertEqual(mock_chunks.call_count, 2) + + # Verify result contains chunks from both batches + self.assertEqual(len(result), 2) + pd.testing.assert_frame_equal(result[0], batch1_data) + pd.testing.assert_frame_equal(result[1], batch2_data) + + def test_batch_reading_with_empty_batches(self): + """Test that empty batches are properly skipped""" + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Create one non-empty and one empty batch + non_empty_batch = pd.DataFrame({"id": [1], "name": ["Alice"]}) + empty_batch = pd.DataFrame() + + mock_batch1 = Mock() + mock_batch1.to_pandas.return_value = non_empty_batch + mock_batch2 = Mock() + mock_batch2.to_pandas.return_value = empty_batch + + mock_parquet_file.iter_batches.return_value = iter([mock_batch1, mock_batch2]) + + with patch( + "metadata.readers.dataframe.parquet.dataframe_to_chunks" + ) as mock_chunks: + mock_chunks.side_effect = lambda df: [df] if not df.empty else [] + + result = self.reader._read_parquet_in_batches(mock_parquet_file) + + # Should only call dataframe_to_chunks once for non-empty batch + mock_chunks.assert_called_once_with(non_empty_batch) + + # Result should only contain the non-empty batch + self.assertEqual(len(result), 1) + pd.testing.assert_frame_equal(result[0], non_empty_batch) + + def test_exception_handling_with_successful_fallback(self): + """Test exception handling when batching fails but fallback succeeds""" + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Make iter_batches raise an exception + mock_parquet_file.iter_batches.side_effect = Exception("Batching failed") + + # But make regular read work + sample_data = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]}) + mock_arrow_table = Mock() + mock_arrow_table.to_pandas.return_value = sample_data + mock_parquet_file.read.return_value = mock_arrow_table + + with patch( + "metadata.readers.dataframe.parquet.dataframe_to_chunks" + ) as mock_chunks: + mock_chunks.return_value = [sample_data] + + with patch("metadata.readers.dataframe.parquet.logger") as mock_logger: + result = self.reader._read_parquet_in_batches(mock_parquet_file) + + # Verify warning was logged about fallback + mock_logger.warning.assert_called_with( + "Batched reading failed: Batching failed. Falling back to regular reading - " + "this may cause memory issues for large files" + ) + + # Verify regular read was used as fallback + mock_parquet_file.read.assert_called_once() + + # Verify result + self.assertEqual(result, [sample_data]) + + def test_exception_handling_with_failed_fallback(self): + """Test when both batching and fallback fail""" + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Make both iter_batches and read fail + mock_parquet_file.iter_batches.side_effect = Exception("Batching failed") + mock_parquet_file.read.side_effect = Exception("Regular read failed") + + with patch("metadata.readers.dataframe.parquet.logger") as mock_logger: + # Should raise the fallback exception + with self.assertRaises(Exception) as context: + self.reader._read_parquet_in_batches(mock_parquet_file) + + self.assertEqual(str(context.exception), "Regular read failed") + + # Verify error was logged + mock_logger.error.assert_called_with( + "Failed to read parquet file: Regular read failed" + ) + + def test_custom_batch_size(self): + """Test that custom batch size is properly passed to iter_batches""" + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Create a simple batch + batch_data = pd.DataFrame({"id": [1], "name": ["Alice"]}) + mock_batch = Mock() + mock_batch.to_pandas.return_value = batch_data + + mock_parquet_file.iter_batches.return_value = iter([mock_batch]) + + with patch( + "metadata.readers.dataframe.parquet.dataframe_to_chunks" + ) as mock_chunks: + mock_chunks.return_value = [batch_data] + + # Test with custom batch size + custom_batch_size = 5000 + self.reader._read_parquet_in_batches( + mock_parquet_file, batch_size=custom_batch_size + ) + + # Verify custom batch size was used + mock_parquet_file.iter_batches.assert_called_once_with( + batch_size=custom_batch_size + ) + + def test_batch_counting_and_logging(self): + """Test that batch counting and logging work correctly""" + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Create multiple batches + batches_data = [ + pd.DataFrame({"id": [1], "name": ["Alice"]}), + pd.DataFrame({"id": [2], "name": ["Bob"]}), + pd.DataFrame({"id": [3], "name": ["Charlie"]}), + ] + + mock_batches = [] + for data in batches_data: + mock_batch = Mock() + mock_batch.to_pandas.return_value = data + mock_batches.append(mock_batch) + + mock_parquet_file.iter_batches.return_value = iter(mock_batches) + + with patch( + "metadata.readers.dataframe.parquet.dataframe_to_chunks" + ) as mock_chunks: + mock_chunks.side_effect = lambda df: [df] if not df.empty else [] + + with patch("metadata.readers.dataframe.parquet.logger") as mock_logger: + result = self.reader._read_parquet_in_batches(mock_parquet_file) + + # Verify info logs were called + mock_logger.info.assert_any_call( + "Reading large parquet file in batches to avoid memory issues" + ) + mock_logger.info.assert_any_call( + "Successfully processed 3 batches from large parquet file" + ) + + # Verify all batches were processed + self.assertEqual(len(result), 3) + + def test_dataframe_to_chunks_integration(self): + """Test integration with dataframe_to_chunks function""" + mock_parquet_file = Mock() + mock_parquet_file.iter_batches = Mock() + + # Create a batch that would generate multiple chunks + batch_data = pd.DataFrame( + { + "id": list( + range(1000) + ), # Large enough to potentially create multiple chunks + "name": [f"User{i}" for i in range(1000)], + } + ) + + mock_batch = Mock() + mock_batch.to_pandas.return_value = batch_data + mock_parquet_file.iter_batches.return_value = iter([mock_batch]) + + # Use real dataframe_to_chunks function + result = self.reader._read_parquet_in_batches(mock_parquet_file) + + # Verify that result is a list of dataframes (chunks) + self.assertIsInstance(result, list) + self.assertTrue(len(result) > 0) + + # Verify that all chunks are DataFrames + for chunk in result: + self.assertIsInstance(chunk, pd.DataFrame) + + # Verify that concatenating all chunks gives us back the original data + if len(result) > 1: + concatenated = pd.concat(result, ignore_index=True) + pd.testing.assert_frame_equal(concatenated, batch_data) + else: + pd.testing.assert_frame_equal(result[0], batch_data) + + def test_real_parquet_file_batch_processing(self): + """Test batch processing with real flights-1m.parquet file (7MB)""" + import os + + from pyarrow.parquet import ParquetFile + + # Path to the test parquet file + test_file_path = os.path.join( + os.path.dirname(__file__), "test_files", "flights-1m.parquet" + ) + + # Skip test if file doesn't exist + if not os.path.exists(test_file_path): + self.skipTest(f"Test file not found: {test_file_path}") + + try: + # Create ParquetFile object from real file + parquet_file = ParquetFile(test_file_path) + + # Get some basic info about the file + file_size = os.path.getsize(test_file_path) + total_rows = parquet_file.metadata.num_rows + + print( + f"Testing with real parquet file: {file_size} bytes, {total_rows} rows" + ) + + result = self.reader._read_parquet_in_batches(parquet_file) + fallback_method_result = dataframe_to_chunks( + parquet_file.read().to_pandas() + ) + result_processed_rows = sum(len(chunk) for chunk in result) + fallback_method_processed_rows = sum( + len(chunk) for chunk in fallback_method_result + ) + + self.assertEqual(result_processed_rows, fallback_method_processed_rows) + + except Exception as e: + self.fail(f"Real parquet file test failed: {e}") + + +if __name__ == "__main__": + unittest.main()