Add PreProcessor to simplify splitting and cleaning of docs (#473)

* Add PreProcessing

* Adjust PDF conversion tests

* Add tests for Preprocessing

* Add requirement

* Fix tests

* Ignore decoding errors for TextConverter

* Rename split_size to split_length

* Adjust tests

Co-authored-by: Malte Pietsch <malte.pietsch@deepset.ai>
This commit is contained in:
Tanay Soni 2020-10-15 10:42:08 +02:00 committed by GitHub
parent 2e9f3c1512
commit 974b37eded
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 317 additions and 220 deletions

View File

@ -1,8 +1,6 @@
from abc import abstractmethod
from functools import partial, reduce
from itertools import chain
from pathlib import Path
from typing import List, Optional, Tuple, Generator, Set, Dict, Any
from typing import List, Optional, Dict, Any
import langdetect
@ -12,26 +10,13 @@ class BaseConverter:
Base class for implementing file converts to transform input documents to text format for ingestion in DocumentStore.
"""
def __init__(
self,
remove_numeric_tables: Optional[bool] = None,
remove_header_footer: Optional[bool] = None,
remove_whitespace: Optional[bool] = None,
remove_empty_lines: Optional[bool] = None,
valid_languages: Optional[List[str]] = None,
):
def __init__(self, remove_numeric_tables: Optional[bool] = None, valid_languages: Optional[List[str]] = None):
"""
:param remove_numeric_tables: This option uses heuristics to remove numeric rows from the tables.
The tabular structures in documents might be noise for the reader model if it
does not have table parsing capability for finding answers. However, tables
may also have long strings that could possible candidate for searching answers.
The rows containing strings are thus retained in this option.
:param remove_header_footer: use heuristic to remove footers and headers across different pages by searching
for the longest common string. This heuristic uses exact matches and therefore
works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
or similar.
:param remove_whitespace: strip whitespaces before or after each line in the text.
:param remove_empty_lines: remove more than two empty lines in the text.
:param valid_languages: validate languages from a list of languages specified in the ISO 639-1
(https://en.wikipedia.org/wiki/ISO_639-1) format.
This option can be used to add test for encoding errors. If the extracted text is
@ -39,9 +24,6 @@ class BaseConverter:
in garbled text.
"""
self.remove_numeric_tables = remove_numeric_tables
self.remove_header_footer = remove_header_footer
self.remove_whitespace = remove_whitespace
self.remove_empty_lines = remove_empty_lines
self.valid_languages = valid_languages
@abstractmethod
@ -74,81 +56,4 @@ class BaseConverter:
else:
return False
def find_and_remove_header_footer(
self, pages: List[str], n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
) -> Tuple[List[str], Optional[str], Optional[str]]:
"""
Heuristic to find footers and headers across different pages by searching for the longest common string.
For headers we only search in the first n_chars characters (for footer: last n_chars).
Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
but won't detect "Page 3 of 4" or similar.
:param pages: list of strings, one string per page
:param n_chars: number of first/last characters where the header/footer shall be searched in
:param n_first_pages_to_ignore: number of first pages to ignore (e.g. TOCs often don't contain footer/header)
:param n_last_pages_to_ignore: number of last pages to ignore
:return: (cleaned pages, found_header_str, found_footer_str)
"""
# header
start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
found_header = self._find_longest_common_ngram(start_of_pages)
if found_header:
pages = [page.replace(found_header, "") for page in pages]
# footer
end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
found_footer = self._find_longest_common_ngram(end_of_pages)
if found_footer:
pages = [page.replace(found_footer, "") for page in pages]
return pages, found_header, found_footer
def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
"""
Return ngram (of tokens - currently splitted by whitespace)
:param seq: str, string from which the ngram shall be created
:param n: int, n of ngram
:return: str, ngram as string
"""
# In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
# we add a space here and remove it after creation of the ngrams again (see below)
seq = seq.replace("\n", " \n")
seq = seq.replace("\t", " \t")
words = seq.split(" ")
ngrams = (
" ".join(words[i: i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
)
return ngrams
def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
ngrams = map(partial(self._ngram, seq), lengths)
res = set(chain.from_iterable(ngrams))
return res
def _find_longest_common_ngram(self, sequences: List[str], max_ngram: int = 30, min_ngram: int = 3) -> Optional[str]:
"""
Find the longest common ngram across different text sequences (e.g. start of pages).
Considering all ngrams between the specified range. Helpful for finding footers, headers etc.
:param sequences: list[str], list of strings that shall be searched for common n_grams
:param max_ngram: int, maximum length of ngram to consider
:param min_ngram: minimum length of ngram to consider
:return: str, common string of all sections
"""
sequences = [s for s in sequences if s] # filter empty sequences
if not sequences:
return None
seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
intersection = reduce(set.intersection, seqs_ngrams)
try:
longest = max(intersection, key=len)
except ValueError:
# no common sequence found
longest = ""
return longest if longest.strip() else None

View File

@ -1,5 +1,4 @@
import logging
import re
import subprocess
from pathlib import Path
from typing import List, Optional, Dict, Any
@ -10,26 +9,13 @@ logger = logging.getLogger(__name__)
class PDFToTextConverter(BaseConverter):
def __init__(
self,
remove_numeric_tables: Optional[bool] = False,
remove_whitespace: Optional[bool] = None,
remove_empty_lines: Optional[bool] = None,
remove_header_footer: Optional[bool] = None,
valid_languages: Optional[List[str]] = None,
):
def __init__(self, remove_numeric_tables: Optional[bool] = False, valid_languages: Optional[List[str]] = None):
"""
:param remove_numeric_tables: This option uses heuristics to remove numeric rows from the tables.
The tabular structures in documents might be noise for the reader model if it
does not have table parsing capability for finding answers. However, tables
may also have long strings that could possible candidate for searching answers.
The rows containing strings are thus retained in this option.
:param remove_whitespace: strip whitespaces before or after each line in the text.
:param remove_empty_lines: remove more than two empty lines in the text.
:param remove_header_footer: use heuristic to remove footers and headers across different pages by searching
for the longest common string. This heuristic uses exact matches and therefore
works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
or similar.
:param valid_languages: validate languages from a list of languages specified in the ISO 639-1
(https://en.wikipedia.org/wiki/ISO_639-1) format.
This option can be used to add test for encoding errors. If the extracted text is
@ -52,13 +38,7 @@ class PDFToTextConverter(BaseConverter):
"""
)
super().__init__(
remove_numeric_tables=remove_numeric_tables,
remove_whitespace=remove_whitespace,
remove_empty_lines=remove_empty_lines,
remove_header_footer=remove_header_footer,
valid_languages=valid_languages,
)
super().__init__(remove_numeric_tables=remove_numeric_tables, valid_languages=valid_languages)
def convert(self, file_path: Path, meta: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
@ -87,17 +67,9 @@ class PDFToTextConverter(BaseConverter):
if words and len(digits) / len(words) > 0.4 and not line.strip().endswith("."):
logger.debug(f"Removing line '{line}' from {file_path}")
continue
if self.remove_whitespace:
line = line.strip()
cleaned_lines.append(line)
page = "\n".join(cleaned_lines)
if self.remove_empty_lines:
page = re.sub(r"\n\n+", "\n\n", page)
cleaned_pages.append(page)
if self.valid_languages:
@ -108,12 +80,6 @@ class PDFToTextConverter(BaseConverter):
f"been decoded in the correct text format."
)
if self.remove_header_footer:
cleaned_pages, header, footer = self.find_and_remove_header_footer(
cleaned_pages, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
)
logger.info(f"Removed header '{header}' and footer {footer} in {file_path}")
text = "\f".join(cleaned_pages)
document = {"text": text, "meta": meta}
return document

View File

@ -1,8 +1,7 @@
import logging
import re
from html.parser import HTMLParser
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
from typing import List, Optional, Dict, Any
import requests
from tika import parser as tikaparser
@ -44,10 +43,7 @@ class TikaConverter(BaseConverter):
self,
tika_url: str = "http://localhost:9998/tika",
remove_numeric_tables: Optional[bool] = False,
remove_whitespace: Optional[bool] = None,
remove_empty_lines: Optional[bool] = None,
remove_header_footer: Optional[bool] = None,
valid_languages: Optional[List[str]] = None,
valid_languages: Optional[List[str]] = None
):
"""
:param tika_url: URL of the Tika server
@ -56,12 +52,6 @@ class TikaConverter(BaseConverter):
does not have table parsing capability for finding answers. However, tables
may also have long strings that could possible candidate for searching answers.
The rows containing strings are thus retained in this option.
:param remove_whitespace: strip whitespaces before or after each line in the text.
:param remove_empty_lines: remove more than two empty lines in the text.
:param remove_header_footer: use heuristic to remove footers and headers across different pages by searching
for the longest common string. This heuristic uses exact matches and therefore
works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
or similar.
:param valid_languages: validate languages from a list of languages specified in the ISO 639-1
(https://en.wikipedia.org/wiki/ISO_639-1) format.
This option can be used to add test for encoding errors. If the extracted text is
@ -73,13 +63,7 @@ class TikaConverter(BaseConverter):
raise Exception(f"Apache Tika server is not reachable at the URL '{tika_url}'. To run it locally"
f"with Docker, execute: 'docker run -p 9998:9998 apache/tika:1.24.1'")
self.tika_url = tika_url
super().__init__(
remove_numeric_tables=remove_numeric_tables,
remove_whitespace=remove_whitespace,
remove_empty_lines=remove_empty_lines,
remove_header_footer=remove_header_footer,
valid_languages=valid_languages,
)
super().__init__(remove_numeric_tables=remove_numeric_tables, valid_languages=valid_languages)
def convert(self, file_path: Path, meta: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""
@ -106,16 +90,9 @@ class TikaConverter(BaseConverter):
logger.debug(f"Removing line '{line}' from {file_path}")
continue
if self.remove_whitespace:
line = line.strip()
cleaned_lines.append(line)
page = "\n".join(cleaned_lines)
# always clean up empty lines:
page = re.sub(r"\n\n+", "\n\n", page)
cleaned_pages.append(page)
if self.valid_languages:
@ -126,12 +103,6 @@ class TikaConverter(BaseConverter):
f"been decoded in the correct text format."
)
if self.remove_header_footer:
cleaned_pages, header, footer = self.find_and_remove_header_footer(
cleaned_pages, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
)
logger.info(f"Removed header '{header}' and footer '{footer}' in {file_path}")
text = "\f".join(cleaned_pages)
document = {"text": text, "meta": {**parsed["metadata"], **(meta or {})}}
return document

View File

@ -1,5 +1,4 @@
import logging
import re
from pathlib import Path
from typing import List, Optional, Any, Dict
@ -9,39 +8,22 @@ logger = logging.getLogger(__name__)
class TextConverter(BaseConverter):
def __init__(
self,
remove_numeric_tables: Optional[bool] = False,
remove_whitespace: Optional[bool] = None,
remove_empty_lines: Optional[bool] = None,
remove_header_footer: Optional[bool] = None,
valid_languages: Optional[List[str]] = None,
):
def __init__(self, remove_numeric_tables: Optional[bool] = False, valid_languages: Optional[List[str]] = None):
"""
:param remove_numeric_tables: This option uses heuristics to remove numeric rows from the tables.
The tabular structures in documents might be noise for the reader model if it
does not have table parsing capability for finding answers. However, tables
may also have long strings that could possible candidate for searching answers.
The rows containing strings are thus retained in this option.
:param remove_whitespace: strip whitespaces before or after each line in the text.
:param remove_empty_lines: remove more than two empty lines in the text.
:param remove_header_footer: use heuristic to remove footers and headers across different pages by searching
for the longest common string. This heuristic uses exact matches and therefore
works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
or similar.
:param valid_languages: validate languages from a list of languages specified in the ISO 639-1
(https://en.wikipedia.org/wiki/ISO_639-1) format.
This option can be used to add test for encoding errors. If the extracted text is
not one of the valid languages, then it might likely be encoding error resulting
in garbled text.
"""
super().__init__(
remove_numeric_tables=remove_numeric_tables,
remove_whitespace=remove_whitespace,
remove_empty_lines=remove_empty_lines,
remove_header_footer=remove_header_footer,
valid_languages=valid_languages,
)
super().__init__(remove_numeric_tables=remove_numeric_tables,
valid_languages=valid_languages)
def convert(self,
file_path: Path,
@ -57,7 +39,7 @@ class TextConverter(BaseConverter):
:return: Dict of format {"text": "The text from file", "meta": meta}}
"""
with open(file_path, encoding=encoding) as f:
with open(file_path, encoding=encoding, errors="ignore") as f:
text = f.read()
pages = text.split("\f")
@ -75,16 +57,9 @@ class TextConverter(BaseConverter):
logger.debug(f"Removing line '{line}' from {file_path}")
continue
if self.remove_whitespace:
line = line.strip()
cleaned_lines.append(line)
page = "\n".join(cleaned_lines)
if self.remove_empty_lines:
page = re.sub(r"\n\n+", "\n\n", page)
cleaned_pages.append(page)
if self.valid_languages:
@ -95,12 +70,6 @@ class TextConverter(BaseConverter):
f"been decoded in the correct text format."
)
if self.remove_header_footer:
cleaned_pages, header, footer = self.find_and_remove_header_footer(
cleaned_pages, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
)
logger.info(f"Removed header '{header}' and footer {footer} in {file_path}")
text = "".join(pages)
document = {"text": text, "meta": meta}
return document

View File

@ -0,0 +1,14 @@
from typing import List, Dict, Any
class BasePreProcessor:
def process(self, document: dict) -> List[dict]:
cleaned_document = self.clean(document)
split_documents = self.split(cleaned_document)
return split_documents
def clean(self, document: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError
def split(self, document: Dict[str, Any]) -> List[Dict[str, Any]]:
raise NotImplementedError

View File

@ -0,0 +1,222 @@
import logging
import re
from copy import deepcopy
from functools import partial, reduce
from itertools import chain
from typing import List, Optional, Generator, Set
import nltk
from more_itertools import windowed
from haystack.preprocessor.base import BasePreProcessor
logger = logging.getLogger(__name__)
class PreProcessor(BasePreProcessor):
def __init__(
self,
clean_whitespace: Optional[bool] = True,
clean_header_footer: Optional[bool] = False,
clean_empty_lines: Optional[bool] = True,
split_by: Optional[str] = "passage",
split_length: Optional[int] = 10,
split_stride: Optional[int] = None,
split_respect_sentence_boundary: Optional[bool] = False,
):
"""
:param clean_header_footer: use heuristic to remove footers and headers across different pages by searching
for the longest common string. This heuristic uses exact matches and therefore
works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
or similar.
:param clean_whitespace: strip whitespaces before or after each line in the text.
:param clean_empty_lines: remove more than two empty lines in the text.
:param split_by: split the document by "word", "sentence", or "passage". Set to None to disable splitting.
:param split_length: n number of splits to merge as a single document. For instance, if n -> 10 & split_by ->
"sentence", then each output document will have 10 sentences.
:param split_stride: length of striding window over the splits. For example, if split_by -> `word`,
split_length -> 5 & split_stride -> 2, then the splits would be like:
[w1 w2 w3 w4 w5, w4 w5 w6 w7 w8, w7 w8 w10 w11 w12].
Set the value to None to disable striding behaviour.
:param split_respect_sentence_boundary: whether to split in partial sentences when if split_by -> `word`. If set
to True, the individual split would always have complete sentence &
the number of words being less than or equal to the split_length.
"""
nltk.download("punkt")
self.clean_whitespace = clean_whitespace
self.clean_header_footer = clean_header_footer
self.clean_empty_lines = clean_empty_lines
self.split_by = split_by
self.split_length = split_length
self.split_stride = split_stride
self.split_respect_sentence_boundary = split_respect_sentence_boundary
def clean(self, document: dict) -> dict:
text = document["text"]
if self.clean_header_footer:
text = self._find_and_remove_header_footer(
text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
)
if self.clean_whitespace:
lines = text.splitlines()
cleaned_lines = []
for line in lines:
line = line.strip()
cleaned_lines.append(line)
text = "\n".join(cleaned_lines)
if self.clean_empty_lines:
text = re.sub(r"\n\n+", "\n\n", text)
document["text"] = text
return document
def split(self, document: dict) -> List[dict]:
if not self.split_by:
return [document]
if not self.split_length:
raise Exception("split_length needs be set when using split_by.")
text = document["text"]
if self.split_respect_sentence_boundary: # split by words ensuring no sub sentence splits
if self.split_by == "word":
sentences = nltk.tokenize.sent_tokenize(text)
word_count = 0
text_splits = []
current_slice = ""
for sen in sentences:
current_word_count = len(sen.split(" "))
if current_word_count > self.split_length:
logger.warning(f"A sentence found with word count higher than the split length.")
if word_count + current_word_count > self.split_length:
text_splits.append(current_slice)
current_slice = ""
word_count = 0
current_slice += sen
word_count += len(sen.split(" "))
if current_slice:
text_splits.append(current_slice)
else:
raise NotImplementedError(
"'split_respect_sentence_boundary' parameter is only compatible with " "split_by='word'."
)
else:
# create individual "elements" of passage, sentence, or word
if self.split_by == "passage":
elements = text.split("\n\n")
elif self.split_by == "sentence":
elements = nltk.tokenize.sent_tokenize(text)
elif self.split_by == "word":
elements = text.split(" ")
else:
raise NotImplementedError("PreProcessor only supports 'passage' or 'sentence' split_by options.")
# concatenate individual elements based on split_length & split_stride
if self.split_stride:
segments = windowed(elements, n=self.split_length, step=self.split_length - self.split_stride)
else:
segments = windowed(elements, n=self.split_length, step=self.split_length)
text_splits = []
for seg in segments:
txt = " ".join([t for t in seg if t])
text_splits.append(txt)
# create new document dicts for each text split
documents = []
for i, txt in enumerate(text_splits):
doc = deepcopy(document)
doc["text"] = txt
if "meta" not in doc.keys():
doc["meta"] = {}
doc["meta"]["_split_id"] = i
documents.append(doc)
return documents
def _find_and_remove_header_footer(
self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
) -> str:
"""
Heuristic to find footers and headers across different pages by searching for the longest common string.
For headers we only search in the first n_chars characters (for footer: last n_chars).
Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
but won't detect "Page 3 of 4" or similar.
:param n_chars: number of first/last characters where the header/footer shall be searched in
:param n_first_pages_to_ignore: number of first pages to ignore (e.g. TOCs often don't contain footer/header)
:param n_last_pages_to_ignore: number of last pages to ignore
:return: (cleaned pages, found_header_str, found_footer_str)
"""
pages = text.split("\f")
# header
start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
found_header = self._find_longest_common_ngram(start_of_pages)
if found_header:
pages = [page.replace(found_header, "") for page in pages]
# footer
end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
found_footer = self._find_longest_common_ngram(end_of_pages)
if found_footer:
pages = [page.replace(found_footer, "") for page in pages]
logger.debug(f"Removed header '{found_header}' and footer {found_footer} in document")
text = "\f".join(pages)
return text
def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
"""
Return ngram (of tokens - currently split by whitespace)
:param seq: str, string from which the ngram shall be created
:param n: int, n of ngram
:return: str, ngram as string
"""
# In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
# we add a space here and remove it after creation of the ngrams again (see below)
seq = seq.replace("\n", " \n")
seq = seq.replace("\t", " \t")
words = seq.split(" ")
ngrams = (
" ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
)
return ngrams
def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
ngrams = map(partial(self._ngram, seq), lengths)
res = set(chain.from_iterable(ngrams))
return res
def _find_longest_common_ngram(
self, sequences: List[str], max_ngram: int = 30, min_ngram: int = 3
) -> Optional[str]:
"""
Find the longest common ngram across different text sequences (e.g. start of pages).
Considering all ngrams between the specified range. Helpful for finding footers, headers etc.
:param sequences: list[str], list of strings that shall be searched for common n_grams
:param max_ngram: int, maximum length of ngram to consider
:param min_ngram: minimum length of ngram to consider
:return: str, common string of all sections
"""
sequences = [s for s in sequences if s] # filter empty sequences
if not sequences:
return None
seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
intersection = reduce(set.intersection, seqs_ngrams)
try:
longest = max(intersection, key=len)
except ValueError:
# no common sequence found
longest = ""
return longest if longest.strip() else None

View File

@ -153,7 +153,7 @@ def tika_convert_files_to_dicts(
:return: None
"""
converter = TikaConverter(remove_header_footer=True)
converter = TikaConverter()
paths = [p for p in Path(dir_path).glob("**/*")]
allowed_suffixes = [".pdf", ".txt"]
file_paths: List[Path] = []

View File

@ -20,3 +20,5 @@ faiss-cpu; sys_platform != 'win32' and sys_platform != 'cygwin'
tika
uvloop; sys_platform != 'win32' and sys_platform != 'cygwin'
httptools
nltk
more_itertools

View File

@ -40,21 +40,3 @@ def test_language_validation(Converter, xpdf_fixture, caplog):
assert "The language for samples/pdf/sample_pdf_1.pdf is not one of ['de']." in caplog.text
@pytest.mark.parametrize("Converter", [PDFToTextConverter, TikaConverter])
def test_header_footer_removal(Converter, xpdf_fixture):
converter = Converter(remove_header_footer=True)
converter_no_removal = Converter(remove_header_footer=False)
document1 = converter.convert(file_path=Path("samples/pdf/sample_pdf_1.pdf")) # file contains no header/footer
document2 = converter_no_removal.convert(file_path=Path("samples/pdf/sample_pdf_1.pdf")) # file contains no header/footer
pages1 = document1["text"].split("\f")
pages2 = document2["text"].split("\f")
for p1, p2 in zip(pages1, pages2):
assert p2 == p2
document = converter.convert(file_path=Path("samples/pdf/sample_pdf_2.pdf")) # file contains header and footer
pages = document["text"].split("\f")
assert len(pages) == 4
for page in pages:
assert "This is a header." not in page
assert "footer" not in page

66
test/test_preprocessor.py Normal file
View File

@ -0,0 +1,66 @@
from pathlib import Path
from haystack.file_converter.pdf import PDFToTextConverter
from haystack.preprocessor.preprocessor import PreProcessor
TEXT = """
This is a sample sentence in paragraph_1. This is a sample sentence in paragraph_1. This is a sample sentence in
paragraph_1. This is a sample sentence in paragraph_1. This is a sample sentence in paragraph_1.
This is a sample sentence in paragraph_2. This is a sample sentence in paragraph_2. This is a sample sentence in
paragraph_2. This is a sample sentence in paragraph_2. This is a sample sentence in paragraph_2.
This is a sample sentence in paragraph_3. This is a sample sentence in paragraph_3. This is a sample sentence in
paragraph_3. This is a sample sentence in paragraph_3. This is to trick the test with using an abbreviation like Dr.
in the sentence.
"""
def test_preprocess_sentence_split():
document = {"text": TEXT}
preprocessor = PreProcessor(split_length=1, split_stride=0, split_by="sentence")
documents = preprocessor.process(document)
assert len(documents) == 15
preprocessor = PreProcessor(
split_length=10, split_stride=0, split_by="sentence"
)
documents = preprocessor.process(document)
assert len(documents) == 2
def test_preprocess_word_split():
document = {"text": TEXT}
preprocessor = PreProcessor(split_length=10, split_stride=0, split_by="word", split_respect_sentence_boundary=False)
documents = preprocessor.process(document)
assert len(documents) == 11
preprocessor = PreProcessor(split_length=10, split_stride=0, split_by="word", split_respect_sentence_boundary=True)
documents = preprocessor.process(document)
for doc in documents:
assert len(doc["text"].split(" ")) <= 10 or doc["text"].startswith("This is to trick")
assert len(documents) == 15
def test_preprocess_passage_split():
document = {"text": TEXT}
preprocessor = PreProcessor(split_length=1, split_stride=0, split_by="passage")
documents = preprocessor.process(document)
assert len(documents) == 3
preprocessor = PreProcessor(split_length=2, split_stride=0, split_by="passage")
documents = preprocessor.process(document)
assert len(documents) == 2
def test_clean_header_footer():
converter = PDFToTextConverter()
document = converter.convert(file_path=Path("samples/pdf/sample_pdf_2.pdf")) # file contains header/footer
preprocessor = PreProcessor(clean_header_footer=True, split_by=None)
documents = preprocessor.process(document)
assert len(documents) == 1
assert "This is a header." not in documents[0]["text"]
assert "footer" not in documents[0]["text"]