mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-09 01:28:11 +00:00
MINOR: Add support for csv.gz in datalake (#22666)
* MINOR: Add support for csv.gz in datalake * fileformat change * Update generated TypeScript types * pyformat
This commit is contained in:
parent
1228e9647d
commit
fe28faa13f
@ -54,16 +54,24 @@ class DSVDataFrameReader(DataFrameReader):
|
|||||||
super().__init__(config_source, client)
|
super().__init__(config_source, client)
|
||||||
|
|
||||||
def read_from_pandas(
|
def read_from_pandas(
|
||||||
self, path: str, storage_options: Optional[Dict[str, Any]] = None
|
self,
|
||||||
|
path: str,
|
||||||
|
storage_options: Optional[Dict[str, Any]] = None,
|
||||||
|
compression: Optional[str] = None,
|
||||||
) -> DatalakeColumnWrapper:
|
) -> DatalakeColumnWrapper:
|
||||||
import pandas as pd # pylint: disable=import-outside-toplevel
|
import pandas as pd # pylint: disable=import-outside-toplevel
|
||||||
|
|
||||||
|
# Determine compression based on file extension if not provided
|
||||||
|
if compression is None and path.endswith(".gz"):
|
||||||
|
compression = "gzip"
|
||||||
|
|
||||||
chunk_list = []
|
chunk_list = []
|
||||||
with pd.read_csv(
|
with pd.read_csv(
|
||||||
path,
|
path,
|
||||||
sep=self.separator,
|
sep=self.separator,
|
||||||
chunksize=CHUNKSIZE,
|
chunksize=CHUNKSIZE,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
compression=compression,
|
||||||
) as reader:
|
) as reader:
|
||||||
for chunks in reader:
|
for chunks in reader:
|
||||||
chunk_list.append(chunks)
|
chunk_list.append(chunks)
|
||||||
@ -81,16 +89,47 @@ class DSVDataFrameReader(DataFrameReader):
|
|||||||
"""
|
"""
|
||||||
Read the CSV file from the gcs bucket and return a dataframe
|
Read the CSV file from the gcs bucket and return a dataframe
|
||||||
"""
|
"""
|
||||||
|
# Determine compression based on file extension
|
||||||
|
compression = None
|
||||||
|
if key.endswith(".gz"):
|
||||||
|
compression = "gzip"
|
||||||
|
|
||||||
path = f"gs://{bucket_name}/{key}"
|
path = f"gs://{bucket_name}/{key}"
|
||||||
return self.read_from_pandas(path=path)
|
return self.read_from_pandas(path=path, compression=compression)
|
||||||
|
|
||||||
@_read_dsv_dispatch.register
|
@_read_dsv_dispatch.register
|
||||||
def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper:
|
def _(self, _: S3Config, key: str, bucket_name: str) -> DatalakeColumnWrapper:
|
||||||
path = self.client.get_object(Bucket=bucket_name, Key=key)["Body"]
|
import pandas as pd # pylint: disable=import-outside-toplevel
|
||||||
return self.read_from_pandas(path=path)
|
|
||||||
|
# Determine compression based on file extension
|
||||||
|
compression = None
|
||||||
|
if key.endswith(".gz"):
|
||||||
|
compression = "gzip"
|
||||||
|
|
||||||
|
# Get the file content from S3
|
||||||
|
response = self.client.get_object(Bucket=bucket_name, Key=key)
|
||||||
|
file_content = response["Body"]
|
||||||
|
|
||||||
|
# Read the CSV data directly from the StreamingBody
|
||||||
|
chunk_list = []
|
||||||
|
with pd.read_csv(
|
||||||
|
file_content,
|
||||||
|
sep=self.separator,
|
||||||
|
chunksize=CHUNKSIZE,
|
||||||
|
compression=compression,
|
||||||
|
) as reader:
|
||||||
|
for chunks in reader:
|
||||||
|
chunk_list.append(chunks)
|
||||||
|
|
||||||
|
return DatalakeColumnWrapper(dataframes=chunk_list)
|
||||||
|
|
||||||
@_read_dsv_dispatch.register
|
@_read_dsv_dispatch.register
|
||||||
def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
|
def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
|
||||||
|
# Determine compression based on file extension
|
||||||
|
compression = None
|
||||||
|
if key.endswith(".gz"):
|
||||||
|
compression = "gzip"
|
||||||
|
|
||||||
storage_options = return_azure_storage_options(self.config_source)
|
storage_options = return_azure_storage_options(self.config_source)
|
||||||
path = AZURE_PATH.format(
|
path = AZURE_PATH.format(
|
||||||
bucket_name=bucket_name,
|
bucket_name=bucket_name,
|
||||||
@ -100,13 +139,19 @@ class DSVDataFrameReader(DataFrameReader):
|
|||||||
return self.read_from_pandas(
|
return self.read_from_pandas(
|
||||||
path=path,
|
path=path,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
compression=compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
@_read_dsv_dispatch.register
|
@_read_dsv_dispatch.register
|
||||||
def _( # pylint: disable=unused-argument
|
def _( # pylint: disable=unused-argument
|
||||||
self, _: LocalConfig, key: str, bucket_name: str
|
self, _: LocalConfig, key: str, bucket_name: str
|
||||||
) -> DatalakeColumnWrapper:
|
) -> DatalakeColumnWrapper:
|
||||||
return self.read_from_pandas(path=key)
|
# Determine compression based on file extension
|
||||||
|
compression = None
|
||||||
|
if key.endswith(".gz"):
|
||||||
|
compression = "gzip"
|
||||||
|
|
||||||
|
return self.read_from_pandas(path=key, compression=compression)
|
||||||
|
|
||||||
def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
|
def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
|
||||||
return self._read_dsv_dispatch(
|
return self._read_dsv_dispatch(
|
||||||
|
@ -36,6 +36,7 @@ logger = utils_logger()
|
|||||||
|
|
||||||
class SupportedTypes(Enum):
|
class SupportedTypes(Enum):
|
||||||
CSV = "csv"
|
CSV = "csv"
|
||||||
|
CSVGZ = "csv.gz"
|
||||||
TSV = "tsv"
|
TSV = "tsv"
|
||||||
AVRO = "avro"
|
AVRO = "avro"
|
||||||
PARQUET = "parquet"
|
PARQUET = "parquet"
|
||||||
@ -53,6 +54,7 @@ class SupportedTypes(Enum):
|
|||||||
|
|
||||||
DF_READER_MAP = {
|
DF_READER_MAP = {
|
||||||
SupportedTypes.CSV.value: CSVDataFrameReader,
|
SupportedTypes.CSV.value: CSVDataFrameReader,
|
||||||
|
SupportedTypes.CSVGZ.value: CSVDataFrameReader,
|
||||||
SupportedTypes.TSV.value: TSVDataFrameReader,
|
SupportedTypes.TSV.value: TSVDataFrameReader,
|
||||||
SupportedTypes.AVRO.value: AvroDataFrameReader,
|
SupportedTypes.AVRO.value: AvroDataFrameReader,
|
||||||
SupportedTypes.PARQUET.value: ParquetDataFrameReader,
|
SupportedTypes.PARQUET.value: ParquetDataFrameReader,
|
||||||
@ -79,7 +81,10 @@ def get_df_reader(
|
|||||||
Load the File Reader based on the Config Source
|
Load the File Reader based on the Config Source
|
||||||
"""
|
"""
|
||||||
# If we have a DSV file, build a reader dynamically based on the received separator
|
# If we have a DSV file, build a reader dynamically based on the received separator
|
||||||
if type_ in {SupportedTypes.CSV, SupportedTypes.TSV} and separator:
|
if (
|
||||||
|
type_ in {SupportedTypes.CSV, SupportedTypes.CSVGZ, SupportedTypes.TSV}
|
||||||
|
and separator
|
||||||
|
):
|
||||||
return get_dsv_reader_by_separator(separator=separator)(
|
return get_dsv_reader_by_separator(separator=separator)(
|
||||||
config_source=config_source, client=client
|
config_source=config_source, client=client
|
||||||
)
|
)
|
||||||
|
@ -66,6 +66,7 @@ def fetch_dataframe(
|
|||||||
return df_wrapper.dataframes, df_wrapper.raw_data
|
return df_wrapper.dataframes, df_wrapper.raw_data
|
||||||
return df_wrapper.dataframes
|
return df_wrapper.dataframes
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Error fetching file [{bucket_name}/{key}] using "
|
f"Error fetching file [{bucket_name}/{key}] using "
|
||||||
f"[{config_source.__class__.__name__}] due to: [{err}]"
|
f"[{config_source.__class__.__name__}] due to: [{err}]"
|
||||||
|
@ -23,6 +23,7 @@ from metadata.utils.datalake.datalake_utils import (
|
|||||||
DataFrameColumnParser,
|
DataFrameColumnParser,
|
||||||
GenericDataFrameColumnParser,
|
GenericDataFrameColumnParser,
|
||||||
ParquetDataFrameColumnParser,
|
ParquetDataFrameColumnParser,
|
||||||
|
get_file_format_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
STRUCTURE = {
|
STRUCTURE = {
|
||||||
@ -451,3 +452,141 @@ class TestParquetDataFrameColumnParser(TestCase):
|
|||||||
with self.subTest(validation=validation):
|
with self.subTest(validation=validation):
|
||||||
expected_col, actual_col = validation
|
expected_col, actual_col = validation
|
||||||
self._validate_parsed_column(expected_col, actual_col)
|
self._validate_parsed_column(expected_col, actual_col)
|
||||||
|
|
||||||
|
def test_get_file_format_type_csv_gz(self):
|
||||||
|
"""test get_file_format_type function for csv.gz files"""
|
||||||
|
# Test csv.gz file detection
|
||||||
|
result = get_file_format_type("data.csv.gz")
|
||||||
|
self.assertEqual(result, SupportedTypes.CSVGZ)
|
||||||
|
|
||||||
|
# Test regular csv file detection (should still work)
|
||||||
|
result = get_file_format_type("data.csv")
|
||||||
|
self.assertEqual(result, SupportedTypes.CSV)
|
||||||
|
|
||||||
|
# Test other gzipped files
|
||||||
|
result = get_file_format_type("data.json.gz")
|
||||||
|
self.assertEqual(result, SupportedTypes.JSONGZ)
|
||||||
|
|
||||||
|
# Test unsupported gzipped format
|
||||||
|
result = get_file_format_type("data.txt.gz")
|
||||||
|
self.assertEqual(result, False)
|
||||||
|
|
||||||
|
def test_csv_gz_file_format_detection_edge_cases(self):
|
||||||
|
"""test edge cases for csv.gz file format detection"""
|
||||||
|
# Test with nested paths
|
||||||
|
result = get_file_format_type("folder/subfolder/data.csv.gz")
|
||||||
|
self.assertEqual(result, SupportedTypes.CSVGZ)
|
||||||
|
|
||||||
|
# Test with multiple dots
|
||||||
|
result = get_file_format_type("data.backup.csv.gz")
|
||||||
|
self.assertEqual(result, SupportedTypes.CSVGZ)
|
||||||
|
|
||||||
|
# Test with no extension
|
||||||
|
result = get_file_format_type("data")
|
||||||
|
self.assertEqual(result, False)
|
||||||
|
|
||||||
|
# Test with just .gz
|
||||||
|
result = get_file_format_type("data.gz")
|
||||||
|
self.assertEqual(result, False)
|
||||||
|
|
||||||
|
def test_csv_gz_compression_detection(self):
|
||||||
|
"""test compression detection for various file types"""
|
||||||
|
# Test csv.gz compression detection
|
||||||
|
test_cases = [
|
||||||
|
("data.csv.gz", SupportedTypes.CSVGZ),
|
||||||
|
("data.csv", SupportedTypes.CSV),
|
||||||
|
("data.json.gz", SupportedTypes.JSONGZ),
|
||||||
|
("data.json", SupportedTypes.JSON),
|
||||||
|
("data.jsonl.gz", SupportedTypes.JSONLGZ),
|
||||||
|
("data.jsonl", SupportedTypes.JSONL),
|
||||||
|
("data.parquet", SupportedTypes.PARQUET),
|
||||||
|
("data.txt.gz", False), # Unsupported
|
||||||
|
("data.unknown.gz", False), # Unsupported
|
||||||
|
]
|
||||||
|
|
||||||
|
for filename, expected in test_cases:
|
||||||
|
with self.subTest(filename=filename):
|
||||||
|
result = get_file_format_type(filename)
|
||||||
|
self.assertEqual(result, expected, f"Failed for {filename}")
|
||||||
|
|
||||||
|
def test_csv_gz_reader_factory_integration(self):
|
||||||
|
"""test that csv.gz is properly integrated with reader factory"""
|
||||||
|
from metadata.readers.dataframe.reader_factory import SupportedTypes
|
||||||
|
|
||||||
|
# Test that CSVGZ is properly handled
|
||||||
|
try:
|
||||||
|
# Test that the enum value exists
|
||||||
|
self.assertEqual(SupportedTypes.CSVGZ.value, "csv.gz")
|
||||||
|
|
||||||
|
# Test that it's different from regular CSV
|
||||||
|
self.assertNotEqual(SupportedTypes.CSVGZ, SupportedTypes.CSV)
|
||||||
|
self.assertNotEqual(SupportedTypes.CSVGZ.value, SupportedTypes.CSV.value)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.fail(f"CSVGZ enum test failed: {e}")
|
||||||
|
|
||||||
|
def test_csv_gz_supported_types_enum(self):
|
||||||
|
"""test that CSVGZ is properly defined in SupportedTypes enum"""
|
||||||
|
# Test that CSVGZ exists in the enum
|
||||||
|
self.assertIn(SupportedTypes.CSVGZ, SupportedTypes)
|
||||||
|
self.assertEqual(SupportedTypes.CSVGZ.value, "csv.gz")
|
||||||
|
|
||||||
|
# Test that it's different from regular CSV
|
||||||
|
self.assertNotEqual(SupportedTypes.CSVGZ, SupportedTypes.CSV)
|
||||||
|
self.assertNotEqual(SupportedTypes.CSVGZ.value, SupportedTypes.CSV.value)
|
||||||
|
|
||||||
|
def test_csv_gz_dsv_reader_compression_detection(self):
|
||||||
|
"""test that DSV reader properly detects compression for csv.gz files"""
|
||||||
|
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
||||||
|
LocalConfig,
|
||||||
|
)
|
||||||
|
from metadata.readers.dataframe.dsv import DSVDataFrameReader
|
||||||
|
|
||||||
|
# Create a mock config
|
||||||
|
local_config = LocalConfig()
|
||||||
|
|
||||||
|
# Create DSV reader
|
||||||
|
reader = DSVDataFrameReader(config_source=local_config, client=None)
|
||||||
|
|
||||||
|
# Test compression detection logic (this is the same logic used in the dispatch methods)
|
||||||
|
test_cases = [
|
||||||
|
("data.csv.gz", "gzip"),
|
||||||
|
("data.csv", None),
|
||||||
|
("data.json.gz", "gzip"),
|
||||||
|
("data.txt.gz", "gzip"),
|
||||||
|
("data.unknown.gz", "gzip"),
|
||||||
|
]
|
||||||
|
|
||||||
|
for filename, expected_compression in test_cases:
|
||||||
|
with self.subTest(filename=filename):
|
||||||
|
# Simulate the compression detection logic from the dispatch methods
|
||||||
|
compression = None
|
||||||
|
if filename.endswith(".gz"):
|
||||||
|
compression = "gzip"
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
compression,
|
||||||
|
expected_compression,
|
||||||
|
f"Compression detection failed for {filename}",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_csv_gz_integration_completeness(self):
|
||||||
|
"""test that csv.gz support is complete across all components"""
|
||||||
|
# Test that CSVGZ is in the reader factory mapping
|
||||||
|
from metadata.readers.dataframe.reader_factory import (
|
||||||
|
DF_READER_MAP,
|
||||||
|
SupportedTypes,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that CSVGZ is mapped to CSVDataFrameReader
|
||||||
|
self.assertIn(SupportedTypes.CSVGZ.value, DF_READER_MAP)
|
||||||
|
|
||||||
|
# Test that the get_df_reader function includes CSVGZ in DSV handling
|
||||||
|
|
||||||
|
# This should not raise an exception for CSVGZ
|
||||||
|
try:
|
||||||
|
# Test that CSVGZ is included in the DSV types
|
||||||
|
dsv_types = {SupportedTypes.CSV, SupportedTypes.CSVGZ, SupportedTypes.TSV}
|
||||||
|
self.assertIn(SupportedTypes.CSVGZ, dsv_types)
|
||||||
|
except Exception as e:
|
||||||
|
self.fail(f"CSVGZ integration test failed: {e}")
|
||||||
|
@ -996,7 +996,7 @@
|
|||||||
"fileFormat": {
|
"fileFormat": {
|
||||||
"description": "File format in case of file/datalake tables.",
|
"description": "File format in case of file/datalake tables.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": ["csv", "tsv", "avro", "parquet", "pq", "pqt", "parq", "parquet.snappy", "json", "json.gz", "json.zip", "jsonl", "jsonl.gz", "jsonl.zip"]
|
"enum": ["csv", "csv.gz", "tsv", "avro", "parquet", "pq", "pqt", "parq", "parquet.snappy", "json", "json.gz", "json.zip", "jsonl", "jsonl.gz", "jsonl.zip"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"properties": {
|
"properties": {
|
||||||
|
@ -668,6 +668,7 @@ export enum ModelType {
|
|||||||
export enum FileFormat {
|
export enum FileFormat {
|
||||||
Avro = "avro",
|
Avro = "avro",
|
||||||
CSV = "csv",
|
CSV = "csv",
|
||||||
|
CSVGz = "csv.gz",
|
||||||
JSON = "json",
|
JSON = "json",
|
||||||
JSONGz = "json.gz",
|
JSONGz = "json.gz",
|
||||||
JSONZip = "json.zip",
|
JSONZip = "json.zip",
|
||||||
|
@ -867,6 +867,7 @@ export enum ModelType {
|
|||||||
export enum FileFormat {
|
export enum FileFormat {
|
||||||
Avro = "avro",
|
Avro = "avro",
|
||||||
CSV = "csv",
|
CSV = "csv",
|
||||||
|
CSVGz = "csv.gz",
|
||||||
JSON = "json",
|
JSON = "json",
|
||||||
JSONGz = "json.gz",
|
JSONGz = "json.gz",
|
||||||
JSONZip = "json.zip",
|
JSONZip = "json.zip",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user