Fixes 16652: add GCS storage service (#16917)

* FEAT-16652: add GCS storage service

* reformat

* update connection tests

* fix tests

* relax google-cloud-storage version constraint

* fix GCP config in tests

---------

Co-authored-by: Matthew Chamberlin <mchamberlin@ginkgobioworks.com>
Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Matt Chamberlin 2024-07-10 08:03:28 -04:00 committed by GitHub
parent 0740b01624
commit d757aa9d77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1301 additions and 31 deletions

View File

@ -24,6 +24,7 @@ VERSIONS = {
"avro": "avro>=1.11.3,<1.12",
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",
"google-cloud-monitoring": "google-cloud-monitoring>=2.0.0",
"google-cloud-storage": "google-cloud-storage==1.43.0",
"gcsfs": "gcsfs>=2023.1.0",
"great-expectations": "great-expectations>=0.18.0,<0.18.14",
@ -198,6 +199,7 @@ plugins: Dict[str, Set[str]] = {
*COMMONS["datalake"],
},
"datalake-gcs": {
VERSIONS["google-cloud-monitoring"],
VERSIONS["google-cloud-storage"],
VERSIONS["gcsfs"],
*COMMONS["datalake"],

View File

@ -0,0 +1,63 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A client for Google Cloud Storage that supports multiple projects."""
from functools import partial
from typing import List, Optional, Type, Union
from google import auth
from google.cloud.monitoring_v3 import MetricServiceClient
from google.cloud.storage import Client
NoProject = object()
class MultiProjectClient:
"""Google Cloud Client does not support ad-hoc project switching. This class wraps the client and allows
switching between projects. If no project is specified, the client will not have a project set and will try
to resolve it from ADC.
Example usage:
```
from google.cloud.storage import Client
client = MultiProjectClient(Client, project_ids=["project1", "project2"])
buckets_project1 = client.list_buckets("project1")
buckets_project2 = client.list_buckets("project2")
"""
def __init__(
self,
client_class: Union[Type[Client], Type[MetricServiceClient]],
project_ids: Optional[List[str]] = None,
**client_kwargs,
):
if project_ids:
self.clients = {
project_id: client_class(project=project_id, **client_kwargs)
for project_id in project_ids
}
else:
self.clients = {NoProject: client_class(**client_kwargs)}
def project_ids(self):
if NoProject in self.clients:
_, project_id = auth.default()
return [project_id]
return list(self.clients.keys())
def __getattr__(self, client_method):
"""Return the underlying client method as a partial function so we can inject the project_id."""
return partial(self._call, client_method)
def _call(self, method, project_id, *args, **kwargs):
"""Call the method on the client for the given project_id. The args and kwargs are passed through."""
client = self.clients.get(project_id, self.clients.get(NoProject))
if not client:
raise ValueError(f"Project {project_id} not found")
return getattr(client, method)(*args, **kwargs)

View File

@ -0,0 +1,159 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GCS storage connection"""
from dataclasses import dataclass
from typing import Optional
from google.cloud.exceptions import NotFound
from google.cloud.monitoring_v3 import MetricServiceClient
from google.cloud.storage import Client
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.storage.gcsConnection import (
GcsConnection,
)
from metadata.generated.schema.security.credentials.gcpValues import (
GcpCredentialsValues,
SingleProjectId,
)
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
test_connection_steps,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.storage.gcs.client import MultiProjectClient
from metadata.utils.credentials import set_google_credentials
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass
class GcsObjectStoreClient:
storage_client: MultiProjectClient
metrics_client: MetricServiceClient
def get_connection(connection: GcsConnection):
set_google_credentials(connection.credentials)
project_ids = None
if isinstance(connection.credentials.gcpConfig, GcpCredentialsValues):
project_ids = (
[connection.credentials.gcpConfig.projectId.root]
if isinstance(connection.credentials.gcpConfig.projectId, SingleProjectId)
else connection.credentials.gcpConfig.projectId.root
)
return GcsObjectStoreClient(
storage_client=MultiProjectClient(client_class=Client, project_ids=project_ids),
metrics_client=MetricServiceClient(),
)
@dataclass
class BucketTestState:
project_id: str
bucket_name: str
blob_name: str = None
class Tester:
"""
A wrapper class that holds state. We need it because the different testing stages
are not independent of each other. For example, we need to list buckets before we can list
blobs within a bucket.
"""
def __init__(self, client: GcsObjectStoreClient, connection: GcsConnection):
self.client = client
self.connection = connection
self.bucket_tests = []
def list_buckets(self):
if self.connection.bucketNames:
for bucket_name in self.connection.bucketNames:
for project_id, client in self.client.storage_client.clients.items():
try:
client.get_bucket(bucket_name)
except NotFound:
continue
else:
self.bucket_tests.append(
BucketTestState(project_id, bucket_name)
)
break
else:
raise SourceConnectionException(
f"Bucket {bucket_name} not found in provided projects."
)
return
else:
for project_id, client in self.client.storage_client.clients.items():
bucket = next(client.list_buckets())
self.bucket_tests.append(BucketTestState(project_id, bucket.name))
def get_bucket(self):
if not self.bucket_tests:
raise SourceConnectionException("No buckets found in provided projects")
for bucket_test in self.bucket_tests:
client = self.client.storage_client.clients[bucket_test.project_id]
client.get_bucket(bucket_test.bucket_name)
def list_blobs(self):
if not self.bucket_tests:
raise SourceConnectionException("No buckets found in provided projects")
for bucket_test in self.bucket_tests:
client = self.client.storage_client.clients[bucket_test.project_id]
blob = next(client.list_blobs(bucket_test.bucket_name))
bucket_test.blob_name = blob.name
def get_blob(self):
if not self.bucket_tests:
raise SourceConnectionException("No buckets found in provided projects")
for bucket_test in self.bucket_tests:
client = self.client.storage_client.clients[bucket_test.project_id]
bucket = client.get_bucket(bucket_test.bucket_name)
bucket.get_blob(bucket_test.blob_name)
def get_metrics(self):
for project_id in self.client.storage_client.clients.keys():
self.client.metrics_client.list_metric_descriptors(
name=f"projects/{project_id}"
)
def test_connection(
metadata: OpenMetadata,
client: GcsObjectStoreClient,
service_connection: GcsConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
tester = Tester(client, service_connection)
test_fn = {
"ListBuckets": tester.list_buckets,
"GetBucket": tester.get_bucket,
"ListBlobs": tester.list_blobs,
"GetBlob": tester.get_blob,
"GetMetrics": tester.get_metrics,
}
test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)

View File

@ -0,0 +1,461 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GCS object store extraction metadata"""
import json
import secrets
import traceback
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, Iterable, List, Optional
from google.cloud.exceptions import NotFound
from google.cloud.monitoring_v3.types import TimeInterval
from pydantic import ValidationError
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
from metadata.generated.schema.entity.data import container
from metadata.generated.schema.entity.data.container import (
Container,
ContainerDataModel,
)
from metadata.generated.schema.entity.services.connections.database.datalake.gcsConfig import (
GCSConfig,
)
from metadata.generated.schema.entity.services.connections.storage.gcsConnection import (
GcsConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import (
MetadataEntry,
StorageContainerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.storage.gcs.models import (
GCSBucketResponse,
GCSContainerDetails,
)
from metadata.ingestion.source.storage.storage_service import (
KEY_SEPARATOR,
OPENMETADATA_TEMPLATE_FILE_NAME,
StorageServiceSource,
)
from metadata.readers.file.base import ReadException
from metadata.readers.file.config_source_factory import get_reader
from metadata.utils import fqn
from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class GCSMetric(Enum):
NUMBER_OF_OBJECTS = "storage.googleapis.com/storage/object_count"
BUCKET_SIZE_BYTES = "storage.googleapis.com/storage/total_bytes"
class GcsSource(StorageServiceSource):
"""
Source implementation to ingest GCS bucket data.
"""
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata)
self.gcs_clients = self.connection
self.gcs_readers = {
project_id: get_reader(config_source=GCSConfig(), client=client)
for project_id, client in self.gcs_clients.storage_client.clients.items()
}
self._bucket_cache: Dict[str, Container] = {}
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: GcsConnection = config.serviceConnection.root.config
if not isinstance(connection, GcsConnection):
raise InvalidSourceException(
f"Expected GcsConnection, but got {connection}"
)
return cls(config, metadata)
def get_containers(self) -> Iterable[GCSContainerDetails]:
bucket_results = self.fetch_buckets()
for bucket_response in bucket_results:
bucket_name = bucket_response.name
try:
# We always generate the parent container (the bucket)
yield self._generate_unstructured_container(
bucket_response=bucket_response
)
container_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
self.context.get().container,
)
)
container_entity = self.metadata.get_by_name(
entity=Container, fqn=container_fqn
)
self._bucket_cache[bucket_name] = container_entity
parent_entity: EntityReference = EntityReference(
id=self._bucket_cache[bucket_name].id.root, type="container"
)
if self.global_manifest:
manifest_entries_for_current_bucket = (
self._manifest_entries_to_metadata_entries_by_container(
container_name=bucket_name, manifest=self.global_manifest
)
)
# Check if we have entries in the manifest file belonging to this bucket
if manifest_entries_for_current_bucket:
# ingest all the relevant valid paths from it
yield from self._generate_structured_containers(
bucket_response=bucket_response,
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
# nothing else do to for the current bucket, skipping to the next
continue
# If no global file, or no valid entries in the manifest, check for bucket level metadata file
metadata_config = self._load_metadata_file(bucket=bucket_response)
if metadata_config:
yield from self._generate_structured_containers(
bucket_response=bucket_response,
entries=metadata_config.entries,
parent=parent_entity,
)
except ValidationError as err:
self.status.failed(
StackTraceError(
name=bucket_response.name,
error=f"Validation error while creating Container from bucket details - {err}",
stackTrace=traceback.format_exc(),
)
)
except Exception as err:
self.status.failed(
StackTraceError(
name=bucket_response.name,
error=f"Wild error while creating Container from bucket details - {err}",
stackTrace=traceback.format_exc(),
)
)
def yield_create_container_requests(
self, container_details: GCSContainerDetails
) -> Iterable[Either[CreateContainerRequest]]:
container_request = CreateContainerRequest(
name=container_details.name,
prefix=container_details.prefix,
numberOfObjects=container_details.number_of_objects,
size=container_details.size,
dataModel=container_details.data_model,
service=self.context.get().objectstore_service,
parent=container_details.parent,
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
fullPath=container_details.fullPath,
)
yield Either(right=container_request)
self.register_record(container_request=container_request)
def _generate_container_details(
self,
bucket_response: GCSBucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
) -> Optional[GCSContainerDetails]:
bucket_name = bucket_response.name
sample_key = self._get_sample_file_path(
bucket=bucket_response, metadata_entry=metadata_entry
)
# if we have a sample file to fetch a schema from
if sample_key:
columns = self._get_columns(
container_name=bucket_name,
sample_key=sample_key,
metadata_entry=metadata_entry,
config_source=GCSConfig(
securityConfig=self.service_connection.credentials
),
client=self.gcs_clients.storage_client.clients[
bucket_response.project_id
],
)
if columns:
prefix = (
f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
)
return GCSContainerDetails(
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
prefix=prefix,
creation_date=bucket_response.creation_date.isoformat()
if bucket_response.creation_date
else None,
number_of_objects=self._fetch_metric(
bucket=bucket_response, metric=GCSMetric.NUMBER_OF_OBJECTS
),
size=self._fetch_metric(
bucket=bucket_response, metric=GCSMetric.BUCKET_SIZE_BYTES
),
file_formats=[container.FileFormat(metadata_entry.structureFormat)],
data_model=ContainerDataModel(
isPartitioned=metadata_entry.isPartitioned, columns=columns
),
parent=parent,
fullPath=self._get_full_path(bucket_name, prefix),
sourceUrl=self._get_object_source_url(
bucket=bucket_response,
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
),
)
return None
def _generate_structured_containers(
self,
bucket_response: GCSBucketResponse,
entries: List[MetadataEntry],
parent: Optional[EntityReference] = None,
) -> List[GCSContainerDetails]:
result: List[GCSContainerDetails] = []
for metadata_entry in entries:
logger.info(
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating structured container"
)
structured_container: Optional[
GCSContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
result.append(structured_container)
return result
def _fetch_bucket(self, bucket_name: str) -> GCSBucketResponse:
for project_id, client in self.gcs_clients.storage_client.clients.items():
try:
bucket = client.get_bucket(bucket_name)
except NotFound:
continue
return GCSBucketResponse(
name=bucket.name,
project_id=project_id,
creation_date=bucket.time_created,
)
def fetch_buckets(self) -> List[GCSBucketResponse]:
results: List[GCSBucketResponse] = []
try:
if self.service_connection.bucketNames:
for bucket_name in self.service_connection.bucketNames:
bucket = self._fetch_bucket(bucket_name)
if bucket:
results.append(bucket)
return results
for project_id, client in self.gcs_clients.storage_client.clients.items():
for bucket in client.list_buckets():
if filter_by_container(
self.source_config.containerFilterPattern,
container_name=bucket.name,
):
self.status.filter(bucket.name, "Bucket Filtered Out")
else:
results.append(
GCSBucketResponse(
name=bucket.name,
project_id=project_id,
creation_date=bucket.time_created,
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to fetch buckets list - {err}")
return results
def _get_time_interval(self, days: int = 2):
end = datetime.now()
start = end - timedelta(days=days)
start_time = start.isoformat(timespec="seconds") + "Z"
end_time = end.isoformat(timespec="seconds") + "Z"
return TimeInterval(end_time=end_time, start_time=start_time)
def _fetch_metric(self, bucket: GCSBucketResponse, metric: GCSMetric) -> float:
try:
filters = [
f'resource.labels.bucket_name="{bucket.name}"',
f'metric.type="{metric.value}"',
]
filter_ = " AND ".join(filters)
interval = self._get_time_interval()
timeseries = self.gcs_clients.metrics_client.list_time_series(
name=f"projects/{bucket.project_id}", filter=filter_, interval=interval
)
point = list(timeseries)[-1].points[-1]
return point.value.int64_value
except Exception:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed fetching metric {metric.value} for bucket {bucket.name}, returning 0"
)
return 0
def _generate_unstructured_container(
self, bucket_response: GCSBucketResponse
) -> GCSContainerDetails:
return GCSContainerDetails(
name=bucket_response.name,
prefix=KEY_SEPARATOR,
creation_date=bucket_response.creation_date.isoformat()
if bucket_response.creation_date
else None,
number_of_objects=self._fetch_metric(
bucket=bucket_response, metric=GCSMetric.NUMBER_OF_OBJECTS
),
size=self._fetch_metric(
bucket=bucket_response, metric=GCSMetric.BUCKET_SIZE_BYTES
),
file_formats=[],
data_model=None,
fullPath=self._get_full_path(bucket_name=bucket_response.name),
sourceUrl=self._get_bucket_source_url(bucket=bucket_response),
)
def _clean_path(self, path: str) -> str:
return path.strip(KEY_SEPARATOR)
def _get_full_path(self, bucket_name: str, prefix: str = None) -> Optional[str]:
"""
Method to get the full path of the file
"""
if bucket_name is None:
return None
full_path = f"gs://{self._clean_path(bucket_name)}"
if prefix:
full_path += f"/{self._clean_path(prefix)}"
return full_path
def _get_sample_file_path(
self, bucket: GCSBucketResponse, metadata_entry: MetadataEntry
) -> Optional[str]:
"""
Given a bucket and a metadata entry, returns the full path key to a file which can then be used to infer schema
or None in the case of a non-structured metadata entry, or if no such keys can be found
"""
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
try:
if prefix:
client = self.gcs_clients.storage_client.clients[bucket.project_id]
response = client.list_blobs(
bucket.name,
prefix=prefix,
max_results=1000,
)
candidate_keys = [
entry.name
for entry in response
if entry.name.endswith(metadata_entry.structureFormat)
]
# pick a random key out of the candidates if any were returned
if candidate_keys:
result_key = secrets.choice(candidate_keys)
logger.info(
f"File {result_key} was picked to infer data structure from."
)
return result_key
logger.warning(
f"No sample files found in {prefix} with {metadata_entry.structureFormat} extension"
)
return None
except Exception:
logger.debug(traceback.format_exc())
logger.warning(
f"Error when trying to list objects in GCS bucket {bucket.name} at prefix {prefix}"
)
return None
def _get_bucket_source_url(self, bucket: GCSBucketResponse) -> Optional[str]:
"""
Method to get the source url of GCS bucket
"""
try:
return (
f"https://console.cloud.google.com/storage/browser/{bucket.name}"
f";tab=objects?project={bucket.project_id}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get source url: {exc}")
return None
def _get_object_source_url(
self, bucket: GCSBucketResponse, prefix: str
) -> Optional[str]:
"""
Method to get the source url of GCS object
"""
try:
return (
f"https://console.cloud.google.com/storage/browser/_details/{bucket.name}/{prefix}"
f";tab=live_object?project={bucket.project_id}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get source url: {exc}")
return None
def _load_metadata_file(
self, bucket: GCSBucketResponse
) -> Optional[StorageContainerConfig]:
"""
Load the metadata template file from the root of the bucket, if it exists
"""
try:
logger.info(
f"Looking for metadata template file at - gs://{bucket.name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
)
reader = self.gcs_readers.get(bucket.project_id)
response_object = reader.read(
path=OPENMETADATA_TEMPLATE_FILE_NAME,
bucket_name=bucket.name,
verbose=False,
)
content = json.loads(response_object)
metadata_config = StorageContainerConfig.parse_obj(content)
return metadata_config
except ReadException:
logger.warning(
f"No metadata file found at gs://{bucket.name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed loading metadata file gs://{bucket.name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}"
)
return None

View File

@ -0,0 +1,86 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
GCS custom pydantic models
"""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Extra, Field
from metadata.generated.schema.entity.data.container import (
ContainerDataModel,
FileFormat,
)
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.entityReference import EntityReference
class GCSBucketResponse(BaseModel):
"""
Class modelling a response received from gcs_client.list_buckets operation
"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name")
project_id: str = Field(..., description="Project ID")
creation_date: Optional[datetime] = Field(
None,
description="Timestamp of Bucket creation in ISO format",
)
class GCSContainerDetails(BaseModel):
"""
Class mapping container details used to create the container requests
"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name")
prefix: str = Field(..., description="Prefix for the container")
description: Optional[basic.Markdown] = Field(
None, description="Description of the container instance."
)
number_of_objects: float = Field(
...,
description="Total nr. of objects",
)
size: float = Field(
...,
description="Total size in bytes of all objects",
title="Total size(bytes) of objects",
)
file_formats: Optional[List[FileFormat]] = Field(
...,
description="File formats",
)
data_model: Optional[ContainerDataModel] = Field(
...,
description="Data Model of the container",
)
creation_date: Optional[str] = Field(
None,
description="Timestamp of Bucket creation in ISO format",
)
parent: Optional[EntityReference] = Field(
None,
description="Reference to the parent container",
)
sourceUrl: Optional[basic.SourceUrl] = Field(
None, description="Source URL of the container."
)
fullPath: Optional[str] = Field(
None, description="Full path of the container/file."
)

View File

@ -99,7 +99,7 @@ class GCPSecretsManager(ExternalSecretsManager, ABC):
# Build the resource name of the secret version.
project_id = self.credentials.gcpConfig.projectId.__root__
project_id = self.credentials.gcpConfig.projectId.root
secret_id = (
f"projects/{project_id}/secrets/{secret_id}/versions/{FIXED_VERSION_ID}"
)

View File

@ -0,0 +1,455 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for GCS Object store source
"""
import datetime
import uuid
from collections import namedtuple
from typing import List
from unittest import TestCase
from unittest.mock import patch
import pandas as pd
from metadata.generated.schema.entity.data.container import (
ContainerDataModel,
FileFormat,
)
from metadata.generated.schema.entity.data.table import Column, ColumnName, DataType
from metadata.generated.schema.entity.services.connections.database.datalake.gcsConfig import (
GCSConfig,
)
from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import (
MetadataEntry,
StorageContainerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import SourceUrl
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.source.storage.gcs.metadata import (
GCSBucketResponse,
GCSContainerDetails,
GcsSource,
)
from metadata.ingestion.source.storage.storage_service import (
OPENMETADATA_TEMPLATE_FILE_NAME,
)
from metadata.readers.file.base import ReadException
from metadata.readers.file.config_source_factory import get_reader
MockBucketResponse = namedtuple("MockBucketResponse", ["name", "time_created"])
MockObjectFilePath = namedtuple("MockObjectFilePath", ["name"])
MOCK_OBJECT_STORE_CONFIG = {
"source": {
"type": "gcs",
"serviceName": "gcs_test",
"serviceConnection": {
"config": {
"type": "GCS",
"credentials": {
"gcpConfig": {
"type": "service_account",
"projectId": "my-gcp-project",
"privateKeyId": "private_key_id",
# this is a valid key that was generated on a local machine and is not used for any real project
"privateKey": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEAw3vHG9fDIkcYB0xi2Mv4fS2gUzKR9ZRrcVNeKkqGFTT71AVB\nOzgIqYVe8b2aWODuNye6sipcrqTqOt05Esj+sxhk5McM9bE2RlxXC5QH/Bp9zxMP\n/Yksv9Ov7fdDt/loUk7sTXvI+7LDJfmRYU6MtVjyyLs7KpQIB2xBWEToU1xZY+v0\ndRC1NA+YWc+FjXbAiFAf9d4gXkYO8VmU5meixVh4C8nsjokEXk0T/HEItpZCxadk\ndZ7LKUE/HDmWCO2oNG6sCf4ET2crjSdYIfXuREopX1aQwnk7KbI4/YIdlRz1I369\nAz3+Hxlf9lLJVH3+itN4GXrR9yWWKWKDnwDPbQIDAQABAoIBAQC3X5QuTR7SN8iV\niBUtc2D84+ECSmza5shG/UJW/6N5n0Mf53ICgBS4GNEwiYCRISa0/ILIgK6CcVb7\nsuvH8F3kWNzEMui4TO0x4YsR5GH9HkioCCS224frxkLBQnL20HIIy9ok8Rpe6Zjg\nNZUnp4yczPyqSeA9l7FUbTt69uDM2Cx61m8REOpFukpnYLyZGbmNPYmikEO+rq9r\nwNID5dkSeVuQYo4MQdRavOGFUWvUYXzkEQ0A6vPyraVBfolESX8WaLNVjic7nIa3\nujdSNojnJqGJ3gslntcmN1d4JOfydc4bja4/NdNlcOHpWDGLzY1QnaDe0Koxn8sx\nLT9MVD2NAoGBAPy7r726bKVGWcwqTzUuq1OWh5c9CAc4N2zWBBldSJyUdllUq52L\nWTyva6GRoRzCcYa/dKLLSM/k4eLf9tpxeIIfTOMsvzGtbAdm257ndMXNvfYpxCfU\nK/gUFfAUGHZ3MucTHRY6DTkJg763Sf6PubA2fqv3HhVZDK/1HGDtHlTPAoGBAMYC\npdV7O7lAyXS/d9X4PQZ4BM+P8MbXEdGBbPPlzJ2YIb53TEmYfSj3z41u9+BNnhGP\n4uzUyAR/E4sxrA2+Ll1lPSCn+KY14WWiVGfWmC5j1ftdpkbrXstLN8NpNYzrKZwx\njdR0ZkwvZ8B5+kJ1hK96giwWS+SJxJR3TohcQ18DAoGAJSfmv2r//BBqtURnHrd8\nwq43wvlbC8ytAVg5hA0d1r9Q4vM6w8+vz+cuWLOTTyobDKdrG1/tlXrd5r/sh9L0\n15SIdkGm3kPTxQbPNP5sQYRs8BrV1tEvoao6S3B45DnEBwrdVN42AXOvpcNGoqE4\nuHpahyeuiY7s+ZV8lZdmxSsCgYEAolr5bpmk1rjwdfGoaKEqKGuwRiBX5DHkQkxE\n8Zayt2VOBcX7nzyRI05NuEIMrLX3rZ61CktN1aH8fF02He6aRaoE/Qm9L0tujM8V\nNi8WiLMDeR/Ifs3u4/HAv1E8v1byv0dCa7klR8J257McJ/ID4X4pzcxaXgE4ViOd\nGOHNu9ECgYEApq1zkZthEQymTUxs+lSFcubQpaXyf5ZC61cJewpWkqGDtSC+8DxE\nF/jydybWuoNHXymnvY6QywxuIooivbuib6AlgpEJeybmnWlDOZklFOD0abNZ+aNO\ndUk7XVGffCakXQ0jp1kmZA4lGsYK1h5dEU5DgXqu4UYJ88Vttax2W+Y=\n-----END RSA PRIVATE KEY-----\n",
"clientEmail": "gcpuser@project_id.iam.gserviceaccount.com",
"clientId": "client_id",
"authUri": "https://accounts.google.com/o/oauth2/auth",
"tokenUri": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
}
},
}
},
"sourceConfig": {
"config": {
"type": "StorageMetadata",
"containerFilterPattern": {"includes": ["^test_*"]},
"storageMetadataConfigSource": {
"prefixConfig": {
"containerName": "test_bucket",
"objectPrefix": "manifest",
},
},
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
}
},
}
MOCK_BUCKETS_RESPONSE = [
MockBucketResponse(
name="test_transactions", time_created=datetime.datetime(2000, 1, 1)
),
MockBucketResponse(name="test_sales", time_created=datetime.datetime(2000, 2, 2)),
MockBucketResponse(name="events", time_created=datetime.datetime(2000, 3, 3)),
]
MOCK_METADATA_FILE_RESPONSE = {
"entries": [
{
"dataPath": "transactions",
"structureFormat": "csv",
"isPartitioned": False,
}
]
}
EXPECTED_BUCKETS: List[GCSBucketResponse] = [
GCSBucketResponse(
name="test_transactions",
project_id="my-gcp-project",
creation_date=datetime.datetime(2000, 1, 1),
),
GCSBucketResponse(
name="test_sales",
project_id="my-gcp-project",
creation_date=datetime.datetime(2000, 2, 2),
),
]
MOCK_OBJECT_FILE_PATHS = [
MockObjectFilePath(name="transactions/transactions_1.csv"),
MockObjectFilePath(name="transactions/transactions_2.csv"),
]
def _get_str_value(data):
if data:
if isinstance(data, str):
return data
return data.value
return None
def custom_column_compare(self, other):
return (
self.name == other.name
and self.displayName == other.displayName
and self.description == other.description
and self.dataTypeDisplay == other.dataTypeDisplay
and self.children == other.children
and _get_str_value(self.arrayDataType) == _get_str_value(other.arrayDataType)
)
class StorageUnitTest(TestCase):
"""
Validate how we work with object store metadata
"""
@patch(
"metadata.ingestion.source.storage.storage_service.StorageServiceSource.test_connection"
)
def __init__(self, method_name: str, test_connection) -> None:
super().__init__(method_name)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.model_validate(
MOCK_OBJECT_STORE_CONFIG
)
# This already validates that the source can be initialized
self.object_store_source = GcsSource.create(
MOCK_OBJECT_STORE_CONFIG["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.gcs_reader = get_reader(
config_source=GCSConfig(),
client=self.object_store_source.gcs_clients.storage_client.clients[
"my-gcp-project"
],
)
def test_create_from_invalid_source(self):
"""
An invalid config raises an error
"""
not_object_store_source = {
"type": "gcs",
"serviceName": "mysql_local",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"authType": {"password": "openmetadata_password"},
"hostPort": "localhost:3306",
"databaseSchema": "openmetadata_db",
}
},
"sourceConfig": {
"config": {
"type": "StorageMetadata",
"storageMetadataConfigSource": {
"prefixConfig": {
"containerName": "test_bucket",
"objectPrefix": "manifest",
},
},
}
},
}
self.assertRaises(
InvalidSourceException,
GcsSource.create,
not_object_store_source,
self.config.workflowConfig.openMetadataServerConfig,
)
def test_gcs_buckets_fetching(self):
self.object_store_source.gcs_clients.storage_client.clients[
"my-gcp-project"
].list_buckets = lambda: MOCK_BUCKETS_RESPONSE
self.assertListEqual(self.object_store_source.fetch_buckets(), EXPECTED_BUCKETS)
def test_load_metadata_file_gcs(self):
metadata_entry: List[MetadataEntry] = self.return_metadata_entry()
self.assertEqual(1, len(metadata_entry))
self.assertEqual(
MetadataEntry(
dataPath="transactions",
structureFormat="csv",
isPartitioned=False,
),
metadata_entry[0],
)
def test_no_metadata_file_returned_when_file_not_present(self):
with self.assertRaises(ReadException):
self.gcs_reader.read(
path=OPENMETADATA_TEMPLATE_FILE_NAME,
bucket_name="test",
verbose=False,
)
def test_generate_unstructured_container(self):
bucket_response = GCSBucketResponse(
name="test_bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2000, 1, 1),
)
self.object_store_source._fetch_metric = lambda bucket, metric: 100.0
self.assertEqual(
GCSContainerDetails(
name=bucket_response.name,
prefix="/",
number_of_objects=100,
size=100,
file_formats=[],
data_model=None,
creation_date=bucket_response.creation_date.isoformat(),
sourceUrl=SourceUrl(
"https://console.cloud.google.com/storage/browser/test_bucket;tab=objects?project=my-gcp-project"
),
fullPath="gs://test_bucket",
),
self.object_store_source._generate_unstructured_container(
bucket_response=bucket_response
),
)
def test_generate_structured_container(self):
self.object_store_source._get_sample_file_path = (
lambda bucket, metadata_entry: "transactions/file_1.csv"
)
self.object_store_source._fetch_metric = lambda bucket, metric: 100.0
columns: List[Column] = [
Column(
name=ColumnName("transaction_id"),
dataType=DataType.INT,
dataTypeDisplay="INT",
displayName="transaction_id",
),
Column(
name=ColumnName("transaction_value"),
dataType=DataType.INT,
dataTypeDisplay="INT",
displayName="transaction_value",
),
]
self.object_store_source.extract_column_definitions = (
lambda bucket_name, sample_key, config_source, client, metadata_entry: columns
)
entity_ref = EntityReference(id=uuid.uuid4(), type="container")
self.assertEqual(
GCSContainerDetails(
name="transactions",
prefix="/transactions",
number_of_objects=100,
size=100,
file_formats=[FileFormat.csv],
data_model=ContainerDataModel(isPartitioned=False, columns=columns),
creation_date=datetime.datetime(2000, 1, 1).isoformat(),
parent=entity_ref,
sourceUrl=SourceUrl(
f"https://console.cloud.google.com/storage/browser/_details/test_bucket/transactions;tab=live_object?project=my-gcp-project"
),
fullPath="gs://test_bucket/transactions",
),
self.object_store_source._generate_container_details(
GCSBucketResponse(
name="test_bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2000, 1, 1),
),
MetadataEntry(
dataPath="transactions",
structureFormat="csv",
isPartitioned=False,
),
parent=entity_ref,
),
)
# Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation
def test_extract_column_definitions(self):
with patch(
"metadata.ingestion.source.storage.storage_service.fetch_dataframe",
return_value=(
[
pd.DataFrame.from_dict(
[
{"transaction_id": 1, "transaction_value": 100},
{"transaction_id": 2, "transaction_value": 200},
{"transaction_id": 3, "transaction_value": 300},
]
)
],
None,
),
):
Column.__eq__ = custom_column_compare
self.assertListEqual(
[
Column(
name=ColumnName("transaction_id"),
dataType=DataType.INT,
dataTypeDisplay="INT",
displayName="transaction_id",
),
Column(
name=ColumnName("transaction_value"),
dataType=DataType.INT,
dataTypeDisplay="INT",
displayName="transaction_value",
),
],
self.object_store_source.extract_column_definitions(
bucket_name="test_bucket",
sample_key="test.json",
config_source=None,
client=None,
metadata_entry=self.return_metadata_entry()[0],
),
)
def test_get_sample_file_prefix_for_structured_and_partitioned_metadata(self):
input_metadata = MetadataEntry(
dataPath="transactions",
structureFormat="parquet",
isPartitioned=True,
partitionColumns=[Column(name="date", dataType=DataType.DATE)],
)
self.assertEqual(
"transactions/",
self.object_store_source._get_sample_file_prefix(
metadata_entry=input_metadata
),
)
def test_get_sample_file_prefix_for_unstructured_metadata(self):
input_metadata = MetadataEntry(dataPath="transactions")
self.assertIsNone(
self.object_store_source._get_sample_file_prefix(
metadata_entry=input_metadata
)
)
def test_get_sample_file_prefix_for_structured_and_not_partitioned_metadata(self):
input_metadata = MetadataEntry(
dataPath="transactions",
structureFormat="csv",
isPartitioned=False,
)
self.assertEqual(
"transactions/",
self.object_store_source._get_sample_file_prefix(
metadata_entry=input_metadata
),
)
def test_get_sample_file_path_with_invalid_prefix(self):
self.object_store_source._get_sample_file_prefix = (
lambda metadata_entry: "/transactions"
)
self.assertIsNone(
self.object_store_source._get_sample_file_path(
bucket=GCSBucketResponse(
name="test_bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2000, 1, 1),
),
metadata_entry=MetadataEntry(
dataPath="invalid_path",
structureFormat="csv",
isPartitioned=False,
),
)
)
def test_get_sample_file_path_randomly(self):
self.object_store_source._get_sample_file_prefix = (
lambda metadata_entry: "/transactions"
)
self.object_store_source.gcs_clients.storage_client.clients[
"my-gcp-project"
].list_blobs = lambda bucket, prefix, max_results: MOCK_OBJECT_FILE_PATHS
candidate = self.object_store_source._get_sample_file_path(
bucket=GCSBucketResponse(
name="test_bucket",
project_id="my-gcp-project",
creation_date=datetime.datetime(2000, 1, 1),
),
metadata_entry=MetadataEntry(
dataPath="/transactions",
structureFormat="csv",
isPartitioned=False,
),
)
self.assertTrue(
candidate
in [
"transactions/transactions_1.csv",
"transactions/transactions_2.csv",
"transactions/",
]
)
def return_metadata_entry(self):
container_config = StorageContainerConfig.model_validate(
MOCK_METADATA_FILE_RESPONSE
)
return container_config.entries

View File

@ -11,16 +11,16 @@
"mandatory": true
},
{
"name": "ListBlobs",
"description": "List all the blobs available to the user.",
"errorMessage": "Failed to fetch blobs, please validate the credentials if the user has access to list blobs",
"name": "GetBucket",
"description": "Get the bucket available to the user.",
"errorMessage": "Failed to fetch bucket, please validate the credentials if the user has access to get bucket",
"shortCircuit": true,
"mandatory": true
},
{
"name": "GetBucket",
"description": "Get the bucket available to the user.",
"errorMessage": "Failed to fetch bucket, please validate the credentials if the user has access to get bucket",
"name": "ListBlobs",
"description": "List all the blobs available to the user.",
"errorMessage": "Failed to fetch blobs, please validate the credentials if the user has access to list blobs",
"shortCircuit": true,
"mandatory": true
},
@ -30,6 +30,12 @@
"errorMessage": "Failed to fetch blobs, please validate the credentials if the user has access to get blob",
"shortCircuit": true,
"mandatory": true
},
{
"name": "GetMetrics",
"description": "Get Google Cloud bucket resource metrics.",
"errorMessage": "Failed to fetch Google Cloud bucket resource metrics, please validate if user has access to fetch these metrics",
"mandatory": false
}
]
}

View File

@ -25,6 +25,15 @@
"description": "GCP Credentials",
"$ref": "../../../../security/credentials/gcpCredentials.json"
},
"bucketNames": {
"title": "Bucket Names",
"description": "Bucket Names of the data source.",
"type": "array",
"items": {
"type": "string"
},
"default": null
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"

View File

@ -11,7 +11,7 @@
* limitations under the License.
*/
import { capitalize, cloneDeep, toLower } from 'lodash';
import { capitalize, toLower } from 'lodash';
import {
AIRBYTE,
AIRFLOW,
@ -23,7 +23,6 @@ import {
BIGQUERY,
BIGTABLE,
CLICKHOUSE,
COMMON_UI_SCHEMA,
COUCHBASE,
CUSTOM_STORAGE_DEFAULT,
DAGSTER,
@ -41,6 +40,7 @@ import {
ELASTIC_SEARCH,
FIVETRAN,
FLINK,
GCS,
GLUE,
GREENPLUM,
HIVE,
@ -114,8 +114,6 @@ import { SearchServiceType } from '../generated/entity/data/searchIndex';
import { MessagingServiceType } from '../generated/entity/data/topic';
import { MetadataServiceType } from '../generated/entity/services/metadataService';
import { SearchSourceAlias } from '../interface/search.interface';
import customConnection from '../jsons/connectionSchemas/connections/storage/customStorageConnection.json';
import s3Connection from '../jsons/connectionSchemas/connections/storage/s3Connection.json';
import { getDashboardConfig } from './DashboardServiceUtils';
import { getDatabaseConfig } from './DatabaseServiceUtils';
import { getMessagingConfig } from './MessagingServiceUtils';
@ -123,6 +121,7 @@ import { getMetadataConfig } from './MetadataServiceUtils';
import { getMlmodelConfig } from './MlmodelServiceUtils';
import { getPipelineConfig } from './PipelineServiceUtils';
import { getSearchServiceConfig } from './SearchServiceUtils';
import { getStorageConfig } from './StorageServiceUtils';
import { customServiceComparator } from './StringsUtils';
class ServiceUtilClassBase {
@ -130,7 +129,6 @@ class ServiceUtilClassBase {
StorageServiceType.Adls,
DatabaseServiceType.QueryLog,
DatabaseServiceType.Dbt,
StorageServiceType.Gcs,
MetadataServiceType.Alation,
];
@ -438,6 +436,9 @@ class ServiceUtilClassBase {
case this.StorageServiceTypeSmallCase.S3:
return AMAZON_S3;
case this.StorageServiceTypeSmallCase.Gcs:
return GCS;
case this.SearchServiceTypeSmallCase.ElasticSearch:
return ELASTIC_SEARCH;
@ -594,25 +595,6 @@ class ServiceUtilClassBase {
}
};
public getStorageServiceConfig(type: StorageServiceType) {
let schema = {};
const uiSchema = { ...COMMON_UI_SCHEMA };
switch (type) {
case StorageServiceType.S3: {
schema = s3Connection;
break;
}
case StorageServiceType.CustomStorage: {
schema = customConnection;
break;
}
}
return cloneDeep({ schema, uiSchema });
}
public getPipelineServiceConfig(type: PipelineServiceType) {
return getPipelineConfig(type);
}
@ -637,6 +619,10 @@ class ServiceUtilClassBase {
return getSearchServiceConfig(type);
}
public getStorageServiceConfig(type: StorageServiceType) {
return getStorageConfig(type);
}
public getMetadataServiceConfig(type: MetadataServiceType) {
return getMetadataConfig(type);
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2024 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { StorageServiceType } from '../generated/entity/services/storageService';
import customConnection from '../jsons/connectionSchemas/connections/storage/customStorageConnection.json';
import gcsConnection from '../jsons/connectionSchemas/connections/storage/gcsConnection.json';
import s3Connection from '../jsons/connectionSchemas/connections/storage/s3Connection.json';
export const getStorageConfig = (type: StorageServiceType) => {
let schema = {};
const uiSchema = { ...COMMON_UI_SCHEMA };
switch (type as unknown as StorageServiceType) {
case StorageServiceType.S3: {
schema = s3Connection;
break;
}
case StorageServiceType.Gcs: {
schema = gcsConnection;
break;
}
case StorageServiceType.CustomStorage: {
schema = customConnection;
break;
}
}
return cloneDeep({ schema, uiSchema });
};