diff --git a/ingestion/src/metadata/clients/aws_client.py b/ingestion/src/metadata/clients/aws_client.py index c8647f361e7..57152ef2705 100644 --- a/ingestion/src/metadata/clients/aws_client.py +++ b/ingestion/src/metadata/clients/aws_client.py @@ -26,6 +26,8 @@ logger = utils_logger() class AWSServices(Enum): + S3 = "s3" + CLOUDWATCH = "cloudwatch" DYNAMO_DB = "dynamodb" GLUE = "glue" SAGEMAKER = "sagemaker" @@ -157,6 +159,12 @@ class AWSClient: ) return session.resource(service_name=service_name) + def get_s3_client(self): + return self.get_client(AWSServices.S3.value) + + def get_cloudwatch_client(self): + return self.get_client(AWSServices.CLOUDWATCH.value) + def get_dynamo_client(self): return self.get_resource(AWSServices.DYNAMO_DB.value) diff --git a/ingestion/src/metadata/ingestion/source/objectstore/__init__.py b/ingestion/src/metadata/ingestion/source/objectstore/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/ingestion/source/objectstore/objectstore_service.py b/ingestion/src/metadata/ingestion/source/objectstore/objectstore_service.py new file mode 100644 index 00000000000..be3a36e10b3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/objectstore/objectstore_service.py @@ -0,0 +1,156 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base class for ingesting Object Storage services +""" +from abc import ABC, abstractmethod +from typing import Any, Iterable + +from metadata.generated.schema.api.data.createContainer import CreateContainerRequest +from metadata.generated.schema.entity.data.container import Container +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.objectstoreService import ( + ObjectStoreConnection, + ObjectStoreService, +) +from metadata.generated.schema.metadataIngestion.objectstoreServiceMetadataPipeline import ( + ObjectStoreServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.topology import ( + NodeStage, + ServiceTopology, + TopologyNode, + create_source_context, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.ingestion.source.objectstore.s3.connection import S3ObjectStoreClient +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class ObjectStoreServiceTopology(ServiceTopology): + + root = TopologyNode( + producer="get_services", + stages=[ + NodeStage( + type_=ObjectStoreService, + context="objectstore_service", + processor="yield_create_request_objectstore_service", + overwrite=False, + must_return=True, + ), + ], + children=["container"], + ) + + container = TopologyNode( + producer="get_containers", + stages=[ + NodeStage( + type_=Container, + context="containers", + processor="yield_create_container_requests", + consumer=["objectstore_service"], + nullable=True, + ) + ], + ) + + +class ObjectStoreSourceStatus(SourceStatus): + """ + Reports the source status after ingestion + """ + + def scanned(self, record: str) -> None: + self.success.append(record) + logger.debug(f"Scanned: {record}") + + def filter(self, key: str, reason: str) -> None: + self.filtered.append(key) + logger.debug(f"Filtered {key}: {reason}") + + +class ObjectStoreServiceSource(TopologyRunnerMixin, Source, ABC): + """ + Base class for Object Store Services. + It implements the topology and context. + """ + + status: ObjectStoreSourceStatus + source_config: ObjectStoreServiceMetadataPipeline + config: WorkflowSource + metadata: OpenMetadata + # Big union of types we want to fetch dynamically + service_connection: ObjectStoreConnection.__fields__["config"].type_ + + topology = ObjectStoreServiceTopology() + context = create_source_context(topology) + + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config + self.source_config: ObjectStoreServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.connection: S3ObjectStoreClient = get_connection(self.service_connection) + self.test_connection() + self.status = ObjectStoreSourceStatus() + + @abstractmethod + def get_containers(self) -> Iterable[Any]: + """ + Retrieve all containers for the service + """ + + @abstractmethod + def yield_create_container_requests( + self, container_details: Any + ) -> Iterable[CreateContainerRequest]: + """Generate the create container requests based on the received details""" + + def get_status(self) -> SourceStatus: + return self.status + + def close(self): + pass + + def get_services(self) -> Iterable[WorkflowSource]: + yield self.config + + def prepare(self): + pass + + def test_connection(self) -> None: + test_connection_fn = get_test_connection_fn(self.service_connection) + test_connection_fn(self.connection) + + def yield_create_request_objectstore_service(self, config: WorkflowSource): + yield self.metadata.get_create_service_from_source( + entity=ObjectStoreService, config=config + ) diff --git a/ingestion/src/metadata/ingestion/source/objectstore/s3/__init__.py b/ingestion/src/metadata/ingestion/source/objectstore/s3/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/ingestion/source/objectstore/s3/connection.py b/ingestion/src/metadata/ingestion/source/objectstore/s3/connection.py new file mode 100644 index 00000000000..8764aa5e37b --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/objectstore/s3/connection.py @@ -0,0 +1,70 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Source connection handler for S3 object store. For this to work, it requires the following S3 permissions for all +the buckets which require ingestion: s3:ListBucket, s3:GetObject and s3:GetBucketLocation +The cloudwatch client is used to fetch the total size in bytes for a bucket, and the total nr of files. This requires +the cloudwatch:GetMetricData permissions + +""" +from dataclasses import dataclass +from functools import partial + +from botocore.client import BaseClient + +from metadata.clients.aws_client import AWSClient +from metadata.generated.schema.entity.services.connections.objectstore.s3ObjectStoreConnection import ( + S3StoreConnection, +) +from metadata.ingestion.connections.test_connections import ( + TestConnectionResult, + TestConnectionStep, + test_connection_steps, +) + + +@dataclass +class S3ObjectStoreClient: + s3_client: BaseClient + cloudwatch_client: BaseClient + + +def get_connection(connection: S3StoreConnection) -> S3ObjectStoreClient: + """ + Returns 2 clients - the s3 client and the cloudwatch client needed for total nr of objects and total size + """ + aws_client = AWSClient(connection.awsConfig) + return S3ObjectStoreClient( + s3_client=aws_client.get_client(service_name="s3"), + cloudwatch_client=aws_client.get_client(service_name="cloudwatch"), + ) + + +def test_connection(client: S3ObjectStoreClient) -> TestConnectionResult: + """ + Test connection + """ + steps = [ + TestConnectionStep( + function=client.s3_client.list_buckets, + name="List buckets", + ), + TestConnectionStep( + function=partial( + client.cloudwatch_client.list_metrics, + Namespace="AWS/S3", + ), + name="Get Cloudwatch AWS/S3 metrics", + mandatory=False, + ), + ] + + return test_connection_steps(steps) diff --git a/ingestion/src/metadata/ingestion/source/objectstore/s3/metadata.py b/ingestion/src/metadata/ingestion/source/objectstore/s3/metadata.py new file mode 100644 index 00000000000..d50eb3073f5 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/objectstore/s3/metadata.py @@ -0,0 +1,402 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""S3 object store extraction metadata""" +import json +import secrets +import traceback +from datetime import datetime, timedelta +from enum import Enum +from typing import Iterable, List, Optional + +from pandas import DataFrame +from pydantic import Extra, Field, ValidationError +from pydantic.main import BaseModel + +from metadata.generated.schema.api.data.createContainer import CreateContainerRequest +from metadata.generated.schema.entity.data import container +from metadata.generated.schema.entity.data.container import ContainerDataModel +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.objectstore.s3ObjectStoreConnection import ( + S3StoreConnection, +) +from metadata.generated.schema.metadataIngestion.objectstore.containerMetadataConfig import ( + MetadataEntry, + ObjectStoreContainerConfig, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.datalake.metadata import DatalakeSource +from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper +from metadata.ingestion.source.objectstore.objectstore_service import ( + ObjectStoreServiceSource, +) +from metadata.utils.filters import filter_by_container +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +S3_CLIENT_ROOT_RESPONSE = "Contents" +OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json" +S3_KEY_SEPARATOR = "/" + + +class S3Metric(Enum): + NUMBER_OF_OBJECTS = "NumberOfObjects" + BUCKET_SIZE_BYTES = "BucketSizeBytes" + + +class S3BucketResponse(BaseModel): + """ + Class modelling a response received from s3_client.list_buckets operation + """ + + class Config: + extra = Extra.forbid + + name: str = Field(..., description="Bucket name", title="Bucket Name", alias="Name") + creation_date: datetime = Field( + ..., + description="Timestamp of Bucket creation in ISO format", + title="Creation Timestamp", + alias="CreationDate", + ) + + +class S3ContainerDetails(BaseModel): + """Class mapping container details used to create the container requests""" + + class Config: + extra = Extra.forbid + + name: str = Field(..., description="Bucket name", title="Bucket Name") + prefix: str = Field(..., description="Prefix for the container", title="Prefix") + number_of_objects: float = Field( + ..., description="Total nr. of objects", title="Nr. of objects" + ) + size: float = Field( + ..., + description="Total size in bytes of all objects", + title="Total size(bytes) of objects", + ) + file_formats: Optional[List[container.FileFormat]] = Field( + ..., description="File formats", title="File formats" + ) + data_model: Optional[ContainerDataModel] = Field( + ..., description="Data Model of the container", title="Data Model" + ) + creation_date: str = Field( + ..., + description="Timestamp of Bucket creation in ISO format", + title="Creation Timestamp", + ) + + +class S3Source(ObjectStoreServiceSource): + """ + Source implementation to ingest S3 buckets data. + """ + + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + self.s3_client = self.connection.s3_client + self.cloudwatch_client = self.connection.cloudwatch_client + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: S3StoreConnection = config.serviceConnection.__root__.config + if not isinstance(connection, S3StoreConnection): + raise InvalidSourceException( + f"Expected S3StoreConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def get_containers(self) -> Iterable[S3ContainerDetails]: + bucket_results = self.fetch_buckets() + try: + for bucket_response in bucket_results: + metadata_config = self._load_metadata_file( + bucket_name=bucket_response.name + ) + if metadata_config: + for metadata_entry in metadata_config.entries: + logger.info( + f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} " + f"and generating structured container" + ) + structured_container: Optional[ + S3ContainerDetails + ] = self._generate_container_details( + bucket_response=bucket_response, + metadata_entry=metadata_entry, + ) + if structured_container: + yield structured_container + else: + logger.info( + f"No metadata found for bucket {bucket_response.name}, generating unstructured container.." + ) + yield self._generate_unstructured_container( + bucket_response=bucket_response + ) + except ValidationError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Validation error while creating Container from bucket details - {err}" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Wild error while creating Container from bucket details - {err}" + ) + + def yield_create_container_requests( + self, container_details: S3ContainerDetails + ) -> Iterable[CreateContainerRequest]: + + yield CreateContainerRequest( + name=container_details.name, + prefix=container_details.prefix, + numberOfObjects=container_details.number_of_objects, + size=container_details.size, + dataModel=container_details.data_model, + service=self.context.objectstore_service.fullyQualifiedName, + ) + + def _generate_container_details( + self, bucket_response: S3BucketResponse, metadata_entry: MetadataEntry + ) -> Optional[S3ContainerDetails]: + bucket_name = bucket_response.name + sample_key = self._get_sample_file_path( + bucket_name=bucket_name, metadata_entry=metadata_entry + ) + # if we have a sample file to fetch a schema from + if sample_key: + columns = self.extract_column_definitions(bucket_name, sample_key) + if columns: + return S3ContainerDetails( + name=f"{bucket_name}.{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", + prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", + creation_date=bucket_response.creation_date.isoformat(), + number_of_objects=self._fetch_metric( + bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS + ), + size=self._fetch_metric( + bucket_name=bucket_name, metric=S3Metric.BUCKET_SIZE_BYTES + ), + file_formats=[container.FileFormat(metadata_entry.structureFormat)], + data_model=ContainerDataModel( + isPartitioned=metadata_entry.isPartitioned, columns=columns + ), + ) + return None + + def extract_column_definitions( + self, bucket_name: str, sample_key: str + ) -> List[Column]: + client_args = self.service_connection.awsConfig + data_structure_details = DatalakeSource.get_s3_files( + self.s3_client, + key=sample_key, + bucket_name=bucket_name, + client_kwargs=client_args, + ) + columns = [] + if isinstance(data_structure_details, DataFrame): + columns = DatalakeSource.get_columns(data_structure_details) + if isinstance(data_structure_details, list) and data_structure_details: + columns = DatalakeSource.get_columns(data_structure_details[0]) + if isinstance(data_structure_details, DatalakeColumnWrapper): + columns = data_structure_details.columns # pylint: disable=no-member + return columns + + def fetch_buckets(self) -> List[S3BucketResponse]: + results: List[S3BucketResponse] = [] + try: + # No pagination required, as there is a hard 1000 limit on nr of buckets per aws account + for bucket in self.s3_client.list_buckets().get("Buckets") or []: + if filter_by_container( + self.source_config.containerFilterPattern, + container_name=bucket["Name"], + ): + self.status.filter(bucket["Name"], "Bucket Filtered Out") + else: + results.append(S3BucketResponse.parse_obj(bucket)) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to fetch buckets list - {err}") + return results + + def _fetch_metric(self, bucket_name: str, metric: S3Metric) -> float: + try: + raw_result = self.cloudwatch_client.get_metric_data( + MetricDataQueries=[ + { + "Id": "total_nr_of_object_request", + "MetricStat": { + "Metric": { + "Namespace": "AWS/S3", + "MetricName": metric.value, + "Dimensions": [ + {"Name": "BucketName", "Value": bucket_name}, + { + "Name": "StorageType", + # StandardStorage-only support for BucketSizeBytes for now + "Value": "StandardStorage" + if metric == S3Metric.BUCKET_SIZE_BYTES + else "AllStorageTypes", + }, + ], + }, + "Period": 60, + "Stat": "Average", + "Unit": "Bytes" + if metric == S3Metric.BUCKET_SIZE_BYTES + else "Count", + }, + }, + ], + StartTime=datetime.now() - timedelta(days=2), + # metrics generated daily, ensure there is at least 1 entry + EndTime=datetime.now(), + ScanBy="TimestampDescending", + ) + if raw_result["MetricDataResults"]: + first_metric = raw_result["MetricDataResults"][0] + if first_metric["StatusCode"] == "Complete" and first_metric["Values"]: + return int(first_metric["Values"][0]) + except Exception: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed fetching metric {metric.value} for bucket {bucket_name}, returning 0" + ) + return 0 + + def _load_metadata_file( + self, bucket_name: str + ) -> Optional[ObjectStoreContainerConfig]: + """ + Load the metadata template file from the root of the bucket, if it exists + """ + if self._is_metadata_file_present(bucket_name=bucket_name): + try: + logger.info( + f"Found metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" + ) + response_object = self.s3_client.get_object( + Bucket=bucket_name, Key=OPENMETADATA_TEMPLATE_FILE_NAME + ) + content = json.load(response_object["Body"]) + metadata_config = ObjectStoreContainerConfig.parse_obj(content) + return metadata_config + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed loading metadata file s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}" + ) + return None + + def _is_metadata_file_present(self, bucket_name: str): + return self.prefix_exits( + bucket_name=bucket_name, + prefix=OPENMETADATA_TEMPLATE_FILE_NAME, + ) + + def _generate_unstructured_container( + self, bucket_response: S3BucketResponse + ) -> S3ContainerDetails: + return S3ContainerDetails( + name=bucket_response.name, + prefix=S3_KEY_SEPARATOR, + creation_date=bucket_response.creation_date.isoformat(), + number_of_objects=self._fetch_metric( + bucket_name=bucket_response.name, metric=S3Metric.NUMBER_OF_OBJECTS + ), + size=self._fetch_metric( + bucket_name=bucket_response.name, metric=S3Metric.BUCKET_SIZE_BYTES + ), + file_formats=[], # TODO should we fetch some random files by extension here? Would it be valuable info? + data_model=None, + ) + + @staticmethod + def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]: + result = f"{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}" + if not metadata_entry.structureFormat: + logger.warning(f"Ignoring un-structured metadata entry {result}") + return None + if metadata_entry.isPartitioned and metadata_entry.partitionColumn: + result = ( + f"{result}/{metadata_entry.partitionColumn.strip(S3_KEY_SEPARATOR)}" + ) + return result + + def _get_sample_file_path( + self, bucket_name: str, metadata_entry: MetadataEntry + ) -> Optional[str]: + """ + Given a bucket and a metadata entry, returns the full path key to a file which can then be used to infer schema + or None in the case of a non-structured metadata entry, or if no such keys can be found + """ + prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry) + # no objects found in the data path + if not self.prefix_exits(bucket_name=bucket_name, prefix=prefix): + logger.warning(f"Ignoring metadata entry {prefix} - no files found") + return None + # this will look only in the first 1000 files under that path (default for list_objects_v2). + # We'd rather not do pagination here as it would incur unwanted costs + try: + response = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + candidate_keys = [ + entry["Key"] + for entry in response[S3_CLIENT_ROOT_RESPONSE] + if entry + and entry.get("Key") + and entry["Key"].endswith(metadata_entry.structureFormat) + ] + # pick a random key out of the candidates if any were returned + if candidate_keys: + result_key = secrets.choice(candidate_keys) + logger.info( + f"File {result_key} was picked to infer data structure from." + ) + return result_key + logger.warning( + f"No sample files found in {prefix} with {metadata_entry.structureFormat} extension" + ) + return None + except Exception: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error when trying to list objects in S3 bucket {bucket_name} at prefix {prefix}" + ) + return None + + def prefix_exits(self, bucket_name: str, prefix: str) -> bool: + """ + Checks if a given prefix exists in a bucket + """ + try: + res = self.s3_client.list_objects_v2( + Bucket=bucket_name, Prefix=prefix, MaxKeys=1 + ) + return S3_CLIENT_ROOT_RESPONSE in res + except Exception: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed when trying to check if S3 prefix {prefix} exists in bucket {bucket_name}" + ) + return False diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index 7c21cdc6c35..18cd80525df 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -208,3 +208,18 @@ def filter_by_mlmodel( :return: True for filtering, False otherwise """ return _filter(mlmodel_filter_pattern, mlmodel_name) + + +def filter_by_container( + container_filter_pattern: Optional[FilterPattern], container_name: str +) -> bool: + """ + Return True if the container needs to be filtered, False otherwise + + Include takes precedence over exclude + + :param container_filter_pattern: Container defining the container filtering logic + :param container_name: container name + :return: True for filtering, False otherwise + """ + return _filter(container_filter_pattern, container_name) diff --git a/ingestion/tests/unit/topology/objectstore/__init__.py b/ingestion/tests/unit/topology/objectstore/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/unit/topology/objectstore/test_objectstore.py b/ingestion/tests/unit/topology/objectstore/test_objectstore.py new file mode 100644 index 00000000000..45bfdbb32f1 --- /dev/null +++ b/ingestion/tests/unit/topology/objectstore/test_objectstore.py @@ -0,0 +1,350 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for Object store source +""" +import datetime +import io +import json +from typing import List +from unittest import TestCase +from unittest.mock import patch + +import pandas as pd +from botocore.response import StreamingBody + +from metadata.generated.schema.entity.data.container import ( + ContainerDataModel, + FileFormat, +) +from metadata.generated.schema.entity.data.table import Column, ColumnName, DataType +from metadata.generated.schema.metadataIngestion.objectstore.containerMetadataConfig import ( + MetadataEntry, + ObjectStoreContainerConfig, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.datalake.metadata import DatalakeSource +from metadata.ingestion.source.objectstore.s3.metadata import ( + S3BucketResponse, + S3ContainerDetails, + S3Source, +) + +MOCK_OBJECT_STORE_CONFIG = { + "source": { + "type": "s3", + "serviceName": "s3_test", + "serviceConnection": { + "config": {"type": "S3", "awsConfig": {"awsRegion": "us-east-1"}} + }, + "sourceConfig": { + "config": { + "type": "ObjectStoreMetadata", + "containerFilterPattern": {"includes": ["^test_*"]}, + } + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "token"}, + } + }, +} +MOCK_S3_BUCKETS_RESPONSE = { + "Buckets": [ + {"Name": "test_transactions", "CreationDate": datetime.datetime(2000, 1, 1)}, + {"Name": "test_sales", "CreationDate": datetime.datetime(2000, 2, 2)}, + {"Name": "events", "CreationDate": datetime.datetime(2000, 3, 3)}, + ] +} +MOCK_S3_METADATA_FILE_RESPONSE = { + "entries": [ + { + "dataPath": "transactions", + "structureFormat": "csv", + "isPartitioned": False, + } + ] +} +EXPECTED_S3_BUCKETS: List[S3BucketResponse] = [ + S3BucketResponse( + Name="test_transactions", CreationDate=datetime.datetime(2000, 1, 1) + ), + S3BucketResponse(Name="test_sales", CreationDate=datetime.datetime(2000, 2, 2)), +] +MOCK_S3_OBJECT_FILE_PATHS = { + "Contents": [ + {"Key": "transactions/", "Size": 0}, + {"Key": "transactions/transactions_1.csv", "Size": 69}, + {"Key": "transactions/transactions_2.csv", "Size": 55}, + ] +} + + +class ObjectStoreUnitTest(TestCase): + """ + Validate how we work with object store metadata + """ + + @patch( + "metadata.ingestion.source.objectstore.objectstore_service.ObjectStoreServiceSource.test_connection" + ) + def __init__(self, method_name: str, test_connection) -> None: + super().__init__(method_name) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(MOCK_OBJECT_STORE_CONFIG) + + # This already validates that the source can be initialized + self.object_store_source = S3Source.create( + MOCK_OBJECT_STORE_CONFIG["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + + def test_create_from_invalid_source(self): + """ + An invalid config raises an error + """ + not_object_store_source = { + "type": "s3", + "serviceName": "mysql_local", + "serviceConnection": { + "config": { + "type": "Mysql", + "username": "openmetadata_user", + "password": "openmetadata_password", + "hostPort": "localhost:3306", + "databaseSchema": "openmetadata_db", + } + }, + "sourceConfig": { + "config": { + "type": "ObjectStoreMetadata", + } + }, + } + self.assertRaises( + InvalidSourceException, + S3Source.create, + not_object_store_source, + self.config.workflowConfig.openMetadataServerConfig, + ) + + def test_s3_buckets_fetching(self): + self.object_store_source.s3_client.list_buckets = ( + lambda: MOCK_S3_BUCKETS_RESPONSE + ) + self.assertListEqual( + self.object_store_source.fetch_buckets(), EXPECTED_S3_BUCKETS + ) + + def test_load_metadata_file(self): + self.object_store_source._is_metadata_file_present = lambda bucket_name: True + self.object_store_source.s3_client.get_object = ( + lambda Bucket, Key: self._compute_mocked_metadata_file_response() + ) + container_config: ObjectStoreContainerConfig = ( + self.object_store_source._load_metadata_file(bucket_name="test") + ) + + self.assertEqual(1, len(container_config.entries)) + self.assertEqual( + MetadataEntry( + dataPath="transactions", + structureFormat="csv", + isPartitioned=False, + ), + container_config.entries[0], + ) + + def test_no_metadata_file_returned_when_file_not_present(self): + self.object_store_source._is_metadata_file_present = lambda bucket_name: False + self.assertIsNone( + self.object_store_source._load_metadata_file(bucket_name="test") + ) + + def test_generate_unstructured_container(self): + bucket_response = S3BucketResponse( + Name="test_bucket", CreationDate=datetime.datetime(2000, 1, 1) + ) + self.object_store_source._fetch_metric = lambda bucket_name, metric: 100.0 + self.assertEqual( + S3ContainerDetails( + name=bucket_response.name, + prefix="/", + number_of_objects=100, + size=100, + file_formats=[], + data_model=None, + creation_date=bucket_response.creation_date.isoformat(), + ), + self.object_store_source._generate_unstructured_container( + bucket_response=bucket_response + ), + ) + + def test_generate_structured_container(self): + self.object_store_source._get_sample_file_path = ( + lambda bucket_name, metadata_entry: "transactions/file_1.csv" + ) + self.object_store_source._fetch_metric = lambda bucket_name, metric: 100.0 + columns: List[Column] = [ + Column( + name=ColumnName(__root__="transaction_id"), + dataType=DataType.INT, + dataTypeDisplay="INT", + dataLength=1, + ), + Column( + name=ColumnName(__root__="transaction_value"), + dataType=DataType.INT, + dataTypeDisplay="INT", + dataLength=1, + ), + ] + self.object_store_source.extract_column_definitions = ( + lambda bucket_name, sample_key: columns + ) + self.assertEquals( + S3ContainerDetails( + name="test_bucket.transactions", + prefix="/transactions", + number_of_objects=100, + size=100, + file_formats=[FileFormat.csv], + data_model=ContainerDataModel(isPartitioned=False, columns=columns), + creation_date=datetime.datetime(2000, 1, 1).isoformat(), + ), + self.object_store_source._generate_container_details( + S3BucketResponse( + Name="test_bucket", CreationDate=datetime.datetime(2000, 1, 1) + ), + MetadataEntry( + dataPath="transactions", + structureFormat="csv", + isPartitioned=False, + ), + ), + ) + + # Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation + def test_extract_column_definitions(self): + DatalakeSource.get_s3_files = lambda client, key, bucket_name, client_kwargs: [ + pd.DataFrame.from_dict( + [ + {"transaction_id": 1, "transaction_value": 100}, + {"transaction_id": 2, "transaction_value": 200}, + {"transaction_id": 3, "transaction_value": 300}, + ] + ) + ] + self.assertListEqual( + [ + Column( + name=ColumnName(__root__="transaction_id"), + dataType=DataType.INT, + dataTypeDisplay="INT", + dataLength=1, + ), + Column( + name=ColumnName(__root__="transaction_value"), + dataType=DataType.INT, + dataTypeDisplay="INT", + dataLength=1, + ), + ], + self.object_store_source.extract_column_definitions( + bucket_name="test_bucket", sample_key="test.json" + ), + ) + + def test_get_sample_file_prefix_for_structured_and_partitioned_metadata(self): + input_metadata = MetadataEntry( + dataPath="transactions", + structureFormat="parquet", + isPartitioned=True, + partitionColumn="date", + ) + self.assertEquals( + "transactions/date", + self.object_store_source._get_sample_file_prefix( + metadata_entry=input_metadata + ), + ) + + def test_get_sample_file_prefix_for_unstructured_metadata(self): + input_metadata = MetadataEntry(dataPath="transactions") + self.assertIsNone( + self.object_store_source._get_sample_file_prefix( + metadata_entry=input_metadata + ) + ) + + def test_get_sample_file_prefix_for_structured_and_not_partitioned_metadata(self): + input_metadata = MetadataEntry( + dataPath="transactions", + structureFormat="csv", + isPartitioned=False, + ) + self.assertEquals( + "transactions", + self.object_store_source._get_sample_file_prefix( + metadata_entry=input_metadata + ), + ) + + def test_get_sample_file_path_with_invalid_prefix(self): + self.object_store_source._get_sample_file_prefix = ( + lambda metadata_entry: "/transactions" + ) + self.assertIsNone( + self.object_store_source._get_sample_file_path( + bucket_name="test_bucket", + metadata_entry=MetadataEntry( + dataPath="invalid_path", + structureFormat="csv", + isPartitioned=False, + ), + ) + ) + + def test_get_sample_file_path_randomly(self): + self.object_store_source._get_sample_file_prefix = ( + lambda metadata_entry: "/transactions" + ) + self.object_store_source.prefix_exits = lambda bucket_name, prefix: True + self.object_store_source.s3_client.list_objects_v2 = ( + lambda Bucket, Prefix: MOCK_S3_OBJECT_FILE_PATHS + ) + + candidate = self.object_store_source._get_sample_file_path( + bucket_name="test_bucket", + metadata_entry=MetadataEntry( + dataPath="/transactions", + structureFormat="csv", + isPartitioned=False, + ), + ) + self.assertTrue( + candidate + in ["transactions/transactions_1.csv", "transactions/transactions_2.csv"] + ) + + @staticmethod + def _compute_mocked_metadata_file_response(): + body_encoded = json.dumps(MOCK_S3_METADATA_FILE_RESPONSE).encode() + body = StreamingBody(io.BytesIO(body_encoded), len(body_encoded)) + return {"Body": body} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/objectstores/ContainerResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/objectstores/ContainerResourceTest.java index 794cf30ff27..53ffe6ee585 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/objectstores/ContainerResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/objectstores/ContainerResourceTest.java @@ -178,9 +178,9 @@ public class ContainerResourceTest extends EntityResourceTest