From 392107bc4ac5f635b1d58e65d47b61c99ced263c Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 8 Feb 2023 23:01:25 +0530 Subject: [PATCH] Datalake Avro & Json Lines Support (#10129) --- .../source/database/datalake/metadata.py | 40 +++- .../source/database/datalake/models.py | 31 +++ .../source/database/datalake/utils.py | 108 ++++++++++ ingestion/src/metadata/parsers/avro_parser.py | 92 +++++--- .../metadata/parsers/json_schema_parser.py | 9 +- .../src/metadata/parsers/protobuf_parser.py | 8 +- ingestion/src/metadata/utils/azure_utils.py | 53 ++--- ingestion/src/metadata/utils/gcs_utils.py | 50 ++--- ingestion/src/metadata/utils/s3_utils.py | 39 ++-- .../resources/datasets/avro_data_file.avro | Bin 0 -> 543 bytes .../resources/datasets/avro_schema_file.avro | 51 +++++ ingestion/tests/unit/test_avro_parser.py | 12 +- .../tests/unit/test_json_schema_parser.py | 14 +- ingestion/tests/unit/test_protobuf_parser.py | 12 +- .../unit/topology/database/test_datalake.py | 203 ++++++++++++++++++ .../json/schema/entity/data/table.json | 6 +- 16 files changed, 573 insertions(+), 155 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/datalake/models.py create mode 100644 ingestion/src/metadata/ingestion/source/database/datalake/utils.py create mode 100644 ingestion/tests/unit/resources/datasets/avro_data_file.avro create mode 100644 ingestion/tests/unit/resources/datasets/avro_schema_file.avro diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 7299aa56e7d..61f9817dcb7 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -52,6 +52,7 @@ from metadata.ingestion.source.database.database_service import ( DatabaseServiceSource, SQLSourceStatus, ) +from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper from metadata.utils import fqn from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -70,23 +71,34 @@ DATALAKE_DATA_TYPES = { JSON_SUPPORTED_TYPES = (".json", ".json.gz", ".json.zip") -DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".parquet") + JSON_SUPPORTED_TYPES +DATALAKE_SUPPORTED_FILE_TYPES = ( + ".csv", + ".tsv", + ".parquet", + ".avro", +) + JSON_SUPPORTED_TYPES def ometa_to_dataframe(config_source, client, table): + """ + Method to get dataframe for profiling + """ + data = None if isinstance(config_source, GCSConfig): - return DatalakeSource.get_gcs_files( + data = DatalakeSource.get_gcs_files( client=client, key=table.name.__root__, bucket_name=table.databaseSchema.name, ) if isinstance(config_source, S3Config): - return DatalakeSource.get_s3_files( + data = DatalakeSource.get_s3_files( client=client, key=table.name.__root__, bucket_name=table.databaseSchema.name, ) - return None + if isinstance(data, DatalakeColumnWrapper): + data = data.dataframes + return data class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-methods @@ -418,8 +430,10 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- ) if isinstance(data_frame, DataFrame): columns = self.get_columns(data_frame) - if isinstance(data_frame, list): + if isinstance(data_frame, list) and data_frame: columns = self.get_columns(data_frame[0]) + if isinstance(data_frame, DatalakeColumnWrapper): + columns = data_frame.columns if columns: table_request = CreateTableRequest( name=table_name, @@ -448,6 +462,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- Fetch GCS Bucket files """ from metadata.utils.gcs_utils import ( # pylint: disable=import-outside-toplevel + read_avro_from_gcs, read_csv_from_gcs, read_json_from_gcs, read_parquet_from_gcs, @@ -467,6 +482,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- if key.endswith(".parquet"): return read_parquet_from_gcs(key, bucket_name) + if key.endswith(".avro"): + return read_avro_from_gcs(client, key, bucket_name) + except Exception as exc: logger.debug(traceback.format_exc()) logger.error( @@ -480,6 +498,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- Fetch Azure Storage files """ from metadata.utils.azure_utils import ( # pylint: disable=import-outside-toplevel + read_avro_from_azure, read_csv_from_azure, read_json_from_azure, read_parquet_from_azure, @@ -490,9 +509,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- return read_csv_from_azure(client, key, container_name, storage_options) if key.endswith(JSON_SUPPORTED_TYPES): - return read_json_from_azure( - client, key, container_name, storage_options - ) + return read_json_from_azure(client, key, container_name) if key.endswith(".parquet"): return read_parquet_from_azure( @@ -504,6 +521,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- client, key, container_name, storage_options, sep="\t" ) + if key.endswith(".avro"): + return read_avro_from_azure(client, key, container_name) + except Exception as exc: logger.debug(traceback.format_exc()) logger.error( @@ -517,6 +537,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- Fetch S3 Bucket files """ from metadata.utils.s3_utils import ( # pylint: disable=import-outside-toplevel + read_avro_from_s3, read_csv_from_s3, read_json_from_s3, read_parquet_from_s3, @@ -536,6 +557,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- if key.endswith(".parquet"): return read_parquet_from_s3(client_kwargs, key, bucket_name) + if key.endswith(".avro"): + return read_avro_from_s3(client, key, bucket_name) + except Exception as exc: logger.debug(traceback.format_exc()) logger.error( diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/models.py b/ingestion/src/metadata/ingestion/source/database/datalake/models.py new file mode 100644 index 00000000000..c48b270b2eb --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/datalake/models.py @@ -0,0 +1,31 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module to define pydentic models related to datalake +""" +from typing import Any, List, Optional + +from pydantic import BaseModel + +from metadata.generated.schema.entity.data.table import Column + + +class DatalakeColumnWrapper(BaseModel): + """ + In case of avro files we can directly get the column details and + we do not need the dataframe to parse the metadata but profiler + need the dataframes hence this model binds the columns details and dataframe + which can be used by both profiler and metadata ingestion + """ + + columns: Optional[List[Column]] + dataframes: Optional[List[Any]] # pandas.Dataframe does not have any validators diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/utils.py b/ingestion/src/metadata/ingestion/source/database/datalake/utils.py new file mode 100644 index 00000000000..ad85a293367 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/datalake/utils.py @@ -0,0 +1,108 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module to define helper methods for datalake +""" + +import gzip +import io +import json +import zipfile +from typing import List, Union + +import pandas as pd +from avro.datafile import DataFileReader +from avro.io import DatumReader + +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type.schema import DataTypeTopic +from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper +from metadata.parsers.avro_parser import parse_avro_schema +from metadata.utils.constants import UTF_8 +from metadata.utils.logger import utils_logger + +logger = utils_logger() + +PD_AVRO_FIELD_MAP = { + DataTypeTopic.BOOLEAN.value: "bool", + DataTypeTopic.INT.value: "int", + DataTypeTopic.LONG.value: "float", + DataTypeTopic.FLOAT.value: "float", + DataTypeTopic.DOUBLE.value: "float", + DataTypeTopic.TIMESTAMP.value: "float", + DataTypeTopic.TIMESTAMPZ.value: "float", +} + +AVRO_SCHEMA = "avro.schema" + + +def read_from_avro( + avro_text: bytes, +) -> Union[DatalakeColumnWrapper, List[pd.DataFrame]]: + """ + Method to parse the avro data from storage sources + """ + try: + elements = DataFileReader(io.BytesIO(avro_text), DatumReader()) + if elements.meta.get(AVRO_SCHEMA): + return DatalakeColumnWrapper( + columns=parse_avro_schema( + schema=elements.meta.get(AVRO_SCHEMA).decode(UTF_8), cls=Column + ), + dataframes=[pd.DataFrame.from_records(elements)], + ) + return [pd.DataFrame.from_records(elements)] + except AssertionError: + columns = parse_avro_schema(schema=avro_text, cls=Column) + field_map = { + col.name.__root__: pd.Series( + PD_AVRO_FIELD_MAP.get(col.dataType.value, "str") + ) + for col in columns + } + return DatalakeColumnWrapper( + columns=columns, dataframes=[pd.DataFrame(field_map)] + ) + + +def _get_json_text(key: str, text: bytes, decode: bool) -> str: + if key.endswith(".gz"): + return gzip.decompress(text) + if key.endswith(".zip"): + with zipfile.ZipFile(io.BytesIO(text)) as zip_file: + return zip_file.read(zip_file.infolist()[0]).decode(UTF_8) + if decode: + return text.decode(UTF_8) + return text + + +def read_from_json( + key: str, json_text: str, sample_size: int = 100, decode: bool = False +) -> List[pd.DataFrame]: + """ + Read the json file from the azure container and return a dataframe + """ + json_text = _get_json_text(key, json_text, decode) + try: + data = json.loads(json_text) + except json.decoder.JSONDecodeError: + logger.debug("Failed to read as JSON object trying to read as JSON Lines") + data = [ + json.loads(json_obj) + for json_obj in json_text.strip().split("\n")[:sample_size] + ] + + if isinstance(data, list): + return [pd.DataFrame.from_dict(data[:sample_size])] + return [ + pd.DataFrame.from_dict({key: pd.Series(value) for key, value in data.items()}) + ] diff --git a/ingestion/src/metadata/parsers/avro_parser.py b/ingestion/src/metadata/parsers/avro_parser.py index 0cbad6b3eec..4db90946592 100644 --- a/ingestion/src/metadata/parsers/avro_parser.py +++ b/ingestion/src/metadata/parsers/avro_parser.py @@ -14,38 +14,87 @@ Utils module to parse the avro schema """ import traceback -from typing import List, Optional +from typing import List, Optional, Union import avro.schema as avroschema from avro.schema import ArraySchema +from pydantic.main import ModelMetaclass +from metadata.generated.schema.entity.data.table import Column, DataType from metadata.generated.schema.type.schema import FieldModel from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -def parse_avro_schema(schema: str) -> Optional[List[FieldModel]]: +def parse_array_fields( + field, cls: ModelMetaclass = FieldModel +) -> Optional[List[Union[FieldModel, Column]]]: + """ + Parse array field for avro schema + """ + field_items = field.type.items + child_obj = cls( + name=field_items.name, + dataType=str(field_items.type).upper(), + children=get_avro_fields(field.type.items, cls), + ) + + obj = cls( + name=field.name, + dataType=str(field.type.type).upper(), + ) + + if cls == Column: + if str(field_items.type).upper() == DataType.ARRAY.value: + child_obj.arrayDataType = str(field.type.items.type).upper() + child_obj.dataTypeDisplay = f"{field_items.type}<{field.type.items.type}>" + else: + child_obj.dataTypeDisplay = str(field_items.type) + if str(field.type.type).upper() == DataType.ARRAY.value: + obj.arrayDataType = str(field_items.type).upper() + obj.dataTypeDisplay = f"{field.type.type}<{field_items.type}>" + else: + obj.dataTypeDisplay = str(field.type.type) + + obj.children = [child_obj] + + return obj + + +def parse_single_field( + field, cls: ModelMetaclass = FieldModel +) -> Optional[List[Union[FieldModel, Column]]]: + """ + Parse primitive field for avro schema + """ + obj = cls( + name=field.name, + dataType=str(field.type.type).upper(), + ) + if cls == Column: + obj.dataTypeDisplay = str(field.type.type) + return obj + + +def parse_avro_schema( + schema: str, cls: ModelMetaclass = FieldModel +) -> Optional[List[Union[FieldModel, Column]]]: """ Method to parse the avro schema """ try: parsed_schema = avroschema.parse(schema) - field_models = [ - FieldModel( - name=parsed_schema.name, - dataType=str(parsed_schema.type).upper(), - children=get_avro_fields(parsed_schema), - ) - ] - return field_models + return get_avro_fields(parsed_schema, cls) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning(f"Unable to parse the avro schema: {exc}") return None -def get_avro_fields(parsed_schema) -> Optional[List[FieldModel]]: +def get_avro_fields( + parsed_schema, cls: ModelMetaclass = FieldModel +) -> Optional[List[Union[FieldModel, Column]]]: """ Recursively convert the parsed schema into required models """ @@ -54,26 +103,9 @@ def get_avro_fields(parsed_schema) -> Optional[List[FieldModel]]: for field in parsed_schema.fields: try: if isinstance(field.type, ArraySchema): - field_items = field.type.items - field_models.append( - FieldModel( - name=field.name, - dataType=str(field.type.type).upper(), - children=[ - FieldModel( - name=field_items.name, - dataType=str(field_items.type).upper(), - children=get_avro_fields(field.type.items), - ) - ], - ) - ) + field_models.append(parse_array_fields(field, cls=cls)) else: - field_models.append( - FieldModel( - name=field.name, dataType=str(field.type.fullname).upper() - ) - ) + field_models.append(parse_single_field(field, cls=cls)) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning(f"Unable to parse the avro schema into models: {exc}") diff --git a/ingestion/src/metadata/parsers/json_schema_parser.py b/ingestion/src/metadata/parsers/json_schema_parser.py index 818fc27fbce..7f268f5425d 100644 --- a/ingestion/src/metadata/parsers/json_schema_parser.py +++ b/ingestion/src/metadata/parsers/json_schema_parser.py @@ -44,14 +44,7 @@ def parse_json_schema(schema_text: str) -> Optional[List[FieldModel]]: """ try: json_schema_data = json.loads(schema_text) - field_models = [ - FieldModel( - name=json_schema_data.get("title", "default"), - dataType=JsonSchemaDataTypes(json_schema_data.get("type")).name, - description=json_schema_data.get("description"), - children=get_json_schema_fields(json_schema_data.get("properties")), - ) - ] + field_models = get_json_schema_fields(json_schema_data.get("properties")) return field_models except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/parsers/protobuf_parser.py b/ingestion/src/metadata/parsers/protobuf_parser.py index 3b7f69367f5..a39d6f9705c 100644 --- a/ingestion/src/metadata/parsers/protobuf_parser.py +++ b/ingestion/src/metadata/parsers/protobuf_parser.py @@ -176,13 +176,7 @@ class ProtobufParser: proto_path=proto_path, file_path=file_path ) - field_models = [ - FieldModel( - name=instance.DESCRIPTOR.name, - dataType="RECORD", - children=self.get_protobuf_fields(instance.DESCRIPTOR.fields), - ) - ] + field_models = self.get_protobuf_fields(instance.DESCRIPTOR.fields) # Clean up the tmp folder if Path(self.config.base_file_path).exists(): diff --git a/ingestion/src/metadata/utils/azure_utils.py b/ingestion/src/metadata/utils/azure_utils.py index 30822dd2651..ff9874a50e3 100644 --- a/ingestion/src/metadata/utils/azure_utils.py +++ b/ingestion/src/metadata/utils/azure_utils.py @@ -14,22 +14,37 @@ Utils module to convert different file types from azure file system into a dataf """ import gzip +import io import traceback +import zipfile from typing import Any import pandas as pd +from metadata.ingestion.source.database.datalake.utils import ( + read_from_avro, + read_from_json, +) from metadata.utils.logger import utils_logger logger = utils_logger() -def _get_json_text(key: str, text: bytes) -> str: +def _get_json_text(key: str, text: str) -> str: if key.endswith(".gz"): return gzip.decompress(text) + if key.endswith(".zip"): + with zipfile.ZipFile(io.BytesIO(text)) as zip_file: + return zip_file.read(zip_file.infolist()[0]).decode("utf-8") return text +def get_file_text(client: Any, key: str, container_name: str): + container_client = client.get_container_client(container_name) + blob_client = container_client.get_blob_client(key) + return blob_client.download_blob().readall() + + def read_csv_from_azure( client: Any, key: str, container_name: str, storage_options: dict, sep: str = "," ): @@ -48,33 +63,12 @@ def read_csv_from_azure( return None -def read_json_from_azure( - client: Any, key: str, container_name: str, storage_options, sample_size=100 -): +def read_json_from_azure(client: Any, key: str, container_name: str, sample_size=100): """ Read the json file from the azure container and return a dataframe """ - try: - account_url = ( - f"abfs://{container_name}@{client.account_name}.dfs.core.windows.net/{key}" - ) - dataframe = pd.read_json( - account_url, storage_options=storage_options, typ="series" - ) - - data = _get_json_text(key, dataframe.to_dict()) - - if isinstance(data, list): - return [pd.DataFrame.from_dict(data[:sample_size])] - return [ - pd.DataFrame.from_dict( - {key: pd.Series(value) for key, value in data.items()} - ) - ] - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error reading parquet file from azure - {exc}") - return None + json_text = get_file_text(client=client, key=key, container_name=container_name) + return read_from_json(key=key, json_text=json_text, sample_size=sample_size) def read_parquet_from_azure( @@ -93,3 +87,12 @@ def read_parquet_from_azure( logger.debug(traceback.format_exc()) logger.warning(f"Error reading parquet file from azure - {exc}") return None + + +def read_avro_from_azure(client: Any, key: str, container_name: str): + """ + Read the avro file from the gcs bucket and return a dataframe + """ + return read_from_avro( + get_file_text(client=client, key=key, container_name=container_name) + ) diff --git a/ingestion/src/metadata/utils/gcs_utils.py b/ingestion/src/metadata/utils/gcs_utils.py index 6eb0a6a47ce..d9d636e40ae 100644 --- a/ingestion/src/metadata/utils/gcs_utils.py +++ b/ingestion/src/metadata/utils/gcs_utils.py @@ -13,11 +13,7 @@ Utils module to convert different file types from gcs buckets into a dataframe """ -import gzip -import io -import json import traceback -import zipfile from typing import Any import gcsfs @@ -25,19 +21,19 @@ import pandas as pd from pandas import DataFrame from pyarrow.parquet import ParquetFile +from metadata.ingestion.source.database.datalake.utils import ( + read_from_avro, + read_from_json, +) from metadata.utils.constants import CHUNKSIZE from metadata.utils.logger import utils_logger logger = utils_logger() -def _get_json_text(key: str, text: str) -> str: - if key.endswith(".gz"): - return gzip.decompress(text) - if key.endswith(".zip"): - with zipfile.ZipFile(io.BytesIO(text)) as zip_file: - return zip_file.read(zip_file.infolist()[0]).decode("utf-8") - return text +def get_file_text(client: Any, key: str, bucket_name: str): + bucket = client.get_bucket(bucket_name) + return bucket.get_blob(key).download_as_string() def read_csv_from_gcs( # pylint: disable=inconsistent-return-statements @@ -79,30 +75,12 @@ def read_tsv_from_gcs( # pylint: disable=inconsistent-return-statements logger.warning(f"Error reading CSV from GCS - {exc}") -def read_json_from_gcs( # pylint: disable=inconsistent-return-statements - client: Any, key: str, bucket_name: str -) -> DataFrame: +def read_json_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame: """ Read the json file from the gcs bucket and return a dataframe """ - - try: - bucket = client.get_bucket(bucket_name) - text = bucket.get_blob(key).download_as_string() - data = json.loads(_get_json_text(key, text)) - if isinstance(data, list): - return [pd.DataFrame.from_records(data)] - return [ - pd.DataFrame.from_dict( - dict( # pylint: disable=consider-using-dict-comprehension - [(k, pd.Series(v)) for k, v in data.items()] - ) - ) - ] - - except ValueError as verr: - logger.debug(traceback.format_exc()) - logger.warning(f"Error reading JSON from GCS - {verr}") + json_text = get_file_text(client=client, key=key, bucket_name=bucket_name) + return read_from_json(key=key, json_text=json_text) def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame: @@ -113,3 +91,11 @@ def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame: gcs = gcsfs.GCSFileSystem() file = gcs.open(f"gs://{bucket_name}/{key}") return [ParquetFile(file).read().to_pandas()] + + +def read_avro_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame: + """ + Read the avro file from the gcs bucket and return a dataframe + """ + avro_text = get_file_text(client=client, key=key, bucket_name=bucket_name) + return read_from_avro(avro_text) diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index 12536eef213..7456ad5663b 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -13,30 +13,26 @@ Utils module to convert different file types from s3 buckets into a dataframe """ -import gzip -import io -import json import traceback -import zipfile from typing import Any import pandas as pd import pyarrow.parquet as pq import s3fs +from metadata.ingestion.source.database.datalake.utils import ( + read_from_avro, + read_from_json, +) from metadata.utils.constants import CHUNKSIZE from metadata.utils.logger import utils_logger logger = utils_logger() -def _get_json_text(key: str, text: bytes) -> str: - if key.endswith(".gz"): - return gzip.decompress(text) - if key.endswith(".zip"): - with zipfile.ZipFile(io.BytesIO(text)) as zip_file: - return zip_file.read(zip_file.infolist()[0]).decode("utf-8") - return text.decode("utf-8") +def get_file_text(client: Any, key: str, bucket_name: str): + obj = client.get_object(Bucket=bucket_name, Key=key) + return obj["Body"].read() def read_csv_from_s3( @@ -81,14 +77,10 @@ def read_json_from_s3(client: Any, key: str, bucket_name: str, sample_size=100): """ Read the json file from the s3 bucket and return a dataframe """ - obj = client.get_object(Bucket=bucket_name, Key=key) - json_text = obj["Body"].read() - data = json.loads(_get_json_text(key, json_text)) - if isinstance(data, list): - return [pd.DataFrame.from_dict(data[:sample_size])] - return [ - pd.DataFrame.from_dict({key: pd.Series(value) for key, value in data.items()}) - ] + json_text = get_file_text(client=client, key=key, bucket_name=bucket_name) + return read_from_json( + key=key, json_text=json_text, sample_size=sample_size, decode=True + ) def read_parquet_from_s3(client: Any, key: str, bucket_name: str): @@ -105,3 +97,12 @@ def read_parquet_from_s3(client: Any, key: str, bucket_name: str): bucket_uri = f"s3://{bucket_name}/{key}" dataset = pq.ParquetDataset(bucket_uri, filesystem=s3_fs) return [dataset.read_pandas().to_pandas()] + + +def read_avro_from_s3(client: Any, key: str, bucket_name: str): + """ + Read the avro file from the s3 bucket and return a dataframe + """ + return read_from_avro( + get_file_text(client=client, key=key, bucket_name=bucket_name) + ) diff --git a/ingestion/tests/unit/resources/datasets/avro_data_file.avro b/ingestion/tests/unit/resources/datasets/avro_data_file.avro new file mode 100644 index 0000000000000000000000000000000000000000..0065d684ce7dfd39650ba2700d954683a51d73b2 GIT binary patch literal 543 zcmZ9Jze)o^5XKXTAYvJ;higO#*V$Z`2muQtXv9K9HhXiIHFrD5&YsbP_!MI48)#wW z8))wX2v&lig%6;6`C~NY&V2LxX1?9+oR%wV;>?iGvWuyBTN--yB113m3`#PgSM#(; zi?{77Ytfucgwr}_LKaO)sh_Gi*W?#8*Nqe=A$!5ma0bX3&6ri2S+F%q$4(rQbdZ)a z0*b(P5ehK~DJj<$NJD$#?mHuIRQqMxKB9Gi>COwlwgXAh77NClyUsHlq{u8{{D)Mo zXhoXMu?hXz@<6L|z>H)UtdoM$z%DB7jvXD3PYK*Db?69I5+-RuL^0g zKvIZuS?5VoiXS&O`19rJ_PKO<^ZtETc`C0?;%)Ac>;=$56G0|2G&N_Tn8`3GhDd4j v?;k#{SE`Lf4FvJGCc<26h#&xhCV