Feat: parsing supports jsonl or ldjson format (#9087)

### What problem does this PR solve?

Supports jsonl or ldjson format. Feature request from
[discussion](https://github.com/orgs/infiniflow/discussions/8774).

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei 2025-07-30 09:48:20 +08:00 committed by GitHub
parent ba563f8095
commit 39ef2ffba9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 18 deletions

View File

@ -155,7 +155,7 @@ def filename_type(filename):
if re.match(r".*\.pdf$", filename):
return FileType.PDF.value
if re.match(r".*\.(eml|doc|docx|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md|py|js|java|c|cpp|h|php|go|ts|sh|cs|kt|html|sql)$", filename):
if re.match(r".*\.(eml|doc|docx|ppt|pptx|yml|xml|htm|json|jsonl|ldjson|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md|py|js|java|c|cpp|h|php|go|ts|sh|cs|kt|html|sql)$", filename):
return FileType.DOC.value
if re.match(r".*\.(wav|flac|ape|alac|wavpack|wv|mp3|aac|ogg|vorbis|opus)$", filename):

View File

@ -22,24 +22,22 @@ import json
from typing import Any
from rag.nlp import find_codec
class RAGFlowJsonParser:
def __init__(
self, max_chunk_size: int = 2000, min_chunk_size: int | None = None
):
def __init__(self, max_chunk_size: int = 2000, min_chunk_size: int | None = None):
super().__init__()
self.max_chunk_size = max_chunk_size * 2
self.min_chunk_size = (
min_chunk_size
if min_chunk_size is not None
else max(max_chunk_size - 200, 50)
)
self.min_chunk_size = min_chunk_size if min_chunk_size is not None else max(max_chunk_size - 200, 50)
def __call__(self, binary):
encoding = find_codec(binary)
txt = binary.decode(encoding, errors="ignore")
json_data = json.loads(txt)
chunks = self.split_json(json_data, True)
sections = [json.dumps(line, ensure_ascii=False) for line in chunks if line]
if self.is_jsonl_format(txt):
sections = self._parse_jsonl(txt)
else:
sections = self._parse_json(txt)
return sections
@staticmethod
@ -60,14 +58,11 @@ class RAGFlowJsonParser:
return {k: self._list_to_dict_preprocessing(v) for k, v in data.items()}
elif isinstance(data, list):
# Convert the list to a dictionary with index-based keys
return {
str(i): self._list_to_dict_preprocessing(item)
for i, item in enumerate(data)
}
return {str(i): self._list_to_dict_preprocessing(item) for i, item in enumerate(data)}
else:
# Base case: the item is neither a dict nor a list, so return it unchanged
return data
def _json_split(
self,
data,
@ -131,3 +126,54 @@ class RAGFlowJsonParser:
# Convert to string
return [json.dumps(chunk, ensure_ascii=ensure_ascii) for chunk in chunks]
def _parse_json(self, content: str) -> list[str]:
sections = []
try:
json_data = json.loads(content)
chunks = self.split_json(json_data, True)
sections = [json.dumps(line, ensure_ascii=False) for line in chunks if line]
except json.JSONDecodeError:
pass
return sections
def _parse_jsonl(self, content: str) -> list[str]:
lines = content.strip().splitlines()
all_chunks = []
for line in lines:
if not line.strip():
continue
try:
data = json.loads(line)
chunks = self.split_json(data, convert_lists=True)
all_chunks.extend(json.dumps(chunk, ensure_ascii=False) for chunk in chunks if chunk)
except json.JSONDecodeError:
continue
return all_chunks
def is_jsonl_format(self, txt: str, sample_limit: int = 10, threshold: float = 0.8) -> bool:
lines = [line.strip() for line in txt.strip().splitlines() if line.strip()]
if not lines:
return False
try:
json.loads(txt)
return False
except json.JSONDecodeError:
pass
sample_limit = min(len(lines), sample_limit)
sample_lines = lines[:sample_limit]
valid_lines = sum(1 for line in sample_lines if self._is_valid_json(line))
if not valid_lines:
return False
return (valid_lines / len(sample_lines)) >= threshold
def _is_valid_json(self, line: str) -> bool:
try:
json.loads(line)
return True
except json.JSONDecodeError:
return False

View File

@ -499,7 +499,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
sections = [(_, "") for _ in sections if _]
callback(0.8, "Finish parsing.")
elif re.search(r"\.json$", filename, re.IGNORECASE):
elif re.search(r"\.(json|jsonl|ldjson)$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.")
chunk_token_num = int(parser_config.get("chunk_token_num", 128))
sections = JsonParser(chunk_token_num)(binary)