diff --git a/ingestion/examples/sample_data/objectcontainers/containers.json b/ingestion/examples/sample_data/objectcontainers/containers.json new file mode 100644 index 00000000000..e323625dd32 --- /dev/null +++ b/ingestion/examples/sample_data/objectcontainers/containers.json @@ -0,0 +1,198 @@ +[ + { + "name": "transactions", + "displayName": "Company Transactions", + "description": "Bucket containing all the company's transactions", + "parent": null, + "prefix": "/transactions/", + "dataModel": { + "isPartitioned": true, + "columns": [ + { + "name": "transaction_id", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "The ID of the executed transaction. This column is the primary key for this table.", + "tags": [], + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "merchant", + "dataType": "VARCHAR", + "dataLength": 100, + "dataTypeDisplay": "varchar", + "description": "The merchant for this transaction.", + "tags": [], + "ordinalPosition": 2 + }, + { + "name": "transaction_time", + "dataType": "TIMESTAMP", + "dataTypeDisplay": "timestamp", + "description": "The time the transaction took place.", + "tags": [], + "ordinalPosition": 3 + } + ] + }, + "numberOfObjects": "50", + "size": "102400", + "fileFormats": [ + "parquet" + ] + }, + { + "name": "departments", + "displayName": "Company departments", + "description": "Bucket containing company department information", + "parent": null, + "prefix": "/departments/", + "dataModel": null, + "numberOfObjects": "2", + "size": "2048", + "fileFormats": [ + "csv" + ] + }, + { + "name": "engineering", + "displayName": "Engineering department", + "description": "Bucket containing engineering department information", + "parent": "s3_object_store_sample.departments", + "prefix": "/departments/engineering/", + "dataModel": null, + "numberOfObjects": "5", + "size": "14336", + "fileFormats": [ + "zip" + ] + }, + { + "name": "finance", + "displayName": "Finance department", + "description": "Bucket containing finance department information", + "parent": "s3_object_store_sample.departments", + "prefix": "/departments/finance/", + "dataModel": { + "isPartitioned": false, + "columns": [ + { + "name": "department_id", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "The ID of the department. This column is the primary key for this table.", + "tags": [], + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "budget_total_value", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "The department's budget for the current year.", + "tags": [], + "ordinalPosition": 2 + }, + { + "name": "notes", + "dataType": "VARCHAR", + "dataLength": 100, + "dataTypeDisplay": "varchar", + "description": "Notes concerning sustainability for the budget.", + "tags": [], + "ordinalPosition": 3 + }, + { + "name": "budget_executor", + "dataType": "VARCHAR", + "dataTypeDisplay": "varchar", + "description": "The responsible finance lead for the budget execution", + "tags": [], + "ordinalPosition": 4 + } + ] + }, + "numberOfObjects": "75", + "size": "286720", + "fileFormats": [ + "zip", + "csv" + ] + }, + { + "name": "media", + "displayName": "Media department", + "description": "Bucket containing media department information", + "parent": "s3_object_store_sample.departments", + "prefix": "/departments/media/", + "dataModel": null, + "numberOfObjects": "123", + "size": "243712", + "fileFormats": null + }, + { + "name": "movies", + "displayName": "Company official footage", + "description": "Bucket containing movies about the company", + "parent": "s3_object_store_sample.departments.media", + "prefix": "/departments/media/media2020/", + "dataModel": null, + "numberOfObjects": "500", + "size": "15360000", + "fileFormats": [ + "gz" + ] + }, + { + "name": "expenditures", + "displayName": "Expenditures for the current year", + "description": "Bucket containing finance expenditures information", + "parent": "s3_object_store_sample.departments.finance", + "prefix": "/departments/finance/expenditures-2023", + "dataModel": { + "isPartitioned": false, + "columns": [ + { + "name": "department_id", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "The ID of the department. This column is the primary key for this table.", + "tags": [], + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "approved", + "dataType": "BOOLEAN", + "dataTypeDisplay": "boolean", + "description": "Whether this was already approved by upper management", + "tags": [], + "ordinalPosition": 2 + }, + { + "name": "fraudulent_claims", + "dataType": "BOOLEAN", + "dataTypeDisplay": "boolean", + "description": "Whether any claims were made for the expenditure at any point", + "tags": [], + "ordinalPosition": 3 + }, + { + "name": "total_value_for_current_month", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "The current total value spent for the expenditure as of beginning of the current month", + "tags": [], + "ordinalPosition": 4 + } + ] + }, + "numberOfObjects": "10", + "size": "65536", + "fileFormats": [ + "zstd", + "tsv" + ] + } +] diff --git a/ingestion/examples/sample_data/objectcontainers/service.json b/ingestion/examples/sample_data/objectcontainers/service.json new file mode 100644 index 00000000000..a9de0929cee --- /dev/null +++ b/ingestion/examples/sample_data/objectcontainers/service.json @@ -0,0 +1,17 @@ +{ + "type": "s3", + "serviceName": "s3_object_store_sample", + "serviceConnection": { + "config": { + "type": "S3", + "awsConfig": { + "awsAccessKeyId": "aws_access_key_id", + "awsSecretAccessKey": "aws_secret_access_key", + "awsRegion": "awsRegion", + "endPointURL": "https://endpoint.com/" + } + } + }, + "sourceConfig": { + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 3f1e8ba5b13..c276e5d2346 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -36,6 +36,7 @@ from metadata.generated.schema.entity.classification.classification import ( ) from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -60,6 +61,9 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.metadataService import MetadataService from metadata.generated.schema.entity.services.mlmodelService import MlModelService +from metadata.generated.schema.entity.services.objectstoreService import ( + ObjectStoreService, +) from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.entity.teams.role import Role @@ -321,6 +325,11 @@ class OpenMetadata( if issubclass(entity, get_args(Union[User, self.get_create_entity_type(User)])): return "/users" + if issubclass( + entity, get_args(Union[Container, self.get_create_entity_type(Container)]) + ): + return "/containers" + # Services Schemas if issubclass( entity, @@ -378,6 +387,16 @@ class OpenMetadata( ): return "/services/metadataServices" + if issubclass( + entity, + get_args( + Union[ + ObjectStoreService, self.get_create_entity_type(ObjectStoreService) + ] + ), + ): + return "/services/objectstoreServices" + if issubclass( entity, IngestionPipeline, diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 9c28742196e..b74a7a9f376 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -21,6 +21,7 @@ from typing import Any, Dict, Iterable, List, Union from pydantic import ValidationError from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( @@ -43,6 +44,7 @@ from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -71,6 +73,9 @@ from metadata.generated.schema.entity.services.dashboardService import Dashboard from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.mlmodelService import MlModelService +from metadata.generated.schema.entity.services.objectstoreService import ( + ObjectStoreService, +) from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.entity.teams.team import Team @@ -183,13 +188,14 @@ class SampleDataSourceStatus(SourceStatus): class SampleDataSource( Source[Entity] -): # pylint: disable=too-many-instance-attributes,too-many-public-methods +): # pylint: disable=too-many-instance-attributes,too-many-public-methods,disable=too-many-lines, """ Loads JSON data and prepares the required python objects to be sent to the Sink. """ def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + # pylint: disable=too-many-statements super().__init__() self.status = SampleDataSourceStatus() self.config = config @@ -400,6 +406,20 @@ class SampleDataSource( entity=MlModelService, config=WorkflowSource(**self.model_service_json), ) + + self.object_service_json = json.load( + open( # pylint: disable=consider-using-with + sample_data_folder + "/objectcontainers/service.json", + "r", + encoding="utf-8", + ) + ) + + self.object_store_service = self.metadata.get_service_or_create( + entity=ObjectStoreService, + config=WorkflowSource(**self.object_service_json), + ) + self.models = json.load( open( # pylint: disable=consider-using-with sample_data_folder + "/models/models.json", @@ -407,6 +427,15 @@ class SampleDataSource( encoding="utf-8", ) ) + + self.containers = json.load( + open( # pylint: disable=consider-using-with + sample_data_folder + "/objectcontainers/containers.json", + "r", + encoding="utf-8", + ) + ) + self.user_entity = {} self.table_tests = json.load( open( # pylint: disable=consider-using-with @@ -471,6 +500,7 @@ class SampleDataSource( yield from self.ingest_lineage() yield from self.ingest_pipeline_status() yield from self.ingest_mlmodels() + yield from self.ingest_containers() yield from self.ingest_profiles() yield from self.ingest_test_suite() yield from self.ingest_test_case() @@ -843,6 +873,45 @@ class SampleDataSource( logger.debug(traceback.format_exc()) logger.warning(f"Error ingesting MlModel [{model}]: {exc}") + def ingest_containers(self) -> Iterable[CreateContainerRequest]: + """ + Convert sample containers data into a Container Entity + to feed the metastore + """ + + for container in self.containers: + try: + # Fetch linked dashboard ID from name + parent_container_fqn = container.get("parent") + parent_container = None + if parent_container_fqn: + parent_container = self.metadata.get_by_name( + entity=Container, fqn=parent_container_fqn + ) + if not parent_container: + raise InvalidSampleDataException( + f"Cannot find {parent_container_fqn} in Sample Containers" + ) + + container_request = CreateContainerRequest( + name=container["name"], + displayName=container["displayName"], + description=container["description"], + parent=EntityReference(id=parent_container.id, type="container") + if parent_container_fqn + else None, + prefix=container["prefix"], + dataModel=container.get("dataModel"), + numberOfObjects=container.get("numberOfObjects"), + size=container.get("size"), + fileFormats=container.get("fileFormats"), + service=self.object_store_service.fullyQualifiedName, + ) + yield container_request + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error ingesting Container [{container}]: {exc}") + def ingest_users(self) -> Iterable[OMetaUserProfile]: """ Ingest Sample User data