Fixes #9064 - Ingestion for S3 bucket containers (#10515)

* Prep skeleton for object store connector

* First of of ingestion piepline for bucket containers.

* addressed PR review comments.

* swaped random with secrets due to security warning.

* linter issues.

* added prefix population for containers.

* more linting

* removed temp file.

* object store unit tests, PR comments and minor refactorings

* docs update and new TestConnection approach.

* linting

* removed isStructured field from containermeta schema

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Cristian Calugaru 2023-03-26 10:35:34 +01:00 committed by GitHub
parent 0f3e929b75
commit 3ffde9a293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1098 additions and 14 deletions

View File

@ -26,6 +26,8 @@ logger = utils_logger()
class AWSServices(Enum):
S3 = "s3"
CLOUDWATCH = "cloudwatch"
DYNAMO_DB = "dynamodb"
GLUE = "glue"
SAGEMAKER = "sagemaker"
@ -157,6 +159,12 @@ class AWSClient:
)
return session.resource(service_name=service_name)
def get_s3_client(self):
return self.get_client(AWSServices.S3.value)
def get_cloudwatch_client(self):
return self.get_client(AWSServices.CLOUDWATCH.value)
def get_dynamo_client(self):
return self.get_resource(AWSServices.DYNAMO_DB.value)

View File

@ -0,0 +1,156 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for ingesting Object Storage services
"""
from abc import ABC, abstractmethod
from typing import Any, Iterable
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.objectstoreService import (
ObjectStoreConnection,
ObjectStoreService,
)
from metadata.generated.schema.metadataIngestion.objectstoreServiceMetadataPipeline import (
ObjectStoreServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.ingestion.source.objectstore.s3.connection import S3ObjectStoreClient
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class ObjectStoreServiceTopology(ServiceTopology):
root = TopologyNode(
producer="get_services",
stages=[
NodeStage(
type_=ObjectStoreService,
context="objectstore_service",
processor="yield_create_request_objectstore_service",
overwrite=False,
must_return=True,
),
],
children=["container"],
)
container = TopologyNode(
producer="get_containers",
stages=[
NodeStage(
type_=Container,
context="containers",
processor="yield_create_container_requests",
consumer=["objectstore_service"],
nullable=True,
)
],
)
class ObjectStoreSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
def scanned(self, record: str) -> None:
self.success.append(record)
logger.debug(f"Scanned: {record}")
def filter(self, key: str, reason: str) -> None:
self.filtered.append(key)
logger.debug(f"Filtered {key}: {reason}")
class ObjectStoreServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Base class for Object Store Services.
It implements the topology and context.
"""
status: ObjectStoreSourceStatus
source_config: ObjectStoreServiceMetadataPipeline
config: WorkflowSource
metadata: OpenMetadata
# Big union of types we want to fetch dynamically
service_connection: ObjectStoreConnection.__fields__["config"].type_
topology = ObjectStoreServiceTopology()
context = create_source_context(topology)
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: ObjectStoreServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.connection: S3ObjectStoreClient = get_connection(self.service_connection)
self.test_connection()
self.status = ObjectStoreSourceStatus()
@abstractmethod
def get_containers(self) -> Iterable[Any]:
"""
Retrieve all containers for the service
"""
@abstractmethod
def yield_create_container_requests(
self, container_details: Any
) -> Iterable[CreateContainerRequest]:
"""Generate the create container requests based on the received details"""
def get_status(self) -> SourceStatus:
return self.status
def close(self):
pass
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def prepare(self):
pass
def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
test_connection_fn(self.connection)
def yield_create_request_objectstore_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=ObjectStoreService, config=config
)

View File

