Add ingestion for S3 buckets

This commit is contained in:
Matt 2021-11-28 01:40:19 -08:00 committed by Alberto Miorin
parent 5c9c8612dd
commit ededd58e25
6 changed files with 134 additions and 6 deletions

View File

@ -1,7 +1,7 @@
{ {
"$id": "https://open-metadata.org/schema/entity/data/location.json", "$id": "https://open-metadata.org/schema/entity/data/location.json",
"$schema": "http://json-schema.org/draft-07/schema#", "$schema": "http://json-schema.org/draft-07/schema#",
"title": "Table", "title": "Location",
"description": "This schema defines the Location entity. A Location can contain the data of a table or group other sublocation together.", "description": "This schema defines the Location entity. A Location can contain the data of a table or group other sublocation together.",
"type": "object", "type": "object",
"javaType": "org.openmetadata.catalog.entity.data.Location", "javaType": "org.openmetadata.catalog.entity.data.Location",

View File

@ -17,7 +17,7 @@
2. Extract the distribution tar.gz file and run the following command 2. Extract the distribution tar.gz file and run the following command
``` ```
cd open-metadata-<version>/bootstrap cd openmetadata-<version>/bootstrap
sh bootstrap_storage.sh drop-create sh bootstrap_storage.sh drop-create
``` ```
* Bootstrap ES with indexes and load sample data into MySQL * Bootstrap ES with indexes and load sample data into MySQL
@ -26,8 +26,8 @@
2. Once the logs indicate that the instance is up, run the following commands from the top-level directory 2. Once the logs indicate that the instance is up, run the following commands from the top-level directory
``` ```
python3 -m venv /tmp/venv python3 -m venv venv
source /tmp/venv/bin/activate source venv/bin/activate
pip install -r ingestion/requirements.txt pip install -r ingestion/requirements.txt
make install_dev generate make install_dev generate
cd ingestion cd ingestion
@ -54,7 +54,7 @@ You can create a _distribution_ as follows.
$ mvn clean install $ mvn clean install
# Create the binary distribution. # Create the binary distribution.
$ cd dist && mvn package $ cd openmetadata-dist && mvn package
``` ```
The binaries will be created at: The binaries will be created at:

View File

@ -1,2 +1,4 @@
boto3==1.20.14
botocore==1.23.14
datamodel-code-generator==0.11.14 datamodel-code-generator==0.11.14
twine twine

View File

@ -8,6 +8,7 @@ from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.dbtmodel import DbtModel from metadata.generated.schema.entity.data.dbtmodel import DbtModel
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.metrics import Metrics from metadata.generated.schema.entity.data.metrics import Metrics
from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline
@ -18,6 +19,7 @@ from metadata.generated.schema.entity.services.dashboardService import Dashboard
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.entity.teams.user import User
@ -141,6 +143,11 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
): ):
return "/pipelines" return "/pipelines"
if issubclass(
entity, get_args(Union[Location, self.get_create_entity_type(Location)])
):
return "/locations"
if issubclass( if issubclass(
entity, get_args(Union[Table, self.get_create_entity_type(Table)]) entity, get_args(Union[Table, self.get_create_entity_type(Table)])
): ):
@ -207,6 +214,14 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
): ):
return "/services/pipelineServices" return "/services/pipelineServices"
if issubclass(
entity,
get_args(
Union[StorageService, self.get_create_entity_type(StorageService)]
),
):
return "/services/storageServices"
raise MissingEntityTypeException( raise MissingEntityTypeException(
f"Missing {entity} type when generating suffixes" f"Missing {entity} type when generating suffixes"
) )

View File

@ -15,7 +15,6 @@
import logging import logging
import traceback import traceback
from typing import List
from pydantic import ValidationError from pydantic import ValidationError
@ -30,6 +29,9 @@ from metadata.generated.schema.api.data.createDatabase import (
from metadata.generated.schema.api.data.createDbtModel import ( from metadata.generated.schema.api.data.createDbtModel import (
CreateDbtModelEntityRequest, CreateDbtModelEntityRequest,
) )
from metadata.generated.schema.api.data.createLocation import (
CreateLocationEntityRequest,
)
from metadata.generated.schema.api.data.createMlModel import CreateMlModelEntityRequest from metadata.generated.schema.api.data.createMlModel import CreateMlModelEntityRequest
from metadata.generated.schema.api.data.createPipeline import ( from metadata.generated.schema.api.data.createPipeline import (
CreatePipelineEntityRequest, CreatePipelineEntityRequest,
@ -40,6 +42,7 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest
from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest
from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
@ -114,6 +117,8 @@ class MetadataRestSink(Sink):
self.write_charts(record) self.write_charts(record)
elif isinstance(record, Dashboard): elif isinstance(record, Dashboard):
self.write_dashboards(record) self.write_dashboards(record)
elif isinstance(record, Location):
self.write_locations(record)
elif isinstance(record, Pipeline): elif isinstance(record, Pipeline):
self.write_pipelines(record) self.write_pipelines(record)
elif isinstance(record, AddLineage): elif isinstance(record, AddLineage):
@ -294,6 +299,24 @@ class MetadataRestSink(Sink):
chart_references.append(self.charts_dict[chart_id]) chart_references.append(self.charts_dict[chart_id])
return chart_references return chart_references
def write_locations(self, location: Location):
try:
location_request = CreateLocationEntityRequest(
name=location.name,
description=location.description,
locationType=location.locationType,
tags=location.tags,
owner=location.owner,
service=location.service,
)
created_location = self.metadata.create_or_update(location_request)
logger.info(f"Successfully ingested Location {created_location.name}")
self.status.records_written(f"Location: {created_location.name}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest Location {location.name}")
logger.error(err)
self.status.failure(f"Location: {location.name}")
def write_pipelines(self, pipeline: Pipeline): def write_pipelines(self, pipeline: Pipeline):
try: try:
pipeline_request = CreatePipelineEntityRequest( pipeline_request = CreatePipelineEntityRequest(

View File

@ -0,0 +1,88 @@
import logging
import os
import uuid
from typing import Iterable
import boto3
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceEntityRequest,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.storage import StorageServiceType
from metadata.ingestion.api.common import ConfigModel, Record, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.services.storageService import StorageService
logger: logging.Logger = logging.getLogger(__name__)
class S3SourceConfig(ConfigModel):
service_name: str = ""
aws_access_key_id: str
aws_secret_access_key: str
class S3Source(Source):
config: S3SourceConfig
status: SourceStatus
def __init__(
self, config: S3SourceConfig, metadata_config: MetadataServerConfig, ctx
):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
os.environ["AWS_ACCESS_KEY_ID"] = self.config.aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = self.config.aws_secret_access_key
self.status = SourceStatus()
self.service = get_storage_service_or_create(
config.service_name, metadata_config
)
self.s3 = boto3.resource("s3")
@classmethod
def create(
cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext
):
config = S3SourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[Record]:
try:
for bucket in self.s3.buckets.all():
self.status.scanned(bucket.name)
yield Location(
id=uuid.uuid4(),
name=bucket.name,
displayName=bucket.name,
locationType=LocationType.Bucket,
service=EntityReference(id=self.service.id, type="storageService"),
)
except Exception as e:
self.status.failure("error", str(e))
def get_status(self) -> SourceStatus:
return self.status
def get_storage_service_or_create(
service_name: str, metadata_config: MetadataServerConfig
) -> StorageService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=StorageService, fqdn=service_name)
if service is not None:
return service
return metadata.create_or_update(
CreateStorageServiceEntityRequest(
name=service_name,
serviceType=StorageServiceType.S3,
)
)