diff --git a/Makefile b/Makefile index c7c5b467dac..c303e09389a 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ generate: run_ometa_integration_tests: cd ingestion; \ - pytest -c setup.cfg --override-ini=testpaths=tests/integration/ometa + pytest -c setup.cfg --override-ini=testpaths="tests/integration/ometa tests/unit/stage_test.py" publish: make install_dev generate diff --git a/ingestion/examples/sample_data/glue/database.json b/ingestion/examples/sample_data/glue/database.json new file mode 100644 index 00000000000..afec9763be3 --- /dev/null +++ b/ingestion/examples/sample_data/glue/database.json @@ -0,0 +1,9 @@ +{ + "id": null, + "name": "default", + "description": "This **mock** database contains tables related to the Glue service", + "service": { + "id": "b946d870-03b2-4d33-a075-13665a7a76b9", + "type": "GLUE" + } +} diff --git a/ingestion/examples/sample_data/glue/database_service.json b/ingestion/examples/sample_data/glue/database_service.json new file mode 100644 index 00000000000..a426501f84b --- /dev/null +++ b/ingestion/examples/sample_data/glue/database_service.json @@ -0,0 +1,11 @@ +{ + "id": "89229e26-7f74-4443-a568-85512eaeaa07", + "name": "glue", + "serviceType": "Glue", + "description": "Glue service used for table with location", + "href": "null", + "jdbc": { + "driverClass" : "jdbc", + "connectionUrl" : "jdbc://localhost" + } +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/glue/storage_service.json b/ingestion/examples/sample_data/glue/storage_service.json new file mode 100644 index 00000000000..192ec91f1a0 --- /dev/null +++ b/ingestion/examples/sample_data/glue/storage_service.json @@ -0,0 +1,6 @@ +{ + "name": "glue_s3", + "serviceType": "S3", + "description": "S3 Object storage used for the Glue Catalog", + "href": "null" +} diff --git a/ingestion/examples/sample_data/glue/tables.json b/ingestion/examples/sample_data/glue/tables.json new file mode 100644 index 00000000000..de0cad527ba --- /dev/null +++ b/ingestion/examples/sample_data/glue/tables.json @@ -0,0 +1,25 @@ +{ + "tables": [ + { + "name": "sales", + "description": "Sales data", + "tableType": "Regular", + "columns": [ + { + "name": "SKU", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "Article SKU", + "ordinalPosition": 1 + }, + { + "name": "Quantity", + "dataType": "NUMERIC", + "dataTypeDisplay": "numeric", + "description": "Quantity of SKU", + "ordinalPosition": 2 + } + ] + } + ] +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/locations/locations.json b/ingestion/examples/sample_data/locations/locations.json new file mode 100644 index 00000000000..9fd10ee8aa7 --- /dev/null +++ b/ingestion/examples/sample_data/locations/locations.json @@ -0,0 +1,22 @@ +{ + "locations": [ + { + "name": "s3://bucket-a", + "displayName": "s3://bucket-a", + "description": "Bucket A", + "locationType": "Bucket" + }, + { + "name": "s3://bucket-b", + "displayName": "s3://bucket-b", + "description": "Bucket B", + "locationType": "Bucket" + }, + { + "name": "s3://bucket-a/user/hive/dwh", + "displayName": "s3://bucket-a/user/hive/dwh", + "description": "Bucket A prefix", + "locationType": "Prefix" + } + ] +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/locations/service.json b/ingestion/examples/sample_data/locations/service.json new file mode 100644 index 00000000000..f04902923da --- /dev/null +++ b/ingestion/examples/sample_data/locations/service.json @@ -0,0 +1,6 @@ +{ + "name": "sample_s3", + "serviceType": "S3", + "description": "S3 Object storage", + "href": "null" +} diff --git a/ingestion/requirements-test.txt b/ingestion/requirements-test.txt index e0da66a6ea4..0bfae50dce3 100644 --- a/ingestion/requirements-test.txt +++ b/ingestion/requirements-test.txt @@ -2,3 +2,4 @@ black isort pre-commit pylint +faker \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/models/ometa_table_db.py b/ingestion/src/metadata/ingestion/models/ometa_table_db.py index e0d2594a260..bcc688d0d1b 100644 --- a/ingestion/src/metadata/ingestion/models/ometa_table_db.py +++ b/ingestion/src/metadata/ingestion/models/ometa_table_db.py @@ -8,17 +8,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional from pydantic import BaseModel from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.dbtmodel import DbtModel +from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.table import Table class OMetaDatabaseAndTable(BaseModel): - table: Table database: Database + table: Table + location: Optional[Location] class OMetaDatabaseAndModel(BaseModel): diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py index cb04316bfee..ce81cf925d3 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py @@ -6,6 +6,7 @@ To be used by OpenMetadata class import logging from typing import List +from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.table import ( Table, TableData, @@ -27,6 +28,18 @@ class OMetaTableMixin: client: REST + def add_location(self, table: Table, location: Location) -> None: + """ + PUT location for a table + + :param table: Table Entity to update + :param location: Location Entity to add + """ + resp = self.client.put( + f"{self.get_suffix(Table)}/{table.id.__root__}/location", + data=str(location.id.__root__), + ) + def ingest_table_sample_data( self, table: Table, sample_data: TableData ) -> TableData: diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 8d67da300b2..31661d1c077 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -130,65 +130,76 @@ class MetadataRestSink(Sink): f"Ignoring the record due to unknown Record type {type(record)}" ) - def write_tables(self, table_and_db: OMetaDatabaseAndTable): + def write_tables(self, db_and_table: OMetaDatabaseAndTable): try: db_request = CreateDatabaseEntityRequest( - name=table_and_db.database.name, - description=table_and_db.database.description, + name=db_and_table.database.name, + description=db_and_table.database.description, service=EntityReference( - id=table_and_db.database.service.id, type="databaseService" + id=db_and_table.database.service.id, + type="databaseService", ), ) db = self.metadata.create_or_update(db_request) table_request = CreateTableEntityRequest( - name=table_and_db.table.name, - tableType=table_and_db.table.tableType, - columns=table_and_db.table.columns, - description=table_and_db.table.description, + name=db_and_table.table.name, + tableType=db_and_table.table.tableType, + columns=db_and_table.table.columns, + description=db_and_table.table.description, database=db.id, ) - if ( - table_and_db.table.viewDefinition is not None - and table_and_db.table.viewDefinition != "" - ): + if db_and_table.table.viewDefinition: table_request.viewDefinition = ( - table_and_db.table.viewDefinition.__root__ + db_and_table.table.viewDefinition.__root__ ) created_table = self.metadata.create_or_update(table_request) - if table_and_db.table.sampleData is not None: - self.metadata.ingest_table_sample_data( - table=created_table, sample_data=table_and_db.table.sampleData + if db_and_table.location is not None: + location_request = CreateLocationEntityRequest( + name=db_and_table.location.name, + description=db_and_table.location.description, + service=EntityReference( + id=db_and_table.location.service.id, + type="storageService", + ), ) - if table_and_db.table.tableProfile is not None: - for tp in table_and_db.table.tableProfile: + location = self.metadata.create_or_update(location_request) + self.metadata.add_location(table=created_table, location=location) + if db_and_table.table.sampleData is not None: + self.metadata.ingest_table_sample_data( + table=created_table, + sample_data=db_and_table.table.sampleData, + ) + if db_and_table.table.tableProfile is not None: + for tp in db_and_table.table.tableProfile: for pd in tp: if pd[0] == "columnProfile": for col in pd[1]: col.name = col.name.replace(".", "_DOT_") self.metadata.ingest_table_profile_data( table=created_table, - table_profile=table_and_db.table.tableProfile, + table_profile=db_and_table.table.tableProfile, ) logger.info( "Successfully ingested table {}.{}".format( - table_and_db.database.name.__root__, created_table.name.__root__ + db_and_table.database.name.__root__, + created_table.name.__root__, ) ) self.status.records_written( - f"Table: {table_and_db.database.name.__root__}.{created_table.name.__root__}" + f"Table: {db_and_table.database.name.__root__}.{created_table.name.__root__}" ) except (APIError, ValidationError) as err: logger.error( "Failed to ingest table {} in database {} ".format( - table_and_db.table.name.__root__, - table_and_db.database.name.__root__, + db_and_table.table.name.__root__, + db_and_table.database.name.__root__, ) ) logger.error(err) - self.status.failure(f"Table: {table_and_db.table.name.__root__}") + self.status.failure(f"Table: {db_and_table.table.name.__root__}") def write_dbt_models(self, model_and_db: OMetaDatabaseAndModel): try: diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 4c4a23f82a4..ab9b02ee789 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -13,7 +13,6 @@ import csv import json import logging import os -import random import uuid from collections import namedtuple from dataclasses import dataclass, field @@ -25,8 +24,8 @@ from pydantic import ValidationError from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequest from metadata.generated.schema.api.lineage.addLineage import AddLineage -from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.location import Location, LocationType from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table @@ -36,7 +35,9 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Record from metadata.ingestion.api.source import Source, SourceStatus -from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.ometa_table_db import ( + OMetaDatabaseAndTable, +) from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -45,6 +46,8 @@ from metadata.utils.helpers import ( get_database_service_or_create, get_messaging_service_or_create, get_pipeline_service_or_create, + get_storage_service_or_create, + get_database_service_or_create_v2, ) logger: logging.Logger = logging.getLogger(__name__) @@ -221,6 +224,36 @@ class SampleDataSource(Source): self.config = config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) + self.storage_service_json = json.load( + open(self.config.sample_data_folder + "/locations/service.json", "r") + ) + self.locations = json.load( + open(self.config.sample_data_folder + "/locations/locations.json", "r") + ) + self.storage_service = get_storage_service_or_create( + self.storage_service_json, + metadata_config, + ) + self.glue_storage_service_json = json.load( + open(self.config.sample_data_folder + "/glue/storage_service.json", "r") + ) + self.glue_database_service_json = json.load( + open(self.config.sample_data_folder + "/glue/database_service.json", "r") + ) + self.glue_database = json.load( + open(self.config.sample_data_folder + "/glue/database.json", "r") + ) + self.glue_tables = json.load( + open(self.config.sample_data_folder + "/glue/tables.json", "r") + ) + self.glue_database_service = get_database_service_or_create_v2( + self.glue_database_service_json, + metadata_config, + ) + self.glue_storage_service = get_storage_service_or_create( + self.glue_storage_service_json, + metadata_config, + ) self.database_service_json = json.load( open(self.config.sample_data_folder + "/datasets/service.json", "r") ) @@ -293,6 +326,8 @@ class SampleDataSource(Source): pass def next_record(self) -> Iterable[Record]: + yield from self.ingest_locations() + yield from self.ingest_glue() yield from self.ingest_tables() yield from self.ingest_topics() yield from self.ingest_charts() @@ -302,6 +337,52 @@ class SampleDataSource(Source): yield from self.ingest_users() yield from self.ingest_mlmodels() + def ingest_locations(self) -> Iterable[Location]: + for location in self.locations["locations"]: + location_ev = Location( + id=uuid.uuid4(), + name=location["name"], + displayName=location["displayName"], + description=location["description"], + locationType=location["locationType"], + service=EntityReference( + id=self.storage_service.id, type="storageService" + ), + ) + yield location_ev + + def ingest_glue(self) -> Iterable[OMetaDatabaseAndTable]: + db = Database( + id=uuid.uuid4(), + name=self.glue_database["name"], + description=self.glue_database["description"], + service=EntityReference( + id=self.glue_database_service.id, + type=self.glue_database_service.serviceType.value, + ), + ) + for table in self.glue_tables["tables"]: + if not table.get("sampleData"): + table["sampleData"] = GenerateFakeSampleData.check_columns( + table["columns"] + ) + table["id"] = uuid.uuid4() + table_metadata = Table(**table) + location_metadata = Location( + id=uuid.uuid4(), + name="s3://glue_bucket/dwh/schema/" + table["name"], + description=table["description"], + locationType=LocationType.Table, + service=EntityReference( + id=self.glue_storage_service.id, type="storageService" + ), + ) + db_table_location = OMetaDatabaseAndTable( + database=db, table=table_metadata, location=location_metadata + ) + self.status.scanned("table", table_metadata.name.__root__) + yield db_table_location + def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]: db = Database( id=uuid.uuid4(), diff --git a/ingestion/src/metadata/ingestion/stage/file.py b/ingestion/src/metadata/ingestion/stage/file.py index 619a779a987..d8dbae1bff9 100644 --- a/ingestion/src/metadata/ingestion/stage/file.py +++ b/ingestion/src/metadata/ingestion/stage/file.py @@ -14,7 +14,6 @@ import logging import pathlib from metadata.config.common import ConfigModel -from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.stage import Stage, StageStatus from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -52,7 +51,7 @@ class FileStage(Stage): metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(ctx, config, metadata_config) - def stage_record(self, record: Table) -> None: + def stage_record(self, record) -> None: json_record = json.loads(record.json()) self.file.write(json.dumps(json_record)) self.file.write("\n") diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index f0bf90a7561..afeb49a370f 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -24,10 +24,14 @@ from metadata.generated.schema.api.services.createMessagingService import ( from metadata.generated.schema.api.services.createPipelineService import ( CreatePipelineServiceEntityRequest, ) +from metadata.generated.schema.api.services.createStorageService import ( + CreateStorageServiceEntityRequest, +) from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.services.storageService import StorageService from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -132,6 +136,30 @@ def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineSer return created_service +def get_storage_service_or_create(service_json, metadata_config) -> StorageService: + metadata = OpenMetadata(metadata_config) + service = metadata.get_by_name(entity=StorageService, fqdn=service_json["name"]) + if service is not None: + return service + else: + created_service = metadata.create_or_update( + CreateStorageServiceEntityRequest(**service_json) + ) + return created_service + + +def get_database_service_or_create_v2(service_json, metadata_config) -> DatabaseService: + metadata = OpenMetadata(metadata_config) + service = metadata.get_by_name(entity=DatabaseService, fqdn=service_json["name"]) + if service is not None: + return service + else: + created_service = metadata.create_or_update( + CreateDatabaseServiceEntityRequest(**service_json) + ) + return created_service + + def convert_epoch_to_iso(seconds_since_epoch): dt = datetime.utcfromtimestamp(seconds_since_epoch) iso_format = dt.isoformat() + "Z" diff --git a/ingestion/tests/unit/stage_test.py b/ingestion/tests/unit/stage_test.py new file mode 100644 index 00000000000..c22ffdb448c --- /dev/null +++ b/ingestion/tests/unit/stage_test.py @@ -0,0 +1,41 @@ +import json +from unittest import TestCase + +from metadata.ingestion.api.workflow import Workflow + +config = """ +{ + "source": { + "type": "sample-data", + "config": { + "sample_data_folder": "examples/sample_data/" + } + }, + "stage": { + "type": "file", + "config": { + "filename": "/tmp/stage_test" + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } +} +""" + + +class WorkflowTest(TestCase): + def test_execute_200(self): + """ + stage/file.py must be compatible with source/sample_data.py, + this test try to catch if one becomes incompatible with the other + by running a workflow that includes both of them. + """ + workflow = Workflow.create(json.loads(config)) + workflow.execute() + workflow.stop() + self.assertTrue(True)