2025-04-03 10:39:47 +05:30
|
|
|
# Copyright 2025 Collate
|
|
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
2023-05-19 18:54:28 +05:30
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
2025-04-03 10:39:47 +05:30
|
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
2023-05-19 18:54:28 +05:30
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
"""
|
|
|
|
Module to define helper methods for datalake and to fetch data and metadata
|
|
|
|
from different auths and different file systems.
|
|
|
|
"""
|
2023-09-13 15:15:49 +05:30
|
|
|
import ast
|
2023-10-08 20:08:51 +05:30
|
|
|
import json
|
2024-02-01 09:02:52 +01:00
|
|
|
import random
|
2023-09-13 15:15:49 +05:30
|
|
|
import traceback
|
2024-03-26 10:03:21 +05:30
|
|
|
from typing import Any, Dict, List, Optional, Union, cast
|
2023-05-19 18:54:28 +05:30
|
|
|
|
2023-09-13 15:15:49 +05:30
|
|
|
from metadata.generated.schema.entity.data.table import Column, DataType
|
|
|
|
from metadata.ingestion.source.database.column_helpers import truncate_column_name
|
2024-03-26 10:03:21 +05:30
|
|
|
from metadata.parsers.json_schema_parser import parse_json_schema
|
2023-08-09 12:37:16 +02:00
|
|
|
from metadata.readers.dataframe.models import (
|
|
|
|
DatalakeColumnWrapper,
|
2023-05-19 18:54:28 +05:30
|
|
|
DatalakeTableSchemaWrapper,
|
|
|
|
)
|
2023-09-13 15:15:49 +05:30
|
|
|
from metadata.readers.dataframe.reader_factory import SupportedTypes, get_df_reader
|
2023-05-19 18:54:28 +05:30
|
|
|
from metadata.utils.logger import utils_logger
|
|
|
|
|
|
|
|
logger = utils_logger()
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_dataframe(
|
2023-08-09 12:37:16 +02:00
|
|
|
config_source,
|
|
|
|
client,
|
|
|
|
file_fqn: DatalakeTableSchemaWrapper,
|
2024-03-26 10:03:21 +05:30
|
|
|
fetch_raw_data: bool = False,
|
2023-08-09 12:37:16 +02:00
|
|
|
**kwargs,
|
|
|
|
) -> Optional[List["DataFrame"]]:
|
2023-05-19 18:54:28 +05:30
|
|
|
"""
|
|
|
|
Method to get dataframe for profiling
|
|
|
|
"""
|
|
|
|
# dispatch to handle fetching of data from multiple file formats (csv, tsv, json, avro and parquet)
|
|
|
|
key: str = file_fqn.key
|
|
|
|
bucket_name: str = file_fqn.bucket_name
|
|
|
|
try:
|
2023-09-13 15:15:49 +05:30
|
|
|
file_extension: Optional[SupportedTypes] = file_fqn.file_extension or next(
|
|
|
|
supported_type or None
|
|
|
|
for supported_type in SupportedTypes
|
|
|
|
if key.endswith(supported_type.value)
|
|
|
|
)
|
|
|
|
if file_extension and not key.endswith("/"):
|
|
|
|
df_reader = get_df_reader(
|
|
|
|
type_=file_extension,
|
|
|
|
config_source=config_source,
|
|
|
|
client=client,
|
2023-11-10 10:44:47 +01:00
|
|
|
separator=file_fqn.separator,
|
2023-09-13 15:15:49 +05:30
|
|
|
)
|
|
|
|
try:
|
2023-08-09 12:37:16 +02:00
|
|
|
df_wrapper: DatalakeColumnWrapper = df_reader.read(
|
|
|
|
key=key, bucket_name=bucket_name, **kwargs
|
|
|
|
)
|
2024-03-26 10:03:21 +05:30
|
|
|
if fetch_raw_data:
|
|
|
|
return df_wrapper.dataframes, df_wrapper.raw_data
|
2023-08-09 12:37:16 +02:00
|
|
|
return df_wrapper.dataframes
|
2023-09-13 15:15:49 +05:30
|
|
|
except Exception as err:
|
|
|
|
logger.error(
|
|
|
|
f"Error fetching file [{bucket_name}/{key}] using "
|
|
|
|
f"[{config_source.__class__.__name__}] due to: [{err}]"
|
|
|
|
)
|
2023-05-19 18:54:28 +05:30
|
|
|
except Exception as err:
|
|
|
|
logger.error(
|
2023-08-09 12:37:16 +02:00
|
|
|
f"Error fetching file [{bucket_name}/{key}] using [{config_source.__class__.__name__}] due to: [{err}]"
|
2023-05-19 18:54:28 +05:30
|
|
|
)
|
2023-08-03 11:48:22 +02:00
|
|
|
# Here we need to blow things up. Without the dataframe we cannot move forward
|
|
|
|
raise err
|
|
|
|
|
2024-03-26 10:03:21 +05:30
|
|
|
if fetch_raw_data:
|
|
|
|
return None, None
|
2023-05-19 18:54:28 +05:30
|
|
|
return None
|
2023-08-22 13:16:22 +05:30
|
|
|
|
|
|
|
|
2023-09-13 15:15:49 +05:30
|
|
|
def get_file_format_type(key_name, metadata_entry=None):
|
|
|
|
for supported_types in SupportedTypes:
|
|
|
|
if key_name.endswith(supported_types.value):
|
|
|
|
return supported_types
|
|
|
|
if metadata_entry:
|
|
|
|
entry: list = [
|
|
|
|
entry for entry in metadata_entry.entries if key_name == entry.dataPath
|
|
|
|
]
|
|
|
|
if entry and supported_types.value == entry[0].structureFormat:
|
|
|
|
return supported_types
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
# pylint: disable=import-outside-toplevel
|
|
|
|
class DataFrameColumnParser:
|
|
|
|
"""A column parser object. This serves as a Creator class for the appropriate column parser object parser
|
|
|
|
for datalake types. It allows us to implement different schema parsers for different datalake types without
|
|
|
|
implementing many conditionals statements.
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
e.g. if we want to implement a column parser for parquet files, we can simply implement a
|
|
|
|
ParquetDataFrameColumnParser class and add it as part of the `create` method. The `create` method will then return
|
|
|
|
the appropriate parser based on the file type. The `ColumnParser` class has a single entry point `get_columns` which
|
|
|
|
will call the `get_columns` method of the appropriate parser.
|
2023-10-12 14:51:38 +02:00
|
|
|
"""
|
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
def __init__(self, parser):
|
|
|
|
"""Initialize the column parser object"""
|
|
|
|
self.parser = parser
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
@classmethod
|
|
|
|
def create(
|
|
|
|
cls,
|
|
|
|
data_frame: "DataFrame",
|
|
|
|
file_type: Optional[SupportedTypes] = None,
|
|
|
|
sample: bool = True,
|
|
|
|
shuffle: bool = False,
|
2024-03-26 10:03:21 +05:30
|
|
|
raw_data: Any = None,
|
2024-02-01 09:02:52 +01:00
|
|
|
):
|
|
|
|
"""Instantiate a column parser object with the appropriate parser
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
Args:
|
|
|
|
data_frame: the dataframe object
|
|
|
|
file_type: the file type of the dataframe. Will be used to determine the appropriate parser.
|
|
|
|
sample: whether to sample the dataframe or not if we have a list of dataframes.
|
|
|
|
If sample is False, we will concatenate the dataframes, which can be cause OOM error for large dataset.
|
|
|
|
(default: True)
|
|
|
|
shuffle: whether to shuffle the dataframe list or not if sample is True. (default: False)
|
|
|
|
"""
|
|
|
|
data_frame = cls._get_data_frame(data_frame, sample, shuffle)
|
2025-05-13 11:23:46 +05:30
|
|
|
if file_type in {
|
|
|
|
SupportedTypes.PARQUET,
|
|
|
|
SupportedTypes.PARQUET_PQ,
|
|
|
|
SupportedTypes.PARQUET_PQT,
|
|
|
|
SupportedTypes.PARQUET_PARQ,
|
|
|
|
SupportedTypes.PARQUET_SNAPPY,
|
|
|
|
}:
|
2024-02-01 09:02:52 +01:00
|
|
|
parser = ParquetDataFrameColumnParser(data_frame)
|
2024-03-26 10:03:21 +05:30
|
|
|
elif file_type in {
|
|
|
|
SupportedTypes.JSON,
|
|
|
|
SupportedTypes.JSONGZ,
|
|
|
|
SupportedTypes.JSONZIP,
|
|
|
|
}:
|
|
|
|
parser = JsonDataFrameColumnParser(data_frame, raw_data=raw_data)
|
|
|
|
else:
|
|
|
|
parser = GenericDataFrameColumnParser(data_frame)
|
2024-02-01 09:02:52 +01:00
|
|
|
return cls(parser)
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
@staticmethod
|
|
|
|
def _get_data_frame(
|
|
|
|
data_frame: Union[List["DataFrame"], "DataFrame"], sample: bool, shuffle: bool
|
|
|
|
):
|
|
|
|
"""Return the dataframe to use for parsing"""
|
|
|
|
import pandas as pd
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
if not isinstance(data_frame, list):
|
|
|
|
return data_frame
|
2023-09-13 15:15:49 +05:30
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
if sample:
|
|
|
|
if shuffle:
|
|
|
|
random.shuffle(data_frame)
|
|
|
|
return data_frame[0]
|
2023-09-13 15:15:49 +05:30
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
return pd.concat(data_frame)
|
|
|
|
|
|
|
|
def get_columns(self):
|
|
|
|
"""Get the columns from the parser"""
|
|
|
|
return self.parser.get_columns()
|
|
|
|
|
|
|
|
|
|
|
|
class GenericDataFrameColumnParser:
|
|
|
|
"""Given a dataframe object, parse the columns and return a list of Column objects.
|
|
|
|
|
|
|
|
# TODO: We should consider making the function above part of the `GenericDataFrameColumnParser` class
|
|
|
|
# though we need to do a thorough overview of where they are used to ensure unnecessary coupling.
|
2023-09-13 15:15:49 +05:30
|
|
|
"""
|
2024-02-01 09:02:52 +01:00
|
|
|
|
|
|
|
_data_formats = {
|
|
|
|
**dict.fromkeys(["int64", "int", "int32"], DataType.INT),
|
|
|
|
"dict": DataType.JSON,
|
|
|
|
"list": DataType.ARRAY,
|
|
|
|
**dict.fromkeys(["float64", "float32", "float"], DataType.FLOAT),
|
|
|
|
"bool": DataType.BOOLEAN,
|
|
|
|
**dict.fromkeys(
|
2024-04-03 15:51:19 +05:30
|
|
|
["datetime64[ns]", "datetime"],
|
|
|
|
DataType.DATETIME,
|
2024-02-01 09:02:52 +01:00
|
|
|
),
|
2024-04-03 15:51:19 +05:30
|
|
|
"timedelta[ns]": DataType.TIME,
|
2024-02-01 09:02:52 +01:00
|
|
|
"str": DataType.STRING,
|
2024-02-13 08:28:01 +01:00
|
|
|
"bytes": DataType.BYTES,
|
2024-02-01 09:02:52 +01:00
|
|
|
}
|
|
|
|
|
2024-03-26 10:03:21 +05:30
|
|
|
def __init__(self, data_frame: "DataFrame", raw_data: Any = None):
|
2024-02-01 09:02:52 +01:00
|
|
|
self.data_frame = data_frame
|
2024-03-26 10:03:21 +05:30
|
|
|
self.raw_data = raw_data
|
2024-02-01 09:02:52 +01:00
|
|
|
|
|
|
|
def get_columns(self):
|
|
|
|
"""
|
|
|
|
method to process column details
|
|
|
|
"""
|
|
|
|
return self._get_columns(self.data_frame)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _get_columns(cls, data_frame: "DataFrame"):
|
|
|
|
"""
|
|
|
|
method to process column details.
|
|
|
|
|
|
|
|
Note this was move from a function to a class method to bring it closer to the
|
|
|
|
`GenericDataFrameColumnParser` class. Should be rethought as part of the TODO.
|
|
|
|
"""
|
|
|
|
cols = []
|
|
|
|
if hasattr(data_frame, "columns"):
|
|
|
|
df_columns = list(data_frame.columns)
|
|
|
|
for column in df_columns:
|
|
|
|
# use String by default
|
|
|
|
data_type = DataType.STRING
|
|
|
|
try:
|
|
|
|
if hasattr(data_frame[column], "dtypes"):
|
|
|
|
data_type = cls.fetch_col_types(data_frame, column_name=column)
|
|
|
|
|
|
|
|
parsed_string = {
|
|
|
|
"dataTypeDisplay": data_type.value,
|
|
|
|
"dataType": data_type,
|
|
|
|
"name": truncate_column_name(column),
|
|
|
|
"displayName": column,
|
|
|
|
}
|
|
|
|
if data_type == DataType.ARRAY:
|
|
|
|
parsed_string["arrayDataType"] = DataType.UNKNOWN
|
|
|
|
|
|
|
|
if data_type == DataType.JSON:
|
|
|
|
parsed_string["children"] = cls.get_children(
|
|
|
|
data_frame[column].dropna()[:100]
|
|
|
|
)
|
|
|
|
|
|
|
|
cols.append(Column(**parsed_string))
|
|
|
|
except Exception as exc:
|
|
|
|
logger.debug(traceback.format_exc())
|
|
|
|
logger.warning(
|
|
|
|
f"Unexpected exception parsing column [{column}]: {exc}"
|
2023-09-13 15:15:49 +05:30
|
|
|
)
|
2024-02-01 09:02:52 +01:00
|
|
|
return cols
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
@classmethod
|
|
|
|
def fetch_col_types(cls, data_frame, column_name):
|
|
|
|
"""fetch_col_types: Fetch Column Type for the c
|
2023-09-13 15:15:49 +05:30
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
Note this was move from a function to a class method to bring it closer to the
|
|
|
|
`GenericDataFrameColumnParser` class. Should be rethought as part of the TODO.
|
2023-09-13 15:15:49 +05:30
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
Args:
|
|
|
|
data_frame (DataFrame)
|
|
|
|
column_name (string)
|
|
|
|
"""
|
2025-05-19 12:30:56 +05:30
|
|
|
data_type = None # default to string
|
2024-02-01 09:02:52 +01:00
|
|
|
try:
|
|
|
|
if data_frame[column_name].dtypes.name == "object" and any(
|
|
|
|
data_frame[column_name].dropna().values
|
|
|
|
):
|
|
|
|
try:
|
|
|
|
# Safely evaluate the input string
|
2024-04-03 15:51:19 +05:30
|
|
|
df_row_val_list = data_frame[column_name].dropna().values[:1000]
|
|
|
|
parsed_object_datatype_list = []
|
|
|
|
for df_row_val in df_row_val_list:
|
|
|
|
try:
|
|
|
|
parsed_object_datatype_list.append(
|
|
|
|
type(ast.literal_eval(str(df_row_val))).__name__.lower()
|
|
|
|
)
|
|
|
|
except (ValueError, SyntaxError):
|
|
|
|
# we try to parse the value as a datetime, if it fails, we fallback to string
|
|
|
|
# as literal_eval will fail for string
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
from dateutil.parser import ParserError, parse
|
|
|
|
|
|
|
|
try:
|
|
|
|
dtype_ = "int64"
|
|
|
|
if not str(df_row_val).isnumeric():
|
|
|
|
# check if the row value is time
|
|
|
|
try:
|
|
|
|
datetime.strptime(df_row_val, "%H:%M:%S").time()
|
|
|
|
dtype_ = "timedelta[ns]"
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
# check if the row value is date / time / datetime
|
|
|
|
type(parse(df_row_val)).__name__.lower()
|
|
|
|
dtype_ = "datetime64[ns]"
|
|
|
|
parsed_object_datatype_list.append(dtype_)
|
|
|
|
except (ParserError, TypeError):
|
|
|
|
parsed_object_datatype_list.append("str")
|
|
|
|
except Exception as err:
|
|
|
|
logger.debug(
|
|
|
|
f"Failed to parse datatype for column {column_name}, exc: {err},"
|
|
|
|
"Falling back to string."
|
|
|
|
)
|
|
|
|
parsed_object_datatype_list.append("str")
|
|
|
|
|
|
|
|
data_type = max(parsed_object_datatype_list)
|
2024-02-01 09:02:52 +01:00
|
|
|
# Determine the data type of the parsed object
|
2024-04-03 15:51:19 +05:30
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
except (ValueError, SyntaxError):
|
|
|
|
# Handle any exceptions that may occur
|
|
|
|
data_type = "string"
|
2023-09-13 15:15:49 +05:30
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
data_type = cls._data_formats.get(
|
2024-02-13 08:28:01 +01:00
|
|
|
data_type or data_frame[column_name].dtypes.name,
|
2024-02-01 09:02:52 +01:00
|
|
|
)
|
2024-02-13 08:28:01 +01:00
|
|
|
if not data_type:
|
|
|
|
logger.debug(
|
|
|
|
f"unknown data type {data_frame[column_name].dtypes.name}. resolving to string."
|
|
|
|
)
|
|
|
|
data_type = data_type or DataType.STRING
|
2024-02-01 09:02:52 +01:00
|
|
|
except Exception as err:
|
|
|
|
logger.warning(
|
|
|
|
f"Failed to distinguish data type for column {column_name}, Falling back to {data_type}, exc: {err}"
|
|
|
|
)
|
|
|
|
logger.debug(traceback.format_exc())
|
2025-05-19 12:30:56 +05:30
|
|
|
return data_type or DataType.STRING
|
2024-02-01 09:02:52 +01:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def unique_json_structure(cls, dicts: List[Dict]) -> Dict:
|
|
|
|
"""Given a sample of `n` json objects, return a json object that represents the unique
|
|
|
|
structure of all `n` objects. Note that the type of the key will be that of
|
|
|
|
the last object seen in the sample.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
dicts: list of json objects
|
|
|
|
"""
|
|
|
|
result = {}
|
|
|
|
for dict_ in dicts:
|
|
|
|
for key, value in dict_.items():
|
|
|
|
if isinstance(value, dict):
|
|
|
|
nested_json = result.get(key, {})
|
|
|
|
# `isinstance(nested_json, dict)` if for a key we first see a non dict value
|
|
|
|
# but then see a dict value later, we will consider the key to be a dict.
|
|
|
|
result[key] = cls.unique_json_structure(
|
|
|
|
[nested_json if isinstance(nested_json, dict) else {}, value]
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
result[key] = value
|
|
|
|
return result
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def construct_json_column_children(cls, json_column: Dict) -> List[Dict]:
|
|
|
|
"""Construt a dict representation of a Column object
|
|
|
|
|
|
|
|
Args:
|
|
|
|
json_column: unique json structure of a column
|
|
|
|
"""
|
|
|
|
children = []
|
|
|
|
for key, value in json_column.items():
|
|
|
|
column = {}
|
|
|
|
type_ = type(value).__name__.lower()
|
|
|
|
column["dataTypeDisplay"] = cls._data_formats.get(
|
|
|
|
type_, DataType.UNKNOWN
|
|
|
|
).value
|
|
|
|
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
|
|
|
|
column["name"] = truncate_column_name(key)
|
|
|
|
column["displayName"] = key
|
|
|
|
if isinstance(value, dict):
|
|
|
|
column["children"] = cls.construct_json_column_children(value)
|
|
|
|
children.append(column)
|
|
|
|
|
|
|
|
return children
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_children(cls, json_column) -> List[Dict]:
|
|
|
|
"""Get children of json column.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
json_column (pandas.Series): column with 100 sample rows.
|
|
|
|
Sample rows will be used to infer children.
|
|
|
|
"""
|
|
|
|
from pandas import Series # pylint: disable=import-outside-toplevel
|
|
|
|
|
|
|
|
json_column = cast(Series, json_column)
|
|
|
|
try:
|
|
|
|
json_column = json_column.apply(json.loads)
|
|
|
|
except TypeError:
|
|
|
|
# if values are not strings, we will assume they are already json objects
|
|
|
|
# based on the read class logic
|
|
|
|
pass
|
|
|
|
json_structure = cls.unique_json_structure(json_column.values.tolist())
|
|
|
|
|
|
|
|
return cls.construct_json_column_children(json_structure)
|
|
|
|
|
|
|
|
|
|
|
|
# pylint: disable=import-outside-toplevel
|
|
|
|
class ParquetDataFrameColumnParser:
|
|
|
|
"""Given a dataframe object generated from a parquet file, parse the columns and return a list of Column objects."""
|
|
|
|
|
|
|
|
def __init__(self, data_frame: "DataFrame"):
|
|
|
|
import pyarrow as pa
|
|
|
|
|
2024-02-02 16:28:40 +05:30
|
|
|
self._data_formats = {
|
|
|
|
**dict.fromkeys(
|
|
|
|
["int8", "int16", "int32", "int64", "int", pa.DurationType],
|
|
|
|
DataType.INT,
|
|
|
|
),
|
|
|
|
**dict.fromkeys(
|
|
|
|
["uint8", "uint16", "uint32", "uint64", "uint"], DataType.UINT
|
|
|
|
),
|
|
|
|
pa.StructType: DataType.STRUCT,
|
|
|
|
**dict.fromkeys([pa.ListType, pa.LargeListType], DataType.ARRAY),
|
|
|
|
**dict.fromkeys(
|
|
|
|
["halffloat", "float32", "float64", "double", "float"], DataType.FLOAT
|
|
|
|
),
|
|
|
|
"bool": DataType.BOOLEAN,
|
|
|
|
**dict.fromkeys(
|
|
|
|
[
|
|
|
|
"datetime64",
|
|
|
|
"timedelta[ns]",
|
|
|
|
"datetime64[ns]",
|
|
|
|
"time32[s]",
|
|
|
|
"time32[ms]",
|
|
|
|
"time64[ns]",
|
|
|
|
"time64[us]",
|
|
|
|
pa.TimestampType,
|
|
|
|
"date64",
|
|
|
|
],
|
|
|
|
DataType.DATETIME,
|
|
|
|
),
|
|
|
|
"date32[day]": DataType.DATE,
|
|
|
|
"string": DataType.STRING,
|
|
|
|
**dict.fromkeys(
|
|
|
|
["binary", "large_binary", pa.FixedSizeBinaryType], DataType.BINARY
|
|
|
|
),
|
|
|
|
**dict.fromkeys([pa.Decimal128Type, pa.Decimal256Type], DataType.DECIMAL),
|
|
|
|
}
|
|
|
|
|
2024-02-01 09:02:52 +01:00
|
|
|
self.data_frame = data_frame
|
|
|
|
self._arrow_table = pa.Table.from_pandas(self.data_frame)
|
|
|
|
|
|
|
|
def get_columns(self):
|
|
|
|
"""
|
|
|
|
method to process column details for parquet files
|
|
|
|
"""
|
|
|
|
import pyarrow as pa
|
|
|
|
|
|
|
|
schema: List[pa.Field] = self._arrow_table.schema
|
|
|
|
columns = []
|
|
|
|
for column in schema:
|
|
|
|
parsed_column = {
|
|
|
|
"dataTypeDisplay": str(column.type),
|
|
|
|
"dataType": self._get_pq_data_type(column),
|
|
|
|
"name": truncate_column_name(column.name),
|
|
|
|
"displayName": column.name,
|
|
|
|
}
|
|
|
|
|
|
|
|
if parsed_column["dataType"] == DataType.ARRAY:
|
|
|
|
try:
|
|
|
|
item_field = column.type.value_field
|
|
|
|
parsed_column["arrayDataType"] = self._get_pq_data_type(item_field)
|
|
|
|
except AttributeError:
|
|
|
|
# if the value field is not specified, we will set it to UNKNOWN
|
|
|
|
parsed_column["arrayDataType"] = DataType.UNKNOWN
|
|
|
|
|
|
|
|
if parsed_column["dataType"] == DataType.BINARY:
|
|
|
|
try:
|
2024-06-05 21:18:37 +02:00
|
|
|
# Either we an int number or -1
|
|
|
|
data_length = int(type(column.type).byte_width)
|
|
|
|
except Exception as exc:
|
2024-02-01 09:02:52 +01:00
|
|
|
# if the byte width is not specified, we will set it to -1
|
|
|
|
# following pyarrow convention
|
|
|
|
data_length = -1
|
2024-06-05 21:18:37 +02:00
|
|
|
logger.debug("Could not extract binary field length due to %s", exc)
|
2024-02-01 09:02:52 +01:00
|
|
|
parsed_column["dataLength"] = data_length
|
|
|
|
|
|
|
|
if parsed_column["dataType"] == DataType.STRUCT:
|
|
|
|
parsed_column["children"] = self._get_children(column)
|
|
|
|
columns.append(Column(**parsed_column))
|
|
|
|
|
|
|
|
return columns
|
|
|
|
|
|
|
|
def _get_children(self, column):
|
|
|
|
"""For struct types, get the children of the column
|
|
|
|
|
|
|
|
Args:
|
|
|
|
column (pa.Field): pa column
|
|
|
|
"""
|
|
|
|
field_idx = column.type.num_fields
|
|
|
|
|
|
|
|
children = []
|
|
|
|
for idx in range(field_idx):
|
|
|
|
child = column.type.field(idx)
|
|
|
|
data_type = self._get_pq_data_type(child)
|
|
|
|
|
|
|
|
child_column = {
|
|
|
|
"dataTypeDisplay": str(child.type),
|
|
|
|
"dataType": data_type,
|
|
|
|
"name": truncate_column_name(child.name),
|
|
|
|
"displayName": child.name,
|
|
|
|
}
|
|
|
|
if data_type == DataType.STRUCT:
|
|
|
|
child_column["children"] = self._get_children(child)
|
|
|
|
children.append(child_column)
|
|
|
|
|
|
|
|
return children
|
|
|
|
|
|
|
|
def _get_pq_data_type(self, column):
|
|
|
|
"""Given a column return the type of the column
|
|
|
|
|
|
|
|
Args:
|
|
|
|
column (pa.Field): pa column
|
|
|
|
"""
|
|
|
|
import pyarrow as pa
|
|
|
|
|
|
|
|
if isinstance(
|
|
|
|
column.type,
|
|
|
|
(
|
|
|
|
pa.DurationType,
|
|
|
|
pa.StructType,
|
|
|
|
pa.ListType,
|
|
|
|
pa.LargeListType,
|
|
|
|
pa.TimestampType,
|
|
|
|
pa.Decimal128Type,
|
|
|
|
pa.Decimal256Type,
|
|
|
|
pa.FixedSizeBinaryType,
|
|
|
|
),
|
2023-09-13 15:15:49 +05:30
|
|
|
):
|
2024-02-01 09:02:52 +01:00
|
|
|
# the above type can take many shape
|
|
|
|
# (i.e. pa.ListType(pa.StructType([pa.column("a", pa.int64())])), etc,)
|
|
|
|
# so we'll use their type to determine the data type
|
|
|
|
data_type = self._data_formats.get(type(column.type), DataType.UNKNOWN)
|
|
|
|
else:
|
|
|
|
# for the other types we need to use their string representation
|
|
|
|
# to determine the data type as `type(column.type)` will return
|
|
|
|
# a generic `pyarrow.lib.DataType`
|
|
|
|
data_type = self._data_formats.get(str(column.type), DataType.UNKNOWN)
|
|
|
|
|
|
|
|
return data_type
|
2024-03-26 10:03:21 +05:30
|
|
|
|
|
|
|
|
|
|
|
class JsonDataFrameColumnParser(GenericDataFrameColumnParser):
|
|
|
|
"""Given a dataframe object generated from a json file, parse the columns and return a list of Column objects."""
|
|
|
|
|
|
|
|
def get_columns(self):
|
|
|
|
"""
|
|
|
|
method to process column details for json files
|
|
|
|
"""
|
|
|
|
if self.raw_data:
|
|
|
|
try:
|
|
|
|
return parse_json_schema(schema_text=self.raw_data, cls=Column)
|
|
|
|
except Exception as exc:
|
|
|
|
logger.warning(f"Unable to parse the json schema: {exc}")
|
|
|
|
logger.debug(traceback.format_exc())
|
|
|
|
return self._get_columns(self.data_frame)
|