feat(ingest): Support for JSONL in s3 source with max_rows support (#9921)

Co-authored-by: Aditya <aditya.malik@quillbot.com>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Aditya Malik 2024-02-28 19:35:30 +05:30 committed by GitHub
parent f399a872ad
commit 92b1cfa194
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 42 additions and 8 deletions

View File

@ -21,13 +21,14 @@ Supported file types are as follows:
- CSV - CSV
- TSV - TSV
- JSONL
- JSON - JSON
- Parquet - Parquet
- Apache Avro - Apache Avro
Schemas for Parquet and Avro files are extracted as provided. Schemas for Parquet and Avro files are extracted as provided.
Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details))
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance. JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object. We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.

View File

@ -19,13 +19,14 @@ Supported file types are as follows:
- CSV (*.csv) - CSV (*.csv)
- TSV (*.tsv) - TSV (*.tsv)
- JSONL (*.jsonl)
- JSON (*.json) - JSON (*.json)
- Parquet (*.parquet) - Parquet (*.parquet)
- Apache Avro (*.avro) - Apache Avro (*.avro)
Schemas for Parquet and Avro files are extracted as provided. Schemas for Parquet and Avro files are extracted as provided.
Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details))
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance. JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object. We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.

View File

@ -377,7 +377,7 @@ class S3Source(StatefulIngestionSourceBase):
ignoreLeadingWhiteSpace=True, ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True, ignoreTrailingWhiteSpace=True,
) )
elif ext.endswith(".json"): elif ext.endswith(".json") or ext.endswith(".jsonl"):
df = self.spark.read.json(file) df = self.spark.read.json(file)
elif ext.endswith(".avro"): elif ext.endswith(".avro"):
try: try:
@ -441,6 +441,10 @@ class S3Source(StatefulIngestionSourceBase):
fields = csv_tsv.TsvInferrer( fields = csv_tsv.TsvInferrer(
max_rows=self.source_config.max_rows max_rows=self.source_config.max_rows
).infer_schema(file) ).infer_schema(file)
elif extension == ".jsonl":
fields = json.JsonInferrer(
max_rows=self.source_config.max_rows, format="jsonl"
).infer_schema(file)
elif extension == ".json": elif extension == ".json":
fields = json.JsonInferrer().infer_schema(file) fields = json.JsonInferrer().infer_schema(file)
elif extension == ".avro": elif extension == ".avro":

View File

@ -1,3 +1,4 @@
import itertools
import logging import logging
from typing import IO, Dict, List, Type, Union from typing import IO, Dict, List, Type, Union
@ -33,14 +34,28 @@ logger = logging.getLogger(__name__)
class JsonInferrer(SchemaInferenceBase): class JsonInferrer(SchemaInferenceBase):
def __init__(self, max_rows: int = 100, format: str = "json"):
self.max_rows = max_rows
self.format = format
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
try: if self.format == "jsonl":
datastore = ujson.load(file)
except ujson.JSONDecodeError as e:
logger.info(f"Got ValueError: {e}. Retry with jsonlines")
file.seek(0) file.seek(0)
reader = jsl.Reader(file) reader = jsl.Reader(file)
datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)] datastore = [
obj
for obj in itertools.islice(
reader.iter(type=dict, skip_invalid=True), self.max_rows
)
]
else:
try:
datastore = ujson.load(file)
except ujson.JSONDecodeError as e:
logger.info(f"Got ValueError: {e}. Retry with jsonlines")
file.seek(0)
reader = jsl.Reader(file)
datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)]
if not isinstance(datastore, list): if not isinstance(datastore, list):
datastore = [datastore] datastore = [datastore]

View File

@ -74,6 +74,19 @@ def test_infer_schema_tsv():
assert_field_types_match(fields, expected_field_types) assert_field_types_match(fields, expected_field_types)
def test_infer_schema_jsonl():
with tempfile.TemporaryFile(mode="w+b") as file:
file.write(
bytes(test_table.to_json(orient="records", lines=True), encoding="utf-8")
)
file.seek(0)
fields = json.JsonInferrer(max_rows=100, format="jsonl").infer_schema(file)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
def test_infer_schema_json(): def test_infer_schema_json():
with tempfile.TemporaryFile(mode="w+b") as file: with tempfile.TemporaryFile(mode="w+b") as file:
file.write(bytes(test_table.to_json(orient="records"), encoding="utf-8")) file.write(bytes(test_table.to_json(orient="records"), encoding="utf-8"))