diff --git a/metadata-ingestion/src/datahub/ingestion/fs/fs_base.py b/metadata-ingestion/src/datahub/ingestion/fs/fs_base.py index 88dcf68788..b099d4d332 100644 --- a/metadata-ingestion/src/datahub/ingestion/fs/fs_base.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/fs_base.py @@ -31,16 +31,6 @@ class FileSystem(metaclass=ABCMeta): def list(self, path: str) -> Iterable[FileInfo]: pass - @abstractmethod - def write(self, path: str, content: str, **kwargs: Any) -> None: - """Write content to a file at the given path.""" - pass - - @abstractmethod - def exists(self, path: str) -> bool: - """Check if a file exists at the given path.""" - pass - def get_path_schema(path: str) -> str: scheme = parse.urlparse(path).scheme diff --git a/metadata-ingestion/src/datahub/ingestion/fs/http_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/http_fs.py index 52d07e121e..a915335269 100644 --- a/metadata-ingestion/src/datahub/ingestion/fs/http_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/http_fs.py @@ -26,15 +26,3 @@ class HttpFileSystem(FileSystem): def list(self, path: str) -> Iterable[FileInfo]: status = self.file_status(path) return [status] - - def write(self, path: str, content: str, **kwargs: Any) -> None: - """HTTP file system does not support writing.""" - raise NotImplementedError("HTTP file system does not support write operations") - - def exists(self, path: str) -> bool: - """Check if an HTTP resource exists.""" - try: - head = requests.head(path) - return head.ok - except Exception: - return False diff --git a/metadata-ingestion/src/datahub/ingestion/fs/local_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/local_fs.py index be0e94aecd..8a546650a3 100644 --- a/metadata-ingestion/src/datahub/ingestion/fs/local_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/local_fs.py @@ -27,17 +27,3 @@ class LocalFileSystem(FileSystem): return FileInfo(path, os.path.getsize(path), is_file=True) else: return FileInfo(path, 0, is_file=False) - - def write(self, path: str, content: str, **kwargs: Any) -> None: - """Write content to a local file.""" - # Create parent directories if they don't exist - p = pathlib.Path(path) - p.parent.mkdir(parents=True, exist_ok=True) - - # Write the content - with p.open("w", **kwargs) as f: - f.write(content) - - def exists(self, path: str) -> bool: - """Check if a file exists locally.""" - return pathlib.Path(path).exists() diff --git a/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py index d26f0be420..5848230d75 100644 --- a/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py @@ -105,32 +105,3 @@ class S3FileSystem(FileSystem): def list(self, path: str) -> Iterable[FileInfo]: s3_path = parse_s3_path(path) return S3ListIterator(self.s3, s3_path.bucket, s3_path.key) - - def write(self, path: str, content: str, **kwargs: Any) -> None: - """Write content to S3.""" - s3_path = parse_s3_path(path) - - # Convert string content to bytes for S3 - content_bytes = content.encode("utf-8") - - # Upload to S3 - response = self.s3.put_object( - Bucket=s3_path.bucket, Key=s3_path.key, Body=content_bytes, **kwargs - ) - assert_ok_status(response) - - def exists(self, path: str) -> bool: - """Check if an object exists in S3.""" - s3_path = parse_s3_path(path) - try: - self.s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key) - return True - except Exception as e: - if ( - hasattr(e, "response") - and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404 - ): - return False - else: - # Re-raise other exceptions (access denied, etc.) - raise e diff --git a/metadata-ingestion/src/datahub/ingestion/sink/file.py b/metadata-ingestion/src/datahub/ingestion/sink/file.py index 0ae7453ad6..c4f34d780f 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/file.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/file.py @@ -1,36 +1,34 @@ import json import logging import pathlib -from typing import Iterable, List, Union +from typing import Iterable, Union from datahub.configuration.common import ConfigModel from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import RecordEnvelope from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback -from datahub.ingestion.fs.fs_base import get_path_schema -from datahub.ingestion.fs.fs_registry import fs_registry -from datahub.metadata.schema_classes import ( - MetadataChangeEventClass, - MetadataChangeProposalClass, - UsageAggregationClass, +from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( + MetadataChangeEvent, + MetadataChangeProposal, ) +from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation logger = logging.getLogger(__name__) def _to_obj_for_file( obj: Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, + MetadataChangeEvent, + MetadataChangeProposal, MetadataChangeProposalWrapper, - UsageAggregationClass, + UsageAggregation, ], simplified_structure: bool = True, ) -> dict: if isinstance(obj, MetadataChangeProposalWrapper): return obj.to_obj(simplified_structure=simplified_structure) - elif isinstance(obj, MetadataChangeProposalClass) and simplified_structure: + elif isinstance(obj, MetadataChangeProposal) and simplified_structure: serialized = obj.to_obj() if serialized.get("aspect") and serialized["aspect"].get("contentType") in [ JSON_CONTENT_TYPE, @@ -48,28 +46,18 @@ class FileSinkConfig(ConfigModel): class FileSink(Sink[FileSinkConfig, SinkReport]): - """ - File sink that supports writing to various backends (local, S3, etc.) - using the pluggable file system architecture. - """ - def __post_init__(self) -> None: - self.filename = self.config.filename - - # Determine file system based on path schema - schema = get_path_schema(self.filename) - fs_class = fs_registry.get(schema) - self.fs = fs_class.create() - - # Initialize the records list - self.records: List[dict] = [] + fpath = pathlib.Path(self.config.filename) + self.file = fpath.open("w") + self.file.write("[\n") + self.wrote_something = False def write_record_async( self, record_envelope: RecordEnvelope[ Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, + MetadataChangeEvent, + MetadataChangeProposal, MetadataChangeProposalWrapper, ] ], @@ -80,74 +68,40 @@ class FileSink(Sink[FileSinkConfig, SinkReport]): record, simplified_structure=not self.config.legacy_nested_json_string ) - # Store records in memory until close() - self.records.append(obj) + if self.wrote_something: + self.file.write(",\n") + + json.dump(obj, self.file, indent=4) + self.wrote_something = True self.report.report_record_written(record_envelope) if write_callback: write_callback.on_success(record_envelope, {}) def close(self): - """Write all records to the file system as a JSON array.""" - if not self.records: - # Write empty array if no records - content = "[]" - else: - # Convert records to JSON string - content = "[\n" - for i, record in enumerate(self.records): - if i > 0: - content += ",\n" - content += json.dumps(record, indent=4) - content += "\n]" - - # Write to file system - try: - self.fs.write(self.filename, content) - logger.info( - f"Successfully wrote {len(self.records)} records to {self.filename}" - ) - except Exception as e: - logger.error(f"Failed to write to {self.filename}: {e}") - raise + self.file.write("\n]") + self.file.close() def write_metadata_file( - file_path: Union[str, pathlib.Path], + file: pathlib.Path, records: Iterable[ Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, + MetadataChangeEvent, + MetadataChangeProposal, MetadataChangeProposalWrapper, - UsageAggregationClass, + UsageAggregation, dict, # Serialized MCE or MCP ] ], ) -> None: - """ - Write metadata records to any supported file system (local, S3, etc.). - This function uses the pluggable file system architecture. - """ - # Convert Path to string if needed - file_path_str = str(file_path) - - # Determine file system based on path schema - schema = get_path_schema(file_path_str) - fs_class = fs_registry.get(schema) - fs = fs_class.create() - - # Convert records to JSON string - content = "[\n" - record_list = list(records) # Convert iterable to list - - for i, record in enumerate(record_list): - if i > 0: - content += ",\n" - if not isinstance(record, dict): - record = _to_obj_for_file(record) - content += json.dumps(record, indent=4) - content += "\n]" - - # Write to file system - fs.write(file_path_str, content) - logger.info(f"Successfully wrote {len(record_list)} records to {file_path_str}") + # This simplified version of the FileSink can be used for testing purposes. + with file.open("w") as f: + f.write("[\n") + for i, record in enumerate(records): + if i > 0: + f.write(",\n") + if not isinstance(record, dict): + record = _to_obj_for_file(record) + json.dump(record, f, indent=4) + f.write("\n]") diff --git a/metadata-ingestion/tests/unit/test_file_sink.py b/metadata-ingestion/tests/unit/test_file_sink.py deleted file mode 100644 index bf411931dc..0000000000 --- a/metadata-ingestion/tests/unit/test_file_sink.py +++ /dev/null @@ -1,216 +0,0 @@ -import json -from typing import Union -from unittest.mock import MagicMock, Mock, patch - -import pytest - -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.common import RecordEnvelope -from datahub.ingestion.sink.file import FileSink, FileSinkConfig -from datahub.metadata.schema_classes import ( - DatasetPropertiesClass, - MetadataChangeEventClass, - MetadataChangeProposalClass, -) - - -def test_file_sink_local_write_backward_compatibility(tmp_path): - """Test that FileSink writes to local files maintaining backward compatibility.""" - output_file = tmp_path / "test_output.json" - config = FileSinkConfig(filename=str(output_file)) - sink = FileSink(config=config, ctx=MagicMock()) - - # Create a MetadataChangeProposalWrapper - mcp = MetadataChangeProposalWrapper( - entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - aspect=DatasetPropertiesClass(description="test dataset"), - ) - - # Create record envelope with proper type - record_envelope: RecordEnvelope[ - Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, - MetadataChangeProposalWrapper, - ] - ] = RecordEnvelope(record=mcp, metadata={}) - - sink.write_record_async(record_envelope, write_callback=MagicMock()) - sink.close() - - # Verify file was created and has correct content - assert output_file.exists() - with output_file.open() as f: - content = json.load(f) - - assert len(content) == 1 - assert ( - content[0]["entityUrn"] - == "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)" - ) - assert content[0]["aspect"]["json"]["description"] == "test dataset" - - -def test_file_sink_empty_records(tmp_path): - """Test that FileSink handles empty records correctly.""" - output_file = tmp_path / "empty_output.json" - config = FileSinkConfig(filename=str(output_file)) - sink = FileSink(config=config, ctx=MagicMock()) - - # Don't write any records, just close - sink.close() - - # Verify file contains empty array - assert output_file.exists() - with output_file.open() as f: - content = json.load(f) - assert content == [] - - -def test_file_sink_legacy_nested_json_string(tmp_path): - """Test that FileSink supports legacy nested JSON string format.""" - output_file = tmp_path / "legacy_output.json" - config = FileSinkConfig(filename=str(output_file), legacy_nested_json_string=True) - sink = FileSink(config=config, ctx=MagicMock()) - - # Create a MetadataChangeProposalWrapper - mcp = MetadataChangeProposalWrapper( - entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - aspect=DatasetPropertiesClass(description="test dataset"), - ) - - # Create record envelope with proper type - record_envelope: RecordEnvelope[ - Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, - MetadataChangeProposalWrapper, - ] - ] = RecordEnvelope(record=mcp, metadata={}) - - sink.write_record_async(record_envelope, write_callback=MagicMock()) - sink.close() - - # Verify file was created and has correct content - assert output_file.exists() - with output_file.open() as f: - content = json.load(f) - - # In legacy mode, the aspect should be a nested string - assert len(content) == 1 - assert "aspect" in content[0] - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_file_sink_s3_write(mock_boto3, tmp_path): - """Test that FileSink can write to S3.""" - # Mock the S3 client - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - - # Mock successful put_object response - mock_s3_client.put_object.return_value = { - "ResponseMetadata": {"HTTPStatusCode": 200} - } - - config = FileSinkConfig(filename="s3://test-bucket/metadata/output.json") - sink = FileSink(config=config, ctx=MagicMock()) - - # Create a MetadataChangeProposalWrapper - mcp = MetadataChangeProposalWrapper( - entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - aspect=DatasetPropertiesClass(description="test dataset"), - ) - - # Create record envelope with proper type - record_envelope: RecordEnvelope[ - Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, - MetadataChangeProposalWrapper, - ] - ] = RecordEnvelope(record=mcp, metadata={}) - - sink.write_record_async(record_envelope, write_callback=MagicMock()) - sink.close() - - # Verify S3 client was called correctly - mock_boto3.client.assert_called_once_with("s3") - mock_s3_client.put_object.assert_called_once() - - # Check the call arguments - call_args = mock_s3_client.put_object.call_args - assert call_args.kwargs["Bucket"] == "test-bucket" - assert call_args.kwargs["Key"] == "metadata/output.json" - - # Verify the content is valid JSON - content = call_args.kwargs["Body"].decode("utf-8") - parsed_content = json.loads(content) - assert len(parsed_content) == 1 - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_file_sink_s3_write_failure(mock_boto3, tmp_path): - """Test that FileSink handles S3 write failures gracefully.""" - # Mock the S3 client to raise an exception - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - mock_s3_client.put_object.side_effect = Exception("S3 write failed") - - config = FileSinkConfig(filename="s3://test-bucket/metadata/output.json") - sink = FileSink(config=config, ctx=MagicMock()) - - # Create a MetadataChangeProposalWrapper - mcp = MetadataChangeProposalWrapper( - entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - aspect=DatasetPropertiesClass(description="test dataset"), - ) - - # Create record envelope with proper type - record_envelope: RecordEnvelope[ - Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, - MetadataChangeProposalWrapper, - ] - ] = RecordEnvelope(record=mcp, metadata={}) - - sink.write_record_async(record_envelope, write_callback=MagicMock()) - - # close() should raise the exception - with pytest.raises(Exception, match="S3 write failed"): - sink.close() - - -def test_file_sink_creates_parent_directories(tmp_path): - """Test that FileSink creates parent directories when writing to local files.""" - nested_output_file = tmp_path / "nested" / "dir" / "output.json" - config = FileSinkConfig(filename=str(nested_output_file)) - sink = FileSink(config=config, ctx=MagicMock()) - - # Create a MetadataChangeProposalWrapper - mcp = MetadataChangeProposalWrapper( - entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - aspect=DatasetPropertiesClass(description="test dataset"), - ) - - # Create record envelope with proper type - record_envelope: RecordEnvelope[ - Union[ - MetadataChangeEventClass, - MetadataChangeProposalClass, - MetadataChangeProposalWrapper, - ] - ] = RecordEnvelope(record=mcp, metadata={}) - - sink.write_record_async(record_envelope, write_callback=MagicMock()) - sink.close() - - # Verify nested directories were created - assert nested_output_file.exists() - assert nested_output_file.parent.exists() - - # Verify content - with nested_output_file.open() as f: - content = json.load(f) - assert len(content) == 1 diff --git a/metadata-ingestion/tests/unit/test_file_systems.py b/metadata-ingestion/tests/unit/test_file_systems.py deleted file mode 100644 index 1f5ac7b194..0000000000 --- a/metadata-ingestion/tests/unit/test_file_systems.py +++ /dev/null @@ -1,228 +0,0 @@ -from unittest.mock import Mock, patch - -import pytest - -from datahub.ingestion.fs.fs_base import get_path_schema -from datahub.ingestion.fs.http_fs import HttpFileSystem -from datahub.ingestion.fs.local_fs import LocalFileSystem -from datahub.ingestion.fs.s3_fs import S3FileSystem - - -def test_local_filesystem_write_and_exists(tmp_path): - """Test LocalFileSystem write and exists functionality.""" - fs = LocalFileSystem.create() - - test_file = tmp_path / "test_file.txt" - test_content = "Hello, World!" - - # Test write - fs.write(str(test_file), test_content) - - # Test exists - assert fs.exists(str(test_file)) - assert not fs.exists(str(tmp_path / "nonexistent.txt")) - - # Verify content - with test_file.open() as f: - assert f.read() == test_content - - -def test_local_filesystem_write_creates_directories(tmp_path): - """Test that LocalFileSystem creates parent directories when writing.""" - fs = LocalFileSystem.create() - - nested_file = tmp_path / "nested" / "dir" / "test_file.txt" - test_content = "Hello, World!" - - # Write to nested path - fs.write(str(nested_file), test_content) - - # Verify file and directories were created - assert nested_file.exists() - assert nested_file.parent.exists() - - # Verify content - with nested_file.open() as f: - assert f.read() == test_content - - -def test_local_filesystem_write_with_kwargs(tmp_path): - """Test LocalFileSystem write with additional kwargs.""" - fs = LocalFileSystem.create() - - test_file = tmp_path / "test_file.txt" - test_content = "Hello, World!" - - # Test write with encoding kwarg - fs.write(str(test_file), test_content, encoding="utf-8") - - # Verify content - with test_file.open() as f: - assert f.read() == test_content - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_s3_filesystem_write(mock_boto3): - """Test S3FileSystem write functionality.""" - # Mock the S3 client - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - - # Mock successful put_object response - mock_s3_client.put_object.return_value = { - "ResponseMetadata": {"HTTPStatusCode": 200} - } - - fs = S3FileSystem.create() - - test_path = "s3://test-bucket/path/to/file.txt" - test_content = "Hello, S3!" - - # Test write - fs.write(test_path, test_content) - - # Verify S3 client was called correctly - mock_s3_client.put_object.assert_called_once_with( - Bucket="test-bucket", Key="path/to/file.txt", Body=test_content.encode("utf-8") - ) - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_s3_filesystem_write_with_kwargs(mock_boto3): - """Test S3FileSystem write with additional kwargs.""" - # Mock the S3 client - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - - # Mock successful put_object response - mock_s3_client.put_object.return_value = { - "ResponseMetadata": {"HTTPStatusCode": 200} - } - - fs = S3FileSystem.create() - - test_path = "s3://test-bucket/path/to/file.txt" - test_content = "Hello, S3!" - - # Test write with additional kwargs - fs.write( - test_path, test_content, ContentType="text/plain", Metadata={"author": "test"} - ) - - # Verify S3 client was called with additional kwargs - mock_s3_client.put_object.assert_called_once_with( - Bucket="test-bucket", - Key="path/to/file.txt", - Body=test_content.encode("utf-8"), - ContentType="text/plain", - Metadata={"author": "test"}, - ) - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_s3_filesystem_exists_true(mock_boto3): - """Test S3FileSystem exists functionality when file exists.""" - # Mock the S3 client - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - - # Mock successful head_object response - mock_s3_client.head_object.return_value = { - "ResponseMetadata": {"HTTPStatusCode": 200} - } - - fs = S3FileSystem.create() - - test_path = "s3://test-bucket/path/to/file.txt" - - # Test exists - assert fs.exists(test_path) - - # Verify S3 client was called correctly - mock_s3_client.head_object.assert_called_once_with( - Bucket="test-bucket", Key="path/to/file.txt" - ) - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_s3_filesystem_exists_false(mock_boto3): - """Test S3FileSystem exists functionality when file doesn't exist.""" - # Mock the S3 client - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - - # Mock 404 response with proper structure - error = Exception("Not found") - error.response = {"ResponseMetadata": {"HTTPStatusCode": 404}} # type: ignore - mock_s3_client.head_object.side_effect = error - - fs = S3FileSystem.create() - - test_path = "s3://test-bucket/path/to/nonexistent.txt" - - # Test exists - assert not fs.exists(test_path) - - -@patch("datahub.ingestion.fs.s3_fs.boto3") -def test_s3_filesystem_exists_error(mock_boto3): - """Test S3FileSystem exists functionality with non-404 errors.""" - # Mock the S3 client - mock_s3_client = Mock() - mock_boto3.client.return_value = mock_s3_client - - # Mock access denied response - error = Exception("Access denied") - mock_s3_client.head_object.side_effect = error - - fs = S3FileSystem.create() - - test_path = "s3://test-bucket/path/to/file.txt" - - # Test exists - should re-raise non-404 errors - with pytest.raises(Exception, match="Access denied"): - fs.exists(test_path) - - -def test_http_filesystem_write_not_supported(): - """Test that HttpFileSystem write operation raises NotImplementedError.""" - fs = HttpFileSystem.create() - - with pytest.raises( - NotImplementedError, match="HTTP file system does not support write operations" - ): - fs.write("http://example.com/file.txt", "content") - - -@patch("datahub.ingestion.fs.http_fs.requests") -def test_http_filesystem_exists_true(mock_requests): - """Test HttpFileSystem exists functionality when resource exists.""" - # Mock successful HEAD response - mock_response = Mock() - mock_response.ok = True - mock_requests.head.return_value = mock_response - - fs = HttpFileSystem.create() - - assert fs.exists("http://example.com/file.txt") - mock_requests.head.assert_called_once_with("http://example.com/file.txt") - - -@patch("datahub.ingestion.fs.http_fs.requests") -def test_http_filesystem_exists_false(mock_requests): - """Test HttpFileSystem exists functionality when resource doesn't exist.""" - # Mock failed HEAD response - mock_requests.head.side_effect = Exception("Request failed") - - fs = HttpFileSystem.create() - - assert not fs.exists("http://example.com/nonexistent.txt") - - -def test_get_path_schema(): - """Test path schema detection.""" - assert get_path_schema("s3://bucket/file.txt") == "s3" - assert get_path_schema("http://example.com/file.txt") == "http" - assert get_path_schema("https://example.com/file.txt") == "https" - assert get_path_schema("/local/path/file.txt") == "file" - assert get_path_schema("relative/path/file.txt") == "file"