mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-06 06:16:21 +00:00
Feature: MF4 File Reader (#23308)
* feat: mf4 file reader * refactor: removed schema_from_data implementation * test: added tests for mf4 files
This commit is contained in:
parent
721cdfa51e
commit
6b7262a8ea
@ -33,7 +33,7 @@ VERSIONS = {
|
|||||||
"grpc-tools": "grpcio-tools>=1.47.2",
|
"grpc-tools": "grpcio-tools>=1.47.2",
|
||||||
"msal": "msal~=1.2",
|
"msal": "msal~=1.2",
|
||||||
"neo4j": "neo4j~=5.3",
|
"neo4j": "neo4j~=5.3",
|
||||||
"pandas": "pandas~=2.0.0",
|
"pandas": "pandas~=2.0.3",
|
||||||
"pyarrow": "pyarrow~=16.0",
|
"pyarrow": "pyarrow~=16.0",
|
||||||
"pydantic": "pydantic~=2.0,>=2.7.0",
|
"pydantic": "pydantic~=2.0,>=2.7.0",
|
||||||
"pydantic-settings": "pydantic-settings~=2.0,>=2.7.0",
|
"pydantic-settings": "pydantic-settings~=2.0,>=2.7.0",
|
||||||
@ -69,10 +69,12 @@ VERSIONS = {
|
|||||||
"pyathena": "pyathena~=3.0",
|
"pyathena": "pyathena~=3.0",
|
||||||
"sqlalchemy-bigquery": "sqlalchemy-bigquery>=1.2.2",
|
"sqlalchemy-bigquery": "sqlalchemy-bigquery>=1.2.2",
|
||||||
"presidio-analyzer": "presidio-analyzer==2.2.358",
|
"presidio-analyzer": "presidio-analyzer==2.2.358",
|
||||||
|
"asammdf": "asammdf~=7.4.5",
|
||||||
}
|
}
|
||||||
|
|
||||||
COMMONS = {
|
COMMONS = {
|
||||||
"datalake": {
|
"datalake": {
|
||||||
|
VERSIONS["asammdf"],
|
||||||
VERSIONS["avro"],
|
VERSIONS["avro"],
|
||||||
VERSIONS["boto3"],
|
VERSIONS["boto3"],
|
||||||
VERSIONS["pandas"],
|
VERSIONS["pandas"],
|
||||||
|
75
ingestion/src/metadata/readers/dataframe/mf4.py
Normal file
75
ingestion/src/metadata/readers/dataframe/mf4.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
# Copyright 2025 Collate
|
||||||
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
MF4 DataFrame reader for processing MF4 (Measurement Data Format) files
|
||||||
|
"""
|
||||||
|
|
||||||
|
import io
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from asammdf import MDF
|
||||||
|
|
||||||
|
from metadata.readers.dataframe.base import DataFrameReader
|
||||||
|
from metadata.readers.dataframe.common import dataframe_to_chunks
|
||||||
|
from metadata.readers.dataframe.models import DatalakeColumnWrapper
|
||||||
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
|
logger = ingestion_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class MF4DataFrameReader(DataFrameReader):
|
||||||
|
"""
|
||||||
|
MF4 file reader implementation for extracting schema and data
|
||||||
|
from MF4 (Measurement Data Format) files.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_schema_from_header(mf4_bytes: bytes) -> DatalakeColumnWrapper:
|
||||||
|
"""
|
||||||
|
Extract schema from MF4 header common properties.
|
||||||
|
This method uses the MDF header metadata instead of actual data extraction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
file_obj = io.BytesIO(mf4_bytes)
|
||||||
|
|
||||||
|
mdf = MDF(file_obj)
|
||||||
|
|
||||||
|
if hasattr(mdf, "header") and hasattr(mdf.header, "_common_properties"):
|
||||||
|
common_props = mdf.header._common_properties
|
||||||
|
|
||||||
|
schema_dict = {}
|
||||||
|
|
||||||
|
for key, value in common_props.items():
|
||||||
|
schema_dict[key] = pd.Series(value)
|
||||||
|
|
||||||
|
if schema_dict:
|
||||||
|
schema_df = pd.DataFrame(schema_dict, index=[0])
|
||||||
|
logger.info(f"Extracted {len(schema_dict)} properties from MF4 header")
|
||||||
|
return DatalakeColumnWrapper(
|
||||||
|
dataframes=dataframe_to_chunks(schema_df), raw_data=common_props
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("No _common_properties found in header.")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def read_from_mf4(mf4_bytes: bytes) -> DatalakeColumnWrapper:
|
||||||
|
"""
|
||||||
|
Convert MF4 file content to DatalakeColumnWrapper using header schema extraction.
|
||||||
|
"""
|
||||||
|
return MF4DataFrameReader.extract_schema_from_header(mf4_bytes)
|
||||||
|
|
||||||
|
def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
|
||||||
|
"""
|
||||||
|
Read MF4 file and extract schema information.
|
||||||
|
"""
|
||||||
|
file_content = self.reader.read(key, bucket_name=bucket_name)
|
||||||
|
return self.read_from_mf4(file_content)
|
@ -27,6 +27,7 @@ from metadata.readers.dataframe.dsv import (
|
|||||||
get_dsv_reader_by_separator,
|
get_dsv_reader_by_separator,
|
||||||
)
|
)
|
||||||
from metadata.readers.dataframe.json import JSONDataFrameReader
|
from metadata.readers.dataframe.json import JSONDataFrameReader
|
||||||
|
from metadata.readers.dataframe.mf4 import MF4DataFrameReader
|
||||||
from metadata.readers.dataframe.parquet import ParquetDataFrameReader
|
from metadata.readers.dataframe.parquet import ParquetDataFrameReader
|
||||||
from metadata.readers.models import ConfigSource
|
from metadata.readers.models import ConfigSource
|
||||||
from metadata.utils.logger import utils_logger
|
from metadata.utils.logger import utils_logger
|
||||||
@ -50,6 +51,7 @@ class SupportedTypes(Enum):
|
|||||||
JSONL = "jsonl"
|
JSONL = "jsonl"
|
||||||
JSONLGZ = "jsonl.gz"
|
JSONLGZ = "jsonl.gz"
|
||||||
JSONLZIP = "jsonl.zip"
|
JSONLZIP = "jsonl.zip"
|
||||||
|
MF4 = "MF4"
|
||||||
|
|
||||||
|
|
||||||
DF_READER_MAP = {
|
DF_READER_MAP = {
|
||||||
@ -68,6 +70,7 @@ DF_READER_MAP = {
|
|||||||
SupportedTypes.JSONL.value: JSONDataFrameReader,
|
SupportedTypes.JSONL.value: JSONDataFrameReader,
|
||||||
SupportedTypes.JSONLGZ.value: JSONDataFrameReader,
|
SupportedTypes.JSONLGZ.value: JSONDataFrameReader,
|
||||||
SupportedTypes.JSONLZIP.value: JSONDataFrameReader,
|
SupportedTypes.JSONLZIP.value: JSONDataFrameReader,
|
||||||
|
SupportedTypes.MF4.value: MF4DataFrameReader,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
93
ingestion/tests/unit/test_mf4_reader.py
Normal file
93
ingestion/tests/unit/test_mf4_reader.py
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
# Copyright 2025 Collate
|
||||||
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
MF4 reader tests
|
||||||
|
"""
|
||||||
|
from unittest import TestCase
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from metadata.readers.dataframe.mf4 import MF4DataFrameReader
|
||||||
|
from metadata.readers.dataframe.models import DatalakeColumnWrapper
|
||||||
|
|
||||||
|
|
||||||
|
class TestMF4DataFrameReader(TestCase):
|
||||||
|
"""
|
||||||
|
Test MF4DataFrameReader functionality
|
||||||
|
"""
|
||||||
|
|
||||||
|
@patch("metadata.readers.dataframe.base.get_reader")
|
||||||
|
def setUp(self, mock_get_reader):
|
||||||
|
"""Set up test fixtures"""
|
||||||
|
# MF4DataFrameReader requires config_source and client
|
||||||
|
mock_config_source = MagicMock()
|
||||||
|
mock_client = MagicMock()
|
||||||
|
|
||||||
|
# Mock the reader that get_reader returns
|
||||||
|
mock_reader = MagicMock()
|
||||||
|
mock_get_reader.return_value = mock_reader
|
||||||
|
|
||||||
|
self.reader = MF4DataFrameReader(
|
||||||
|
config_source=mock_config_source, client=mock_client
|
||||||
|
)
|
||||||
|
self.mock_reader = mock_reader
|
||||||
|
|
||||||
|
def test_extract_schema_from_header_with_common_properties(self):
|
||||||
|
"""
|
||||||
|
Test extracting schema from MF4 header with _common_properties
|
||||||
|
"""
|
||||||
|
# Mock MF4 bytes
|
||||||
|
mock_mf4_bytes = b"mock_mf4_content"
|
||||||
|
|
||||||
|
# Expected common properties
|
||||||
|
expected_common_props = {
|
||||||
|
"measurement_id": "TEST_001",
|
||||||
|
"vehicle_id": "VEH_123",
|
||||||
|
"test_date": "2025-01-01",
|
||||||
|
"sample_rate": 1000.0,
|
||||||
|
"channels_count": 10,
|
||||||
|
"duration": 3600.0,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create mock MDF object with header
|
||||||
|
with patch("metadata.readers.dataframe.mf4.MDF") as mock_mdf_class:
|
||||||
|
mock_mdf = MagicMock()
|
||||||
|
mock_header = MagicMock()
|
||||||
|
mock_header._common_properties = expected_common_props
|
||||||
|
mock_mdf.header = mock_header
|
||||||
|
mock_mdf_class.return_value = mock_mdf
|
||||||
|
|
||||||
|
# Call the method
|
||||||
|
result = MF4DataFrameReader.extract_schema_from_header(mock_mf4_bytes)
|
||||||
|
# Validate result
|
||||||
|
self.assertIsInstance(result, DatalakeColumnWrapper)
|
||||||
|
self.assertIsNotNone(result.dataframes)
|
||||||
|
self.assertEqual(result.raw_data, expected_common_props)
|
||||||
|
|
||||||
|
def test_extract_schema_from_header_without_common_properties(self):
|
||||||
|
"""
|
||||||
|
Test extracting schema when no _common_properties exist
|
||||||
|
"""
|
||||||
|
mock_mf4_bytes = b"mock_mf4_content"
|
||||||
|
|
||||||
|
with patch("metadata.readers.dataframe.mf4.MDF") as mock_mdf_class:
|
||||||
|
mock_mdf = MagicMock()
|
||||||
|
mock_header = MagicMock()
|
||||||
|
# No _common_properties attribute
|
||||||
|
del mock_header._common_properties
|
||||||
|
mock_mdf.header = mock_header
|
||||||
|
mock_mdf_class.return_value = mock_mdf
|
||||||
|
|
||||||
|
# Call the method
|
||||||
|
result = MF4DataFrameReader.extract_schema_from_header(mock_mf4_bytes)
|
||||||
|
|
||||||
|
# Should return None when no properties found
|
||||||
|
self.assertIsNone(result)
|
@ -36,18 +36,49 @@
|
|||||||
"fileFormat": {
|
"fileFormat": {
|
||||||
"javaType": "org.openmetadata.schema.type.ContainerFileFormat",
|
"javaType": "org.openmetadata.schema.type.ContainerFileFormat",
|
||||||
"description": "This schema defines the file formats for the object/files within a container.",
|
"description": "This schema defines the file formats for the object/files within a container.",
|
||||||
"javaInterfaces": ["org.openmetadata.schema.EnumInterface"],
|
"javaInterfaces": [
|
||||||
|
"org.openmetadata.schema.EnumInterface"
|
||||||
|
],
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": ["zip", "gz", "zstd", "csv", "tsv", "json", "parquet", "avro"],
|
"enum": [
|
||||||
|
"zip",
|
||||||
|
"gz",
|
||||||
|
"zstd",
|
||||||
|
"csv",
|
||||||
|
"tsv",
|
||||||
|
"json",
|
||||||
|
"parquet",
|
||||||
|
"avro",
|
||||||
|
"MF4"
|
||||||
|
],
|
||||||
"javaEnums": [
|
"javaEnums": [
|
||||||
{"name": "Zip"},
|
{
|
||||||
{"name": "Gz"},
|
"name": "Zip"
|
||||||
{"name": "Zstd"},
|
},
|
||||||
{"name": "Csv"},
|
{
|
||||||
{"name": "Tsv"},
|
"name": "Gz"
|
||||||
{"name": "Json"},
|
},
|
||||||
{"name": "Parquet"},
|
{
|
||||||
{"name": "Avro"}
|
"name": "Zstd"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Csv"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Tsv"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Parquet"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Avro"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "MF4"
|
||||||
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -161,7 +192,7 @@
|
|||||||
"type": "boolean",
|
"type": "boolean",
|
||||||
"default": false
|
"default": false
|
||||||
},
|
},
|
||||||
"retentionPeriod" : {
|
"retentionPeriod": {
|
||||||
"description": "Retention period of the data in the Container. Period is expressed as duration in ISO 8601 format in UTC. Example - `P23DT23H`.",
|
"description": "Retention period of the data in the Container. Period is expressed as duration in ISO 8601 format in UTC. Example - `P23DT23H`.",
|
||||||
"$ref": "../../type/basic.json#/definitions/duration"
|
"$ref": "../../type/basic.json#/definitions/duration"
|
||||||
},
|
},
|
||||||
@ -177,7 +208,7 @@
|
|||||||
"description": "Full path of the container/file.",
|
"description": "Full path of the container/file.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"domains" : {
|
"domains": {
|
||||||
"description": "Domains the Container belongs to. When not set, the Container inherits the domain from the storage service it belongs to.",
|
"description": "Domains the Container belongs to. When not set, the Container inherits the domain from the storage service it belongs to.",
|
||||||
"$ref": "../../type/entityReferenceList.json"
|
"$ref": "../../type/entityReferenceList.json"
|
||||||
},
|
},
|
||||||
|
@ -660,6 +660,7 @@ export enum FileFormat {
|
|||||||
CSV = "csv",
|
CSV = "csv",
|
||||||
Gz = "gz",
|
Gz = "gz",
|
||||||
JSON = "json",
|
JSON = "json",
|
||||||
|
Mf4 = "MF4",
|
||||||
Parquet = "parquet",
|
Parquet = "parquet",
|
||||||
Tsv = "tsv",
|
Tsv = "tsv",
|
||||||
Zip = "zip",
|
Zip = "zip",
|
||||||
|
@ -825,6 +825,7 @@ export enum FileFormat {
|
|||||||
CSV = "csv",
|
CSV = "csv",
|
||||||
Gz = "gz",
|
Gz = "gz",
|
||||||
JSON = "json",
|
JSON = "json",
|
||||||
|
Mf4 = "MF4",
|
||||||
Parquet = "parquet",
|
Parquet = "parquet",
|
||||||
Tsv = "tsv",
|
Tsv = "tsv",
|
||||||
Zip = "zip",
|
Zip = "zip",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user