revert(ingest): Extending file sink to support writing to S3 (#14160) (#14248)

This commit is contained in:
Harshal Sheth 2025-07-29 03:13:26 -07:00 committed by GitHub
parent af98667272
commit fe3ffc1c27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 36 additions and 591 deletions

View File

@ -31,16 +31,6 @@ class FileSystem(metaclass=ABCMeta):
def list(self, path: str) -> Iterable[FileInfo]: def list(self, path: str) -> Iterable[FileInfo]:
pass pass
@abstractmethod
def write(self, path: str, content: str, **kwargs: Any) -> None:
"""Write content to a file at the given path."""
pass
@abstractmethod
def exists(self, path: str) -> bool:
"""Check if a file exists at the given path."""
pass
def get_path_schema(path: str) -> str: def get_path_schema(path: str) -> str:
scheme = parse.urlparse(path).scheme scheme = parse.urlparse(path).scheme

View File

@ -26,15 +26,3 @@ class HttpFileSystem(FileSystem):
def list(self, path: str) -> Iterable[FileInfo]: def list(self, path: str) -> Iterable[FileInfo]:
status = self.file_status(path) status = self.file_status(path)
return [status] return [status]
def write(self, path: str, content: str, **kwargs: Any) -> None:
"""HTTP file system does not support writing."""
raise NotImplementedError("HTTP file system does not support write operations")
def exists(self, path: str) -> bool:
"""Check if an HTTP resource exists."""
try:
head = requests.head(path)
return head.ok
except Exception:
return False

View File

@ -27,17 +27,3 @@ class LocalFileSystem(FileSystem):
return FileInfo(path, os.path.getsize(path), is_file=True) return FileInfo(path, os.path.getsize(path), is_file=True)
else: else:
return FileInfo(path, 0, is_file=False) return FileInfo(path, 0, is_file=False)
def write(self, path: str, content: str, **kwargs: Any) -> None:
"""Write content to a local file."""
# Create parent directories if they don't exist
p = pathlib.Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
# Write the content
with p.open("w", **kwargs) as f:
f.write(content)
def exists(self, path: str) -> bool:
"""Check if a file exists locally."""
return pathlib.Path(path).exists()

View File

@ -105,32 +105,3 @@ class S3FileSystem(FileSystem):
def list(self, path: str) -> Iterable[FileInfo]: def list(self, path: str) -> Iterable[FileInfo]:
s3_path = parse_s3_path(path) s3_path = parse_s3_path(path)
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key) return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)
def write(self, path: str, content: str, **kwargs: Any) -> None:
"""Write content to S3."""
s3_path = parse_s3_path(path)
# Convert string content to bytes for S3
content_bytes = content.encode("utf-8")
# Upload to S3
response = self.s3.put_object(
Bucket=s3_path.bucket, Key=s3_path.key, Body=content_bytes, **kwargs
)
assert_ok_status(response)
def exists(self, path: str) -> bool:
"""Check if an object exists in S3."""
s3_path = parse_s3_path(path)
try:
self.s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
return True
except Exception as e:
if (
hasattr(e, "response")
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
):
return False
else:
# Re-raise other exceptions (access denied, etc.)
raise e

View File

