diff --git a/metadata-ingestion/docs/sources/gcs/README.md b/metadata-ingestion/docs/sources/gcs/README.md index d6bb8147f0..47a6b731dc 100644 --- a/metadata-ingestion/docs/sources/gcs/README.md +++ b/metadata-ingestion/docs/sources/gcs/README.md @@ -21,13 +21,14 @@ Supported file types are as follows: - CSV - TSV +- JSONL - JSON - Parquet - Apache Avro Schemas for Parquet and Avro files are extracted as provided. -Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) +Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance. We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object. diff --git a/metadata-ingestion/docs/sources/s3/README.md b/metadata-ingestion/docs/sources/s3/README.md index 8d65e1cf8b..7944f78280 100644 --- a/metadata-ingestion/docs/sources/s3/README.md +++ b/metadata-ingestion/docs/sources/s3/README.md @@ -19,13 +19,14 @@ Supported file types are as follows: - CSV (*.csv) - TSV (*.tsv) +- JSONL (*.jsonl) - JSON (*.json) - Parquet (*.parquet) - Apache Avro (*.avro) Schemas for Parquet and Avro files are extracted as provided. -Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) +Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance. We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object. diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 41fc578235..43a1bcd06d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -377,7 +377,7 @@ class S3Source(StatefulIngestionSourceBase): ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, ) - elif ext.endswith(".json"): + elif ext.endswith(".json") or ext.endswith(".jsonl"): df = self.spark.read.json(file) elif ext.endswith(".avro"): try: @@ -441,6 +441,10 @@ class S3Source(StatefulIngestionSourceBase): fields = csv_tsv.TsvInferrer( max_rows=self.source_config.max_rows ).infer_schema(file) + elif extension == ".jsonl": + fields = json.JsonInferrer( + max_rows=self.source_config.max_rows, format="jsonl" + ).infer_schema(file) elif extension == ".json": fields = json.JsonInferrer().infer_schema(file) elif extension == ".avro": diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py index 251d136fe9..1f2c73a252 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py @@ -1,3 +1,4 @@ +import itertools import logging from typing import IO, Dict, List, Type, Union @@ -33,14 +34,28 @@ logger = logging.getLogger(__name__) class JsonInferrer(SchemaInferenceBase): + def __init__(self, max_rows: int = 100, format: str = "json"): + self.max_rows = max_rows + self.format = format + def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: - try: - datastore = ujson.load(file) - except ujson.JSONDecodeError as e: - logger.info(f"Got ValueError: {e}. Retry with jsonlines") + if self.format == "jsonl": file.seek(0) reader = jsl.Reader(file) - datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)] + datastore = [ + obj + for obj in itertools.islice( + reader.iter(type=dict, skip_invalid=True), self.max_rows + ) + ] + else: + try: + datastore = ujson.load(file) + except ujson.JSONDecodeError as e: + logger.info(f"Got ValueError: {e}. Retry with jsonlines") + file.seek(0) + reader = jsl.Reader(file) + datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)] if not isinstance(datastore, list): datastore = [datastore] diff --git a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py index de88deec9b..a1ef02c27e 100644 --- a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py +++ b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py @@ -74,6 +74,19 @@ def test_infer_schema_tsv(): assert_field_types_match(fields, expected_field_types) +def test_infer_schema_jsonl(): + with tempfile.TemporaryFile(mode="w+b") as file: + file.write( + bytes(test_table.to_json(orient="records", lines=True), encoding="utf-8") + ) + file.seek(0) + + fields = json.JsonInferrer(max_rows=100, format="jsonl").infer_schema(file) + + assert_field_paths_match(fields, expected_field_paths) + assert_field_types_match(fields, expected_field_types) + + def test_infer_schema_json(): with tempfile.TemporaryFile(mode="w+b") as file: file.write(bytes(test_table.to_json(orient="records"), encoding="utf-8"))