MINOR: Add Unstructured Formats Support to GCS Storage Connector (#23158)

(cherry picked from commit 08ee62a1981ab242db02c119ed92edc58bca42d4)
This commit is contained in:
Mayur Singal 2025-09-02 18:22:39 +05:30 committed by OpenMetadata Release Bot
parent 4a81cf9a9c
commit 9eb6cfc04a
4 changed files with 757 additions and 34 deletions

View File

@ -12,9 +12,10 @@
import json
import secrets
import traceback
from copy import deepcopy
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Tuple
from google.cloud.exceptions import NotFound
from google.cloud.monitoring_v3.types import TimeInterval
@ -42,6 +43,7 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
@ -63,6 +65,8 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
WILD_CARD = "*"
class GCSMetric(Enum):
NUMBER_OF_OBJECTS = "storage.googleapis.com/storage/object_count"
@ -82,6 +86,7 @@ class GcsSource(StorageServiceSource):
for project_id, client in self.gcs_clients.storage_client.clients.items()
}
self._bucket_cache: Dict[str, Container] = {}
self._unstructured_container_cache: Dict[str, Tuple[str, str]] = {}
@classmethod
def create(
@ -115,6 +120,10 @@ class GcsSource(StorageServiceSource):
entity=Container, fqn=container_fqn
)
self._bucket_cache[bucket_name] = container_entity
self._unstructured_container_cache[container_fqn] = (
container_entity.id.root,
"", # Empty string for bucket root (no relative path)
)
parent_entity: EntityReference = EntityReference(
id=self._bucket_cache[bucket_name].id.root, type="container"
)
@ -132,6 +141,11 @@ class GcsSource(StorageServiceSource):
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
yield from self._generate_unstructured_containers(
bucket_response=bucket_response,
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
# nothing else do to for the current bucket, skipping to the next
continue
# If no global file, or no valid entries in the manifest, check for bucket level metadata file
@ -142,6 +156,14 @@ class GcsSource(StorageServiceSource):
entries=metadata_config.entries,
parent=parent_entity,
)
yield from self._generate_unstructured_containers(
bucket_response=bucket_response,
entries=metadata_config.entries,
parent=parent_entity,
)
# clean up the cache after each bucket
self._unstructured_container_cache.clear()
except ValidationError as err:
self.status.failed(
@ -164,12 +186,12 @@ class GcsSource(StorageServiceSource):
self, container_details: GCSContainerDetails
) -> Iterable[Either[CreateContainerRequest]]:
container_request = CreateContainerRequest(
name=container_details.name,
name=EntityName(container_details.name),
prefix=container_details.prefix,
numberOfObjects=container_details.number_of_objects,
size=container_details.size,
dataModel=container_details.data_model,
service=self.context.get().objectstore_service,
service=FullyQualifiedEntityName(self.context.get().objectstore_service),
parent=container_details.parent,
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
@ -178,6 +200,33 @@ class GcsSource(StorageServiceSource):
yield Either(right=container_request)
self.register_record(container_request=container_request)
def get_size(
self, bucket_name: str, project_id: str, file_path: str
) -> Optional[float]:
"""
Method to get the size of the file
"""
try:
client = self.gcs_clients.storage_client.clients[project_id]
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_path)
blob.reload()
return blob.size
except Exception as exc:
logger.debug(f"Failed to get size of file due to {exc}")
logger.debug(traceback.format_exc())
return None
def is_valid_unstructured_file(self, accepted_extensions: List, key: str) -> bool:
if WILD_CARD in accepted_extensions:
return True
for ext in accepted_extensions:
if key.endswith(ext):
return True
return False
def _generate_container_details(
self,
bucket_response: GCSBucketResponse,
@ -185,6 +234,10 @@ class GcsSource(StorageServiceSource):
parent: Optional[EntityReference] = None,
) -> Optional[GCSContainerDetails]:
bucket_name = bucket_response.name
if not metadata_entry.structureFormat:
return None
sample_key = self._get_sample_file_path(
bucket=bucket_response, metadata_entry=metadata_entry
)
@ -226,33 +279,80 @@ class GcsSource(StorageServiceSource):
sourceUrl=self._get_object_source_url(
bucket=bucket_response,
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
is_file=False, # Structured containers are directories
),
)
return None
def _generate_structured_containers_by_depth(
self,
bucket_response: GCSBucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
) -> Iterable[GCSContainerDetails]:
try:
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
if prefix:
client = self.gcs_clients.storage_client.clients[
bucket_response.project_id
]
response = client.list_blobs(
bucket_response.name,
prefix=prefix,
max_results=1000,
)
# total depth is depth of prefix + depth of the metadata entry
total_depth = metadata_entry.depth + len(prefix[:-1].split("/"))
candidate_keys = {
"/".join(entry.name.split("/")[:total_depth]) + "/"
for entry in response
if entry and entry.name and len(entry.name.split("/")) > total_depth
}
for key in candidate_keys:
metadata_entry_copy = deepcopy(metadata_entry)
metadata_entry_copy.dataPath = key.strip(KEY_SEPARATOR)
structured_container: Optional[
GCSContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry_copy,
parent=parent,
)
if structured_container:
yield structured_container
except Exception as err:
logger.warning(
f"Error while generating structured containers by depth for {metadata_entry.dataPath} - {err}"
)
logger.debug(traceback.format_exc())
def _generate_structured_containers(
self,
bucket_response: GCSBucketResponse,
entries: List[MetadataEntry],
parent: Optional[EntityReference] = None,
) -> List[GCSContainerDetails]:
result: List[GCSContainerDetails] = []
) -> Iterable[GCSContainerDetails]:
for metadata_entry in entries:
logger.info(
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating structured container"
)
structured_container: Optional[
GCSContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
result.append(structured_container)
return result
if metadata_entry.depth == 0:
structured_container: Optional[
GCSContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
yield structured_container
else:
yield from self._generate_structured_containers_by_depth(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
def _fetch_bucket(self, bucket_name: str) -> GCSBucketResponse:
for project_id, client in self.gcs_clients.storage_client.clients.items():
@ -409,31 +509,190 @@ class GcsSource(StorageServiceSource):
Method to get the source url of GCS bucket
"""
try:
return (
f"https://console.cloud.google.com/storage/browser/{bucket.name}"
f";tab=objects?project={bucket.project_id}"
)
return f"https://console.cloud.google.com/storage/browser/{bucket.name}?project={bucket.project_id}"
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get source url: {exc}")
return None
def _get_object_source_url(
self, bucket: GCSBucketResponse, prefix: str
self, bucket: GCSBucketResponse, prefix: str, is_file: bool = False
) -> Optional[str]:
"""
Method to get the source url of GCS object
Method to get the source url of GCS object or directory
"""
try:
return (
f"https://console.cloud.google.com/storage/browser/_details/{bucket.name}/{prefix}"
f";tab=live_object?project={bucket.project_id}"
)
# Remove trailing slash from prefix if present
clean_prefix = prefix.rstrip("/")
if is_file:
# For files, use the _details path with tab=live_object
return f"https://console.cloud.google.com/storage/browser/_details/{bucket.name}/{clean_prefix};tab=live_object"
else:
# For directories/prefixes, use the browser view
return f"https://console.cloud.google.com/storage/browser/{bucket.name}/{clean_prefix}?project={bucket.project_id}"
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get source url: {exc}")
return None
def _yield_parents_of_unstructured_container(
self,
bucket_name: str,
project_id: str,
list_of_parent: List[str],
parent: Optional[EntityReference] = None,
):
relative_path = "" # Path relative to bucket for URLs
sub_parent = parent
for i in range(len(list_of_parent) - 1):
container_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
bucket_name,
*list_of_parent[: i + 1],
)
)
if container_fqn in self._unstructured_container_cache:
parent_id, relative_path = self._unstructured_container_cache[
container_fqn
]
sub_parent = EntityReference(id=parent_id, type="container")
continue
# Build the relative path (no gs:// prefix)
current_relative_path = KEY_SEPARATOR.join(list_of_parent[: i + 1])
yield GCSContainerDetails(
name=list_of_parent[i],
prefix=KEY_SEPARATOR + current_relative_path,
file_formats=[],
parent=sub_parent,
fullPath=self._get_full_path(bucket_name, current_relative_path),
sourceUrl=self._get_object_source_url(
bucket=GCSBucketResponse(
name=bucket_name, project_id=project_id, creation_date=None
),
prefix=current_relative_path,
is_file=False, # Parent containers are directories
),
)
container_entity = self.metadata.get_by_name(
entity=Container, fqn=container_fqn
)
relative_path = current_relative_path
self._unstructured_container_cache[container_fqn] = (
container_entity.id.root,
relative_path,
)
sub_parent = EntityReference(id=container_entity.id.root, type="container")
def _yield_nested_unstructured_containers(
self,
bucket_response: GCSBucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
):
bucket_name = bucket_response.name
client = self.gcs_clients.storage_client.clients[bucket_response.project_id]
response = client.list_blobs(
bucket_name,
prefix=metadata_entry.dataPath,
max_results=1000,
)
candidate_keys = [
entry.name
for entry in response
if entry and entry.name and not entry.name.endswith("/")
]
for key in candidate_keys:
if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key):
logger.debug(
f"Extracting metadata from path {key.strip(KEY_SEPARATOR)} "
f"and generating unstructured container"
)
list_of_parent = key.strip(KEY_SEPARATOR).split(KEY_SEPARATOR)
yield from self._yield_parents_of_unstructured_container(
bucket_name, bucket_response.project_id, list_of_parent, parent
)
parent_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
bucket_name,
*list_of_parent[:-1],
)
)
parent_id, parent_relative_path = self._unstructured_container_cache[
parent_fqn
]
container_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
bucket_name,
*list_of_parent,
)
)
size = self.get_size(bucket_name, bucket_response.project_id, key)
yield GCSContainerDetails(
name=list_of_parent[-1],
prefix=KEY_SEPARATOR + parent_relative_path
if parent_relative_path
else KEY_SEPARATOR,
file_formats=[],
size=size,
container_fqn=container_fqn,
leaf_container=True,
parent=EntityReference(id=parent_id, type="container"),
fullPath=self._get_full_path(bucket_name, key),
sourceUrl=self._get_object_source_url(
bucket=bucket_response,
prefix=key, # Use the full key path for the file
is_file=True, # Leaf containers are files
),
)
def _generate_unstructured_containers(
self,
bucket_response: GCSBucketResponse,
entries: List[MetadataEntry],
parent: Optional[EntityReference] = None,
) -> Iterable[GCSContainerDetails]:
bucket_name = bucket_response.name
for metadata_entry in entries:
if metadata_entry.structureFormat:
continue
if metadata_entry.unstructuredFormats:
yield from self._yield_nested_unstructured_containers(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
else:
logger.debug(
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating unstructured container"
)
prefix = (
f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
)
yield GCSContainerDetails(
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
prefix=prefix,
file_formats=[],
data_model=None,
parent=parent,
size=self.get_size(
bucket_name=bucket_name,
project_id=bucket_response.project_id,
file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR),
),
fullPath=self._get_full_path(bucket_name, prefix),
sourceUrl=self._get_object_source_url(
bucket=bucket_response,
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
is_file=True, # Individual unstructured files
),
)
def _load_metadata_file(
self, bucket: GCSBucketResponse
) -> Optional[StorageContainerConfig]:

View File

@ -53,21 +53,21 @@ class GCSContainerDetails(BaseModel):
description: Optional[basic.Markdown] = Field(
None, description="Description of the container instance."
)
number_of_objects: float = Field(
...,
number_of_objects: Optional[float] = Field(
None,
description="Total nr. of objects",
)
size: float = Field(
...,
size: Optional[float] = Field(
None,
description="Total size in bytes of all objects",
title="Total size(bytes) of objects",
)
file_formats: Optional[List[FileFormat]] = Field(
...,
None,
description="File formats",
)
data_model: Optional[ContainerDataModel] = Field(
...,
None,
description="Data Model of the container",
)
creation_date: Optional[str] = Field(
@ -84,3 +84,9 @@ class GCSContainerDetails(BaseModel):
fullPath: Optional[str] = Field(
None, description="Full path of the container/file."
)
container_fqn: Optional[str] = Field(
None, description="Fully qualified name of the container."
)
leaf_container: Optional[bool] = Field(
None, description="Whether this is a leaf container."
)

View File

@ -259,7 +259,7 @@ class StorageUnitTest(TestCase):
data_model=None,
creation_date=bucket_response.creation_date.isoformat(),
sourceUrl=SourceUrl(
"https://console.cloud.google.com/storage/browser/test_bucket;tab=objects?project=my-gcp-project"
"https://console.cloud.google.com/storage/browser/test_bucket?project=my-gcp-project"
),
fullPath="gs://test_bucket",
),
@ -304,7 +304,7 @@ class StorageUnitTest(TestCase):
creation_date=datetime.datetime(2000, 1, 1).isoformat(),
parent=entity_ref,
sourceUrl=SourceUrl(
f"https://console.cloud.google.com/storage/browser/_details/test_bucket/transactions;tab=live_object?project=my-gcp-project"
f"https://console.cloud.google.com/storage/browser/test_bucket/transactions?project=my-gcp-project"
),
fullPath="gs://test_bucket/transactions",
),

View File

@ -0,0 +1,458 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for GCS Object store source - Unstructured Formats Support
"""
import datetime
import uuid
from collections import namedtuple
from unittest import TestCase
from unittest.mock import MagicMock, patch
from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import (
MetadataEntry,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.storage.gcs.metadata import GcsSource
from metadata.ingestion.source.storage.gcs.models import (
GCSBucketResponse,
GCSContainerDetails,
)
MockBucketResponse = namedtuple("MockBucketResponse", ["name", "time_created"])
MockBlob = namedtuple("MockBlob", ["name", "size"])
MOCK_GCS_CONFIG = {
"source": {
"type": "gcs",
"serviceName": "gcs_test",
"serviceConnection": {
"config": {
"type": "GCS",
"credentials": {
"gcpConfig": {
"type": "service_account",
"projectId": "my-gcp-project",
"privateKeyId": "private_key_id",
"privateKey": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDPZPtpdoNsg5rH\n8b443iQ5FCAXYbFQEEk/nne/b+6iOzf8zGSYH6cmXmvYNIuw3pqcEAwiUStC8SPt\nbskdbdSf2miJ8DsbZoJPV2wN4zw71If/rhn+VUVgzAXUrYFEJX73gQs7Qhm0QlPL\nUUxcYoboxIkIzbn11SlX6K/IvveGzch7AYhgVGB9nWWZFdBwJaVeGRpdEjtoB/KR\nsXrydUtqzoxvJ6YIGxdV7os4GQlEbs/8mdDnC63+jfqDLzrfWQgSfuyUA2YrZFrX\n74/IhjLH9fA8rrWO0BoEzrfjxxRfXygw+V+nfl1fjPhzkUaOACsh4XaIvAZT8gt7\nzsUVycE1AgMBAAECggEABTatKUwM5rvyNqeJmCVhdLvgSKYwjmoyZTKHIR1fut1R\nPX4n4zkcexbxkBhWEKav7U9r9qriswab2BqqXJ6Hs1MKwLj0y24KxZLViQ3W1Ew1\n9QP77ExZd6L5XIzWDJACvpcUYLN7MPBf6eMLz+C8MnrKVRnS3G604NxdGudOEqnr\nB/H2IRj1G368IApSljV8uHZ7wHMq4nEeMkDYlwWbF8EkxXGc2BkQaq2VbNw4F+Ky\n1KG+G6evzT63D2T03TnB5WstOVMeZng+NOMIP2eiVBF8Pmc8nMekAJ+A35ecQDfL\njK8nlgNdvbDa0BuHSupD4XjNuCCwjVzmnbeWwrQL+wKBgQD8L+LK9K0vuiz7nxpO\noULsz8wU6lNj0qAniZRIomzmNzzgO9O3g81J2TdHOgmhOU/LP0Z26xpptzIFXZ0E\nfMFJNEuZy0R4MUyQ8fGBKIOU5GU6OECxoqZJUfj73KsIXXHOo5TkDycbXB5nDmCF\nEjqjldrOym8QQPsszprsQdJDlwKBgQDSh7jfSSTJ+3sKD1s0p1QCS3PpkQ74RovS\nRvYkHYBp9mM1PPrZqYiJY9qZno1utXUloVg1e+98ihhlAiPs29p3ISkzeTZPPEP4\nBzVGusFRaDeMUC8Ux4fHzPBiRuX/IxhZtf6VrIxwO28cePMBR1HMRNSDZLv1Dk9d\nr+PTf9rLEwKBgQDKEwzll/2WOtaEoU6RJwLbgv6C2+kKGeIfDEz3jr84EZcEDqxt\nZn1+6UE0H7tLpwLbV858x5KYlyTLy+FfkR/IKtFRYOFydf5mPphH6FDXY9QBPMYK\nEMyx/69FEeMyhr4E2Gsb+1BYyg3Kgmiw+JRoNFHqVad9HLSniL33Bh8X7QKBgFl8\nyU9X1uRoGc+X4WvLKEFlcxq3xwYvbmVuNlf5lkj0Kw1JI1N75YaIxDWCGJoTVX0u\nTMFHMe/c/yuIMl8OwJjcppezkSsy8a0u2y16WovQ4bOpramGeqep7A/KFR9S+pm/\nazyRwIxAJyWSH7DOcO2D4FUNb3tlnsSy7ANNmGGzAoGBAPVwNP+I9CSTFYJwYUCZ\neeFkJkafjfEjuN2dFJiISMd34+UO4MH45dmGB6dJsKgSpsi2HbP8mfL6Wjkmpk1w\nt496flAMznTkgegF2oRNx21c7Cz6pVB88Z8UUoBudSVBoASahdev9Ch4YU/etZOI\nCuSfmiAKdSNbly8ZHZV4Ew8b\n-----END PRIVATE KEY-----\n",
"clientEmail": "gcpuser@project_id.iam.gserviceaccount.com",
"clientId": "client_id",
"authUri": "https://accounts.google.com/o/oauth2/auth",
"tokenUri": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
}
},
}
},
"sourceConfig": {
"config": {
"type": "StorageMetadata",
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
}
},
}
class TestGCSUnstructuredFormats(TestCase):
"""
Test GCS unstructured formats support
"""
@patch(
"metadata.ingestion.source.storage.storage_service.StorageServiceSource.test_connection"
)
def setUp(self, test_connection):
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.model_validate(MOCK_GCS_CONFIG)
self.gcs_source = GcsSource.create(
MOCK_GCS_CONFIG["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
# Mock the context
mock_context = MagicMock()
mock_context.get.return_value = MagicMock(
objectstore_service="test_service", container="test_container"
)
self.gcs_source.context = mock_context
# Mock metadata client
self.gcs_source.metadata = MagicMock()
def test_is_valid_unstructured_file(self):
"""Test file validation for unstructured formats"""
# Test with wildcard
self.assertTrue(self.gcs_source.is_valid_unstructured_file(["*"], "test.pdf"))
self.assertTrue(
self.gcs_source.is_valid_unstructured_file(["*"], "anything.txt")
)
# Test with specific extensions
self.assertTrue(
self.gcs_source.is_valid_unstructured_file([".pdf", ".txt"], "test.pdf")
)
self.assertTrue(
self.gcs_source.is_valid_unstructured_file([".pdf", ".txt"], "test.txt")
)
self.assertFalse(
self.gcs_source.is_valid_unstructured_file([".pdf", ".txt"], "test.doc")
)
# Test without extension dot
self.assertTrue(
self.gcs_source.is_valid_unstructured_file(["pdf", "txt"], "test.pdf")
)
def test_get_size(self):
"""Test getting file size from GCS"""
# Mock the GCS client and blob
mock_client = MagicMock()
mock_bucket = MagicMock()
mock_blob = MagicMock()
mock_blob.size = 1024
mock_client.get_bucket.return_value = mock_bucket
mock_bucket.blob.return_value = mock_blob
self.gcs_source.gcs_clients.storage_client.clients = {
"my-gcp-project": mock_client
}
size = self.gcs_source.get_size("test-bucket", "my-gcp-project", "test.pdf")
self.assertEqual(size, 1024)
# Test error handling
mock_client.get_bucket.side_effect = Exception("Error")
size = self.gcs_source.get_size("test-bucket", "my-gcp-project", "test.pdf")
self.assertIsNone(size)
def test_generate_unstructured_containers(self):
"""Test generating unstructured containers"""
bucket_response = GCSBucketResponse(
name="test-bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2025, 1, 1),
)
# Test with unstructuredFormats specified
metadata_entry = MetadataEntry(
dataPath="documents/", unstructuredFormats=[".pdf", ".txt"]
)
# Mock list_blobs response
mock_client = MagicMock()
mock_blobs = [
MockBlob(name="documents/file1.pdf", size=1024),
MockBlob(name="documents/file2.txt", size=2048),
MockBlob(name="documents/file3.doc", size=512), # Should be filtered
MockBlob(name="documents/subdir/file4.pdf", size=4096),
]
mock_client.list_blobs.return_value = mock_blobs
self.gcs_source.gcs_clients.storage_client.clients = {
"my-gcp-project": mock_client
}
# Mock the container entity lookup
mock_container = MagicMock()
mock_container.id.root = str(uuid.uuid4())
self.gcs_source.metadata.get_by_name.return_value = mock_container
# Mock get_size
self.gcs_source.get_size = MagicMock(return_value=1024)
parent = EntityReference(id=uuid.uuid4(), type="container")
containers = list(
self.gcs_source._generate_unstructured_containers(
bucket_response, [metadata_entry], parent
)
)
# Check we got the right number of containers (files + intermediate directories)
self.assertGreater(len(containers), 0)
# Check that we only processed valid extensions
for container in containers:
if container.leaf_container:
self.assertTrue(
container.name.endswith(".pdf") or container.name.endswith(".txt")
)
def test_generate_unstructured_containers_wildcard(self):
"""Test generating unstructured containers with wildcard"""
bucket_response = GCSBucketResponse(
name="test-bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2025, 1, 1),
)
# Test with wildcard to accept all files
metadata_entry = MetadataEntry(dataPath="files/", unstructuredFormats=["*"])
# Mock list_blobs response
mock_client = MagicMock()
mock_blobs = [
MockBlob(name="files/file1.pdf", size=1024),
MockBlob(name="files/file2.docx", size=2048),
MockBlob(
name="files/file3.xyz", size=512
), # Should be accepted with wildcard
]
mock_client.list_blobs.return_value = mock_blobs
self.gcs_source.gcs_clients.storage_client.clients = {
"my-gcp-project": mock_client
}
# Mock the container entity lookup
mock_container = MagicMock()
mock_container.id.root = str(uuid.uuid4())
self.gcs_source.metadata.get_by_name.return_value = mock_container
# Mock get_size
self.gcs_source.get_size = MagicMock(return_value=1024)
parent = EntityReference(id=uuid.uuid4(), type="container")
containers = list(
self.gcs_source._generate_unstructured_containers(
bucket_response, [metadata_entry], parent
)
)
# With wildcard, all files should be processed
leaf_containers = [c for c in containers if c.leaf_container]
self.assertEqual(len(leaf_containers), 3)
def test_yield_parents_of_unstructured_container(self):
"""Test creating parent container hierarchy"""
bucket_name = "test-bucket"
project_id = "my-gcp-project"
list_of_parent = ["documents", "2025", "january", "report.pdf"]
# Mock container entity
mock_container = MagicMock()
mock_container.id.root = str(uuid.uuid4())
self.gcs_source.metadata.get_by_name.return_value = mock_container
parent = EntityReference(id=uuid.uuid4(), type="container")
# Generate parent containers
parents = list(
self.gcs_source._yield_parents_of_unstructured_container(
bucket_name, project_id, list_of_parent, parent
)
)
# Should create containers for: documents, 2025, january (not report.pdf as it's the leaf)
self.assertEqual(len(parents), 3)
self.assertEqual(parents[0].name, "documents")
self.assertEqual(parents[1].name, "2025")
self.assertEqual(parents[2].name, "january")
# Check full paths are constructed correctly
self.assertIn("documents", parents[0].fullPath)
self.assertIn("2025", parents[1].fullPath)
self.assertIn("january", parents[2].fullPath)
def test_yield_nested_unstructured_containers(self):
"""Test yielding nested unstructured containers"""
bucket_response = GCSBucketResponse(
name="test-bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2025, 1, 1),
)
metadata_entry = MetadataEntry(
dataPath="data/", unstructuredFormats=[".csv", ".json"]
)
# Mock list_blobs response with nested structure
mock_client = MagicMock()
mock_blobs = [
MockBlob(name="data/2025/01/sales.csv", size=1024),
MockBlob(name="data/2025/01/products.json", size=2048),
MockBlob(name="data/2025/02/sales.csv", size=3072),
MockBlob(name="data/temp.txt", size=512), # Should be filtered
]
mock_client.list_blobs.return_value = mock_blobs
self.gcs_source.gcs_clients.storage_client.clients = {
"my-gcp-project": mock_client
}
# Mock container entity
mock_container = MagicMock()
mock_container.id.root = str(uuid.uuid4())
self.gcs_source.metadata.get_by_name.return_value = mock_container
# Mock get_size
self.gcs_source.get_size = MagicMock(return_value=1024)
parent = EntityReference(id=uuid.uuid4(), type="container")
containers = list(
self.gcs_source._yield_nested_unstructured_containers(
bucket_response, metadata_entry, parent
)
)
# Check that containers were created for the nested structure
self.assertGreater(len(containers), 0)
# Check leaf containers are properly marked
leaf_containers = [c for c in containers if c.leaf_container]
self.assertEqual(len(leaf_containers), 3) # 3 valid files
# Verify leaf containers have correct names
leaf_names = [c.name for c in leaf_containers]
self.assertIn("sales.csv", leaf_names)
self.assertIn("products.json", leaf_names)
def test_integration_structured_and_unstructured(self):
"""Test integration with both structured and unstructured formats"""
bucket_response = GCSBucketResponse(
name="test-bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2025, 1, 1),
)
entries = [
# Structured entry
MetadataEntry(
dataPath="tables/", structureFormat="parquet", isPartitioned=False
),
# Unstructured entry
MetadataEntry(dataPath="documents/", unstructuredFormats=[".pdf", ".docx"]),
# Mixed - should only process as unstructured
MetadataEntry(
dataPath="reports/",
structureFormat="csv",
unstructuredFormats=[
".txt"
], # Should be ignored when structureFormat is set
),
]
# Mock list_blobs for unstructured
mock_client = MagicMock()
mock_blobs = [
MockBlob(name="documents/report1.pdf", size=1024),
MockBlob(name="documents/report2.docx", size=2048),
]
mock_client.list_blobs.return_value = mock_blobs
self.gcs_source.gcs_clients.storage_client.clients = {
"my-gcp-project": mock_client
}
# Mock container entity
mock_container = MagicMock()
mock_container.id.root = str(uuid.uuid4())
self.gcs_source.metadata.get_by_name.return_value = mock_container
# Mock get_size
self.gcs_source.get_size = MagicMock(return_value=1024)
parent = EntityReference(id=uuid.uuid4(), type="container")
# Test _generate_unstructured_containers
unstructured_containers = list(
self.gcs_source._generate_unstructured_containers(
bucket_response, entries, parent
)
)
# Should only process the second entry (documents/) as unstructured
# The first is structured (skipped), the third has structureFormat so skipped
self.assertGreater(len(unstructured_containers), 0)
def test_unstructured_container_cache_management(self):
"""Test that unstructured container cache is properly managed"""
# Initially cache should be empty
self.assertEqual(len(self.gcs_source._unstructured_container_cache), 0)
# Add some entries to cache
self.gcs_source._unstructured_container_cache["test.service.bucket.folder1"] = (
"id1",
"/folder1",
)
self.gcs_source._unstructured_container_cache["test.service.bucket.folder2"] = (
"id2",
"/folder2",
)
self.assertEqual(len(self.gcs_source._unstructured_container_cache), 2)
# Simulate cache clear (happens after each bucket in get_containers)
self.gcs_source._unstructured_container_cache.clear()
self.assertEqual(len(self.gcs_source._unstructured_container_cache), 0)
def test_generate_structured_containers_by_depth(self):
"""Test generating structured containers with depth specification"""
bucket_response = GCSBucketResponse(
name="test-bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2025, 1, 1),
)
metadata_entry = MetadataEntry(
dataPath="data/",
structureFormat="parquet",
depth=2, # Look 2 levels deep
isPartitioned=False,
)
# Mock list_blobs response
mock_client = MagicMock()
mock_blobs = [
MockBlob(name="data/2025/01/sales.parquet", size=1024),
MockBlob(name="data/2025/02/sales.parquet", size=2048),
MockBlob(name="data/2024/12/sales.parquet", size=3072),
]
mock_client.list_blobs.return_value = mock_blobs
self.gcs_source.gcs_clients.storage_client.clients = {
"my-gcp-project": mock_client
}
# Mock _get_sample_file_prefix
self.gcs_source._get_sample_file_prefix = MagicMock(return_value="data/")
# Mock _generate_container_details to return valid containers
mock_container_details = GCSContainerDetails(
name="test-container", prefix="/data/2025/01/", file_formats=[], size=1024
)
self.gcs_source._generate_container_details = MagicMock(
return_value=mock_container_details
)
parent = EntityReference(id=uuid.uuid4(), type="container")
containers = list(
self.gcs_source._generate_structured_containers_by_depth(
bucket_response, metadata_entry, parent
)
)
# Should create containers for unique paths at depth 2
self.assertGreater(len(containers), 0)