diff --git a/ingestion/setup.py b/ingestion/setup.py index a7b7df1d18c..1e5eb8d3a11 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -24,6 +24,7 @@ VERSIONS = { "avro": "avro>=1.11.3,<1.12", "boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3 "geoalchemy2": "GeoAlchemy2~=0.12", + "google-cloud-monitoring": "google-cloud-monitoring>=2.0.0", "google-cloud-storage": "google-cloud-storage==1.43.0", "gcsfs": "gcsfs>=2023.1.0", "great-expectations": "great-expectations>=0.18.0,<0.18.14", @@ -198,6 +199,7 @@ plugins: Dict[str, Set[str]] = { *COMMONS["datalake"], }, "datalake-gcs": { + VERSIONS["google-cloud-monitoring"], VERSIONS["google-cloud-storage"], VERSIONS["gcsfs"], *COMMONS["datalake"], diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs/client.py b/ingestion/src/metadata/ingestion/source/storage/gcs/client.py new file mode 100644 index 00000000000..0f6f6119019 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/gcs/client.py @@ -0,0 +1,63 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A client for Google Cloud Storage that supports multiple projects.""" +from functools import partial +from typing import List, Optional, Type, Union + +from google import auth +from google.cloud.monitoring_v3 import MetricServiceClient +from google.cloud.storage import Client + +NoProject = object() + + +class MultiProjectClient: + """Google Cloud Client does not support ad-hoc project switching. This class wraps the client and allows + switching between projects. If no project is specified, the client will not have a project set and will try + to resolve it from ADC. + Example usage: + ``` + from google.cloud.storage import Client + client = MultiProjectClient(Client, project_ids=["project1", "project2"]) + buckets_project1 = client.list_buckets("project1") + buckets_project2 = client.list_buckets("project2") + """ + + def __init__( + self, + client_class: Union[Type[Client], Type[MetricServiceClient]], + project_ids: Optional[List[str]] = None, + **client_kwargs, + ): + if project_ids: + self.clients = { + project_id: client_class(project=project_id, **client_kwargs) + for project_id in project_ids + } + else: + self.clients = {NoProject: client_class(**client_kwargs)} + + def project_ids(self): + if NoProject in self.clients: + _, project_id = auth.default() + return [project_id] + return list(self.clients.keys()) + + def __getattr__(self, client_method): + """Return the underlying client method as a partial function so we can inject the project_id.""" + return partial(self._call, client_method) + + def _call(self, method, project_id, *args, **kwargs): + """Call the method on the client for the given project_id. The args and kwargs are passed through.""" + client = self.clients.get(project_id, self.clients.get(NoProject)) + if not client: + raise ValueError(f"Project {project_id} not found") + return getattr(client, method)(*args, **kwargs) diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py b/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py new file mode 100644 index 00000000000..a11d156c838 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py @@ -0,0 +1,159 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""GCS storage connection""" +from dataclasses import dataclass +from typing import Optional + +from google.cloud.exceptions import NotFound +from google.cloud.monitoring_v3 import MetricServiceClient +from google.cloud.storage import Client + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.storage.gcsConnection import ( + GcsConnection, +) +from metadata.generated.schema.security.credentials.gcpValues import ( + GcpCredentialsValues, + SingleProjectId, +) +from metadata.ingestion.connections.test_connections import ( + SourceConnectionException, + test_connection_steps, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.storage.gcs.client import MultiProjectClient +from metadata.utils.credentials import set_google_credentials +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +@dataclass +class GcsObjectStoreClient: + storage_client: MultiProjectClient + metrics_client: MetricServiceClient + + +def get_connection(connection: GcsConnection): + set_google_credentials(connection.credentials) + project_ids = None + if isinstance(connection.credentials.gcpConfig, GcpCredentialsValues): + project_ids = ( + [connection.credentials.gcpConfig.projectId.root] + if isinstance(connection.credentials.gcpConfig.projectId, SingleProjectId) + else connection.credentials.gcpConfig.projectId.root + ) + return GcsObjectStoreClient( + storage_client=MultiProjectClient(client_class=Client, project_ids=project_ids), + metrics_client=MetricServiceClient(), + ) + + +@dataclass +class BucketTestState: + project_id: str + bucket_name: str + blob_name: str = None + + +class Tester: + """ + A wrapper class that holds state. We need it because the different testing stages + are not independent of each other. For example, we need to list buckets before we can list + blobs within a bucket. + """ + + def __init__(self, client: GcsObjectStoreClient, connection: GcsConnection): + self.client = client + self.connection = connection + self.bucket_tests = [] + + def list_buckets(self): + if self.connection.bucketNames: + for bucket_name in self.connection.bucketNames: + for project_id, client in self.client.storage_client.clients.items(): + try: + client.get_bucket(bucket_name) + except NotFound: + continue + else: + self.bucket_tests.append( + BucketTestState(project_id, bucket_name) + ) + break + else: + raise SourceConnectionException( + f"Bucket {bucket_name} not found in provided projects." + ) + return + else: + for project_id, client in self.client.storage_client.clients.items(): + bucket = next(client.list_buckets()) + self.bucket_tests.append(BucketTestState(project_id, bucket.name)) + + def get_bucket(self): + if not self.bucket_tests: + raise SourceConnectionException("No buckets found in provided projects") + for bucket_test in self.bucket_tests: + client = self.client.storage_client.clients[bucket_test.project_id] + client.get_bucket(bucket_test.bucket_name) + + def list_blobs(self): + if not self.bucket_tests: + raise SourceConnectionException("No buckets found in provided projects") + for bucket_test in self.bucket_tests: + client = self.client.storage_client.clients[bucket_test.project_id] + blob = next(client.list_blobs(bucket_test.bucket_name)) + bucket_test.blob_name = blob.name + + def get_blob(self): + if not self.bucket_tests: + raise SourceConnectionException("No buckets found in provided projects") + for bucket_test in self.bucket_tests: + client = self.client.storage_client.clients[bucket_test.project_id] + bucket = client.get_bucket(bucket_test.bucket_name) + bucket.get_blob(bucket_test.blob_name) + + def get_metrics(self): + for project_id in self.client.storage_client.clients.keys(): + self.client.metrics_client.list_metric_descriptors( + name=f"projects/{project_id}" + ) + + +def test_connection( + metadata: OpenMetadata, + client: GcsObjectStoreClient, + service_connection: GcsConnection, + automation_workflow: Optional[AutomationWorkflow] = None, +) -> None: + """ + Test connection. This can be executed either as part + of a metadata workflow or during an Automation Workflow + """ + tester = Tester(client, service_connection) + + test_fn = { + "ListBuckets": tester.list_buckets, + "GetBucket": tester.get_bucket, + "ListBlobs": tester.list_blobs, + "GetBlob": tester.get_blob, + "GetMetrics": tester.get_metrics, + } + + test_connection_steps( + metadata=metadata, + test_fn=test_fn, + service_type=service_connection.type.value, + automation_workflow=automation_workflow, + ) diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs/metadata.py b/ingestion/src/metadata/ingestion/source/storage/gcs/metadata.py new file mode 100644 index 00000000000..bd89d3a4321 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/gcs/metadata.py @@ -0,0 +1,461 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""GCS object store extraction metadata""" +import json +import secrets +import traceback +from datetime import datetime, timedelta +from enum import Enum +from typing import Dict, Iterable, List, Optional + +from google.cloud.exceptions import NotFound +from google.cloud.monitoring_v3.types import TimeInterval +from pydantic import ValidationError + +from metadata.generated.schema.api.data.createContainer import CreateContainerRequest +from metadata.generated.schema.entity.data import container +from metadata.generated.schema.entity.data.container import ( + Container, + ContainerDataModel, +) +from metadata.generated.schema.entity.services.connections.database.datalake.gcsConfig import ( + GCSConfig, +) +from metadata.generated.schema.entity.services.connections.storage.gcsConnection import ( + GcsConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import ( + MetadataEntry, + StorageContainerConfig, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.storage.gcs.models import ( + GCSBucketResponse, + GCSContainerDetails, +) +from metadata.ingestion.source.storage.storage_service import ( + KEY_SEPARATOR, + OPENMETADATA_TEMPLATE_FILE_NAME, + StorageServiceSource, +) +from metadata.readers.file.base import ReadException +from metadata.readers.file.config_source_factory import get_reader +from metadata.utils import fqn +from metadata.utils.filters import filter_by_container +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class GCSMetric(Enum): + NUMBER_OF_OBJECTS = "storage.googleapis.com/storage/object_count" + BUCKET_SIZE_BYTES = "storage.googleapis.com/storage/total_bytes" + + +class GcsSource(StorageServiceSource): + """ + Source implementation to ingest GCS bucket data. + """ + + def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + super().__init__(config, metadata) + self.gcs_clients = self.connection + self.gcs_readers = { + project_id: get_reader(config_source=GCSConfig(), client=client) + for project_id, client in self.gcs_clients.storage_client.clients.items() + } + self._bucket_cache: Dict[str, Container] = {} + + @classmethod + def create( + cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None + ): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: GcsConnection = config.serviceConnection.root.config + if not isinstance(connection, GcsConnection): + raise InvalidSourceException( + f"Expected GcsConnection, but got {connection}" + ) + return cls(config, metadata) + + def get_containers(self) -> Iterable[GCSContainerDetails]: + bucket_results = self.fetch_buckets() + + for bucket_response in bucket_results: + bucket_name = bucket_response.name + try: + # We always generate the parent container (the bucket) + yield self._generate_unstructured_container( + bucket_response=bucket_response + ) + container_fqn = fqn._build( # pylint: disable=protected-access + *( + self.context.get().objectstore_service, + self.context.get().container, + ) + ) + container_entity = self.metadata.get_by_name( + entity=Container, fqn=container_fqn + ) + self._bucket_cache[bucket_name] = container_entity + parent_entity: EntityReference = EntityReference( + id=self._bucket_cache[bucket_name].id.root, type="container" + ) + if self.global_manifest: + manifest_entries_for_current_bucket = ( + self._manifest_entries_to_metadata_entries_by_container( + container_name=bucket_name, manifest=self.global_manifest + ) + ) + # Check if we have entries in the manifest file belonging to this bucket + if manifest_entries_for_current_bucket: + # ingest all the relevant valid paths from it + yield from self._generate_structured_containers( + bucket_response=bucket_response, + entries=manifest_entries_for_current_bucket, + parent=parent_entity, + ) + # nothing else do to for the current bucket, skipping to the next + continue + # If no global file, or no valid entries in the manifest, check for bucket level metadata file + metadata_config = self._load_metadata_file(bucket=bucket_response) + if metadata_config: + yield from self._generate_structured_containers( + bucket_response=bucket_response, + entries=metadata_config.entries, + parent=parent_entity, + ) + + except ValidationError as err: + self.status.failed( + StackTraceError( + name=bucket_response.name, + error=f"Validation error while creating Container from bucket details - {err}", + stackTrace=traceback.format_exc(), + ) + ) + except Exception as err: + self.status.failed( + StackTraceError( + name=bucket_response.name, + error=f"Wild error while creating Container from bucket details - {err}", + stackTrace=traceback.format_exc(), + ) + ) + + def yield_create_container_requests( + self, container_details: GCSContainerDetails + ) -> Iterable[Either[CreateContainerRequest]]: + container_request = CreateContainerRequest( + name=container_details.name, + prefix=container_details.prefix, + numberOfObjects=container_details.number_of_objects, + size=container_details.size, + dataModel=container_details.data_model, + service=self.context.get().objectstore_service, + parent=container_details.parent, + sourceUrl=container_details.sourceUrl, + fileFormats=container_details.file_formats, + fullPath=container_details.fullPath, + ) + yield Either(right=container_request) + self.register_record(container_request=container_request) + + def _generate_container_details( + self, + bucket_response: GCSBucketResponse, + metadata_entry: MetadataEntry, + parent: Optional[EntityReference] = None, + ) -> Optional[GCSContainerDetails]: + bucket_name = bucket_response.name + sample_key = self._get_sample_file_path( + bucket=bucket_response, metadata_entry=metadata_entry + ) + # if we have a sample file to fetch a schema from + if sample_key: + columns = self._get_columns( + container_name=bucket_name, + sample_key=sample_key, + metadata_entry=metadata_entry, + config_source=GCSConfig( + securityConfig=self.service_connection.credentials + ), + client=self.gcs_clients.storage_client.clients[ + bucket_response.project_id + ], + ) + if columns: + prefix = ( + f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}" + ) + return GCSContainerDetails( + name=metadata_entry.dataPath.strip(KEY_SEPARATOR), + prefix=prefix, + creation_date=bucket_response.creation_date.isoformat() + if bucket_response.creation_date + else None, + number_of_objects=self._fetch_metric( + bucket=bucket_response, metric=GCSMetric.NUMBER_OF_OBJECTS + ), + size=self._fetch_metric( + bucket=bucket_response, metric=GCSMetric.BUCKET_SIZE_BYTES + ), + file_formats=[container.FileFormat(metadata_entry.structureFormat)], + data_model=ContainerDataModel( + isPartitioned=metadata_entry.isPartitioned, columns=columns + ), + parent=parent, + fullPath=self._get_full_path(bucket_name, prefix), + sourceUrl=self._get_object_source_url( + bucket=bucket_response, + prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR), + ), + ) + return None + + def _generate_structured_containers( + self, + bucket_response: GCSBucketResponse, + entries: List[MetadataEntry], + parent: Optional[EntityReference] = None, + ) -> List[GCSContainerDetails]: + result: List[GCSContainerDetails] = [] + for metadata_entry in entries: + logger.info( + f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} " + f"and generating structured container" + ) + structured_container: Optional[ + GCSContainerDetails + ] = self._generate_container_details( + bucket_response=bucket_response, + metadata_entry=metadata_entry, + parent=parent, + ) + if structured_container: + result.append(structured_container) + + return result + + def _fetch_bucket(self, bucket_name: str) -> GCSBucketResponse: + for project_id, client in self.gcs_clients.storage_client.clients.items(): + try: + bucket = client.get_bucket(bucket_name) + except NotFound: + continue + return GCSBucketResponse( + name=bucket.name, + project_id=project_id, + creation_date=bucket.time_created, + ) + + def fetch_buckets(self) -> List[GCSBucketResponse]: + results: List[GCSBucketResponse] = [] + try: + if self.service_connection.bucketNames: + for bucket_name in self.service_connection.bucketNames: + bucket = self._fetch_bucket(bucket_name) + if bucket: + results.append(bucket) + return results + for project_id, client in self.gcs_clients.storage_client.clients.items(): + for bucket in client.list_buckets(): + if filter_by_container( + self.source_config.containerFilterPattern, + container_name=bucket.name, + ): + self.status.filter(bucket.name, "Bucket Filtered Out") + else: + results.append( + GCSBucketResponse( + name=bucket.name, + project_id=project_id, + creation_date=bucket.time_created, + ) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to fetch buckets list - {err}") + return results + + def _get_time_interval(self, days: int = 2): + end = datetime.now() + start = end - timedelta(days=days) + + start_time = start.isoformat(timespec="seconds") + "Z" + end_time = end.isoformat(timespec="seconds") + "Z" + return TimeInterval(end_time=end_time, start_time=start_time) + + def _fetch_metric(self, bucket: GCSBucketResponse, metric: GCSMetric) -> float: + try: + filters = [ + f'resource.labels.bucket_name="{bucket.name}"', + f'metric.type="{metric.value}"', + ] + filter_ = " AND ".join(filters) + interval = self._get_time_interval() + timeseries = self.gcs_clients.metrics_client.list_time_series( + name=f"projects/{bucket.project_id}", filter=filter_, interval=interval + ) + point = list(timeseries)[-1].points[-1] + return point.value.int64_value + except Exception: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed fetching metric {metric.value} for bucket {bucket.name}, returning 0" + ) + return 0 + + def _generate_unstructured_container( + self, bucket_response: GCSBucketResponse + ) -> GCSContainerDetails: + return GCSContainerDetails( + name=bucket_response.name, + prefix=KEY_SEPARATOR, + creation_date=bucket_response.creation_date.isoformat() + if bucket_response.creation_date + else None, + number_of_objects=self._fetch_metric( + bucket=bucket_response, metric=GCSMetric.NUMBER_OF_OBJECTS + ), + size=self._fetch_metric( + bucket=bucket_response, metric=GCSMetric.BUCKET_SIZE_BYTES + ), + file_formats=[], + data_model=None, + fullPath=self._get_full_path(bucket_name=bucket_response.name), + sourceUrl=self._get_bucket_source_url(bucket=bucket_response), + ) + + def _clean_path(self, path: str) -> str: + return path.strip(KEY_SEPARATOR) + + def _get_full_path(self, bucket_name: str, prefix: str = None) -> Optional[str]: + """ + Method to get the full path of the file + """ + if bucket_name is None: + return None + + full_path = f"gs://{self._clean_path(bucket_name)}" + + if prefix: + full_path += f"/{self._clean_path(prefix)}" + + return full_path + + def _get_sample_file_path( + self, bucket: GCSBucketResponse, metadata_entry: MetadataEntry + ) -> Optional[str]: + """ + Given a bucket and a metadata entry, returns the full path key to a file which can then be used to infer schema + or None in the case of a non-structured metadata entry, or if no such keys can be found + """ + prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry) + try: + if prefix: + client = self.gcs_clients.storage_client.clients[bucket.project_id] + response = client.list_blobs( + bucket.name, + prefix=prefix, + max_results=1000, + ) + candidate_keys = [ + entry.name + for entry in response + if entry.name.endswith(metadata_entry.structureFormat) + ] + # pick a random key out of the candidates if any were returned + if candidate_keys: + result_key = secrets.choice(candidate_keys) + logger.info( + f"File {result_key} was picked to infer data structure from." + ) + return result_key + logger.warning( + f"No sample files found in {prefix} with {metadata_entry.structureFormat} extension" + ) + return None + except Exception: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error when trying to list objects in GCS bucket {bucket.name} at prefix {prefix}" + ) + return None + + def _get_bucket_source_url(self, bucket: GCSBucketResponse) -> Optional[str]: + """ + Method to get the source url of GCS bucket + """ + try: + return ( + f"https://console.cloud.google.com/storage/browser/{bucket.name}" + f";tab=objects?project={bucket.project_id}" + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Unable to get source url: {exc}") + return None + + def _get_object_source_url( + self, bucket: GCSBucketResponse, prefix: str + ) -> Optional[str]: + """ + Method to get the source url of GCS object + """ + try: + return ( + f"https://console.cloud.google.com/storage/browser/_details/{bucket.name}/{prefix}" + f";tab=live_object?project={bucket.project_id}" + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Unable to get source url: {exc}") + return None + + def _load_metadata_file( + self, bucket: GCSBucketResponse + ) -> Optional[StorageContainerConfig]: + """ + Load the metadata template file from the root of the bucket, if it exists + """ + try: + logger.info( + f"Looking for metadata template file at - gs://{bucket.name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" + ) + reader = self.gcs_readers.get(bucket.project_id) + response_object = reader.read( + path=OPENMETADATA_TEMPLATE_FILE_NAME, + bucket_name=bucket.name, + verbose=False, + ) + content = json.loads(response_object) + metadata_config = StorageContainerConfig.parse_obj(content) + return metadata_config + except ReadException: + logger.warning( + f"No metadata file found at gs://{bucket.name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed loading metadata file gs://{bucket.name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}" + ) + return None diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs/models.py b/ingestion/src/metadata/ingestion/source/storage/gcs/models.py new file mode 100644 index 00000000000..69d9fc9a760 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/gcs/models.py @@ -0,0 +1,86 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +GCS custom pydantic models +""" +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, Extra, Field + +from metadata.generated.schema.entity.data.container import ( + ContainerDataModel, + FileFormat, +) +from metadata.generated.schema.type import basic +from metadata.generated.schema.type.entityReference import EntityReference + + +class GCSBucketResponse(BaseModel): + """ + Class modelling a response received from gcs_client.list_buckets operation + """ + + class Config: + extra = Extra.forbid + + name: str = Field(..., description="Bucket name") + project_id: str = Field(..., description="Project ID") + creation_date: Optional[datetime] = Field( + None, + description="Timestamp of Bucket creation in ISO format", + ) + + +class GCSContainerDetails(BaseModel): + """ + Class mapping container details used to create the container requests + """ + + class Config: + extra = Extra.forbid + + name: str = Field(..., description="Bucket name") + prefix: str = Field(..., description="Prefix for the container") + description: Optional[basic.Markdown] = Field( + None, description="Description of the container instance." + ) + number_of_objects: float = Field( + ..., + description="Total nr. of objects", + ) + size: float = Field( + ..., + description="Total size in bytes of all objects", + title="Total size(bytes) of objects", + ) + file_formats: Optional[List[FileFormat]] = Field( + ..., + description="File formats", + ) + data_model: Optional[ContainerDataModel] = Field( + ..., + description="Data Model of the container", + ) + creation_date: Optional[str] = Field( + None, + description="Timestamp of Bucket creation in ISO format", + ) + parent: Optional[EntityReference] = Field( + None, + description="Reference to the parent container", + ) + sourceUrl: Optional[basic.SourceUrl] = Field( + None, description="Source URL of the container." + ) + fullPath: Optional[str] = Field( + None, description="Full path of the container/file." + ) diff --git a/ingestion/src/metadata/utils/secrets/gcp_secrets_manager.py b/ingestion/src/metadata/utils/secrets/gcp_secrets_manager.py index c4688da8dd1..08cce3cb8c5 100644 --- a/ingestion/src/metadata/utils/secrets/gcp_secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets/gcp_secrets_manager.py @@ -99,7 +99,7 @@ class GCPSecretsManager(ExternalSecretsManager, ABC): # Build the resource name of the secret version. - project_id = self.credentials.gcpConfig.projectId.__root__ + project_id = self.credentials.gcpConfig.projectId.root secret_id = ( f"projects/{project_id}/secrets/{secret_id}/versions/{FIXED_VERSION_ID}" ) diff --git a/ingestion/tests/unit/topology/storage/test_gcs_storage.py b/ingestion/tests/unit/topology/storage/test_gcs_storage.py new file mode 100644 index 00000000000..3b0447e8611 --- /dev/null +++ b/ingestion/tests/unit/topology/storage/test_gcs_storage.py @@ -0,0 +1,455 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for GCS Object store source +""" +import datetime +import uuid +from collections import namedtuple +from typing import List +from unittest import TestCase +from unittest.mock import patch + +import pandas as pd + +from metadata.generated.schema.entity.data.container import ( + ContainerDataModel, + FileFormat, +) +from metadata.generated.schema.entity.data.table import Column, ColumnName, DataType +from metadata.generated.schema.entity.services.connections.database.datalake.gcsConfig import ( + GCSConfig, +) +from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import ( + MetadataEntry, + StorageContainerConfig, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import SourceUrl +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.source.storage.gcs.metadata import ( + GCSBucketResponse, + GCSContainerDetails, + GcsSource, +) +from metadata.ingestion.source.storage.storage_service import ( + OPENMETADATA_TEMPLATE_FILE_NAME, +) +from metadata.readers.file.base import ReadException +from metadata.readers.file.config_source_factory import get_reader + +MockBucketResponse = namedtuple("MockBucketResponse", ["name", "time_created"]) +MockObjectFilePath = namedtuple("MockObjectFilePath", ["name"]) + +MOCK_OBJECT_STORE_CONFIG = { + "source": { + "type": "gcs", + "serviceName": "gcs_test", + "serviceConnection": { + "config": { + "type": "GCS", + "credentials": { + "gcpConfig": { + "type": "service_account", + "projectId": "my-gcp-project", + "privateKeyId": "private_key_id", + # this is a valid key that was generated on a local machine and is not used for any real project + "privateKey": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEAw3vHG9fDIkcYB0xi2Mv4fS2gUzKR9ZRrcVNeKkqGFTT71AVB\nOzgIqYVe8b2aWODuNye6sipcrqTqOt05Esj+sxhk5McM9bE2RlxXC5QH/Bp9zxMP\n/Yksv9Ov7fdDt/loUk7sTXvI+7LDJfmRYU6MtVjyyLs7KpQIB2xBWEToU1xZY+v0\ndRC1NA+YWc+FjXbAiFAf9d4gXkYO8VmU5meixVh4C8nsjokEXk0T/HEItpZCxadk\ndZ7LKUE/HDmWCO2oNG6sCf4ET2crjSdYIfXuREopX1aQwnk7KbI4/YIdlRz1I369\nAz3+Hxlf9lLJVH3+itN4GXrR9yWWKWKDnwDPbQIDAQABAoIBAQC3X5QuTR7SN8iV\niBUtc2D84+ECSmza5shG/UJW/6N5n0Mf53ICgBS4GNEwiYCRISa0/ILIgK6CcVb7\nsuvH8F3kWNzEMui4TO0x4YsR5GH9HkioCCS224frxkLBQnL20HIIy9ok8Rpe6Zjg\nNZUnp4yczPyqSeA9l7FUbTt69uDM2Cx61m8REOpFukpnYLyZGbmNPYmikEO+rq9r\nwNID5dkSeVuQYo4MQdRavOGFUWvUYXzkEQ0A6vPyraVBfolESX8WaLNVjic7nIa3\nujdSNojnJqGJ3gslntcmN1d4JOfydc4bja4/NdNlcOHpWDGLzY1QnaDe0Koxn8sx\nLT9MVD2NAoGBAPy7r726bKVGWcwqTzUuq1OWh5c9CAc4N2zWBBldSJyUdllUq52L\nWTyva6GRoRzCcYa/dKLLSM/k4eLf9tpxeIIfTOMsvzGtbAdm257ndMXNvfYpxCfU\nK/gUFfAUGHZ3MucTHRY6DTkJg763Sf6PubA2fqv3HhVZDK/1HGDtHlTPAoGBAMYC\npdV7O7lAyXS/d9X4PQZ4BM+P8MbXEdGBbPPlzJ2YIb53TEmYfSj3z41u9+BNnhGP\n4uzUyAR/E4sxrA2+Ll1lPSCn+KY14WWiVGfWmC5j1ftdpkbrXstLN8NpNYzrKZwx\njdR0ZkwvZ8B5+kJ1hK96giwWS+SJxJR3TohcQ18DAoGAJSfmv2r//BBqtURnHrd8\nwq43wvlbC8ytAVg5hA0d1r9Q4vM6w8+vz+cuWLOTTyobDKdrG1/tlXrd5r/sh9L0\n15SIdkGm3kPTxQbPNP5sQYRs8BrV1tEvoao6S3B45DnEBwrdVN42AXOvpcNGoqE4\nuHpahyeuiY7s+ZV8lZdmxSsCgYEAolr5bpmk1rjwdfGoaKEqKGuwRiBX5DHkQkxE\n8Zayt2VOBcX7nzyRI05NuEIMrLX3rZ61CktN1aH8fF02He6aRaoE/Qm9L0tujM8V\nNi8WiLMDeR/Ifs3u4/HAv1E8v1byv0dCa7klR8J257McJ/ID4X4pzcxaXgE4ViOd\nGOHNu9ECgYEApq1zkZthEQymTUxs+lSFcubQpaXyf5ZC61cJewpWkqGDtSC+8DxE\nF/jydybWuoNHXymnvY6QywxuIooivbuib6AlgpEJeybmnWlDOZklFOD0abNZ+aNO\ndUk7XVGffCakXQ0jp1kmZA4lGsYK1h5dEU5DgXqu4UYJ88Vttax2W+Y=\n-----END RSA PRIVATE KEY-----\n", + "clientEmail": "gcpuser@project_id.iam.gserviceaccount.com", + "clientId": "client_id", + "authUri": "https://accounts.google.com/o/oauth2/auth", + "tokenUri": "https://oauth2.googleapis.com/token", + "authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs", + "clientX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs", + } + }, + } + }, + "sourceConfig": { + "config": { + "type": "StorageMetadata", + "containerFilterPattern": {"includes": ["^test_*"]}, + "storageMetadataConfigSource": { + "prefixConfig": { + "containerName": "test_bucket", + "objectPrefix": "manifest", + }, + }, + } + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "token"}, + } + }, +} +MOCK_BUCKETS_RESPONSE = [ + MockBucketResponse( + name="test_transactions", time_created=datetime.datetime(2000, 1, 1) + ), + MockBucketResponse(name="test_sales", time_created=datetime.datetime(2000, 2, 2)), + MockBucketResponse(name="events", time_created=datetime.datetime(2000, 3, 3)), +] + +MOCK_METADATA_FILE_RESPONSE = { + "entries": [ + { + "dataPath": "transactions", + "structureFormat": "csv", + "isPartitioned": False, + } + ] +} +EXPECTED_BUCKETS: List[GCSBucketResponse] = [ + GCSBucketResponse( + name="test_transactions", + project_id="my-gcp-project", + creation_date=datetime.datetime(2000, 1, 1), + ), + GCSBucketResponse( + name="test_sales", + project_id="my-gcp-project", + creation_date=datetime.datetime(2000, 2, 2), + ), +] +MOCK_OBJECT_FILE_PATHS = [ + MockObjectFilePath(name="transactions/transactions_1.csv"), + MockObjectFilePath(name="transactions/transactions_2.csv"), +] + + +def _get_str_value(data): + if data: + if isinstance(data, str): + return data + return data.value + + return None + + +def custom_column_compare(self, other): + return ( + self.name == other.name + and self.displayName == other.displayName + and self.description == other.description + and self.dataTypeDisplay == other.dataTypeDisplay + and self.children == other.children + and _get_str_value(self.arrayDataType) == _get_str_value(other.arrayDataType) + ) + + +class StorageUnitTest(TestCase): + """ + Validate how we work with object store metadata + """ + + @patch( + "metadata.ingestion.source.storage.storage_service.StorageServiceSource.test_connection" + ) + def __init__(self, method_name: str, test_connection) -> None: + super().__init__(method_name) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.model_validate( + MOCK_OBJECT_STORE_CONFIG + ) + + # This already validates that the source can be initialized + self.object_store_source = GcsSource.create( + MOCK_OBJECT_STORE_CONFIG["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + self.gcs_reader = get_reader( + config_source=GCSConfig(), + client=self.object_store_source.gcs_clients.storage_client.clients[ + "my-gcp-project" + ], + ) + + def test_create_from_invalid_source(self): + """ + An invalid config raises an error + """ + not_object_store_source = { + "type": "gcs", + "serviceName": "mysql_local", + "serviceConnection": { + "config": { + "type": "Mysql", + "username": "openmetadata_user", + "authType": {"password": "openmetadata_password"}, + "hostPort": "localhost:3306", + "databaseSchema": "openmetadata_db", + } + }, + "sourceConfig": { + "config": { + "type": "StorageMetadata", + "storageMetadataConfigSource": { + "prefixConfig": { + "containerName": "test_bucket", + "objectPrefix": "manifest", + }, + }, + } + }, + } + self.assertRaises( + InvalidSourceException, + GcsSource.create, + not_object_store_source, + self.config.workflowConfig.openMetadataServerConfig, + ) + + def test_gcs_buckets_fetching(self): + self.object_store_source.gcs_clients.storage_client.clients[ + "my-gcp-project" + ].list_buckets = lambda: MOCK_BUCKETS_RESPONSE + self.assertListEqual(self.object_store_source.fetch_buckets(), EXPECTED_BUCKETS) + + def test_load_metadata_file_gcs(self): + metadata_entry: List[MetadataEntry] = self.return_metadata_entry() + + self.assertEqual(1, len(metadata_entry)) + self.assertEqual( + MetadataEntry( + dataPath="transactions", + structureFormat="csv", + isPartitioned=False, + ), + metadata_entry[0], + ) + + def test_no_metadata_file_returned_when_file_not_present(self): + with self.assertRaises(ReadException): + self.gcs_reader.read( + path=OPENMETADATA_TEMPLATE_FILE_NAME, + bucket_name="test", + verbose=False, + ) + + def test_generate_unstructured_container(self): + bucket_response = GCSBucketResponse( + name="test_bucket", + project_id="my-gcp-project", + creation_date=datetime.datetime(2000, 1, 1), + ) + self.object_store_source._fetch_metric = lambda bucket, metric: 100.0 + self.assertEqual( + GCSContainerDetails( + name=bucket_response.name, + prefix="/", + number_of_objects=100, + size=100, + file_formats=[], + data_model=None, + creation_date=bucket_response.creation_date.isoformat(), + sourceUrl=SourceUrl( + "https://console.cloud.google.com/storage/browser/test_bucket;tab=objects?project=my-gcp-project" + ), + fullPath="gs://test_bucket", + ), + self.object_store_source._generate_unstructured_container( + bucket_response=bucket_response + ), + ) + + def test_generate_structured_container(self): + self.object_store_source._get_sample_file_path = ( + lambda bucket, metadata_entry: "transactions/file_1.csv" + ) + self.object_store_source._fetch_metric = lambda bucket, metric: 100.0 + columns: List[Column] = [ + Column( + name=ColumnName("transaction_id"), + dataType=DataType.INT, + dataTypeDisplay="INT", + displayName="transaction_id", + ), + Column( + name=ColumnName("transaction_value"), + dataType=DataType.INT, + dataTypeDisplay="INT", + displayName="transaction_value", + ), + ] + self.object_store_source.extract_column_definitions = ( + lambda bucket_name, sample_key, config_source, client, metadata_entry: columns + ) + + entity_ref = EntityReference(id=uuid.uuid4(), type="container") + + self.assertEqual( + GCSContainerDetails( + name="transactions", + prefix="/transactions", + number_of_objects=100, + size=100, + file_formats=[FileFormat.csv], + data_model=ContainerDataModel(isPartitioned=False, columns=columns), + creation_date=datetime.datetime(2000, 1, 1).isoformat(), + parent=entity_ref, + sourceUrl=SourceUrl( + f"https://console.cloud.google.com/storage/browser/_details/test_bucket/transactions;tab=live_object?project=my-gcp-project" + ), + fullPath="gs://test_bucket/transactions", + ), + self.object_store_source._generate_container_details( + GCSBucketResponse( + name="test_bucket", + project_id="my-gcp-project", + creation_date=datetime.datetime(2000, 1, 1), + ), + MetadataEntry( + dataPath="transactions", + structureFormat="csv", + isPartitioned=False, + ), + parent=entity_ref, + ), + ) + + # Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation + def test_extract_column_definitions(self): + with patch( + "metadata.ingestion.source.storage.storage_service.fetch_dataframe", + return_value=( + [ + pd.DataFrame.from_dict( + [ + {"transaction_id": 1, "transaction_value": 100}, + {"transaction_id": 2, "transaction_value": 200}, + {"transaction_id": 3, "transaction_value": 300}, + ] + ) + ], + None, + ), + ): + Column.__eq__ = custom_column_compare + self.assertListEqual( + [ + Column( + name=ColumnName("transaction_id"), + dataType=DataType.INT, + dataTypeDisplay="INT", + displayName="transaction_id", + ), + Column( + name=ColumnName("transaction_value"), + dataType=DataType.INT, + dataTypeDisplay="INT", + displayName="transaction_value", + ), + ], + self.object_store_source.extract_column_definitions( + bucket_name="test_bucket", + sample_key="test.json", + config_source=None, + client=None, + metadata_entry=self.return_metadata_entry()[0], + ), + ) + + def test_get_sample_file_prefix_for_structured_and_partitioned_metadata(self): + input_metadata = MetadataEntry( + dataPath="transactions", + structureFormat="parquet", + isPartitioned=True, + partitionColumns=[Column(name="date", dataType=DataType.DATE)], + ) + self.assertEqual( + "transactions/", + self.object_store_source._get_sample_file_prefix( + metadata_entry=input_metadata + ), + ) + + def test_get_sample_file_prefix_for_unstructured_metadata(self): + input_metadata = MetadataEntry(dataPath="transactions") + self.assertIsNone( + self.object_store_source._get_sample_file_prefix( + metadata_entry=input_metadata + ) + ) + + def test_get_sample_file_prefix_for_structured_and_not_partitioned_metadata(self): + input_metadata = MetadataEntry( + dataPath="transactions", + structureFormat="csv", + isPartitioned=False, + ) + self.assertEqual( + "transactions/", + self.object_store_source._get_sample_file_prefix( + metadata_entry=input_metadata + ), + ) + + def test_get_sample_file_path_with_invalid_prefix(self): + self.object_store_source._get_sample_file_prefix = ( + lambda metadata_entry: "/transactions" + ) + self.assertIsNone( + self.object_store_source._get_sample_file_path( + bucket=GCSBucketResponse( + name="test_bucket", + project_id="my-gcp-project", + creation_date=datetime.datetime(2000, 1, 1), + ), + metadata_entry=MetadataEntry( + dataPath="invalid_path", + structureFormat="csv", + isPartitioned=False, + ), + ) + ) + + def test_get_sample_file_path_randomly(self): + self.object_store_source._get_sample_file_prefix = ( + lambda metadata_entry: "/transactions" + ) + self.object_store_source.gcs_clients.storage_client.clients[ + "my-gcp-project" + ].list_blobs = lambda bucket, prefix, max_results: MOCK_OBJECT_FILE_PATHS + + candidate = self.object_store_source._get_sample_file_path( + bucket=GCSBucketResponse( + name="test_bucket", + project_id="my-gcp-project", + creation_date=datetime.datetime(2000, 1, 1), + ), + metadata_entry=MetadataEntry( + dataPath="/transactions", + structureFormat="csv", + isPartitioned=False, + ), + ) + self.assertTrue( + candidate + in [ + "transactions/transactions_1.csv", + "transactions/transactions_2.csv", + "transactions/", + ] + ) + + def return_metadata_entry(self): + container_config = StorageContainerConfig.model_validate( + MOCK_METADATA_FILE_RESPONSE + ) + return container_config.entries diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_s3_storage.py similarity index 100% rename from ingestion/tests/unit/topology/storage/test_storage.py rename to ingestion/tests/unit/topology/storage/test_s3_storage.py diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/storage/gcs.json b/openmetadata-service/src/main/resources/json/data/testConnections/storage/gcs.json index ad07409324c..f9853b384e4 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/storage/gcs.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/storage/gcs.json @@ -11,16 +11,16 @@ "mandatory": true }, { - "name": "ListBlobs", - "description": "List all the blobs available to the user.", - "errorMessage": "Failed to fetch blobs, please validate the credentials if the user has access to list blobs", + "name": "GetBucket", + "description": "Get the bucket available to the user.", + "errorMessage": "Failed to fetch bucket, please validate the credentials if the user has access to get bucket", "shortCircuit": true, "mandatory": true }, { - "name": "GetBucket", - "description": "Get the bucket available to the user.", - "errorMessage": "Failed to fetch bucket, please validate the credentials if the user has access to get bucket", + "name": "ListBlobs", + "description": "List all the blobs available to the user.", + "errorMessage": "Failed to fetch blobs, please validate the credentials if the user has access to list blobs", "shortCircuit": true, "mandatory": true }, @@ -30,6 +30,12 @@ "errorMessage": "Failed to fetch blobs, please validate the credentials if the user has access to get blob", "shortCircuit": true, "mandatory": true + }, + { + "name": "GetMetrics", + "description": "Get Google Cloud bucket resource metrics.", + "errorMessage": "Failed to fetch Google Cloud bucket resource metrics, please validate if user has access to fetch these metrics", + "mandatory": false } ] } \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/storage/gcsConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/storage/gcsConnection.json index 8482e9789e2..3915525e0fe 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/storage/gcsConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/storage/gcsConnection.json @@ -25,6 +25,15 @@ "description": "GCP Credentials", "$ref": "../../../../security/credentials/gcpCredentials.json" }, + "bucketNames": { + "title": "Bucket Names", + "description": "Bucket Names of the data source.", + "type": "array", + "items": { + "type": "string" + }, + "default": null + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts b/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts index 8852e075801..b1e555493e5 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts @@ -11,7 +11,7 @@ * limitations under the License. */ -import { capitalize, cloneDeep, toLower } from 'lodash'; +import { capitalize, toLower } from 'lodash'; import { AIRBYTE, AIRFLOW, @@ -23,7 +23,6 @@ import { BIGQUERY, BIGTABLE, CLICKHOUSE, - COMMON_UI_SCHEMA, COUCHBASE, CUSTOM_STORAGE_DEFAULT, DAGSTER, @@ -41,6 +40,7 @@ import { ELASTIC_SEARCH, FIVETRAN, FLINK, + GCS, GLUE, GREENPLUM, HIVE, @@ -114,8 +114,6 @@ import { SearchServiceType } from '../generated/entity/data/searchIndex'; import { MessagingServiceType } from '../generated/entity/data/topic'; import { MetadataServiceType } from '../generated/entity/services/metadataService'; import { SearchSourceAlias } from '../interface/search.interface'; -import customConnection from '../jsons/connectionSchemas/connections/storage/customStorageConnection.json'; -import s3Connection from '../jsons/connectionSchemas/connections/storage/s3Connection.json'; import { getDashboardConfig } from './DashboardServiceUtils'; import { getDatabaseConfig } from './DatabaseServiceUtils'; import { getMessagingConfig } from './MessagingServiceUtils'; @@ -123,6 +121,7 @@ import { getMetadataConfig } from './MetadataServiceUtils'; import { getMlmodelConfig } from './MlmodelServiceUtils'; import { getPipelineConfig } from './PipelineServiceUtils'; import { getSearchServiceConfig } from './SearchServiceUtils'; +import { getStorageConfig } from './StorageServiceUtils'; import { customServiceComparator } from './StringsUtils'; class ServiceUtilClassBase { @@ -130,7 +129,6 @@ class ServiceUtilClassBase { StorageServiceType.Adls, DatabaseServiceType.QueryLog, DatabaseServiceType.Dbt, - StorageServiceType.Gcs, MetadataServiceType.Alation, ]; @@ -438,6 +436,9 @@ class ServiceUtilClassBase { case this.StorageServiceTypeSmallCase.S3: return AMAZON_S3; + case this.StorageServiceTypeSmallCase.Gcs: + return GCS; + case this.SearchServiceTypeSmallCase.ElasticSearch: return ELASTIC_SEARCH; @@ -594,25 +595,6 @@ class ServiceUtilClassBase { } }; - public getStorageServiceConfig(type: StorageServiceType) { - let schema = {}; - const uiSchema = { ...COMMON_UI_SCHEMA }; - switch (type) { - case StorageServiceType.S3: { - schema = s3Connection; - - break; - } - case StorageServiceType.CustomStorage: { - schema = customConnection; - - break; - } - } - - return cloneDeep({ schema, uiSchema }); - } - public getPipelineServiceConfig(type: PipelineServiceType) { return getPipelineConfig(type); } @@ -637,6 +619,10 @@ class ServiceUtilClassBase { return getSearchServiceConfig(type); } + public getStorageServiceConfig(type: StorageServiceType) { + return getStorageConfig(type); + } + public getMetadataServiceConfig(type: MetadataServiceType) { return getMetadataConfig(type); } diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/StorageServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/StorageServiceUtils.ts new file mode 100644 index 00000000000..9ccb4e64c52 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/utils/StorageServiceUtils.ts @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { cloneDeep } from 'lodash'; +import { COMMON_UI_SCHEMA } from '../constants/Services.constant'; +import { StorageServiceType } from '../generated/entity/services/storageService'; +import customConnection from '../jsons/connectionSchemas/connections/storage/customStorageConnection.json'; +import gcsConnection from '../jsons/connectionSchemas/connections/storage/gcsConnection.json'; +import s3Connection from '../jsons/connectionSchemas/connections/storage/s3Connection.json'; + +export const getStorageConfig = (type: StorageServiceType) => { + let schema = {}; + const uiSchema = { ...COMMON_UI_SCHEMA }; + switch (type as unknown as StorageServiceType) { + case StorageServiceType.S3: { + schema = s3Connection; + + break; + } + case StorageServiceType.Gcs: { + schema = gcsConnection; + + break; + } + case StorageServiceType.CustomStorage: { + schema = customConnection; + + break; + } + } + + return cloneDeep({ schema, uiSchema }); +};