Fix #17195: Support automated unstructured files ingestion & tags (#17196)

This commit is contained in:
Mayur Singal 2024-07-31 00:05:58 +05:30 committed by GitHub
parent d2d0fba5c3
commit 840a102887
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 395 additions and 45 deletions

View File

@ -14,7 +14,7 @@ import secrets
import traceback
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Tuple
from pydantic import ValidationError
@ -42,12 +42,16 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.storage.s3.models import (
S3BucketResponse,
S3ContainerDetails,
S3Tag,
S3TagResponse,
)
from metadata.ingestion.source.storage.storage_service import (
KEY_SEPARATOR,
@ -59,11 +63,14 @@ from metadata.readers.file.config_source_factory import get_reader
from metadata.utils import fqn
from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_label
logger = ingestion_logger()
S3_CLIENT_ROOT_RESPONSE = "Contents"
WILD_CARD = "*"
class S3Metric(Enum):
NUMBER_OF_OBJECTS = "NumberOfObjects"
@ -81,6 +88,7 @@ class S3Source(StorageServiceSource):
self.cloudwatch_client = self.connection.cloudwatch_client
self._bucket_cache: Dict[str, Container] = {}
self._unstructured_container_cache: Dict[str, Tuple[str, str]] = {}
self.s3_reader = get_reader(config_source=S3Config(), client=self.s3_client)
@classmethod
@ -113,6 +121,10 @@ class S3Source(StorageServiceSource):
entity=Container, fqn=container_fqn
)
self._bucket_cache[bucket_name] = container_entity
self._unstructured_container_cache[container_fqn] = (
container_entity.id.root,
KEY_SEPARATOR,
)
parent_entity: EntityReference = EntityReference(
id=self._bucket_cache[bucket_name].id.root, type="container"
)
@ -130,6 +142,11 @@ class S3Source(StorageServiceSource):
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
yield from self._generate_unstructured_containers(
bucket_response=bucket_response,
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
# nothing else do to for the current bucket, skipping to the next
continue
# If no global file, or no valid entries in the manifest, check for bucket level metadata file
@ -140,6 +157,14 @@ class S3Source(StorageServiceSource):
entries=metadata_config.entries,
parent=parent_entity,
)
yield from self._generate_unstructured_containers(
bucket_response=bucket_response,
entries=metadata_config.entries,
parent=parent_entity,
)
# clean up the cache after each bucket
self._unstructured_container_cache.clear()
except ValidationError as err:
self.status.failed(
@ -158,6 +183,69 @@ class S3Source(StorageServiceSource):
)
)
def _get_bucket_name_and_key(self, full_path: str) -> Tuple[str, str]:
"""
Method to get the bucket name and key from the full path
"""
if full_path:
parts = full_path.removeprefix("s3://").split(KEY_SEPARATOR)
if len(parts) > 2:
return parts[0], KEY_SEPARATOR.join(parts[1:])
return None, None
def get_tag_by_fqn(self, entity_fqn: str) -> Optional[List[TagLabel]]:
"""
Pick up the tags registered in the context
searching by entity FQN
"""
try:
tag_labels = []
for tag_and_category in self.context.get().tags or []:
if tag_and_category.fqn and tag_and_category.fqn.root == entity_fqn:
tag_label = get_tag_label(
metadata=self.metadata,
tag_name=tag_and_category.tag_request.name.root,
classification_name=tag_and_category.classification_request.name.root,
)
if tag_label:
tag_labels.append(tag_label)
return tag_labels or None
except Exception as exc:
logger.debug(f"Failed to ingest tags due to: {exc}")
logger.debug(traceback.format_exc())
return None
def yield_container_tags(
self, container_details: S3ContainerDetails
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
From topology. To be run for each container
"""
try:
bucket_name, key = self._get_bucket_name_and_key(container_details.fullPath)
if (
container_details.leaf_container
and container_details.container_fqn
and bucket_name
and key
):
tags = self.s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
tags_list: List[S3Tag] = S3TagResponse.model_validate(tags).TagSet
for tag in tags_list:
yield from get_ometa_tag_and_classification(
tag_fqn=FullyQualifiedEntityName(
container_details.container_fqn
),
tags=[tag.Value],
classification_name=tag.Key,
tag_description="S3 TAG VALUE",
classification_description="S3 TAG KEY",
)
except Exception as exc:
logger.debug(f"Failed to ingest tags due to: {exc}")
logger.debug(traceback.format_exc())
def yield_create_container_requests(
self, container_details: S3ContainerDetails
) -> Iterable[Either[CreateContainerRequest]]:
@ -172,6 +260,7 @@ class S3Source(StorageServiceSource):
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
fullPath=container_details.fullPath,
tags=self.get_tag_by_fqn(container_details.container_fqn),
)
yield Either(right=container_request)
self.register_record(container_request=container_request)
@ -195,31 +284,10 @@ class S3Source(StorageServiceSource):
parent: Optional[EntityReference] = None,
) -> Optional[S3ContainerDetails]:
bucket_name = bucket_response.name
object_size = self.get_size(
bucket_name=bucket_name,
file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR),
)
if not metadata_entry.structureFormat and object_size:
prefix = f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
return S3ContainerDetails(
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
prefix=prefix,
creation_date=bucket_response.creation_date.isoformat()
if bucket_response.creation_date
else None,
file_formats=[],
data_model=None,
parent=parent,
size=self.get_size(
bucket_name=bucket_name,
file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR),
),
fullPath=self._get_full_path(bucket_name, prefix),
sourceUrl=self._get_object_source_url(
bucket_name=bucket_name,
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
),
)
if not metadata_entry.structureFormat:
return None
sample_key = self._get_sample_file_path(
bucket_name=bucket_name, metadata_entry=metadata_entry
)
@ -244,12 +312,6 @@ class S3Source(StorageServiceSource):
creation_date=bucket_response.creation_date.isoformat()
if bucket_response.creation_date
else None,
number_of_objects=self._fetch_metric(
bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS
),
size=self._fetch_metric(
bucket_name=bucket_name, metric=S3Metric.BUCKET_SIZE_BYTES
),
file_formats=[container.FileFormat(metadata_entry.structureFormat)],
data_model=ContainerDataModel(
isPartitioned=metadata_entry.isPartitioned, columns=columns
@ -268,8 +330,7 @@ class S3Source(StorageServiceSource):
bucket_response: S3BucketResponse,
entries: List[MetadataEntry],
parent: Optional[EntityReference] = None,
) -> List[S3ContainerDetails]:
result: List[S3ContainerDetails] = []
) -> Iterable[S3ContainerDetails]:
for metadata_entry in entries:
logger.info(
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
@ -283,9 +344,160 @@ class S3Source(StorageServiceSource):
parent=parent,
)
if structured_container:
result.append(structured_container)
yield structured_container
return result
def is_valid_unstructured_file(self, accepted_extensions: List, key: str) -> bool:
# Split the string into a list of values
if WILD_CARD in accepted_extensions:
return True
for ext in accepted_extensions:
if key.endswith(ext):
return True
return False
def _yield_parents_of_unstructured_container(
self,
bucket_name: str,
list_of_parent: List[str],
parent: Optional[EntityReference] = None,
):
full_path = self._get_full_path(bucket_name)
sub_parent = parent
for i in range(len(list_of_parent) - 1):
container_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
bucket_name,
*list_of_parent[: i + 1],
)
)
if container_fqn in self._unstructured_container_cache:
parent_id, full_path = self._unstructured_container_cache[container_fqn]
sub_parent = EntityReference(id=parent_id, type="container")
continue
yield S3ContainerDetails(
name=list_of_parent[i],
prefix=full_path,
file_formats=[],
parent=sub_parent,
fullPath=full_path + KEY_SEPARATOR + list_of_parent[i],
sourceUrl=self._get_object_source_url(
bucket_name=bucket_name,
prefix=self._clean_path(
full_path + KEY_SEPARATOR + list_of_parent[i]
),
),
)
container_entity = self.metadata.get_by_name(
entity=Container, fqn=container_fqn
)
full_path += KEY_SEPARATOR + list_of_parent[i]
self._unstructured_container_cache[container_fqn] = (
container_entity.id.root,
full_path,
)
sub_parent = EntityReference(id=container_entity.id.root, type="container")
def _yield_nested_unstructured_containers(
self,
bucket_response: S3BucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
):
bucket_name = bucket_response.name
response = self.s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=metadata_entry.dataPath
)
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry and entry.get("Key")
]
for key in candidate_keys:
if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key):
logger.info(
f"Extracting metadata from path {key.strip(KEY_SEPARATOR)} "
f"and generating unstructured container"
)
list_of_parent = key.strip(KEY_SEPARATOR).split(KEY_SEPARATOR)
yield from self._yield_parents_of_unstructured_container(
bucket_name, list_of_parent, parent
)
parent_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
bucket_name,
*list_of_parent[:-1],
)
)
parent_id, parent_path = self._unstructured_container_cache[parent_fqn]
container_fqn = fqn._build( # pylint: disable=protected-access
*(
self.context.get().objectstore_service,
bucket_name,
*list_of_parent,
)
)
size = self.get_size(bucket_name, key)
yield S3ContainerDetails(
name=list_of_parent[-1],
prefix=self._clean_path(parent_path + KEY_SEPARATOR),
file_formats=[],
size=size,
container_fqn=container_fqn,
leaf_container=True,
parent=EntityReference(id=parent_id, type="container"),
fullPath=self._get_full_path(bucket_name, key),
sourceUrl=self._get_object_source_url(
bucket_name=bucket_name,
prefix=self._clean_path(
parent_path + KEY_SEPARATOR + list_of_parent[-1]
),
),
)
def _generate_unstructured_containers(
self,
bucket_response: S3BucketResponse,
entries: List[MetadataEntry],
parent: Optional[EntityReference] = None,
) -> Iterable[S3ContainerDetails]:
bucket_name = bucket_response.name
for metadata_entry in entries:
if metadata_entry.structureFormat:
continue
if metadata_entry.unstructuredFormats:
yield from self._yield_nested_unstructured_containers(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
else:
logger.info(
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating unstructured container"
)
prefix = (
f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
)
yield S3ContainerDetails(
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
prefix=prefix,
file_formats=[],
data_model=None,
parent=parent,
size=self.get_size(
bucket_name=bucket_name,
file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR),
),
fullPath=self._get_full_path(bucket_name, prefix),
sourceUrl=self._get_object_source_url(
bucket_name=bucket_name,
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
),
)
def fetch_buckets(self) -> List[S3BucketResponse]:
results: List[S3BucketResponse] = []

View File

@ -41,6 +41,19 @@ class S3BucketResponse(BaseModel):
)
class S3Tag(BaseModel):
Key: str
Value: str
class S3TagResponse(BaseModel):
"""
Class modelling a response received from s3_client.get_bucket_tagging operation
"""
TagSet: List[S3Tag] = Field([], description="List of tags")
class S3ContainerDetails(BaseModel):
"""
Class mapping container details used to create the container requests
@ -50,6 +63,10 @@ class S3ContainerDetails(BaseModel):
extra="forbid",
)
leaf_container: bool = Field(False, description="Leaf container")
container_fqn: Optional[str] = Field(
None, description="Fully qualified name of the container"
)
name: str = Field(..., description="Bucket name")
prefix: str = Field(..., description="Prefix for the container")
number_of_objects: Optional[float] = Field(

View File

@ -41,6 +41,7 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
@ -99,6 +100,13 @@ class StorageServiceTopology(ServiceTopology):
] = TopologyNode(
producer="get_containers",
stages=[
NodeStage(
type_=OMetaTagAndClassification,
context="tags",
processor="yield_tag_details",
nullable=True,
store_all_in_context=True,
),
NodeStage(
type_=Container,
context="container",
@ -106,7 +114,7 @@ class StorageServiceTopology(ServiceTopology):
consumer=["objectstore_service"],
nullable=True,
use_cache=True,
)
),
],
)
@ -188,6 +196,22 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
def prepare(self):
"""By default, nothing needs to be taken care of when loading the source"""
def yield_container_tags(
self, container_details: Any
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
From topology. To be run for each container
"""
def yield_tag_details(
self, container_details: Any
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
From topology. To be run for each container
"""
if self.source_config.includeTags:
yield from self.yield_container_tags(container_details) or []
def register_record(self, container_request: CreateContainerRequest) -> None:
"""
Mark the container record as scanned and update

Binary file not shown.

After

Width:  |  Height:  |  Size: 210 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

View File

@ -8,6 +8,10 @@
{
"dataPath": "solved.png"
},
{
"dataPath": "docs_images",
"unstructuredFormats": ["png", "webp"]
},
{
"dataPath": "transactions_separator",
"structureFormat": "csv",

View File

@ -34,7 +34,7 @@ def test_s3_ingestion(metadata, ingest_s3_storage, service_name):
entity=Container, fqn=f"{service_name}.test-bucket", fields=["*"]
)
# The bucket has children and no dataModel
assert 6 == len(bucket.children.root)
assert 7 == len(bucket.children.root)
assert not bucket.dataModel
# We can validate the children
@ -86,3 +86,57 @@ def test_s3_ingestion(metadata, ingest_s3_storage, service_name):
)
assert not png_file.dataModel
assert png_file.size > 1000
# validate unstructured parent containers
container1: Container = metadata.get_by_name(
entity=Container,
fqn=f"{service_name}.test-bucket.docs_images",
fields=["*"],
)
assert not container1.dataModel
container2: Container = metadata.get_by_name(
entity=Container,
fqn=f"{service_name}.test-bucket.docs_images.storage",
fields=["*"],
)
assert not container2.dataModel
container3: Container = metadata.get_by_name(
entity=Container,
fqn=f"{service_name}.test-bucket.docs_images.storage.s3",
fields=["*"],
)
assert not container3.dataModel
# validate images container
image1: Container = metadata.get_by_name(
entity=Container,
fqn=f'{service_name}.test-bucket.docs_images.storage.s3."add-new-service.png"',
fields=["*"],
)
assert not image1.dataModel
assert image1.size > 100
image1: Container = metadata.get_by_name(
entity=Container,
fqn=f'{service_name}.test-bucket.docs_images.storage."s3-demo.png"',
fields=["*"],
)
assert not image1.dataModel
assert image1.size > 100
image2: Container = metadata.get_by_name(
entity=Container,
fqn=f'{service_name}.test-bucket.docs_images.synapse."add-new-service.webp"',
fields=["*"],
)
assert not image2.dataModel
assert image2.size > 100
image3: Container = metadata.get_by_name(
entity=Container,
fqn=f'{service_name}.test-bucket.docs_images.domodatabase."scopes.jpeg"',
fields=["*"],
)
assert image3 is None

View File

@ -274,8 +274,6 @@ class StorageUnitTest(TestCase):
S3ContainerDetails(
name="transactions",
prefix="/transactions",
number_of_objects=100,
size=100,
file_formats=[FileFormat.csv],
data_model=ContainerDataModel(isPartitioned=False, columns=columns),
creation_date=datetime.datetime(2000, 1, 1).isoformat(),

View File

@ -57,6 +57,19 @@ Again, this information will be added on top of the inferred schema from the dat
{% /codeInfo %}
{% codeInfo srNumber=6 %}
**Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files.
In case you want to ingest a single unstructured file, then just specifying the full path of the unstructured file in `datapath` would be enough for ingestion.
In case you want to ingest all unstructured files with a specific extension for example `pdf` & `png` then you can provide the folder name containing such files in `dataPath` and list of extensions in the `unstructuredFormats` field.
In case you want to ingest all unstructured files with irrespective of their file type or extension then you can provide the folder name containing such files in `dataPath` and `["*"]` in the `unstructuredFormats` field.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="openmetadata.json" %}
@ -110,6 +123,19 @@ Again, this information will be added on top of the inferred schema from the dat
}
]
}
```
```json {% srNumber=6 %}
{
"dataPath": "path/to/solution.pdf",
},
{
"dataPath": "path/to/unstructured_folder_png_pdf",
"unstructuredFormats": ["png","pdf"]
},
{
"dataPath": "path/to/unstructured_folder_all",
"unstructuredFormats": ["*"]
}
]
}
```

View File

@ -7,8 +7,8 @@ slug: /connectors/storage/adls
name="ADLS"
stage="PROD"
platform="Collate"
availableFeatures=["Metadata"]
unavailableFeatures=[]
availableFeatures=["Metadata", "Structured Containers"]
unavailableFeatures=["Unstructured Containers"]
/ %}
This page contains the setup guide and reference information for the ADLS connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/storage/gcs
name="GCS"
stage="PROD"
platform="Collate"
availableFeatures=["Metadata"]
unavailableFeatures=[]
availableFeatures=["Metadata", "Structured Containers"]
unavailableFeatures=["Unstructured Containers"]
/ %}
This page contains the setup guide and reference information for the GCS connector.

View File

@ -7,7 +7,7 @@ slug: /connectors/storage/s3
name="S3"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata"]
availableFeatures=["Metadata", "Structured Containers", "Unstructured Containers"]
unavailableFeatures=[]
/ %}

View File

@ -22,6 +22,15 @@
"type": "string",
"default": null
},
"unstructuredFormats": {
"title": "Unstructured format",
"description": "What the unstructured formats you want to ingest, eg. png, pdf, jpg.",
"type": "array",
"items": {
"type": "string"
},
"default": null
},
"separator": {
"title": "Separator",
"description": "For delimited files such as CSV, what is the separator being used?",

View File

@ -63,6 +63,12 @@
"description": "Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source. If the toggle is set to true, the metadata fetched from the source will override the existing metadata in the OpenMetadata server. If the toggle is set to false, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. This is applicable for fields like description, tags, owner and displayName",
"type": "boolean",
"default": false
},
"includeTags": {
"description": "Optional configuration to toggle the tags ingestion.",
"type": "boolean",
"default": false,
"title": "Include Tags"
}
},
"additionalProperties": false