mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-18 14:06:59 +00:00
Datalake Avro & Json Lines Support (#10129)
This commit is contained in:
parent
34a0cc147e
commit
392107bc4a
@ -52,6 +52,7 @@ from metadata.ingestion.source.database.database_service import (
|
||||
DatabaseServiceSource,
|
||||
SQLSourceStatus,
|
||||
)
|
||||
from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.filters import filter_by_schema, filter_by_table
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
@ -70,23 +71,34 @@ DATALAKE_DATA_TYPES = {
|
||||
|
||||
JSON_SUPPORTED_TYPES = (".json", ".json.gz", ".json.zip")
|
||||
|
||||
DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".parquet") + JSON_SUPPORTED_TYPES
|
||||
DATALAKE_SUPPORTED_FILE_TYPES = (
|
||||
".csv",
|
||||
".tsv",
|
||||
".parquet",
|
||||
".avro",
|
||||
) + JSON_SUPPORTED_TYPES
|
||||
|
||||
|
||||
def ometa_to_dataframe(config_source, client, table):
|
||||
"""
|
||||
Method to get dataframe for profiling
|
||||
"""
|
||||
data = None
|
||||
if isinstance(config_source, GCSConfig):
|
||||
return DatalakeSource.get_gcs_files(
|
||||
data = DatalakeSource.get_gcs_files(
|
||||
client=client,
|
||||
key=table.name.__root__,
|
||||
bucket_name=table.databaseSchema.name,
|
||||
)
|
||||
if isinstance(config_source, S3Config):
|
||||
return DatalakeSource.get_s3_files(
|
||||
data = DatalakeSource.get_s3_files(
|
||||
client=client,
|
||||
key=table.name.__root__,
|
||||
bucket_name=table.databaseSchema.name,
|
||||
)
|
||||
return None
|
||||
if isinstance(data, DatalakeColumnWrapper):
|
||||
data = data.dataframes
|
||||
return data
|
||||
|
||||
|
||||
class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-methods
|
||||
@ -418,8 +430,10 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
)
|
||||
if isinstance(data_frame, DataFrame):
|
||||
columns = self.get_columns(data_frame)
|
||||
if isinstance(data_frame, list):
|
||||
if isinstance(data_frame, list) and data_frame:
|
||||
columns = self.get_columns(data_frame[0])
|
||||
if isinstance(data_frame, DatalakeColumnWrapper):
|
||||
columns = data_frame.columns
|
||||
if columns:
|
||||
table_request = CreateTableRequest(
|
||||
name=table_name,
|
||||
@ -448,6 +462,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
Fetch GCS Bucket files
|
||||
"""
|
||||
from metadata.utils.gcs_utils import ( # pylint: disable=import-outside-toplevel
|
||||
read_avro_from_gcs,
|
||||
read_csv_from_gcs,
|
||||
read_json_from_gcs,
|
||||
read_parquet_from_gcs,
|
||||
@ -467,6 +482,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
if key.endswith(".parquet"):
|
||||
return read_parquet_from_gcs(key, bucket_name)
|
||||
|
||||
if key.endswith(".avro"):
|
||||
return read_avro_from_gcs(client, key, bucket_name)
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
@ -480,6 +498,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
Fetch Azure Storage files
|
||||
"""
|
||||
from metadata.utils.azure_utils import ( # pylint: disable=import-outside-toplevel
|
||||
read_avro_from_azure,
|
||||
read_csv_from_azure,
|
||||
read_json_from_azure,
|
||||
read_parquet_from_azure,
|
||||
@ -490,9 +509,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
return read_csv_from_azure(client, key, container_name, storage_options)
|
||||
|
||||
if key.endswith(JSON_SUPPORTED_TYPES):
|
||||
return read_json_from_azure(
|
||||
client, key, container_name, storage_options
|
||||
)
|
||||
return read_json_from_azure(client, key, container_name)
|
||||
|
||||
if key.endswith(".parquet"):
|
||||
return read_parquet_from_azure(
|
||||
@ -504,6 +521,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
client, key, container_name, storage_options, sep="\t"
|
||||
)
|
||||
|
||||
if key.endswith(".avro"):
|
||||
return read_avro_from_azure(client, key, container_name)
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
@ -517,6 +537,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
Fetch S3 Bucket files
|
||||
"""
|
||||
from metadata.utils.s3_utils import ( # pylint: disable=import-outside-toplevel
|
||||
read_avro_from_s3,
|
||||
read_csv_from_s3,
|
||||
read_json_from_s3,
|
||||
read_parquet_from_s3,
|
||||
@ -536,6 +557,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
|
||||
if key.endswith(".parquet"):
|
||||
return read_parquet_from_s3(client_kwargs, key, bucket_name)
|
||||
|
||||
if key.endswith(".avro"):
|
||||
return read_avro_from_s3(client, key, bucket_name)
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
|
@ -0,0 +1,31 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Module to define pydentic models related to datalake
|
||||
"""
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
|
||||
|
||||
class DatalakeColumnWrapper(BaseModel):
|
||||
"""
|
||||
In case of avro files we can directly get the column details and
|
||||
we do not need the dataframe to parse the metadata but profiler
|
||||
need the dataframes hence this model binds the columns details and dataframe
|
||||
which can be used by both profiler and metadata ingestion
|
||||
"""
|
||||
|
||||
columns: Optional[List[Column]]
|
||||
dataframes: Optional[List[Any]] # pandas.Dataframe does not have any validators
|
@ -0,0 +1,108 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Module to define helper methods for datalake
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import io
|
||||
import json
|
||||
import zipfile
|
||||
from typing import List, Union
|
||||
|
||||
import pandas as pd
|
||||
from avro.datafile import DataFileReader
|
||||
from avro.io import DatumReader
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.generated.schema.type.schema import DataTypeTopic
|
||||
from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper
|
||||
from metadata.parsers.avro_parser import parse_avro_schema
|
||||
from metadata.utils.constants import UTF_8
|
||||
from metadata.utils.logger import utils_logger
|
||||
|
||||
logger = utils_logger()
|
||||
|
||||
PD_AVRO_FIELD_MAP = {
|
||||
DataTypeTopic.BOOLEAN.value: "bool",
|
||||
DataTypeTopic.INT.value: "int",
|
||||
DataTypeTopic.LONG.value: "float",
|
||||
DataTypeTopic.FLOAT.value: "float",
|
||||
DataTypeTopic.DOUBLE.value: "float",
|
||||
DataTypeTopic.TIMESTAMP.value: "float",
|
||||
DataTypeTopic.TIMESTAMPZ.value: "float",
|
||||
}
|
||||
|
||||
AVRO_SCHEMA = "avro.schema"
|
||||
|
||||
|
||||
def read_from_avro(
|
||||
avro_text: bytes,
|
||||
) -> Union[DatalakeColumnWrapper, List[pd.DataFrame]]:
|
||||
"""
|
||||
Method to parse the avro data from storage sources
|
||||
"""
|
||||
try:
|
||||
elements = DataFileReader(io.BytesIO(avro_text), DatumReader())
|
||||
if elements.meta.get(AVRO_SCHEMA):
|
||||
return DatalakeColumnWrapper(
|
||||
columns=parse_avro_schema(
|
||||
schema=elements.meta.get(AVRO_SCHEMA).decode(UTF_8), cls=Column
|
||||
),
|
||||
dataframes=[pd.DataFrame.from_records(elements)],
|
||||
)
|
||||
return [pd.DataFrame.from_records(elements)]
|
||||
except AssertionError:
|
||||
columns = parse_avro_schema(schema=avro_text, cls=Column)
|
||||
field_map = {
|
||||
col.name.__root__: pd.Series(
|
||||
PD_AVRO_FIELD_MAP.get(col.dataType.value, "str")
|
||||
)
|
||||
for col in columns
|
||||
}
|
||||
return DatalakeColumnWrapper(
|
||||
columns=columns, dataframes=[pd.DataFrame(field_map)]
|
||||
)
|
||||
|
||||
|
||||
def _get_json_text(key: str, text: bytes, decode: bool) -> str:
|
||||
if key.endswith(".gz"):
|
||||
return gzip.decompress(text)
|
||||
if key.endswith(".zip"):
|
||||
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
|
||||
return zip_file.read(zip_file.infolist()[0]).decode(UTF_8)
|
||||
if decode:
|
||||
return text.decode(UTF_8)
|
||||
return text
|
||||
|
||||
|
||||
def read_from_json(
|
||||
key: str, json_text: str, sample_size: int = 100, decode: bool = False
|
||||
) -> List[pd.DataFrame]:
|
||||
"""
|
||||
Read the json file from the azure container and return a dataframe
|
||||
"""
|
||||
json_text = _get_json_text(key, json_text, decode)
|
||||
try:
|
||||
data = json.loads(json_text)
|
||||
except json.decoder.JSONDecodeError:
|
||||
logger.debug("Failed to read as JSON object trying to read as JSON Lines")
|
||||
data = [
|
||||
json.loads(json_obj)
|
||||
for json_obj in json_text.strip().split("\n")[:sample_size]
|
||||
]
|
||||
|
||||
if isinstance(data, list):
|
||||
return [pd.DataFrame.from_dict(data[:sample_size])]
|
||||
return [
|
||||
pd.DataFrame.from_dict({key: pd.Series(value) for key, value in data.items()})
|
||||
]
|
@ -14,38 +14,87 @@ Utils module to parse the avro schema
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import avro.schema as avroschema
|
||||
from avro.schema import ArraySchema
|
||||
from pydantic.main import ModelMetaclass
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column, DataType
|
||||
from metadata.generated.schema.type.schema import FieldModel
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
def parse_avro_schema(schema: str) -> Optional[List[FieldModel]]:
|
||||
def parse_array_fields(
|
||||
field, cls: ModelMetaclass = FieldModel
|
||||
) -> Optional[List[Union[FieldModel, Column]]]:
|
||||
"""
|
||||
Parse array field for avro schema
|
||||
"""
|
||||
field_items = field.type.items
|
||||
child_obj = cls(
|
||||
name=field_items.name,
|
||||
dataType=str(field_items.type).upper(),
|
||||
children=get_avro_fields(field.type.items, cls),
|
||||
)
|
||||
|
||||
obj = cls(
|
||||
name=field.name,
|
||||
dataType=str(field.type.type).upper(),
|
||||
)
|
||||
|
||||
if cls == Column:
|
||||
if str(field_items.type).upper() == DataType.ARRAY.value:
|
||||
child_obj.arrayDataType = str(field.type.items.type).upper()
|
||||
child_obj.dataTypeDisplay = f"{field_items.type}<{field.type.items.type}>"
|
||||
else:
|
||||
child_obj.dataTypeDisplay = str(field_items.type)
|
||||
if str(field.type.type).upper() == DataType.ARRAY.value:
|
||||
obj.arrayDataType = str(field_items.type).upper()
|
||||
obj.dataTypeDisplay = f"{field.type.type}<{field_items.type}>"
|
||||
else:
|
||||
obj.dataTypeDisplay = str(field.type.type)
|
||||
|
||||
obj.children = [child_obj]
|
||||
|
||||
return obj
|
||||
|
||||
|
||||
def parse_single_field(
|
||||
field, cls: ModelMetaclass = FieldModel
|
||||
) -> Optional[List[Union[FieldModel, Column]]]:
|
||||
"""
|
||||
Parse primitive field for avro schema
|
||||
"""
|
||||
obj = cls(
|
||||
name=field.name,
|
||||
dataType=str(field.type.type).upper(),
|
||||
)
|
||||
if cls == Column:
|
||||
obj.dataTypeDisplay = str(field.type.type)
|
||||
return obj
|
||||
|
||||
|
||||
def parse_avro_schema(
|
||||
schema: str, cls: ModelMetaclass = FieldModel
|
||||
) -> Optional[List[Union[FieldModel, Column]]]:
|
||||
"""
|
||||
Method to parse the avro schema
|
||||
"""
|
||||
try:
|
||||
parsed_schema = avroschema.parse(schema)
|
||||
field_models = [
|
||||
FieldModel(
|
||||
name=parsed_schema.name,
|
||||
dataType=str(parsed_schema.type).upper(),
|
||||
children=get_avro_fields(parsed_schema),
|
||||
)
|
||||
]
|
||||
return field_models
|
||||
return get_avro_fields(parsed_schema, cls)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to parse the avro schema: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
def get_avro_fields(parsed_schema) -> Optional[List[FieldModel]]:
|
||||
def get_avro_fields(
|
||||
parsed_schema, cls: ModelMetaclass = FieldModel
|
||||
) -> Optional[List[Union[FieldModel, Column]]]:
|
||||
"""
|
||||
Recursively convert the parsed schema into required models
|
||||
"""
|
||||
@ -54,26 +103,9 @@ def get_avro_fields(parsed_schema) -> Optional[List[FieldModel]]:
|
||||
for field in parsed_schema.fields:
|
||||
try:
|
||||
if isinstance(field.type, ArraySchema):
|
||||
field_items = field.type.items
|
||||
field_models.append(
|
||||
FieldModel(
|
||||
name=field.name,
|
||||
dataType=str(field.type.type).upper(),
|
||||
children=[
|
||||
FieldModel(
|
||||
name=field_items.name,
|
||||
dataType=str(field_items.type).upper(),
|
||||
children=get_avro_fields(field.type.items),
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
field_models.append(parse_array_fields(field, cls=cls))
|
||||
else:
|
||||
field_models.append(
|
||||
FieldModel(
|
||||
name=field.name, dataType=str(field.type.fullname).upper()
|
||||
)
|
||||
)
|
||||
field_models.append(parse_single_field(field, cls=cls))
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to parse the avro schema into models: {exc}")
|
||||
|
@ -44,14 +44,7 @@ def parse_json_schema(schema_text: str) -> Optional[List[FieldModel]]:
|
||||
"""
|
||||
try:
|
||||
json_schema_data = json.loads(schema_text)
|
||||
field_models = [
|
||||
FieldModel(
|
||||
name=json_schema_data.get("title", "default"),
|
||||
dataType=JsonSchemaDataTypes(json_schema_data.get("type")).name,
|
||||
description=json_schema_data.get("description"),
|
||||
children=get_json_schema_fields(json_schema_data.get("properties")),
|
||||
)
|
||||
]
|
||||
field_models = get_json_schema_fields(json_schema_data.get("properties"))
|
||||
return field_models
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
|
@ -176,13 +176,7 @@ class ProtobufParser:
|
||||
proto_path=proto_path, file_path=file_path
|
||||
)
|
||||
|
||||
field_models = [
|
||||
FieldModel(
|
||||
name=instance.DESCRIPTOR.name,
|
||||
dataType="RECORD",
|
||||
children=self.get_protobuf_fields(instance.DESCRIPTOR.fields),
|
||||
)
|
||||
]
|
||||
field_models = self.get_protobuf_fields(instance.DESCRIPTOR.fields)
|
||||
|
||||
# Clean up the tmp folder
|
||||
if Path(self.config.base_file_path).exists():
|
||||
|
@ -14,22 +14,37 @@ Utils module to convert different file types from azure file system into a dataf
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import io
|
||||
import traceback
|
||||
import zipfile
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from metadata.ingestion.source.database.datalake.utils import (
|
||||
read_from_avro,
|
||||
read_from_json,
|
||||
)
|
||||
from metadata.utils.logger import utils_logger
|
||||
|
||||
logger = utils_logger()
|
||||
|
||||
|
||||
def _get_json_text(key: str, text: bytes) -> str:
|
||||
def _get_json_text(key: str, text: str) -> str:
|
||||
if key.endswith(".gz"):
|
||||
return gzip.decompress(text)
|
||||
if key.endswith(".zip"):
|
||||
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
|
||||
return zip_file.read(zip_file.infolist()[0]).decode("utf-8")
|
||||
return text
|
||||
|
||||
|
||||
def get_file_text(client: Any, key: str, container_name: str):
|
||||
container_client = client.get_container_client(container_name)
|
||||
blob_client = container_client.get_blob_client(key)
|
||||
return blob_client.download_blob().readall()
|
||||
|
||||
|
||||
def read_csv_from_azure(
|
||||
client: Any, key: str, container_name: str, storage_options: dict, sep: str = ","
|
||||
):
|
||||
@ -48,33 +63,12 @@ def read_csv_from_azure(
|
||||
return None
|
||||
|
||||
|
||||
def read_json_from_azure(
|
||||
client: Any, key: str, container_name: str, storage_options, sample_size=100
|
||||
):
|
||||
def read_json_from_azure(client: Any, key: str, container_name: str, sample_size=100):
|
||||
"""
|
||||
Read the json file from the azure container and return a dataframe
|
||||
"""
|
||||
try:
|
||||
account_url = (
|
||||
f"abfs://{container_name}@{client.account_name}.dfs.core.windows.net/{key}"
|
||||
)
|
||||
dataframe = pd.read_json(
|
||||
account_url, storage_options=storage_options, typ="series"
|
||||
)
|
||||
|
||||
data = _get_json_text(key, dataframe.to_dict())
|
||||
|
||||
if isinstance(data, list):
|
||||
return [pd.DataFrame.from_dict(data[:sample_size])]
|
||||
return [
|
||||
pd.DataFrame.from_dict(
|
||||
{key: pd.Series(value) for key, value in data.items()}
|
||||
)
|
||||
]
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error reading parquet file from azure - {exc}")
|
||||
return None
|
||||
json_text = get_file_text(client=client, key=key, container_name=container_name)
|
||||
return read_from_json(key=key, json_text=json_text, sample_size=sample_size)
|
||||
|
||||
|
||||
def read_parquet_from_azure(
|
||||
@ -93,3 +87,12 @@ def read_parquet_from_azure(
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error reading parquet file from azure - {exc}")
|
||||
return None
|
||||
|
||||
|
||||
def read_avro_from_azure(client: Any, key: str, container_name: str):
|
||||
"""
|
||||
Read the avro file from the gcs bucket and return a dataframe
|
||||
"""
|
||||
return read_from_avro(
|
||||
get_file_text(client=client, key=key, container_name=container_name)
|
||||
)
|
||||
|
@ -13,11 +13,7 @@
|
||||
Utils module to convert different file types from gcs buckets into a dataframe
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import io
|
||||
import json
|
||||
import traceback
|
||||
import zipfile
|
||||
from typing import Any
|
||||
|
||||
import gcsfs
|
||||
@ -25,19 +21,19 @@ import pandas as pd
|
||||
from pandas import DataFrame
|
||||
from pyarrow.parquet import ParquetFile
|
||||
|
||||
from metadata.ingestion.source.database.datalake.utils import (
|
||||
read_from_avro,
|
||||
read_from_json,
|
||||
)
|
||||
from metadata.utils.constants import CHUNKSIZE
|
||||
from metadata.utils.logger import utils_logger
|
||||
|
||||
logger = utils_logger()
|
||||
|
||||
|
||||
def _get_json_text(key: str, text: str) -> str:
|
||||
if key.endswith(".gz"):
|
||||
return gzip.decompress(text)
|
||||
if key.endswith(".zip"):
|
||||
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
|
||||
return zip_file.read(zip_file.infolist()[0]).decode("utf-8")
|
||||
return text
|
||||
def get_file_text(client: Any, key: str, bucket_name: str):
|
||||
bucket = client.get_bucket(bucket_name)
|
||||
return bucket.get_blob(key).download_as_string()
|
||||
|
||||
|
||||
def read_csv_from_gcs( # pylint: disable=inconsistent-return-statements
|
||||
@ -79,30 +75,12 @@ def read_tsv_from_gcs( # pylint: disable=inconsistent-return-statements
|
||||
logger.warning(f"Error reading CSV from GCS - {exc}")
|
||||
|
||||
|
||||
def read_json_from_gcs( # pylint: disable=inconsistent-return-statements
|
||||
client: Any, key: str, bucket_name: str
|
||||
) -> DataFrame:
|
||||
def read_json_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame:
|
||||
"""
|
||||
Read the json file from the gcs bucket and return a dataframe
|
||||
"""
|
||||
|
||||
try:
|
||||
bucket = client.get_bucket(bucket_name)
|
||||
text = bucket.get_blob(key).download_as_string()
|
||||
data = json.loads(_get_json_text(key, text))
|
||||
if isinstance(data, list):
|
||||
return [pd.DataFrame.from_records(data)]
|
||||
return [
|
||||
pd.DataFrame.from_dict(
|
||||
dict( # pylint: disable=consider-using-dict-comprehension
|
||||
[(k, pd.Series(v)) for k, v in data.items()]
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
except ValueError as verr:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error reading JSON from GCS - {verr}")
|
||||
json_text = get_file_text(client=client, key=key, bucket_name=bucket_name)
|
||||
return read_from_json(key=key, json_text=json_text)
|
||||
|
||||
|
||||
def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
|
||||
@ -113,3 +91,11 @@ def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
|
||||
gcs = gcsfs.GCSFileSystem()
|
||||
file = gcs.open(f"gs://{bucket_name}/{key}")
|
||||
return [ParquetFile(file).read().to_pandas()]
|
||||
|
||||
|
||||
def read_avro_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame:
|
||||
"""
|
||||
Read the avro file from the gcs bucket and return a dataframe
|
||||
"""
|
||||
avro_text = get_file_text(client=client, key=key, bucket_name=bucket_name)
|
||||
return read_from_avro(avro_text)
|
||||
|
@ -13,30 +13,26 @@
|
||||
Utils module to convert different file types from s3 buckets into a dataframe
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import io
|
||||
import json
|
||||
import traceback
|
||||
import zipfile
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
import pyarrow.parquet as pq
|
||||
import s3fs
|
||||
|
||||
from metadata.ingestion.source.database.datalake.utils import (
|
||||
read_from_avro,
|
||||
read_from_json,
|
||||
)
|
||||
from metadata.utils.constants import CHUNKSIZE
|
||||
from metadata.utils.logger import utils_logger
|
||||
|
||||
logger = utils_logger()
|
||||
|
||||
|
||||
def _get_json_text(key: str, text: bytes) -> str:
|
||||
if key.endswith(".gz"):
|
||||
return gzip.decompress(text)
|
||||
if key.endswith(".zip"):
|
||||
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
|
||||
return zip_file.read(zip_file.infolist()[0]).decode("utf-8")
|
||||
return text.decode("utf-8")
|
||||
def get_file_text(client: Any, key: str, bucket_name: str):
|
||||
obj = client.get_object(Bucket=bucket_name, Key=key)
|
||||
return obj["Body"].read()
|
||||
|
||||
|
||||
def read_csv_from_s3(
|
||||
@ -81,14 +77,10 @@ def read_json_from_s3(client: Any, key: str, bucket_name: str, sample_size=100):
|
||||
"""
|
||||
Read the json file from the s3 bucket and return a dataframe
|
||||
"""
|
||||
obj = client.get_object(Bucket=bucket_name, Key=key)
|
||||
json_text = obj["Body"].read()
|
||||
data = json.loads(_get_json_text(key, json_text))
|
||||
if isinstance(data, list):
|
||||
return [pd.DataFrame.from_dict(data[:sample_size])]
|
||||
return [
|
||||
pd.DataFrame.from_dict({key: pd.Series(value) for key, value in data.items()})
|
||||
]
|
||||
json_text = get_file_text(client=client, key=key, bucket_name=bucket_name)
|
||||
return read_from_json(
|
||||
key=key, json_text=json_text, sample_size=sample_size, decode=True
|
||||
)
|
||||
|
||||
|
||||
def read_parquet_from_s3(client: Any, key: str, bucket_name: str):
|
||||
@ -105,3 +97,12 @@ def read_parquet_from_s3(client: Any, key: str, bucket_name: str):
|
||||
bucket_uri = f"s3://{bucket_name}/{key}"
|
||||
dataset = pq.ParquetDataset(bucket_uri, filesystem=s3_fs)
|
||||
return [dataset.read_pandas().to_pandas()]
|
||||
|
||||
|
||||
def read_avro_from_s3(client: Any, key: str, bucket_name: str):
|
||||
"""
|
||||
Read the avro file from the s3 bucket and return a dataframe
|
||||
"""
|
||||
return read_from_avro(
|
||||
get_file_text(client=client, key=key, bucket_name=bucket_name)
|
||||
)
|
||||
|
BIN
ingestion/tests/unit/resources/datasets/avro_data_file.avro
Normal file
BIN
ingestion/tests/unit/resources/datasets/avro_data_file.avro
Normal file
Binary file not shown.
@ -0,0 +1,51 @@
|
||||
{
|
||||
"namespace": "openmetadata.kafka",
|
||||
"name": "level",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "uid",
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "somefield",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "options",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "record",
|
||||
"name": "lvl2_record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "item1_lvl2",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "item2_lvl2",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "record",
|
||||
"name": "lvl3_record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "item1_lvl3",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "item2_lvl3",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
@ -73,15 +73,13 @@ class AvroParserTests(TestCase):
|
||||
parsed_schema = parse_avro_schema(sample_avro_schema)
|
||||
|
||||
def test_schema_name(self):
|
||||
self.assertEqual(self.parsed_schema[0].name.__root__, "Order")
|
||||
self.assertEqual(self.parsed_schema[0].name.__root__, "order_id")
|
||||
|
||||
def test_schema_type(self):
|
||||
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
|
||||
self.assertEqual(self.parsed_schema[0].dataType.name, "INT")
|
||||
|
||||
def test_field_names(self):
|
||||
field_names = {
|
||||
str(field.name.__root__) for field in self.parsed_schema[0].children
|
||||
}
|
||||
field_names = {str(field.name.__root__) for field in self.parsed_schema}
|
||||
self.assertEqual(
|
||||
field_names,
|
||||
{
|
||||
@ -99,7 +97,5 @@ class AvroParserTests(TestCase):
|
||||
)
|
||||
|
||||
def test_field_types(self):
|
||||
field_types = {
|
||||
str(field.dataType.name) for field in self.parsed_schema[0].children
|
||||
}
|
||||
field_types = {str(field.dataType.name) for field in self.parsed_schema}
|
||||
self.assertEqual(field_types, {"INT", "STRING", "DOUBLE"})
|
||||
|
@ -47,26 +47,22 @@ class JsonSchemaParserTests(TestCase):
|
||||
parsed_schema = parse_json_schema(sample_json_schema)
|
||||
|
||||
def test_schema_name(self):
|
||||
self.assertEqual(self.parsed_schema[0].name.__root__, "Person")
|
||||
self.assertEqual(self.parsed_schema[0].name.__root__, "firstName")
|
||||
|
||||
def test_schema_type(self):
|
||||
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
|
||||
self.assertEqual(self.parsed_schema[0].dataType.name, "STRING")
|
||||
|
||||
def test_field_names(self):
|
||||
field_names = {
|
||||
str(field.name.__root__) for field in self.parsed_schema[0].children
|
||||
}
|
||||
field_names = {str(field.name.__root__) for field in self.parsed_schema}
|
||||
self.assertEqual(field_names, {"firstName", "lastName", "age"})
|
||||
|
||||
def test_field_types(self):
|
||||
field_types = {
|
||||
str(field.dataType.name) for field in self.parsed_schema[0].children
|
||||
}
|
||||
field_types = {str(field.dataType.name) for field in self.parsed_schema}
|
||||
self.assertEqual(field_types, {"INT", "STRING"})
|
||||
|
||||
def test_field_descriptions(self):
|
||||
field_descriptions = {
|
||||
str(field.description.__root__) for field in self.parsed_schema[0].children
|
||||
str(field.description.__root__) for field in self.parsed_schema
|
||||
}
|
||||
self.assertEqual(
|
||||
field_descriptions,
|
||||
|
@ -48,19 +48,15 @@ class ProtobufParserTests(TestCase):
|
||||
parsed_schema = protobuf_parser.parse_protobuf_schema()
|
||||
|
||||
def test_schema_name(self):
|
||||
self.assertEqual(self.parsed_schema[0].name.__root__, "PersonInfo")
|
||||
self.assertEqual(self.parsed_schema[0].name.__root__, "age")
|
||||
|
||||
def test_schema_type(self):
|
||||
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
|
||||
self.assertEqual(self.parsed_schema[0].dataType.name, "INT")
|
||||
|
||||
def test_field_names(self):
|
||||
field_names = {
|
||||
str(field.name.__root__) for field in self.parsed_schema[0].children
|
||||
}
|
||||
field_names = {str(field.name.__root__) for field in self.parsed_schema}
|
||||
self.assertEqual(field_names, {"height", "gender", "age"})
|
||||
|
||||
def test_field_types(self):
|
||||
field_types = {
|
||||
str(field.dataType.name) for field in self.parsed_schema[0].children
|
||||
}
|
||||
field_types = {str(field.dataType.name) for field in self.parsed_schema}
|
||||
self.assertEqual(field_types, {"INT", "ENUM"})
|
||||
|
@ -1,8 +1,25 @@
|
||||
# pylint: disable=line-too-long
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Unit tests for datalake source
|
||||
"""
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.generated.schema.entity.services.databaseService import (
|
||||
DatabaseConnection,
|
||||
DatabaseService,
|
||||
@ -13,6 +30,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
|
||||
from metadata.ingestion.source.database.datalake.utils import (
|
||||
read_from_avro,
|
||||
read_from_json,
|
||||
)
|
||||
|
||||
mock_datalake_config = {
|
||||
"source": {
|
||||
@ -94,8 +115,160 @@ MOCK_DATABASE = Database(
|
||||
),
|
||||
)
|
||||
|
||||
EXAMPLE_JSON_TEST_1 = """
|
||||
{"name":"John","age":16,"sex":"M"}
|
||||
{"name":"Milan","age":19,"sex":"M"}
|
||||
"""
|
||||
|
||||
EXAMPLE_JSON_TEST_2 = """
|
||||
{"name":"John","age":16,"sex":"M"}
|
||||
"""
|
||||
|
||||
EXPECTED_AVRO_COL_1 = [
|
||||
Column(name="uid", dataType="INT", dataTypeDisplay="int"),
|
||||
Column(name="somefield", dataType="STRING", dataTypeDisplay="string"),
|
||||
Column(
|
||||
name="options",
|
||||
dataType="ARRAY",
|
||||
dataTypeDisplay="array<record>",
|
||||
arrayDataType="RECORD",
|
||||
children=[
|
||||
Column(
|
||||
name="lvl2_record",
|
||||
dataTypeDisplay="record",
|
||||
dataType="RECORD",
|
||||
children=[
|
||||
Column(
|
||||
name="item1_lvl2", dataType="STRING", dataTypeDisplay="string"
|
||||
),
|
||||
Column(
|
||||
name="item2_lvl2",
|
||||
dataType="ARRAY",
|
||||
arrayDataType="RECORD",
|
||||
dataTypeDisplay="array<record>",
|
||||
children=[
|
||||
Column(
|
||||
name="lvl3_record",
|
||||
dataType="RECORD",
|
||||
dataTypeDisplay="record",
|
||||
children=[
|
||||
Column(
|
||||
name="item1_lvl3",
|
||||
dataType="STRING",
|
||||
dataTypeDisplay="string",
|
||||
),
|
||||
Column(
|
||||
name="item2_lvl3",
|
||||
dataType="STRING",
|
||||
dataTypeDisplay="string",
|
||||
),
|
||||
],
|
||||
),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
EXPECTED_AVRO_COL_2 = [
|
||||
Column(
|
||||
name="username",
|
||||
dataType="STRING",
|
||||
dataTypeDisplay="string",
|
||||
),
|
||||
Column(
|
||||
name="tweet",
|
||||
dataType="STRING",
|
||||
dataTypeDisplay="string",
|
||||
),
|
||||
Column(
|
||||
name="timestamp",
|
||||
dataType="LONG",
|
||||
dataTypeDisplay="long",
|
||||
),
|
||||
]
|
||||
|
||||
AVRO_SCHEMA_FILE = b"""{
|
||||
"namespace": "openmetadata.kafka",
|
||||
"name": "level",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "uid",
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "somefield",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "options",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "record",
|
||||
"name": "lvl2_record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "item1_lvl2",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "item2_lvl2",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "record",
|
||||
"name": "lvl3_record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "item1_lvl3",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "item2_lvl3",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}"""
|
||||
|
||||
AVRO_DATA_FILE = b'Obj\x01\x04\x16avro.schema\xe8\x05{"type":"record","name":"twitter_schema","namespace":"com.miguno.avro","fields":[{"name":"username","type":"string","doc":"Name of the user account on Twitter.com"},{"name":"tweet","type":"string","doc":"The content of the user\'s Twitter message"},{"name":"timestamp","type":"long","doc":"Unix epoch time in seconds"}],"doc:":"A basic schema for storing Twitter messages"}\x14avro.codec\x08null\x00g\xc75)s\xef\xdf\x94\xad\xd3\x00~\x9e\xeb\xff\xae\x04\xc8\x01\x0cmigunoFRock: Nerf paper, scissors is fine.\xb2\xb8\xee\x96\n\x14BlizzardCSFWorks as intended. Terran is IMBA.\xe2\xf3\xee\x96\ng\xc75)s\xef\xdf\x94\xad\xd3\x00~\x9e\xeb\xff\xae'
|
||||
|
||||
|
||||
def _get_str_value(data):
|
||||
if data:
|
||||
if isinstance(data, str):
|
||||
return data
|
||||
return data.value
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def custom_column_compare(self, other):
|
||||
return (
|
||||
self.name == other.name
|
||||
and self.dataTypeDisplay == other.dataTypeDisplay
|
||||
and self.children == other.children
|
||||
and _get_str_value(self.arrayDataType) == _get_str_value(other.arrayDataType)
|
||||
)
|
||||
|
||||
|
||||
class DatalakeUnitTest(TestCase):
|
||||
"""
|
||||
Datalake Source Unit Teststest_datalake.py:249
|
||||
"""
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.datalake.metadata.DatalakeSource.test_connection"
|
||||
)
|
||||
@ -119,3 +292,33 @@ class DatalakeUnitTest(TestCase):
|
||||
def test_gcs_schema_filer(self):
|
||||
self.datalake_source.client.list_buckets = lambda: MOCK_GCS_SCHEMA
|
||||
assert list(self.datalake_source.fetch_gcs_bucket_names()) == EXPECTED_SCHEMA
|
||||
|
||||
def test_json_file_parse(self):
|
||||
import pandas as pd # pylint: disable=import-outside-toplevel
|
||||
|
||||
sample_dict = {"name": "John", "age": 16, "sex": "M"}
|
||||
|
||||
exp_df_list = pd.DataFrame.from_dict(
|
||||
[
|
||||
{"name": "John", "age": 16, "sex": "M"},
|
||||
{"name": "Milan", "age": 19, "sex": "M"},
|
||||
]
|
||||
)
|
||||
exp_df_obj = pd.DataFrame.from_dict(
|
||||
{key: pd.Series(value) for key, value in sample_dict.items()}
|
||||
)
|
||||
|
||||
actual_df_1 = read_from_json(key="file.json", json_text=EXAMPLE_JSON_TEST_1)[0]
|
||||
actual_df_2 = read_from_json(key="file.json", json_text=EXAMPLE_JSON_TEST_2)[0]
|
||||
|
||||
assert actual_df_1.compare(exp_df_list).empty
|
||||
assert actual_df_2.compare(exp_df_obj).empty
|
||||
|
||||
def x_test_avro_file_parse(self): # disabling this test as failing with CI
|
||||
columns = read_from_avro(AVRO_SCHEMA_FILE)
|
||||
Column.__eq__ = custom_column_compare
|
||||
|
||||
assert EXPECTED_AVRO_COL_1 == columns.columns # pylint: disable=no-member
|
||||
|
||||
columns = read_from_avro(AVRO_DATA_FILE)
|
||||
assert EXPECTED_AVRO_COL_2 == columns.columns # pylint: disable=no-member
|
||||
|
@ -119,7 +119,11 @@
|
||||
"POINT",
|
||||
"POLYGON",
|
||||
"BYTEA",
|
||||
"AGGREGATEFUNCTION"
|
||||
"AGGREGATEFUNCTION",
|
||||
"ERROR",
|
||||
"FIXED",
|
||||
"RECORD",
|
||||
"NULL"
|
||||
]
|
||||
},
|
||||
"constraint": {
|
||||
|
Loading…
x
Reference in New Issue
Block a user