@ -1,36 +1,34 @@
import json import json
import logging import logging
import pathlib import pathlib
from typing import Iterable, List, Union from typing import Iterable, Union
from datahub.configuration.common import ConfigModel from datahub.configuration.common import ConfigModel
from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import RecordEnvelope from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.fs.fs_base import get_path_schema from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
from datahub.ingestion.fs.fs_registry import fs_registry MetadataChangeEvent,
from datahub.metadata.schema_classes import ( MetadataChangeProposal,
MetadataChangeEventClass,
MetadataChangeProposalClass,
UsageAggregationClass,
) )
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _to_obj_for_file( def _to_obj_for_file(
obj: Union[ obj: Union[
MetadataChangeEventClass, MetadataChangeEvent,
MetadataChangeProposalClass, MetadataChangeProposal,
MetadataChangeProposalWrapper, MetadataChangeProposalWrapper,
UsageAggregationClass, UsageAggregation,
], ],
simplified_structure: bool = True, simplified_structure: bool = True,
) -> dict: ) -> dict:
if isinstance(obj, MetadataChangeProposalWrapper): if isinstance(obj, MetadataChangeProposalWrapper):
return obj.to_obj(simplified_structure=simplified_structure) return obj.to_obj(simplified_structure=simplified_structure)
elif isinstance(obj, MetadataChangeProposalClass) and simplified_structure: elif isinstance(obj, MetadataChangeProposal) and simplified_structure:
serialized = obj.to_obj() serialized = obj.to_obj()
if serialized.get("aspect") and serialized["aspect"].get("contentType") in [ if serialized.get("aspect") and serialized["aspect"].get("contentType") in [
JSON_CONTENT_TYPE, JSON_CONTENT_TYPE,
@ -48,28 +46,18 @@ class FileSinkConfig(ConfigModel):
class FileSink(Sink[FileSinkConfig, SinkReport]): class FileSink(Sink[FileSinkConfig, SinkReport]):
"""
File sink that supports writing to various backends (local, S3, etc.)
using the pluggable file system architecture.
"""
def __post_init__(self) -> None: def __post_init__(self) -> None:
self.filename = self.config.filename fpath = pathlib.Path(self.config.filename)
self.file = fpath.open("w")
# Determine file system based on path schema self.file.write("[\n")
schema = get_path_schema(self.filename) self.wrote_something = False
fs_class = fs_registry.get(schema)
self.fs = fs_class.create()
# Initialize the records list
self.records: List[dict] = []
def write_record_async( def write_record_async(
self, self,
record_envelope: RecordEnvelope[ record_envelope: RecordEnvelope[
Union[ Union[
MetadataChangeEventClass, MetadataChangeEvent,
MetadataChangeProposalClass, MetadataChangeProposal,
MetadataChangeProposalWrapper, MetadataChangeProposalWrapper,
] ]
], ],
@ -80,74 +68,40 @@ class FileSink(Sink[FileSinkConfig, SinkReport]):
record, simplified_structure=not self.config.legacy_nested_json_string record, simplified_structure=not self.config.legacy_nested_json_string
) )
# Store records in memory until close() if self.wrote_something:
self.records.append(obj) self.file.write(",\n")
json.dump(obj, self.file, indent=4)
self.wrote_something = True
self.report.report_record_written(record_envelope) self.report.report_record_written(record_envelope)
if write_callback: if write_callback:
write_callback.on_success(record_envelope, {}) write_callback.on_success(record_envelope, {})
def close(self): def close(self):
"""Write all records to the file system as a JSON array.""" self.file.write("\n]")
if not self.records: self.file.close()
# Write empty array if no records
content = "[]"
else:
# Convert records to JSON string
content = "[\n"
for i, record in enumerate(self.records):
if i > 0:
content += ",\n"
content += json.dumps(record, indent=4)
content += "\n]"
# Write to file system
try:
self.fs.write(self.filename, content)
logger.info(
f"Successfully wrote {len(self.records)} records to {self.filename}"
)
except Exception as e:
logger.error(f"Failed to write to {self.filename}: {e}")
raise
def write_metadata_file( def write_metadata_file(
file_path: Union[str, pathlib.Path], file: pathlib.Path,
records: Iterable[ records: Iterable[
Union[ Union[
MetadataChangeEventClass, MetadataChangeEvent,
MetadataChangeProposalClass, MetadataChangeProposal,
MetadataChangeProposalWrapper, MetadataChangeProposalWrapper,
UsageAggregationClass, UsageAggregation,
dict, # Serialized MCE or MCP dict, # Serialized MCE or MCP
] ]
], ],
) -> None: ) -> None:
""" # This simplified version of the FileSink can be used for testing purposes.
Write metadata records to any supported file system (local, S3, etc.). with file.open("w") as f:
This function uses the pluggable file system architecture. f.write("[\n")
""" for i, record in enumerate(records):
# Convert Path to string if needed if i > 0:
file_path_str = str(file_path) f.write(",\n")
if not isinstance(record, dict):
# Determine file system based on path schema record = _to_obj_for_file(record)
schema = get_path_schema(file_path_str) json.dump(record, f, indent=4)
fs_class = fs_registry.get(schema) f.write("\n]")
fs = fs_class.create()
# Convert records to JSON string
content = "[\n"
record_list = list(records) # Convert iterable to list
for i, record in enumerate(record_list):
if i > 0:
content += ",\n"
if not isinstance(record, dict):
record = _to_obj_for_file(record)
content += json.dumps(record, indent=4)
content += "\n]"
# Write to file system
fs.write(file_path_str, content)
logger.info(f"Successfully wrote {len(record_list)} records to {file_path_str}")

View File

@ -1,216 +0,0 @@
import json
from typing import Union
from unittest.mock import MagicMock, Mock, patch
import pytest
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
)
def test_file_sink_local_write_backward_compatibility(tmp_path):
"""Test that FileSink writes to local files maintaining backward compatibility."""
output_file = tmp_path / "test_output.json"
config = FileSinkConfig(filename=str(output_file))
sink = FileSink(config=config, ctx=MagicMock())
# Create a MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
aspect=DatasetPropertiesClass(description="test dataset"),
)
# Create record envelope with proper type
record_envelope: RecordEnvelope[
Union[
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
] = RecordEnvelope(record=mcp, metadata={})
sink.write_record_async(record_envelope, write_callback=MagicMock())
sink.close()
# Verify file was created and has correct content
assert output_file.exists()
with output_file.open() as f:
content = json.load(f)
assert len(content) == 1
assert (
content[0]["entityUrn"]
== "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"
)
assert content[0]["aspect"]["json"]["description"] == "test dataset"
def test_file_sink_empty_records(tmp_path):
"""Test that FileSink handles empty records correctly."""
output_file = tmp_path / "empty_output.json"
config = FileSinkConfig(filename=str(output_file))
sink = FileSink(config=config, ctx=MagicMock())
# Don't write any records, just close
sink.close()
# Verify file contains empty array
assert output_file.exists()
with output_file.open() as f:
content = json.load(f)
assert content == []
def test_file_sink_legacy_nested_json_string(tmp_path):
"""Test that FileSink supports legacy nested JSON string format."""
output_file = tmp_path / "legacy_output.json"
config = FileSinkConfig(filename=str(output_file), legacy_nested_json_string=True)
sink = FileSink(config=config, ctx=MagicMock())
# Create a MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
aspect=DatasetPropertiesClass(description="test dataset"),
)
# Create record envelope with proper type
record_envelope: RecordEnvelope[
Union[
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
] = RecordEnvelope(record=mcp, metadata={})
sink.write_record_async(record_envelope, write_callback=MagicMock())
sink.close()
# Verify file was created and has correct content
assert output_file.exists()
with output_file.open() as f:
content = json.load(f)
# In legacy mode, the aspect should be a nested string
assert len(content) == 1
assert "aspect" in content[0]
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_file_sink_s3_write(mock_boto3, tmp_path):
"""Test that FileSink can write to S3."""
# Mock the S3 client
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
# Mock successful put_object response
mock_s3_client.put_object.return_value = {
"ResponseMetadata": {"HTTPStatusCode": 200}
}
config = FileSinkConfig(filename="s3://test-bucket/metadata/output.json")
sink = FileSink(config=config, ctx=MagicMock())
# Create a MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
aspect=DatasetPropertiesClass(description="test dataset"),
)
# Create record envelope with proper type
record_envelope: RecordEnvelope[
Union[
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
] = RecordEnvelope(record=mcp, metadata={})
sink.write_record_async(record_envelope, write_callback=MagicMock())
sink.close()
# Verify S3 client was called correctly
mock_boto3.client.assert_called_once_with("s3")
mock_s3_client.put_object.assert_called_once()
# Check the call arguments
call_args = mock_s3_client.put_object.call_args
assert call_args.kwargs["Bucket"] == "test-bucket"
assert call_args.kwargs["Key"] == "metadata/output.json"
# Verify the content is valid JSON
content = call_args.kwargs["Body"].decode("utf-8")
parsed_content = json.loads(content)
assert len(parsed_content) == 1
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_file_sink_s3_write_failure(mock_boto3, tmp_path):
"""Test that FileSink handles S3 write failures gracefully."""
# Mock the S3 client to raise an exception
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
mock_s3_client.put_object.side_effect = Exception("S3 write failed")
config = FileSinkConfig(filename="s3://test-bucket/metadata/output.json")
sink = FileSink(config=config, ctx=MagicMock())
# Create a MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
aspect=DatasetPropertiesClass(description="test dataset"),
)
# Create record envelope with proper type
record_envelope: RecordEnvelope[
Union[
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
] = RecordEnvelope(record=mcp, metadata={})
sink.write_record_async(record_envelope, write_callback=MagicMock())
# close() should raise the exception
with pytest.raises(Exception, match="S3 write failed"):
sink.close()
def test_file_sink_creates_parent_directories(tmp_path):
"""Test that FileSink creates parent directories when writing to local files."""
nested_output_file = tmp_path / "nested" / "dir" / "output.json"
config = FileSinkConfig(filename=str(nested_output_file))
sink = FileSink(config=config, ctx=MagicMock())
# Create a MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
aspect=DatasetPropertiesClass(description="test dataset"),
)
# Create record envelope with proper type
record_envelope: RecordEnvelope[
Union[
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
] = RecordEnvelope(record=mcp, metadata={})
sink.write_record_async(record_envelope, write_callback=MagicMock())
sink.close()
# Verify nested directories were created
assert nested_output_file.exists()
assert nested_output_file.parent.exists()
# Verify content
with nested_output_file.open() as f:
content = json.load(f)
assert len(content) == 1

View File

@ -1,228 +0,0 @@
from unittest.mock import Mock, patch
import pytest
from datahub.ingestion.fs.fs_base import get_path_schema
from datahub.ingestion.fs.http_fs import HttpFileSystem
from datahub.ingestion.fs.local_fs import LocalFileSystem
from datahub.ingestion.fs.s3_fs import S3FileSystem
def test_local_filesystem_write_and_exists(tmp_path):
"""Test LocalFileSystem write and exists functionality."""
fs = LocalFileSystem.create()
test_file = tmp_path / "test_file.txt"
test_content = "Hello, World!"
# Test write
fs.write(str(test_file), test_content)
# Test exists
assert fs.exists(str(test_file))
assert not fs.exists(str(tmp_path / "nonexistent.txt"))
# Verify content
with test_file.open() as f:
assert f.read() == test_content
def test_local_filesystem_write_creates_directories(tmp_path):
"""Test that LocalFileSystem creates parent directories when writing."""
fs = LocalFileSystem.create()
nested_file = tmp_path / "nested" / "dir" / "test_file.txt"
test_content = "Hello, World!"
# Write to nested path
fs.write(str(nested_file), test_content)
# Verify file and directories were created
assert nested_file.exists()
assert nested_file.parent.exists()
# Verify content
with nested_file.open() as f:
assert f.read() == test_content
def test_local_filesystem_write_with_kwargs(tmp_path):
"""Test LocalFileSystem write with additional kwargs."""
fs = LocalFileSystem.create()
test_file = tmp_path / "test_file.txt"
test_content = "Hello, World!"
# Test write with encoding kwarg
fs.write(str(test_file), test_content, encoding="utf-8")
# Verify content
with test_file.open() as f:
assert f.read() == test_content
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_s3_filesystem_write(mock_boto3):
"""Test S3FileSystem write functionality."""
# Mock the S3 client
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
# Mock successful put_object response
mock_s3_client.put_object.return_value = {
"ResponseMetadata": {"HTTPStatusCode": 200}
}
fs = S3FileSystem.create()
test_path = "s3://test-bucket/path/to/file.txt"
test_content = "Hello, S3!"
# Test write
fs.write(test_path, test_content)
# Verify S3 client was called correctly
mock_s3_client.put_object.assert_called_once_with(
Bucket="test-bucket", Key="path/to/file.txt", Body=test_content.encode("utf-8")
)
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_s3_filesystem_write_with_kwargs(mock_boto3):
"""Test S3FileSystem write with additional kwargs."""
# Mock the S3 client
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
# Mock successful put_object response
mock_s3_client.put_object.return_value = {
"ResponseMetadata": {"HTTPStatusCode": 200}
}
fs = S3FileSystem.create()
test_path = "s3://test-bucket/path/to/file.txt"
test_content = "Hello, S3!"
# Test write with additional kwargs
fs.write(
test_path, test_content, ContentType="text/plain", Metadata={"author": "test"}
)
# Verify S3 client was called with additional kwargs
mock_s3_client.put_object.assert_called_once_with(
Bucket="test-bucket",
Key="path/to/file.txt",
Body=test_content.encode("utf-8"),
ContentType="text/plain",
Metadata={"author": "test"},
)
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_s3_filesystem_exists_true(mock_boto3):
"""Test S3FileSystem exists functionality when file exists."""
# Mock the S3 client
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
# Mock successful head_object response
mock_s3_client.head_object.return_value = {
"ResponseMetadata": {"HTTPStatusCode": 200}
}
fs = S3FileSystem.create()
test_path = "s3://test-bucket/path/to/file.txt"
# Test exists
assert fs.exists(test_path)
# Verify S3 client was called correctly
mock_s3_client.head_object.assert_called_once_with(
Bucket="test-bucket", Key="path/to/file.txt"
)
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_s3_filesystem_exists_false(mock_boto3):
"""Test S3FileSystem exists functionality when file doesn't exist."""
# Mock the S3 client
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
# Mock 404 response with proper structure
error = Exception("Not found")
error.response = {"ResponseMetadata": {"HTTPStatusCode": 404}} # type: ignore
mock_s3_client.head_object.side_effect = error
fs = S3FileSystem.create()
test_path = "s3://test-bucket/path/to/nonexistent.txt"
# Test exists
assert not fs.exists(test_path)
@patch("datahub.ingestion.fs.s3_fs.boto3")
def test_s3_filesystem_exists_error(mock_boto3):
"""Test S3FileSystem exists functionality with non-404 errors."""
# Mock the S3 client
mock_s3_client = Mock()
mock_boto3.client.return_value = mock_s3_client
# Mock access denied response
error = Exception("Access denied")
mock_s3_client.head_object.side_effect = error
fs = S3FileSystem.create()
test_path = "s3://test-bucket/path/to/file.txt"
# Test exists - should re-raise non-404 errors
with pytest.raises(Exception, match="Access denied"):
fs.exists(test_path)
def test_http_filesystem_write_not_supported():
"""Test that HttpFileSystem write operation raises NotImplementedError."""
fs = HttpFileSystem.create()
with pytest.raises(
NotImplementedError, match="HTTP file system does not support write operations"
):
fs.write("http://example.com/file.txt", "content")
@patch("datahub.ingestion.fs.http_fs.requests")
def test_http_filesystem_exists_true(mock_requests):
"""Test HttpFileSystem exists functionality when resource exists."""
# Mock successful HEAD response
mock_response = Mock()
mock_response.ok = True
mock_requests.head.return_value = mock_response
fs = HttpFileSystem.create()
assert fs.exists("http://example.com/file.txt")
mock_requests.head.assert_called_once_with("http://example.com/file.txt")
@patch("datahub.ingestion.fs.http_fs.requests")
def test_http_filesystem_exists_false(mock_requests):
"""Test HttpFileSystem exists functionality when resource doesn't exist."""
# Mock failed HEAD response
mock_requests.head.side_effect = Exception("Request failed")
fs = HttpFileSystem.create()
assert not fs.exists("http://example.com/nonexistent.txt")
def test_get_path_schema():
"""Test path schema detection."""
assert get_path_schema("s3://bucket/file.txt") == "s3"
assert get_path_schema("http://example.com/file.txt") == "http"
assert get_path_schema("https://example.com/file.txt") == "https"
assert get_path_schema("/local/path/file.txt") == "file"
assert get_path_schema("relative/path/file.txt") == "file"