diff --git a/ingestion/setup.py b/ingestion/setup.py index cda5ac62487..e3793d3810a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -33,7 +33,7 @@ VERSIONS = { "grpc-tools": "grpcio-tools>=1.47.2", "msal": "msal~=1.2", "neo4j": "neo4j~=5.3", - "pandas": "pandas~=2.0.0", + "pandas": "pandas~=2.0.3", "pyarrow": "pyarrow~=16.0", "pydantic": "pydantic~=2.0,>=2.7.0", "pydantic-settings": "pydantic-settings~=2.0,>=2.7.0", @@ -69,10 +69,12 @@ VERSIONS = { "pyathena": "pyathena~=3.0", "sqlalchemy-bigquery": "sqlalchemy-bigquery>=1.2.2", "presidio-analyzer": "presidio-analyzer==2.2.358", + "asammdf": "asammdf~=7.4.5", } COMMONS = { "datalake": { + VERSIONS["asammdf"], VERSIONS["avro"], VERSIONS["boto3"], VERSIONS["pandas"], diff --git a/ingestion/src/metadata/readers/dataframe/mf4.py b/ingestion/src/metadata/readers/dataframe/mf4.py new file mode 100644 index 00000000000..ece60439629 --- /dev/null +++ b/ingestion/src/metadata/readers/dataframe/mf4.py @@ -0,0 +1,75 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +MF4 DataFrame reader for processing MF4 (Measurement Data Format) files +""" + +import io + +import pandas as pd +from asammdf import MDF + +from metadata.readers.dataframe.base import DataFrameReader +from metadata.readers.dataframe.common import dataframe_to_chunks +from metadata.readers.dataframe.models import DatalakeColumnWrapper +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class MF4DataFrameReader(DataFrameReader): + """ + MF4 file reader implementation for extracting schema and data + from MF4 (Measurement Data Format) files. + """ + + @staticmethod + def extract_schema_from_header(mf4_bytes: bytes) -> DatalakeColumnWrapper: + """ + Extract schema from MF4 header common properties. + This method uses the MDF header metadata instead of actual data extraction. + """ + + file_obj = io.BytesIO(mf4_bytes) + + mdf = MDF(file_obj) + + if hasattr(mdf, "header") and hasattr(mdf.header, "_common_properties"): + common_props = mdf.header._common_properties + + schema_dict = {} + + for key, value in common_props.items(): + schema_dict[key] = pd.Series(value) + + if schema_dict: + schema_df = pd.DataFrame(schema_dict, index=[0]) + logger.info(f"Extracted {len(schema_dict)} properties from MF4 header") + return DatalakeColumnWrapper( + dataframes=dataframe_to_chunks(schema_df), raw_data=common_props + ) + + logger.debug("No _common_properties found in header.") + + @staticmethod + def read_from_mf4(mf4_bytes: bytes) -> DatalakeColumnWrapper: + """ + Convert MF4 file content to DatalakeColumnWrapper using header schema extraction. + """ + return MF4DataFrameReader.extract_schema_from_header(mf4_bytes) + + def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper: + """ + Read MF4 file and extract schema information. + """ + file_content = self.reader.read(key, bucket_name=bucket_name) + return self.read_from_mf4(file_content) diff --git a/ingestion/src/metadata/readers/dataframe/reader_factory.py b/ingestion/src/metadata/readers/dataframe/reader_factory.py index 1c2fc97e92a..1232fcb579d 100644 --- a/ingestion/src/metadata/readers/dataframe/reader_factory.py +++ b/ingestion/src/metadata/readers/dataframe/reader_factory.py @@ -27,6 +27,7 @@ from metadata.readers.dataframe.dsv import ( get_dsv_reader_by_separator, ) from metadata.readers.dataframe.json import JSONDataFrameReader +from metadata.readers.dataframe.mf4 import MF4DataFrameReader from metadata.readers.dataframe.parquet import ParquetDataFrameReader from metadata.readers.models import ConfigSource from metadata.utils.logger import utils_logger @@ -50,6 +51,7 @@ class SupportedTypes(Enum): JSONL = "jsonl" JSONLGZ = "jsonl.gz" JSONLZIP = "jsonl.zip" + MF4 = "MF4" DF_READER_MAP = { @@ -68,6 +70,7 @@ DF_READER_MAP = { SupportedTypes.JSONL.value: JSONDataFrameReader, SupportedTypes.JSONLGZ.value: JSONDataFrameReader, SupportedTypes.JSONLZIP.value: JSONDataFrameReader, + SupportedTypes.MF4.value: MF4DataFrameReader, } diff --git a/ingestion/tests/unit/test_mf4_reader.py b/ingestion/tests/unit/test_mf4_reader.py new file mode 100644 index 00000000000..762f0bbac66 --- /dev/null +++ b/ingestion/tests/unit/test_mf4_reader.py @@ -0,0 +1,93 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +MF4 reader tests +""" +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from metadata.readers.dataframe.mf4 import MF4DataFrameReader +from metadata.readers.dataframe.models import DatalakeColumnWrapper + + +class TestMF4DataFrameReader(TestCase): + """ + Test MF4DataFrameReader functionality + """ + + @patch("metadata.readers.dataframe.base.get_reader") + def setUp(self, mock_get_reader): + """Set up test fixtures""" + # MF4DataFrameReader requires config_source and client + mock_config_source = MagicMock() + mock_client = MagicMock() + + # Mock the reader that get_reader returns + mock_reader = MagicMock() + mock_get_reader.return_value = mock_reader + + self.reader = MF4DataFrameReader( + config_source=mock_config_source, client=mock_client + ) + self.mock_reader = mock_reader + + def test_extract_schema_from_header_with_common_properties(self): + """ + Test extracting schema from MF4 header with _common_properties + """ + # Mock MF4 bytes + mock_mf4_bytes = b"mock_mf4_content" + + # Expected common properties + expected_common_props = { + "measurement_id": "TEST_001", + "vehicle_id": "VEH_123", + "test_date": "2025-01-01", + "sample_rate": 1000.0, + "channels_count": 10, + "duration": 3600.0, + } + + # Create mock MDF object with header + with patch("metadata.readers.dataframe.mf4.MDF") as mock_mdf_class: + mock_mdf = MagicMock() + mock_header = MagicMock() + mock_header._common_properties = expected_common_props + mock_mdf.header = mock_header + mock_mdf_class.return_value = mock_mdf + + # Call the method + result = MF4DataFrameReader.extract_schema_from_header(mock_mf4_bytes) + # Validate result + self.assertIsInstance(result, DatalakeColumnWrapper) + self.assertIsNotNone(result.dataframes) + self.assertEqual(result.raw_data, expected_common_props) + + def test_extract_schema_from_header_without_common_properties(self): + """ + Test extracting schema when no _common_properties exist + """ + mock_mf4_bytes = b"mock_mf4_content" + + with patch("metadata.readers.dataframe.mf4.MDF") as mock_mdf_class: + mock_mdf = MagicMock() + mock_header = MagicMock() + # No _common_properties attribute + del mock_header._common_properties + mock_mdf.header = mock_header + mock_mdf_class.return_value = mock_mdf + + # Call the method + result = MF4DataFrameReader.extract_schema_from_header(mock_mf4_bytes) + + # Should return None when no properties found + self.assertIsNone(result) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json index 157bf6a978c..73aa303fd30 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json @@ -36,18 +36,49 @@ "fileFormat": { "javaType": "org.openmetadata.schema.type.ContainerFileFormat", "description": "This schema defines the file formats for the object/files within a container.", - "javaInterfaces": ["org.openmetadata.schema.EnumInterface"], + "javaInterfaces": [ + "org.openmetadata.schema.EnumInterface" + ], "type": "string", - "enum": ["zip", "gz", "zstd", "csv", "tsv", "json", "parquet", "avro"], + "enum": [ + "zip", + "gz", + "zstd", + "csv", + "tsv", + "json", + "parquet", + "avro", + "MF4" + ], "javaEnums": [ - {"name": "Zip"}, - {"name": "Gz"}, - {"name": "Zstd"}, - {"name": "Csv"}, - {"name": "Tsv"}, - {"name": "Json"}, - {"name": "Parquet"}, - {"name": "Avro"} + { + "name": "Zip" + }, + { + "name": "Gz" + }, + { + "name": "Zstd" + }, + { + "name": "Csv" + }, + { + "name": "Tsv" + }, + { + "name": "Json" + }, + { + "name": "Parquet" + }, + { + "name": "Avro" + }, + { + "name": "MF4" + } ] } }, @@ -161,7 +192,7 @@ "type": "boolean", "default": false }, - "retentionPeriod" : { + "retentionPeriod": { "description": "Retention period of the data in the Container. Period is expressed as duration in ISO 8601 format in UTC. Example - `P23DT23H`.", "$ref": "../../type/basic.json#/definitions/duration" }, @@ -177,7 +208,7 @@ "description": "Full path of the container/file.", "type": "string" }, - "domains" : { + "domains": { "description": "Domains the Container belongs to. When not set, the Container inherits the domain from the storage service it belongs to.", "$ref": "../../type/entityReferenceList.json" }, @@ -214,4 +245,4 @@ "service" ], "additionalProperties": false -} +} \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createContainer.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createContainer.ts index c7fdadaf9de..552adc53633 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createContainer.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createContainer.ts @@ -660,6 +660,7 @@ export enum FileFormat { CSV = "csv", Gz = "gz", JSON = "json", + Mf4 = "MF4", Parquet = "parquet", Tsv = "tsv", Zip = "zip", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/container.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/container.ts index e3b0dccbb56..4cff69989f1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/container.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/container.ts @@ -825,6 +825,7 @@ export enum FileFormat { CSV = "csv", Gz = "gz", JSON = "json", + Mf4 = "MF4", Parquet = "parquet", Tsv = "tsv", Zip = "zip",