mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-12-01 09:27:28 +00:00
feat: Add component CSVDocumentCleaner for removing empty rows and columns (#8816)
* Initial commit for csv cleaner * Add release notes * Update lineterminator * Update releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml Co-authored-by: David S. Batista <dsbatista@gmail.com> * alphabetize * Use lazy import * Some refactoring * Some refactoring --------- Co-authored-by: David S. Batista <dsbatista@gmail.com>
This commit is contained in:
parent
1f257944a6
commit
1785ea622e
@ -1,7 +1,7 @@
|
||||
loaders:
|
||||
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
|
||||
search_path: [../../../haystack/components/preprocessors]
|
||||
modules: ["document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
|
||||
modules: ["csv_document_cleaner", "document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
|
||||
ignore_when_discovered: ["__init__"]
|
||||
processors:
|
||||
- type: filter
|
||||
|
||||
@ -2,9 +2,10 @@
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from .csv_document_cleaner import CSVDocumentCleaner
|
||||
from .document_cleaner import DocumentCleaner
|
||||
from .document_splitter import DocumentSplitter
|
||||
from .recursive_splitter import RecursiveDocumentSplitter
|
||||
from .text_cleaner import TextCleaner
|
||||
|
||||
__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner"]
|
||||
__all__ = ["CSVDocumentCleaner", "DocumentCleaner", "DocumentSplitter", "RecursiveDocumentSplitter", "TextCleaner"]
|
||||
|
||||
116
haystack/components/preprocessors/csv_document_cleaner.py
Normal file
116
haystack/components/preprocessors/csv_document_cleaner.py
Normal file
@ -0,0 +1,116 @@
|
||||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from io import StringIO
|
||||
from typing import Dict, List
|
||||
|
||||
from haystack import Document, component, logging
|
||||
from haystack.lazy_imports import LazyImport
|
||||
|
||||
with LazyImport("Run 'pip install pandas'") as pandas_import:
|
||||
import pandas as pd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@component
|
||||
class CSVDocumentCleaner:
|
||||
"""
|
||||
A component for cleaning CSV documents by removing empty rows and columns.
|
||||
|
||||
This component processes CSV content stored in Documents, allowing
|
||||
for the optional ignoring of a specified number of rows and columns before performing
|
||||
the cleaning operation.
|
||||
"""
|
||||
|
||||
def __init__(self, ignore_rows: int = 0, ignore_columns: int = 0) -> None:
|
||||
"""
|
||||
Initializes the CSVDocumentCleaner component.
|
||||
|
||||
:param ignore_rows: Number of rows to ignore from the top of the CSV table before processing.
|
||||
:param ignore_columns: Number of columns to ignore from the left of the CSV table before processing.
|
||||
|
||||
Rows and columns ignored using these parameters are preserved in the final output, meaning
|
||||
they are not considered when removing empty rows and columns.
|
||||
"""
|
||||
self.ignore_rows = ignore_rows
|
||||
self.ignore_columns = ignore_columns
|
||||
pandas_import.check()
|
||||
|
||||
@component.output_types(documents=List[Document])
|
||||
def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
|
||||
"""
|
||||
Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns.
|
||||
|
||||
:param documents: List of Documents containing CSV-formatted content.
|
||||
|
||||
Processing steps:
|
||||
1. Reads each document's content as a CSV table.
|
||||
2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left.
|
||||
3. Drops any rows and columns that are entirely empty (all NaN values).
|
||||
4. Reattaches the ignored rows and columns to maintain their original positions.
|
||||
5. Returns the cleaned CSV content as a new `Document` object.
|
||||
"""
|
||||
ignore_rows = self.ignore_rows
|
||||
ignore_columns = self.ignore_columns
|
||||
|
||||
cleaned_documents = []
|
||||
for document in documents:
|
||||
try:
|
||||
df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}",
|
||||
id=document.id,
|
||||
error=e,
|
||||
)
|
||||
cleaned_documents.append(document)
|
||||
continue
|
||||
|
||||
if ignore_rows > df.shape[0] or ignore_columns > df.shape[1]:
|
||||
logger.warning(
|
||||
"Document {id} has fewer rows {df_rows} or columns {df_cols} "
|
||||
"than the number of rows {rows} or columns {cols} to ignore. "
|
||||
"Keeping the entire document.",
|
||||
id=document.id,
|
||||
df_rows=df.shape[0],
|
||||
df_cols=df.shape[1],
|
||||
rows=ignore_rows,
|
||||
cols=ignore_columns,
|
||||
)
|
||||
cleaned_documents.append(document)
|
||||
continue
|
||||
|
||||
# Save ignored rows
|
||||
ignored_rows = None
|
||||
if ignore_rows > 0:
|
||||
ignored_rows = df.iloc[:ignore_rows, :]
|
||||
|
||||
# Save ignored columns
|
||||
ignored_columns = None
|
||||
if ignore_columns > 0:
|
||||
ignored_columns = df.iloc[:, :ignore_columns]
|
||||
|
||||
# Drop rows and columns that are entirely empty
|
||||
remaining_df = df.iloc[ignore_rows:, ignore_columns:]
|
||||
final_df = remaining_df.dropna(axis=0, how="all").dropna(axis=1, how="all")
|
||||
|
||||
# Reattach ignored rows
|
||||
if ignore_rows > 0 and ignored_rows is not None:
|
||||
# Keep only relevant columns
|
||||
ignored_rows = ignored_rows.loc[:, final_df.columns]
|
||||
final_df = pd.concat([ignored_rows, final_df], axis=0)
|
||||
|
||||
# Reattach ignored columns
|
||||
if ignore_columns > 0 and ignored_columns is not None:
|
||||
# Keep only relevant rows
|
||||
ignored_columns = ignored_columns.loc[final_df.index, :]
|
||||
final_df = pd.concat([ignored_columns, final_df], axis=1)
|
||||
|
||||
cleaned_documents.append(
|
||||
Document(
|
||||
content=final_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy()
|
||||
)
|
||||
)
|
||||
return {"documents": cleaned_documents}
|
||||
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Introduced `CSVDocumentCleaner` component for cleaning CSV documents.
|
||||
- Removes empty rows and columns, while preserving specified ignored rows and columns.
|
||||
- Customizable number of rows and columns to ignore during processing.
|
||||
146
test/components/preprocessors/test_csv_document_cleaner.py
Normal file
146
test/components/preprocessors/test_csv_document_cleaner.py
Normal file
@ -0,0 +1,146 @@
|
||||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from haystack import Document
|
||||
|
||||
from haystack.components.preprocessors.csv_document_cleaner import CSVDocumentCleaner
|
||||
|
||||
|
||||
def test_empty_column() -> None:
|
||||
csv_content = """,A,B,C
|
||||
,1,2,3
|
||||
,4,5,6
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner()
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"
|
||||
|
||||
|
||||
def test_empty_row() -> None:
|
||||
csv_content = """A,B,C
|
||||
1,2,3
|
||||
,,
|
||||
4,5,6
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner()
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"
|
||||
|
||||
|
||||
def test_empty_column_and_row() -> None:
|
||||
csv_content = """,A,B,C
|
||||
,1,2,3
|
||||
,,,
|
||||
,4,5,6
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner()
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"
|
||||
|
||||
|
||||
def test_ignore_rows() -> None:
|
||||
csv_content = """,,
|
||||
A,B,C
|
||||
4,5,6
|
||||
7,8,9
|
||||
"""
|
||||
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n7,8,9\n"
|
||||
assert cleaned_document.meta == {"name": "test.csv"}
|
||||
|
||||
|
||||
def test_ignore_rows_2() -> None:
|
||||
csv_content = """A,B,C
|
||||
,,
|
||||
4,5,6
|
||||
7,8,9
|
||||
"""
|
||||
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == "A,B,C\n4,5,6\n7,8,9\n"
|
||||
assert cleaned_document.meta == {"name": "test.csv"}
|
||||
|
||||
|
||||
def test_ignore_rows_3() -> None:
|
||||
csv_content = """A,B,C
|
||||
4,,6
|
||||
7,,9
|
||||
"""
|
||||
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == "A,C\n4,6\n7,9\n"
|
||||
assert cleaned_document.meta == {"name": "test.csv"}
|
||||
|
||||
|
||||
def test_ignore_columns() -> None:
|
||||
csv_content = """,,A,B
|
||||
,2,3,4
|
||||
,7,8,9
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == ",,A,B\n,2,3,4\n,7,8,9\n"
|
||||
|
||||
|
||||
def test_too_many_ignore_rows() -> None:
|
||||
csv_content = """,,
|
||||
A,B,C
|
||||
4,5,6
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=4)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n"
|
||||
|
||||
|
||||
def test_too_many_ignore_columns() -> None:
|
||||
csv_content = """,,
|
||||
A,B,C
|
||||
4,5,6
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=4)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n"
|
||||
|
||||
|
||||
def test_ignore_rows_and_columns() -> None:
|
||||
csv_content = """,A,B,C
|
||||
1,item,s,
|
||||
2,item2,fd,
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1, ignore_rows=1)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == ",A,B\n1,item,s\n2,item2,fd\n"
|
||||
|
||||
|
||||
def test_zero_ignore_rows_and_columns() -> None:
|
||||
csv_content = """,A,B,C
|
||||
1,item,s,
|
||||
2,item2,fd,
|
||||
"""
|
||||
csv_document = Document(content=csv_content)
|
||||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=0, ignore_rows=0)
|
||||
result = csv_document_cleaner.run([csv_document])
|
||||
cleaned_document = result["documents"][0]
|
||||
assert cleaned_document.content == ",A,B,C\n1,item,s,\n2,item2,fd,\n"
|
||||
Loading…
x
Reference in New Issue
Block a user