diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json index 10f5a15c94b..f336b185a48 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json @@ -1,7 +1,7 @@ { "$id": "https://open-metadata.org/schema/entity/data/location.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Table", + "title": "Location", "description": "This schema defines the Location entity. A Location can contain the data of a table or group other sublocation together.", "type": "object", "javaType": "org.openmetadata.catalog.entity.data.Location", diff --git a/docs/open-source-community/developer/build-code-run-tests.md b/docs/open-source-community/developer/build-code-run-tests.md index bbedf0c329c..52a05242dbb 100644 --- a/docs/open-source-community/developer/build-code-run-tests.md +++ b/docs/open-source-community/developer/build-code-run-tests.md @@ -17,7 +17,7 @@ 2. Extract the distribution tar.gz file and run the following command ``` - cd open-metadata-/bootstrap + cd openmetadata-/bootstrap sh bootstrap_storage.sh drop-create ``` * Bootstrap ES with indexes and load sample data into MySQL @@ -26,8 +26,8 @@ 2. Once the logs indicate that the instance is up, run the following commands from the top-level directory ``` - python3 -m venv /tmp/venv - source /tmp/venv/bin/activate + python3 -m venv venv + source venv/bin/activate pip install -r ingestion/requirements.txt make install_dev generate cd ingestion @@ -54,7 +54,7 @@ You can create a _distribution_ as follows. $ mvn clean install # Create the binary distribution. -$ cd dist && mvn package +$ cd openmetadata-dist && mvn package ``` The binaries will be created at: diff --git a/ingestion/requirements-dev.txt b/ingestion/requirements-dev.txt index 73ac1326509..72b20747ed6 100644 --- a/ingestion/requirements-dev.txt +++ b/ingestion/requirements-dev.txt @@ -1,2 +1,4 @@ +boto3==1.20.14 +botocore==1.23.14 datamodel-code-generator==0.11.14 twine \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 449f9872811..b8221783cbd 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -8,6 +8,7 @@ from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.dbtmodel import DbtModel +from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.metrics import Metrics from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline @@ -18,6 +19,7 @@ from metadata.generated.schema.entity.services.dashboardService import Dashboard from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User @@ -141,6 +143,11 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]): ): return "/pipelines" + if issubclass( + entity, get_args(Union[Location, self.get_create_entity_type(Location)]) + ): + return "/locations" + if issubclass( entity, get_args(Union[Table, self.get_create_entity_type(Table)]) ): @@ -207,6 +214,14 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]): ): return "/services/pipelineServices" + if issubclass( + entity, + get_args( + Union[StorageService, self.get_create_entity_type(StorageService)] + ), + ): + return "/services/storageServices" + raise MissingEntityTypeException( f"Missing {entity} type when generating suffixes" ) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 4110640bcc6..0f6a40677c2 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -15,7 +15,6 @@ import logging import traceback -from typing import List from pydantic import ValidationError @@ -30,6 +29,9 @@ from metadata.generated.schema.api.data.createDatabase import ( from metadata.generated.schema.api.data.createDbtModel import ( CreateDbtModelEntityRequest, ) +from metadata.generated.schema.api.data.createLocation import ( + CreateLocationEntityRequest, +) from metadata.generated.schema.api.data.createMlModel import CreateMlModelEntityRequest from metadata.generated.schema.api.data.createPipeline import ( CreatePipelineEntityRequest, @@ -40,6 +42,7 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineage from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest from metadata.generated.schema.entity.data.chart import ChartType +from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.type.entityReference import EntityReference @@ -114,6 +117,8 @@ class MetadataRestSink(Sink): self.write_charts(record) elif isinstance(record, Dashboard): self.write_dashboards(record) + elif isinstance(record, Location): + self.write_locations(record) elif isinstance(record, Pipeline): self.write_pipelines(record) elif isinstance(record, AddLineage): @@ -294,6 +299,24 @@ class MetadataRestSink(Sink): chart_references.append(self.charts_dict[chart_id]) return chart_references + def write_locations(self, location: Location): + try: + location_request = CreateLocationEntityRequest( + name=location.name, + description=location.description, + locationType=location.locationType, + tags=location.tags, + owner=location.owner, + service=location.service, + ) + created_location = self.metadata.create_or_update(location_request) + logger.info(f"Successfully ingested Location {created_location.name}") + self.status.records_written(f"Location: {created_location.name}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest Location {location.name}") + logger.error(err) + self.status.failure(f"Location: {location.name}") + def write_pipelines(self, pipeline: Pipeline): try: pipeline_request = CreatePipelineEntityRequest( diff --git a/ingestion/src/metadata/ingestion/source/s3.py b/ingestion/src/metadata/ingestion/source/s3.py new file mode 100644 index 00000000000..a4046c7d046 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/s3.py @@ -0,0 +1,88 @@ +import logging +import os +import uuid +from typing import Iterable + +import boto3 + +from metadata.generated.schema.api.services.createStorageService import ( + CreateStorageServiceEntityRequest, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.storage import StorageServiceType +from metadata.ingestion.api.common import ConfigModel, Record, WorkflowContext +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.generated.schema.entity.data.location import Location, LocationType +from metadata.generated.schema.entity.services.storageService import StorageService + +logger: logging.Logger = logging.getLogger(__name__) + + +class S3SourceConfig(ConfigModel): + service_name: str = "" + aws_access_key_id: str + aws_secret_access_key: str + + +class S3Source(Source): + config: S3SourceConfig + status: SourceStatus + + def __init__( + self, config: S3SourceConfig, metadata_config: MetadataServerConfig, ctx + ): + super().__init__(ctx) + self.config = config + self.metadata_config = metadata_config + os.environ["AWS_ACCESS_KEY_ID"] = self.config.aws_access_key_id + os.environ["AWS_SECRET_ACCESS_KEY"] = self.config.aws_secret_access_key + self.status = SourceStatus() + self.service = get_storage_service_or_create( + config.service_name, metadata_config + ) + self.s3 = boto3.resource("s3") + + @classmethod + def create( + cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext + ): + config = S3SourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[Record]: + try: + for bucket in self.s3.buckets.all(): + self.status.scanned(bucket.name) + yield Location( + id=uuid.uuid4(), + name=bucket.name, + displayName=bucket.name, + locationType=LocationType.Bucket, + service=EntityReference(id=self.service.id, type="storageService"), + ) + except Exception as e: + self.status.failure("error", str(e)) + + def get_status(self) -> SourceStatus: + return self.status + + +def get_storage_service_or_create( + service_name: str, metadata_config: MetadataServerConfig +) -> StorageService: + metadata = OpenMetadata(metadata_config) + service = metadata.get_by_name(entity=StorageService, fqdn=service_name) + if service is not None: + return service + return metadata.create_or_update( + CreateStorageServiceEntityRequest( + name=service_name, + serviceType=StorageServiceType.S3, + ) + )