fix(ingest/hive): Fix hive storage path formats (#13536)

This commit is contained in:
Tamas Nemeth 2025-05-19 16:17:28 +02:00 committed by GitHub
parent 1dec8d8ccb
commit 0eca4dfde2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 94 additions and 3 deletions

View File

@ -139,7 +139,7 @@ class StoragePathParser:
path = f"{parsed.netloc}/{parsed.path.lstrip('/')}"
elif platform == StoragePlatform.AZURE:
if scheme in ("abfs", "abfss"):
if scheme in ("abfs", "abfss", "wasbs"):
# Format: abfss://container@account.dfs.core.windows.net/path
container = parsed.netloc.split("@")[0]
path = f"{container}/{parsed.path.lstrip('/')}"
@ -153,7 +153,7 @@ class StoragePathParser:
elif platform == StoragePlatform.DBFS:
# For DBFS, use path as-is
path = parsed.path.lstrip("/")
path = "/" + parsed.path.lstrip("/")
elif platform == StoragePlatform.LOCAL:
# For local files, use full path
@ -169,7 +169,6 @@ class StoragePathParser:
# Clean up the path
path = path.rstrip("/") # Remove trailing slashes
path = re.sub(r"/+", "/", path) # Normalize multiple slashes
path = f"/{path}"
return platform, path

View File

@ -0,0 +1,92 @@
from datahub.ingestion.source.sql.hive import StoragePathParser, StoragePlatform
# Mock logger
class MockLogger:
def warning(self, msg):
pass
logger = MockLogger()
class TestStoragePathParser:
def test_local_file_no_scheme(self):
"""Test parsing a local file path with no scheme."""
result = StoragePathParser.parse_storage_location("/path/to/file")
assert result == (StoragePlatform.LOCAL, "/path/to/file")
def test_s3_path(self):
"""Test parsing an S3 path."""
result = StoragePathParser.parse_storage_location("s3://my-bucket/path/to/file")
assert result == (StoragePlatform.S3, "my-bucket/path/to/file")
def test_azure_abfss_path(self):
"""Test parsing an Azure ABFSS path."""
result = StoragePathParser.parse_storage_location(
"abfss://container@account.dfs.core.windows.net/path/to/file"
)
assert result == (StoragePlatform.AZURE, "container/path/to/file")
def test_azure_abfs_path(self):
"""Test parsing an Azure ABFS path."""
result = StoragePathParser.parse_storage_location(
"abfs://container@account.dfs.core.windows.net/path/to/file"
)
assert result == (StoragePlatform.AZURE, "container/path/to/file")
def test_azure_wasbs_path(self):
"""Test parsing an Azure WASBS path."""
result = StoragePathParser.parse_storage_location(
"wasbs://container@account.blob.core.windows.net/path/to/file"
)
assert result == (StoragePlatform.AZURE, "container/path/to/file")
def test_gcs_path(self):
"""Test parsing a Google Cloud Storage path."""
result = StoragePathParser.parse_storage_location("gs://my-bucket/path/to/file")
assert result == (StoragePlatform.GCS, "my-bucket/path/to/file")
def test_dbfs_path(self):
"""Test parsing a Databricks File System path."""
result = StoragePathParser.parse_storage_location("dbfs:/path/to/file")
assert result == (StoragePlatform.DBFS, "/path/to/file")
def test_local_file_with_scheme(self):
"""Test parsing a local file with scheme."""
result = StoragePathParser.parse_storage_location("file:///path/to/file")
assert result == (StoragePlatform.LOCAL, "/path/to/file")
def test_hdfs_path(self):
"""Test parsing an HDFS path."""
result = StoragePathParser.parse_storage_location(
"hdfs://namenode:8020/path/to/file"
)
assert result == (StoragePlatform.HDFS, "namenode:8020/path/to/file")
def test_empty_string(self):
"""Test parsing an empty string."""
result = StoragePathParser.parse_storage_location("")
assert result is None
def test_invalid_scheme(self):
"""Test parsing a URI with an invalid scheme."""
result = StoragePathParser.parse_storage_location("invalid://path/to/file")
assert result is None
def test_no_scheme(self):
"""Test parsing a URI with no scheme."""
result = StoragePathParser.parse_storage_location("path/to/file")
assert result is None
def test_normalize_multiple_slashes(self):
"""Test normalization of multiple slashes."""
result = StoragePathParser.parse_storage_location(
"s3://bucket//path///to////file"
)
assert result == (StoragePlatform.S3, "bucket/path/to/file")
def test_remove_trailing_slashes(self):
"""Test removal of trailing slashes."""
result = StoragePathParser.parse_storage_location("s3://bucket/path/")
assert result == (StoragePlatform.S3, "bucket/path")