mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-01 19:18:05 +00:00
MINOR: support JSONL datalake file types (#16614)
* fix: support JSONL datalake file types * add jsonl zip file types * update fileFormat enum in table schema * add tests * fix test data ref * reformat * fix tests --------- Co-authored-by: Matthew Chamberlin <mchamberlin@ginkgobioworks.com> Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
parent
7411f9e0e1
commit
ac6ddbf6c4
@ -42,6 +42,9 @@ class SupportedTypes(Enum):
|
|||||||
JSON = "json"
|
JSON = "json"
|
||||||
JSONGZ = "json.gz"
|
JSONGZ = "json.gz"
|
||||||
JSONZIP = "json.zip"
|
JSONZIP = "json.zip"
|
||||||
|
JSONL = "jsonl"
|
||||||
|
JSONLGZ = "jsonl.gz"
|
||||||
|
JSONLZIP = "jsonl.zip"
|
||||||
|
|
||||||
|
|
||||||
DF_READER_MAP = {
|
DF_READER_MAP = {
|
||||||
@ -52,6 +55,9 @@ DF_READER_MAP = {
|
|||||||
SupportedTypes.JSON.value: JSONDataFrameReader,
|
SupportedTypes.JSON.value: JSONDataFrameReader,
|
||||||
SupportedTypes.JSONGZ.value: JSONDataFrameReader,
|
SupportedTypes.JSONGZ.value: JSONDataFrameReader,
|
||||||
SupportedTypes.JSONZIP.value: JSONDataFrameReader,
|
SupportedTypes.JSONZIP.value: JSONDataFrameReader,
|
||||||
|
SupportedTypes.JSONL.value: JSONDataFrameReader,
|
||||||
|
SupportedTypes.JSONLGZ.value: JSONDataFrameReader,
|
||||||
|
SupportedTypes.JSONLZIP.value: JSONDataFrameReader,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,2 @@
|
|||||||
|
{"id": "1", "first_name": "John", "last_name": "Doe", "city": "Los Angeles", "country": "US", "birthdate": "1980-01-01", "age": "40", "json_data": {"foo": {"bar": "baz"}}}
|
||||||
|
{"id": "2", "first_name": "James", "last_name": "Doe", "city": "Los Angeles", "country": "US", "birthdate": "1980-01-01", "age": "40", "json_data": {"foo": {"bar": "baz"}}}
|
||||||
@ -37,9 +37,11 @@ class TestDatalake:
|
|||||||
) # type: ignore
|
) # type: ignore
|
||||||
|
|
||||||
entities = resp.entities
|
entities = resp.entities
|
||||||
assert len(entities) == 3
|
assert len(entities) == 4
|
||||||
names = [entity.name.root for entity in entities]
|
names = [entity.name.root for entity in entities]
|
||||||
assert {"names.json", "new_users.parquet", "users.csv"} == set(names)
|
assert {"names.json", "names.jsonl", "new_users.parquet", "users.csv"} == set(
|
||||||
|
names
|
||||||
|
)
|
||||||
|
|
||||||
for entity in entities:
|
for entity in entities:
|
||||||
columns = entity.columns
|
columns = entity.columns
|
||||||
@ -64,10 +66,19 @@ class TestDatalake:
|
|||||||
fqn='datalake_for_integration_tests.default.MyBucket."names.json"',
|
fqn='datalake_for_integration_tests.default.MyBucket."names.json"',
|
||||||
fields=["tableProfilerConfig"],
|
fields=["tableProfilerConfig"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
jsonl_ = self.metadata.get_by_name(
|
||||||
|
entity=Table,
|
||||||
|
fqn='datalake_for_integration_tests.default.MyBucket."names.jsonl"',
|
||||||
|
fields=["tableProfilerConfig"],
|
||||||
|
)
|
||||||
|
|
||||||
csv_sample_data = self.metadata.get_sample_data(csv_)
|
csv_sample_data = self.metadata.get_sample_data(csv_)
|
||||||
# parquet_sample_data = self.metadata.get_sample_data(parquet_)
|
# parquet_sample_data = self.metadata.get_sample_data(parquet_)
|
||||||
json_sample_data = self.metadata.get_sample_data(json_)
|
json_sample_data = self.metadata.get_sample_data(json_)
|
||||||
|
jsonl_sample_data = self.metadata.get_sample_data(jsonl_)
|
||||||
|
|
||||||
assert csv_sample_data.sampleData.rows
|
assert csv_sample_data.sampleData.rows
|
||||||
# assert parquet_sample_data.sampleData.rows
|
# assert parquet_sample_data.sampleData.rows
|
||||||
assert json_sample_data.sampleData.rows
|
assert json_sample_data.sampleData.rows
|
||||||
|
assert jsonl_sample_data.sampleData.rows
|
||||||
|
|||||||
@ -104,6 +104,24 @@ class TestDataFrameReader(TestCase):
|
|||||||
["name", "id", "version", "Company"],
|
["name", "id", "version", "Company"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_jsonl_reader(self):
|
||||||
|
key = ROOT_PATH / "employees.jsonl"
|
||||||
|
|
||||||
|
df_list = fetch_dataframe(
|
||||||
|
config_source=LocalConfig(),
|
||||||
|
client=None,
|
||||||
|
file_fqn=DatalakeTableSchemaWrapper(key=str(key), bucket_name="unused"),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNotNone(df_list)
|
||||||
|
self.assertTrue(len(df_list))
|
||||||
|
|
||||||
|
self.assertEqual(df_list[0].shape, (4, 4))
|
||||||
|
self.assertEqual(
|
||||||
|
list(df_list[0].columns),
|
||||||
|
["name", "id", "version", "Company"],
|
||||||
|
)
|
||||||
|
|
||||||
def test_avro_reader(self):
|
def test_avro_reader(self):
|
||||||
key = ROOT_PATH / "example.avro"
|
key = ROOT_PATH / "example.avro"
|
||||||
|
|
||||||
|
|||||||
4
ingestion/tests/unit/resources/datalake/employees.jsonl
Normal file
4
ingestion/tests/unit/resources/datalake/employees.jsonl
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
{"name": "Name1", "id": "EMP1", "version": 1, "Company": "Collate Inc."}
|
||||||
|
{"name": "Name2", "id": "EMP2", "version": 1, "Company": "Collate Inc."}
|
||||||
|
{"name": "Name3", "id": "EMP3", "version": 1, "Company": "Collate Inc."}
|
||||||
|
{"name": "Name4", "id": "EMP4", "version": 1, "Company": "Collate Inc."}
|
||||||
@ -906,7 +906,7 @@
|
|||||||
"fileFormat": {
|
"fileFormat": {
|
||||||
"description": "File format in case of file/datalake tables.",
|
"description": "File format in case of file/datalake tables.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": ["csv", "tsv", "avro", "parquet", "json", "json.gz", "json.zip"]
|
"enum": ["csv", "tsv", "avro", "parquet", "json", "json.gz", "json.zip", "jsonl", "jsonl.gz", "jsonl.zip"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"properties": {
|
"properties": {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user