@ -0,0 +1,70 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source connection handler for S3 object store. For this to work, it requires the following S3 permissions for all
the buckets which require ingestion: s3:ListBucket, s3:GetObject and s3:GetBucketLocation
The cloudwatch client is used to fetch the total size in bytes for a bucket, and the total nr of files. This requires
the cloudwatch:GetMetricData permissions
"""
from dataclasses import dataclass
from functools import partial
from botocore.client import BaseClient
from metadata.clients.aws_client import AWSClient
from metadata.generated.schema.entity.services.connections.objectstore.s3ObjectStoreConnection import (
S3StoreConnection,
)
from metadata.ingestion.connections.test_connections import (
TestConnectionResult,
TestConnectionStep,
test_connection_steps,
)
@dataclass
class S3ObjectStoreClient:
s3_client: BaseClient
cloudwatch_client: BaseClient
def get_connection(connection: S3StoreConnection) -> S3ObjectStoreClient:
"""
Returns 2 clients - the s3 client and the cloudwatch client needed for total nr of objects and total size
"""
aws_client = AWSClient(connection.awsConfig)
return S3ObjectStoreClient(
s3_client=aws_client.get_client(service_name="s3"),
cloudwatch_client=aws_client.get_client(service_name="cloudwatch"),
)
def test_connection(client: S3ObjectStoreClient) -> TestConnectionResult:
"""
Test connection
"""
steps = [
TestConnectionStep(
function=client.s3_client.list_buckets,
name="List buckets",
),
TestConnectionStep(
function=partial(
client.cloudwatch_client.list_metrics,
Namespace="AWS/S3",
),
name="Get Cloudwatch AWS/S3 metrics",
mandatory=False,
),
]
return test_connection_steps(steps)

View File

@ -0,0 +1,402 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""S3 object store extraction metadata"""
import json
import secrets
import traceback
from datetime import datetime, timedelta
from enum import Enum
from typing import Iterable, List, Optional
from pandas import DataFrame
from pydantic import Extra, Field, ValidationError
from pydantic.main import BaseModel
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
from metadata.generated.schema.entity.data import container
from metadata.generated.schema.entity.data.container import ContainerDataModel
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.objectstore.s3ObjectStoreConnection import (
S3StoreConnection,
)
from metadata.generated.schema.metadataIngestion.objectstore.containerMetadataConfig import (
MetadataEntry,
ObjectStoreContainerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper
from metadata.ingestion.source.objectstore.objectstore_service import (
ObjectStoreServiceSource,
)
from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
S3_CLIENT_ROOT_RESPONSE = "Contents"
OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json"
S3_KEY_SEPARATOR = "/"
class S3Metric(Enum):
NUMBER_OF_OBJECTS = "NumberOfObjects"
BUCKET_SIZE_BYTES = "BucketSizeBytes"
class S3BucketResponse(BaseModel):
"""
Class modelling a response received from s3_client.list_buckets operation
"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name", title="Bucket Name", alias="Name")
creation_date: datetime = Field(
...,
description="Timestamp of Bucket creation in ISO format",
title="Creation Timestamp",
alias="CreationDate",
)
class S3ContainerDetails(BaseModel):
"""Class mapping container details used to create the container requests"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name", title="Bucket Name")
prefix: str = Field(..., description="Prefix for the container", title="Prefix")
number_of_objects: float = Field(
..., description="Total nr. of objects", title="Nr. of objects"
)
size: float = Field(
...,
description="Total size in bytes of all objects",
title="Total size(bytes) of objects",
)
file_formats: Optional[List[container.FileFormat]] = Field(
..., description="File formats", title="File formats"
)
data_model: Optional[ContainerDataModel] = Field(
..., description="Data Model of the container", title="Data Model"
)
creation_date: str = Field(
...,
description="Timestamp of Bucket creation in ISO format",
title="Creation Timestamp",
)
class S3Source(ObjectStoreServiceSource):
"""
Source implementation to ingest S3 buckets data.
"""
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.s3_client = self.connection.s3_client
self.cloudwatch_client = self.connection.cloudwatch_client
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: S3StoreConnection = config.serviceConnection.__root__.config
if not isinstance(connection, S3StoreConnection):
raise InvalidSourceException(
f"Expected S3StoreConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_containers(self) -> Iterable[S3ContainerDetails]:
bucket_results = self.fetch_buckets()
try:
for bucket_response in bucket_results:
metadata_config = self._load_metadata_file(
bucket_name=bucket_response.name
)
if metadata_config:
for metadata_entry in metadata_config.entries:
logger.info(
f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} "
f"and generating structured container"
)
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
)
if structured_container:
yield structured_container
else:
logger.info(
f"No metadata found for bucket {bucket_response.name}, generating unstructured container.."
)
yield self._generate_unstructured_container(
bucket_response=bucket_response
)
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Validation error while creating Container from bucket details - {err}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Wild error while creating Container from bucket details - {err}"
)
def yield_create_container_requests(
self, container_details: S3ContainerDetails
) -> Iterable[CreateContainerRequest]:
yield CreateContainerRequest(
name=container_details.name,
prefix=container_details.prefix,
numberOfObjects=container_details.number_of_objects,
size=container_details.size,
dataModel=container_details.data_model,
service=self.context.objectstore_service.fullyQualifiedName,
)
def _generate_container_details(
self, bucket_response: S3BucketResponse, metadata_entry: MetadataEntry
) -> Optional[S3ContainerDetails]:
bucket_name = bucket_response.name
sample_key = self._get_sample_file_path(
bucket_name=bucket_name, metadata_entry=metadata_entry
)
# if we have a sample file to fetch a schema from
if sample_key:
columns = self.extract_column_definitions(bucket_name, sample_key)
if columns:
return S3ContainerDetails(
name=f"{bucket_name}.{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}",
prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}",
creation_date=bucket_response.creation_date.isoformat(),
number_of_objects=self._fetch_metric(
bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS
),
size=self._fetch_metric(
bucket_name=bucket_name, metric=S3Metric.BUCKET_SIZE_BYTES
),
file_formats=[container.FileFormat(metadata_entry.structureFormat)],
data_model=ContainerDataModel(
isPartitioned=metadata_entry.isPartitioned, columns=columns
),
)
return None
def extract_column_definitions(
self, bucket_name: str, sample_key: str
) -> List[Column]:
client_args = self.service_connection.awsConfig
data_structure_details = DatalakeSource.get_s3_files(
self.s3_client,
key=sample_key,
bucket_name=bucket_name,
client_kwargs=client_args,
)
columns = []
if isinstance(data_structure_details, DataFrame):
columns = DatalakeSource.get_columns(data_structure_details)
if isinstance(data_structure_details, list) and data_structure_details:
columns = DatalakeSource.get_columns(data_structure_details[0])
if isinstance(data_structure_details, DatalakeColumnWrapper):
columns = data_structure_details.columns # pylint: disable=no-member
return columns
def fetch_buckets(self) -> List[S3BucketResponse]:
results: List[S3BucketResponse] = []
try:
# No pagination required, as there is a hard 1000 limit on nr of buckets per aws account
for bucket in self.s3_client.list_buckets().get("Buckets") or []:
if filter_by_container(
self.source_config.containerFilterPattern,
container_name=bucket["Name"],
):
self.status.filter(bucket["Name"], "Bucket Filtered Out")
else:
results.append(S3BucketResponse.parse_obj(bucket))
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to fetch buckets list - {err}")
return results
def _fetch_metric(self, bucket_name: str, metric: S3Metric) -> float:
try:
raw_result = self.cloudwatch_client.get_metric_data(
MetricDataQueries=[
{
"Id": "total_nr_of_object_request",
"MetricStat": {
"Metric": {
"Namespace": "AWS/S3",
"MetricName": metric.value,
"Dimensions": [
{"Name": "BucketName", "Value": bucket_name},
{
"Name": "StorageType",
# StandardStorage-only support for BucketSizeBytes for now
"Value": "StandardStorage"
if metric == S3Metric.BUCKET_SIZE_BYTES
else "AllStorageTypes",
},
],
},
"Period": 60,
"Stat": "Average",
"Unit": "Bytes"
if metric == S3Metric.BUCKET_SIZE_BYTES
else "Count",
},
},
],
StartTime=datetime.now() - timedelta(days=2),
# metrics generated daily, ensure there is at least 1 entry
EndTime=datetime.now(),
ScanBy="TimestampDescending",
)
if raw_result["MetricDataResults"]:
first_metric = raw_result["MetricDataResults"][0]
if first_metric["StatusCode"] == "Complete" and first_metric["Values"]:
return int(first_metric["Values"][0])
except Exception:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed fetching metric {metric.value} for bucket {bucket_name}, returning 0"
)
return 0
def _load_metadata_file(
self, bucket_name: str
) -> Optional[ObjectStoreContainerConfig]:
"""
Load the metadata template file from the root of the bucket, if it exists
"""
if self._is_metadata_file_present(bucket_name=bucket_name):
try:
logger.info(
f"Found metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
)
response_object = self.s3_client.get_object(
Bucket=bucket_name, Key=OPENMETADATA_TEMPLATE_FILE_NAME
)
content = json.load(response_object["Body"])
metadata_config = ObjectStoreContainerConfig.parse_obj(content)
return metadata_config
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed loading metadata file s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}"
)
return None
def _is_metadata_file_present(self, bucket_name: str):
return self.prefix_exits(
bucket_name=bucket_name,
prefix=OPENMETADATA_TEMPLATE_FILE_NAME,
)
def _generate_unstructured_container(
self, bucket_response: S3BucketResponse
) -> S3ContainerDetails:
return S3ContainerDetails(
name=bucket_response.name,
prefix=S3_KEY_SEPARATOR,
creation_date=bucket_response.creation_date.isoformat(),
number_of_objects=self._fetch_metric(
bucket_name=bucket_response.name, metric=S3Metric.NUMBER_OF_OBJECTS
),
size=self._fetch_metric(
bucket_name=bucket_response.name, metric=S3Metric.BUCKET_SIZE_BYTES
),
file_formats=[], # TODO should we fetch some random files by extension here? Would it be valuable info?
data_model=None,
)
@staticmethod
def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]:
result = f"{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}"
if not metadata_entry.structureFormat:
logger.warning(f"Ignoring un-structured metadata entry {result}")
return None
if metadata_entry.isPartitioned and metadata_entry.partitionColumn:
result = (
f"{result}/{metadata_entry.partitionColumn.strip(S3_KEY_SEPARATOR)}"
)
return result
def _get_sample_file_path(
self, bucket_name: str, metadata_entry: MetadataEntry
) -> Optional[str]:
"""
Given a bucket and a metadata entry, returns the full path key to a file which can then be used to infer schema
or None in the case of a non-structured metadata entry, or if no such keys can be found
"""
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
# no objects found in the data path
if not self.prefix_exits(bucket_name=bucket_name, prefix=prefix):
logger.warning(f"Ignoring metadata entry {prefix} - no files found")
return None
# this will look only in the first 1000 files under that path (default for list_objects_v2).
# We'd rather not do pagination here as it would incur unwanted costs
try:
response = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry
and entry.get("Key")
and entry["Key"].endswith(metadata_entry.structureFormat)
]
# pick a random key out of the candidates if any were returned
if candidate_keys:
result_key = secrets.choice(candidate_keys)
logger.info(
f"File {result_key} was picked to infer data structure from."
)
return result_key
logger.warning(
f"No sample files found in {prefix} with {metadata_entry.structureFormat} extension"
)
return None
except Exception:
logger.debug(traceback.format_exc())
logger.warning(
f"Error when trying to list objects in S3 bucket {bucket_name} at prefix {prefix}"
)
return None
def prefix_exits(self, bucket_name: str, prefix: str) -> bool:
"""
Checks if a given prefix exists in a bucket
"""
try:
res = self.s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, MaxKeys=1
)
return S3_CLIENT_ROOT_RESPONSE in res
except Exception:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed when trying to check if S3 prefix {prefix} exists in bucket {bucket_name}"
)
return False

