Handle container parents (#11026)

This commit is contained in:
Pere Miquel Brull 2023-04-12 18:36:04 +02:00 committed by GitHub
parent 17893f5bd9
commit 47cef52fa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 61 deletions

View File

@ -14,15 +14,17 @@ import secrets
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from typing import Iterable, List, Optional from typing import Dict, Iterable, List, Optional
from pandas import DataFrame from pandas import DataFrame
from pydantic import Extra, Field, ValidationError from pydantic import ValidationError
from pydantic.main import BaseModel
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
from metadata.generated.schema.entity.data import container from metadata.generated.schema.entity.data import container
from metadata.generated.schema.entity.data.container import ContainerDataModel from metadata.generated.schema.entity.data.container import (
Container,
ContainerDataModel,
)
from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
@ -37,9 +39,14 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper
from metadata.ingestion.source.storage.s3.models import (
S3BucketResponse,
S3ContainerDetails,
)
from metadata.ingestion.source.storage.storage_service import StorageServiceSource from metadata.ingestion.source.storage.storage_service import StorageServiceSource
from metadata.utils.filters import filter_by_container from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -56,52 +63,6 @@ class S3Metric(Enum):
BUCKET_SIZE_BYTES = "BucketSizeBytes" BUCKET_SIZE_BYTES = "BucketSizeBytes"
class S3BucketResponse(BaseModel):
"""
Class modelling a response received from s3_client.list_buckets operation
"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name", title="Bucket Name", alias="Name")
creation_date: datetime = Field(
...,
description="Timestamp of Bucket creation in ISO format",
title="Creation Timestamp",
alias="CreationDate",
)
class S3ContainerDetails(BaseModel):
"""Class mapping container details used to create the container requests"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name", title="Bucket Name")
prefix: str = Field(..., description="Prefix for the container", title="Prefix")
number_of_objects: float = Field(
..., description="Total nr. of objects", title="Nr. of objects"
)
size: float = Field(
...,
description="Total size in bytes of all objects",
title="Total size(bytes) of objects",
)
file_formats: Optional[List[container.FileFormat]] = Field(
..., description="File formats", title="File formats"
)
data_model: Optional[ContainerDataModel] = Field(
..., description="Data Model of the container", title="Data Model"
)
creation_date: str = Field(
...,
description="Timestamp of Bucket creation in ISO format",
title="Creation Timestamp",
)
class S3Source(StorageServiceSource): class S3Source(StorageServiceSource):
""" """
Source implementation to ingest S3 buckets data. Source implementation to ingest S3 buckets data.
@ -112,6 +73,8 @@ class S3Source(StorageServiceSource):
self.s3_client = self.connection.s3_client self.s3_client = self.connection.s3_client
self.cloudwatch_client = self.connection.cloudwatch_client self.cloudwatch_client = self.connection.cloudwatch_client
self._bucket_cache: Dict[str, Container] = {}
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -127,6 +90,13 @@ class S3Source(StorageServiceSource):
for bucket_response in bucket_results: for bucket_response in bucket_results:
try: try:
# We always try to generate the parent container (the bucket)
yield self._generate_unstructured_container(
bucket_response=bucket_response
)
self._bucket_cache[bucket_response.name] = self.context.container
metadata_config = self._load_metadata_file( metadata_config = self._load_metadata_file(
bucket_name=bucket_response.name bucket_name=bucket_response.name
) )
@ -141,16 +111,14 @@ class S3Source(StorageServiceSource):
] = self._generate_container_details( ] = self._generate_container_details(
bucket_response=bucket_response, bucket_response=bucket_response,
metadata_entry=metadata_entry, metadata_entry=metadata_entry,
parent=EntityReference(
id=self._bucket_cache[bucket_response.name].id.__root__,
type="container",
),
) )
if structured_container: if structured_container:
yield structured_container yield structured_container
else:
logger.info(
f"No metadata found for bucket {bucket_response.name}, generating unstructured container.."
)
yield self._generate_unstructured_container(
bucket_response=bucket_response
)
except ValidationError as err: except ValidationError as err:
error = f"Validation error while creating Container from bucket details - {err}" error = f"Validation error while creating Container from bucket details - {err}"
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
@ -174,10 +142,14 @@ class S3Source(StorageServiceSource):
size=container_details.size, size=container_details.size,
dataModel=container_details.data_model, dataModel=container_details.data_model,
service=self.context.objectstore_service.fullyQualifiedName, service=self.context.objectstore_service.fullyQualifiedName,
parent=container_details.parent,
) )
def _generate_container_details( def _generate_container_details(
self, bucket_response: S3BucketResponse, metadata_entry: MetadataEntry self,
bucket_response: S3BucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
) -> Optional[S3ContainerDetails]: ) -> Optional[S3ContainerDetails]:
bucket_name = bucket_response.name bucket_name = bucket_response.name
sample_key = self._get_sample_file_path( sample_key = self._get_sample_file_path(
@ -188,7 +160,7 @@ class S3Source(StorageServiceSource):
columns = self.extract_column_definitions(bucket_name, sample_key) columns = self.extract_column_definitions(bucket_name, sample_key)
if columns: if columns:
return S3ContainerDetails( return S3ContainerDetails(
name=f"{bucket_name}.{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", name=metadata_entry.dataPath.strip(S3_KEY_SEPARATOR),
prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}",
creation_date=bucket_response.creation_date.isoformat(), creation_date=bucket_response.creation_date.isoformat(),
number_of_objects=self._fetch_metric( number_of_objects=self._fetch_metric(
@ -201,6 +173,7 @@ class S3Source(StorageServiceSource):
data_model=ContainerDataModel( data_model=ContainerDataModel(
isPartitioned=metadata_entry.isPartitioned, columns=columns isPartitioned=metadata_entry.isPartitioned, columns=columns
), ),
parent=parent,
) )
return None return None

View File

@ -0,0 +1,76 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
S3 custom pydantic models
"""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Extra, Field
from metadata.generated.schema.entity.data.container import (
ContainerDataModel,
FileFormat,
)
from metadata.generated.schema.type.entityReference import EntityReference
class S3BucketResponse(BaseModel):
"""
Class modelling a response received from s3_client.list_buckets operation
"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name", alias="Name")
creation_date: datetime = Field(
...,
description="Timestamp of Bucket creation in ISO format",
alias="CreationDate",
)
class S3ContainerDetails(BaseModel):
"""
Class mapping container details used to create the container requests
"""
class Config:
extra = Extra.forbid
name: str = Field(..., description="Bucket name")
prefix: str = Field(..., description="Prefix for the container")
number_of_objects: float = Field(
...,
description="Total nr. of objects",
)
size: float = Field(
...,
description="Total size in bytes of all objects",
title="Total size(bytes) of objects",
)
file_formats: Optional[List[FileFormat]] = Field(
...,
description="File formats",
)
data_model: Optional[ContainerDataModel] = Field(
...,
description="Data Model of the container",
)
creation_date: str = Field(
...,
description="Timestamp of Bucket creation in ISO format",
)
parent: Optional[EntityReference] = Field(
None,
description="Reference to the parent container",
)

View File

@ -66,7 +66,7 @@ class StorageServiceTopology(ServiceTopology):
stages=[ stages=[
NodeStage( NodeStage(
type_=Container, type_=Container,
context="containers", context="container",
processor="yield_create_container_requests", processor="yield_create_container_requests",
consumer=["objectstore_service"], consumer=["objectstore_service"],
nullable=True, nullable=True,

View File

@ -14,6 +14,7 @@ Unit tests for Object store source
import datetime import datetime
import io import io
import json import json
import uuid
from typing import List from typing import List
from unittest import TestCase from unittest import TestCase
from unittest.mock import patch from unittest.mock import patch
@ -33,6 +34,7 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.storage.s3.metadata import ( from metadata.ingestion.source.storage.s3.metadata import (
@ -218,15 +220,19 @@ class StorageUnitTest(TestCase):
self.object_store_source.extract_column_definitions = ( self.object_store_source.extract_column_definitions = (
lambda bucket_name, sample_key: columns lambda bucket_name, sample_key: columns
) )
entity_ref = EntityReference(id=uuid.uuid4(), type="container")
self.assertEquals( self.assertEquals(
S3ContainerDetails( S3ContainerDetails(
name="test_bucket.transactions", name="transactions",
prefix="/transactions", prefix="/transactions",
number_of_objects=100, number_of_objects=100,
size=100, size=100,
file_formats=[FileFormat.csv], file_formats=[FileFormat.csv],
data_model=ContainerDataModel(isPartitioned=False, columns=columns), data_model=ContainerDataModel(isPartitioned=False, columns=columns),
creation_date=datetime.datetime(2000, 1, 1).isoformat(), creation_date=datetime.datetime(2000, 1, 1).isoformat(),
parent=entity_ref,
), ),
self.object_store_source._generate_container_details( self.object_store_source._generate_container_details(
S3BucketResponse( S3BucketResponse(
@ -237,6 +243,7 @@ class StorageUnitTest(TestCase):
structureFormat="csv", structureFormat="csv",
isPartitioned=False, isPartitioned=False,
), ),
parent=entity_ref,
), ),
) )