View File

@ -208,3 +208,18 @@ def filter_by_mlmodel(
:return: True for filtering, False otherwise
"""
return _filter(mlmodel_filter_pattern, mlmodel_name)
def filter_by_container(
container_filter_pattern: Optional[FilterPattern], container_name: str
) -> bool:
"""
Return True if the container needs to be filtered, False otherwise
Include takes precedence over exclude
:param container_filter_pattern: Container defining the container filtering logic
:param container_name: container name
:return: True for filtering, False otherwise
"""
return _filter(container_filter_pattern, container_name)

View File

@ -0,0 +1,350 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for Object store source
"""
import datetime
import io
import json
from typing import List
from unittest import TestCase
from unittest.mock import patch
import pandas as pd
from botocore.response import StreamingBody
from metadata.generated.schema.entity.data.container import (
ContainerDataModel,
FileFormat,
)
from metadata.generated.schema.entity.data.table import Column, ColumnName, DataType
from metadata.generated.schema.metadataIngestion.objectstore.containerMetadataConfig import (
MetadataEntry,
ObjectStoreContainerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.objectstore.s3.metadata import (
S3BucketResponse,
S3ContainerDetails,
S3Source,
)
MOCK_OBJECT_STORE_CONFIG = {
"source": {
"type": "s3",
"serviceName": "s3_test",
"serviceConnection": {
"config": {"type": "S3", "awsConfig": {"awsRegion": "us-east-1"}}
},
"sourceConfig": {
"config": {
"type": "ObjectStoreMetadata",
"containerFilterPattern": {"includes": ["^test_*"]},
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
}
},
}
MOCK_S3_BUCKETS_RESPONSE = {
"Buckets": [
{"Name": "test_transactions", "CreationDate": datetime.datetime(2000, 1, 1)},
{"Name": "test_sales", "CreationDate": datetime.datetime(2000, 2, 2)},
{"Name": "events", "CreationDate": datetime.datetime(2000, 3, 3)},
]
}
MOCK_S3_METADATA_FILE_RESPONSE = {
"entries": [
{
"dataPath": "transactions",
"structureFormat": "csv",
"isPartitioned": False,
}
]
}
EXPECTED_S3_BUCKETS: List[S3BucketResponse] = [
S3BucketResponse(
Name="test_transactions", CreationDate=datetime.datetime(2000, 1, 1)
),
S3BucketResponse(Name="test_sales", CreationDate=datetime.datetime(2000, 2, 2)),
]
MOCK_S3_OBJECT_FILE_PATHS = {
"Contents": [
{"Key": "transactions/", "Size": 0},
{"Key": "transactions/transactions_1.csv", "Size": 69},
{"Key": "transactions/transactions_2.csv", "Size": 55},
]
}
class ObjectStoreUnitTest(TestCase):
"""
Validate how we work with object store metadata
"""
@patch(
"metadata.ingestion.source.objectstore.objectstore_service.ObjectStoreServiceSource.test_connection"
)
def __init__(self, method_name: str, test_connection) -> None:
super().__init__(method_name)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(MOCK_OBJECT_STORE_CONFIG)
# This already validates that the source can be initialized
self.object_store_source = S3Source.create(
MOCK_OBJECT_STORE_CONFIG["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
def test_create_from_invalid_source(self):
"""
An invalid config raises an error
"""
not_object_store_source = {
"type": "s3",
"serviceName": "mysql_local",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"password": "openmetadata_password",
"hostPort": "localhost:3306",
"databaseSchema": "openmetadata_db",
}
},
"sourceConfig": {
"config": {
"type": "ObjectStoreMetadata",
}
},
}
self.assertRaises(
InvalidSourceException,
S3Source.create,
not_object_store_source,
self.config.workflowConfig.openMetadataServerConfig,
)
def test_s3_buckets_fetching(self):
self.object_store_source.s3_client.list_buckets = (
lambda: MOCK_S3_BUCKETS_RESPONSE
)
self.assertListEqual(
self.object_store_source.fetch_buckets(), EXPECTED_S3_BUCKETS
)
def test_load_metadata_file(self):
self.object_store_source._is_metadata_file_present = lambda bucket_name: True
self.object_store_source.s3_client.get_object = (
lambda Bucket, Key: self._compute_mocked_metadata_file_response()
)
container_config: ObjectStoreContainerConfig = (
self.object_store_source._load_metadata_file(bucket_name="test")
)
self.assertEqual(1, len(container_config.entries))
self.assertEqual(
MetadataEntry(
dataPath="transactions",
structureFormat="csv",
isPartitioned=False,
),
container_config.entries[0],
)
def test_no_metadata_file_returned_when_file_not_present(self):
self.object_store_source._is_metadata_file_present = lambda bucket_name: False
self.assertIsNone(
self.object_store_source._load_metadata_file(bucket_name="test")
)
def test_generate_unstructured_container(self):
bucket_response = S3BucketResponse(
Name="test_bucket", CreationDate=datetime.datetime(2000, 1, 1)
)
self.object_store_source._fetch_metric = lambda bucket_name, metric: 100.0
self.assertEqual(
S3ContainerDetails(
name=bucket_response.name,
prefix="/",
number_of_objects=100,
size=100,
file_formats=[],
data_model=None,
creation_date=bucket_response.creation_date.isoformat(),
),
self.object_store_source._generate_unstructured_container(
bucket_response=bucket_response
),
)
def test_generate_structured_container(self):
self.object_store_source._get_sample_file_path = (
lambda bucket_name, metadata_entry: "transactions/file_1.csv"
)
self.object_store_source._fetch_metric = lambda bucket_name, metric: 100.0
columns: List[Column] = [
Column(
name=ColumnName(__root__="transaction_id"),
dataType=DataType.INT,
dataTypeDisplay="INT",
dataLength=1,
),
Column(
name=ColumnName(__root__="transaction_value"),
dataType=DataType.INT,
dataTypeDisplay="INT",
dataLength=1,
),
]
self.object_store_source.extract_column_definitions = (
lambda bucket_name, sample_key: columns
)
self.assertEquals(
S3ContainerDetails(
name="test_bucket.transactions",
prefix="/transactions",
number_of_objects=100,
size=100,
file_formats=[FileFormat.csv],
data_model=ContainerDataModel(isPartitioned=False, columns=columns),
creation_date=datetime.datetime(2000, 1, 1).isoformat(),
),
self.object_store_source._generate_container_details(
S3BucketResponse(
Name="test_bucket", CreationDate=datetime.datetime(2000, 1, 1)
),
MetadataEntry(
dataPath="transactions",
structureFormat="csv",
isPartitioned=False,
),
),
)
# Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation
def test_extract_column_definitions(self):
DatalakeSource.get_s3_files = lambda client, key, bucket_name, client_kwargs: [
pd.DataFrame.from_dict(
[
{"transaction_id": 1, "transaction_value": 100},
{"transaction_id": 2, "transaction_value": 200},
{"transaction_id": 3, "transaction_value": 300},
]
)
]
self.assertListEqual(
[
Column(
name=ColumnName(__root__="transaction_id"),
dataType=DataType.INT,
dataTypeDisplay="INT",
dataLength=1,
),
Column(
name=ColumnName(__root__="transaction_value"),
dataType=DataType.INT,
dataTypeDisplay="INT",
dataLength=1,
),
],
self.object_store_source.extract_column_definitions(
bucket_name="test_bucket", sample_key="test.json"
),
)
def test_get_sample_file_prefix_for_structured_and_partitioned_metadata(self):
input_metadata = MetadataEntry(
dataPath="transactions",
structureFormat="parquet",
isPartitioned=True,
partitionColumn="date",
)
self.assertEquals(
"transactions/date",
self.object_store_source._get_sample_file_prefix(
metadata_entry=input_metadata
),
)
def test_get_sample_file_prefix_for_unstructured_metadata(self):
input_metadata = MetadataEntry(dataPath="transactions")
self.assertIsNone(
self.object_store_source._get_sample_file_prefix(
metadata_entry=input_metadata
)
)
def test_get_sample_file_prefix_for_structured_and_not_partitioned_metadata(self):
input_metadata = MetadataEntry(
dataPath="transactions",
structureFormat="csv",
isPartitioned=False,
)
self.assertEquals(
"transactions",
self.object_store_source._get_sample_file_prefix(
metadata_entry=input_metadata
),
)
def test_get_sample_file_path_with_invalid_prefix(self):
self.object_store_source._get_sample_file_prefix = (
lambda metadata_entry: "/transactions"
)
self.assertIsNone(
self.object_store_source._get_sample_file_path(
bucket_name="test_bucket",
metadata_entry=MetadataEntry(
dataPath="invalid_path",
structureFormat="csv",
isPartitioned=False,
),
)
)
def test_get_sample_file_path_randomly(self):
self.object_store_source._get_sample_file_prefix = (
lambda metadata_entry: "/transactions"
)
self.object_store_source.prefix_exits = lambda bucket_name, prefix: True
self.object_store_source.s3_client.list_objects_v2 = (
lambda Bucket, Prefix: MOCK_S3_OBJECT_FILE_PATHS
)
candidate = self.object_store_source._get_sample_file_path(
bucket_name="test_bucket",
metadata_entry=MetadataEntry(
dataPath="/transactions",
structureFormat="csv",
isPartitioned=False,
),
)
self.assertTrue(
candidate
in ["transactions/transactions_1.csv", "transactions/transactions_2.csv"]
)
@staticmethod
def _compute_mocked_metadata_file_response():
body_encoded = json.dumps(MOCK_S3_METADATA_FILE_RESPONSE).encode()
body = StreamingBody(io.BytesIO(body_encoded), len(body_encoded))
return {"Body": body}

View File

@ -178,9 +178,9 @@ public class ContainerResourceTest extends EntityResourceTest<Container, CreateC
new CreateContainer()
.withName("0_root")
.withService(S3_OBJECT_STORE_SERVICE_REFERENCE.getName())
.withNumberOfObjects(0)
.withNumberOfObjects(0.0)
.withOwner(USER_WITH_DATA_CONSUMER_ROLE.getEntityReference())
.withSize(0);
.withSize(0.0);
Container rootContainer = createAndCheckEntity(createRootContainer, ADMIN_AUTH_HEADERS);
CreateContainer createChildOneContainer =
@ -188,8 +188,8 @@ public class ContainerResourceTest extends EntityResourceTest<Container, CreateC
.withName("1_child_1")
.withService(S3_OBJECT_STORE_SERVICE_REFERENCE.getName())
.withParent(rootContainer.getEntityReference())
.withNumberOfObjects(0)
.withSize(0);
.withNumberOfObjects(0.0)
.withSize(0.0);
Container childOneContainer = createAndCheckEntity(createChildOneContainer, ADMIN_AUTH_HEADERS);
CreateContainer createChildTwoContainer =
@ -197,8 +197,8 @@ public class ContainerResourceTest extends EntityResourceTest<Container, CreateC
.withName("2_child_2")
.withService(S3_OBJECT_STORE_SERVICE_REFERENCE.getName())
.withParent(rootContainer.getEntityReference())
.withNumberOfObjects(0)
.withSize(0);
.withNumberOfObjects(0.0)
.withSize(0.0);
Container childTwoContainer = createAndCheckEntity(createChildTwoContainer, ADMIN_AUTH_HEADERS);
CreateContainer createChildThreeContainer =
@ -206,8 +206,8 @@ public class ContainerResourceTest extends EntityResourceTest<Container, CreateC
.withName("3_child_3")
.withService(S3_OBJECT_STORE_SERVICE_REFERENCE.getName())
.withParent(childOneContainer.getEntityReference())
.withNumberOfObjects(0)
.withSize(0);
.withNumberOfObjects(0.0)
.withSize(0.0);
Container childThreeContainer = createAndCheckEntity(createChildThreeContainer, ADMIN_AUTH_HEADERS);
// GET .../containers?fields=parent,children
@ -280,8 +280,8 @@ public class ContainerResourceTest extends EntityResourceTest<Container, CreateC
.withService(S3_OBJECT_STORE_SERVICE_REFERENCE.getFullyQualifiedName())
.withDataModel(PARTITIONED_DATA_MODEL)
.withFileFormats(FILE_FORMATS)
.withNumberOfObjects(3)
.withSize(4096);
.withNumberOfObjects(3.0)
.withSize(4096.0);
}
@Override

View File

@ -40,12 +40,12 @@
},
"numberOfObjects": {
"description": "The number of objects/files this container has.",
"type": "integer",
"type": "number",
"default": null
},
"size": {
"description": "The total size in KB this container has.",
"type": "integer",
"type": "number",
"default": null
},
"fileFormats": {

View File

@ -116,12 +116,12 @@
},
"numberOfObjects": {
"description": "The number of objects/files this container has.",
"type": "integer",
"type": "number",
"default": null
},
"size": {
"description": "The total size in KB this container has.",
"type": "integer",
"type": "number",
"default": null
},
"fileFormats": {

View File

@ -0,0 +1,54 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/containerMetadataConfig.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ObjectStoreContainerConfig",
"description": "Definition of the properties contained by an object store container template config file",
"javaType": "org.openmetadata.schema.metadataIngestion.objectstore.ContainerMetadataConfig",
"definitions": {
"metadataEntry": {
"description": "Config properties for a container found in a user-supplied metadata config",
"javaType": "org.openmetadata.schema.metadataIngestion.objectstore.ContainerMetadataEntry",
"type": "object",
"properties": {
"dataPath": {
"title": "Data path",
"description": "The path where the data resides in the container, excluding the bucket name",
"type": "string"
},
"structureFormat": {
"title": "Schema format",
"description": "What's the schema format for the container, eg. avro, parquet, csv.",
"type": "string",
"default": null
},
"isPartitioned": {
"title": "Is Partitioned",
"description": "Flag indicating whether the container's data is partitioned",
"type": "boolean",
"default": false
},
"partitionColumn": {
"title": "Partition Column",
"description": "What is the partition column in case the container's data is partitioned",
"type": "string",
"default": null
}
},
"required": [
"dataPath"
]
}
},
"properties": {
"entries": {
"description": "List of metadata entries for the bucket containing information about where data resides and its structure",
"type": "array",
"items": {
"$ref": "#/definitions/metadataEntry"
},
"default": null
}
},
"required": ["entries"],
"additionalProperties": false
}

View File

@ -0,0 +1,26 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/objectStoreServiceMetadataPipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ObjectStoreServiceMetadataPipeline",
"description": "ObjectStoreService Metadata Pipeline Configuration.",
"definitions": {
"objectstoreMetadataConfigType": {
"description": "Object Store Source Config Metadata Pipeline type",
"type": "string",
"enum": ["ObjectStoreMetadata"],
"default": "ObjectStoreMetadata"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/objectstoreMetadataConfigType",
"default": "ObjectStoreMetadata"
},
"containerFilterPattern": {
"description": "Regex to only fetch containers that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
}
},
"additionalProperties": false
}

View File

@ -36,6 +36,9 @@
{
"$ref": "mlmodelServiceMetadataPipeline.json"
},
{
"$ref": "objectstoreServiceMetadataPipeline.json"
},
{
"$ref": "testSuitePipeline.json"
},