From 710675d51a399991e58e50d71af15d984b44dfcc Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 26 Oct 2021 23:18:43 +0200 Subject: [PATCH] OpenMetadata High Level API (#942) * update arg name for fqdn * fix suite setup and teardown * Update list logic * Add Chart tests * Add Dashboard tests * Add pipeline tests * Add table tests * Simplify create API * Formatting tests * Update create API * Add Tasks and Topics tests * Rename OpenMetadata * Add client close * Use Optional for GET id and name * Fix #931 fetching dashboards using client * Add tests * Fix sample users ingestion * Remove lineage test * Rename services for independence --- .../examples/sample_data/models/models.json | 4 +- .../src/metadata/ingestion/ometa/ometa_api.py | 73 ++++--- .../metadata/ingestion/sink/metadata_rest.py | 2 +- .../metadata/ingestion/source/sample_data.py | 37 +--- ingestion/tests/__init__.py | 1 - ingestion/tests/integration/__init__.py | 1 - .../tests/integration/hive/test_hive_crud.py | 69 +++--- .../tests/integration/ldap/test_ldap_crud.py | 65 +++--- .../integration/mssql/test_mssql_crud.py | 61 ++++-- .../integration/mysql/test_mysql_crud.py | 61 ++++-- .../integration/ometa/test_ometa_chart_api.py | 179 ++++++++++++++++ .../ometa/test_ometa_dashboard_api.py | 183 ++++++++++++++++ .../ometa/test_ometa_database_api.py | 72 ++++--- .../integration/ometa/test_ometa_model_api.py | 31 ++- .../ometa/test_ometa_pipeline_api.py | 183 ++++++++++++++++ .../integration/ometa/test_ometa_table_api.py | 200 ++++++++++++++++++ .../integration/ometa/test_ometa_task_api.py | 179 ++++++++++++++++ .../integration/ometa/test_ometa_topic_api.py | 184 ++++++++++++++++ .../postgres/test_postgres_crud.py | 74 ++++--- ingestion/tests/unit/__init__.py | 1 - ingestion/tests/unit/helpers_test.py | 129 +++++++---- ingestion/tests/unit/test_ometa_endpoints.py | 4 +- ingestion/tests/unit/workflow_test.py | 42 ++-- 23 files changed, 1535 insertions(+), 300 deletions(-) create mode 100644 ingestion/tests/integration/ometa/test_ometa_chart_api.py create mode 100644 ingestion/tests/integration/ometa/test_ometa_dashboard_api.py create mode 100644 ingestion/tests/integration/ometa/test_ometa_pipeline_api.py create mode 100644 ingestion/tests/integration/ometa/test_ometa_table_api.py create mode 100644 ingestion/tests/integration/ometa/test_ometa_task_api.py create mode 100644 ingestion/tests/integration/ometa/test_ometa_topic_api.py diff --git a/ingestion/examples/sample_data/models/models.json b/ingestion/examples/sample_data/models/models.json index 032c5781bf7..e7d4e1c0b25 100644 --- a/ingestion/examples/sample_data/models/models.json +++ b/ingestion/examples/sample_data/models/models.json @@ -4,13 +4,13 @@ "displayName": "ETA Predictions", "description": "ETA Predictions Model", "algorithm": "Neural Network", - "dashboard": "eta_predictions_performance" + "dashboard": "sample_superset.eta_predictions_performance" }, { "name": "forecast_sales", "displayName": "Sales Forecast Predictions", "description": "Sales Forecast Predictions Model", "algorithm": "Time Series", - "dashboard": "forecast_sales_performance" + "dashboard": "sample_superset.forecast_sales_performance" } ] diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index b8bcbda65af..d29d073c5e0 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -1,5 +1,5 @@ import logging -from typing import Generic, List, Type, TypeVar, Union, get_args +from typing import Generic, List, Optional, Type, TypeVar, Union, get_args from pydantic import BaseModel @@ -20,7 +20,7 @@ from metadata.generated.schema.entity.services.messagingService import Messaging from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.teams.user import User from metadata.ingestion.ometa.auth_provider import AuthenticationProvider -from metadata.ingestion.ometa.client import REST, ClientConfig +from metadata.ingestion.ometa.client import REST, APIError, ClientConfig from metadata.ingestion.ometa.openmetadata_rest import ( Auth0AuthenticationProvider, GoogleAuthenticationProvider, @@ -44,13 +44,19 @@ class MissingEntityTypeException(Exception): """ +class InvalidEntityException(Exception): + """ + We receive an entity not supported in an operation + """ + + class EntityList(Generic[T], BaseModel): entities: List[T] total: int after: str = None -class OMeta(Generic[T, C]): +class OpenMetadata(Generic[T, C]): """ Generic interface to the OpenMetadata API @@ -249,17 +255,14 @@ class OMeta(Generic[T, C]): ) return entity_class - def create_or_update(self, entity: Type[T], data: T) -> Type[C]: + def create_or_update(self, data: C) -> T: """ - We allow both Entity and CreateEntity for PUT - If Entity, no need to find response class mapping. + We allow CreateEntity for PUT, so we expect a type C. We PUT to the endpoint and return the Entity generated result - - Here the typing is a bit more weird. We will get a type T, be it - Entity or CreateEntity, and we are always going to return Entity """ + entity = data.__class__ is_create = "create" in entity.__name__.lower() is_service = "service" in entity.__name__.lower() @@ -267,7 +270,9 @@ class OMeta(Generic[T, C]): if is_create: entity_class = self.get_entity_from_create(entity) else: - entity_class = entity + raise InvalidEntityException( + f"PUT operations need a CrateEntity, not {entity}" + ) # Prepare the request method if is_service and is_create: @@ -279,13 +284,33 @@ class OMeta(Generic[T, C]): resp = method(self.get_suffix(entity), data=data.json()) return entity_class(**resp) - def get_by_name(self, entity: Type[T], name: str) -> Type[T]: - resp = self.client.get(f"{self.get_suffix(entity)}/name/{name}") - return entity(**resp) + def get_by_name(self, entity: Type[T], fqdn: str) -> Optional[T]: + """ + Return entity by name or None + """ - def get_by_id(self, entity: Type[T], entity_id: str) -> Type[T]: - resp = self.client.get(f"{self.get_suffix(entity)}/{entity_id}") - return entity(**resp) + try: + resp = self.client.get(f"{self.get_suffix(entity)}/name/{fqdn}") + return entity(**resp) + except APIError as err: + logger.error( + f"Error {err.status_code} trying to GET {entity.__class__.__name__} for FQDN {fqdn}" + ) + return None + + def get_by_id(self, entity: Type[T], entity_id: str) -> Optional[T]: + """ + Return entity by ID or None + """ + + try: + resp = self.client.get(f"{self.get_suffix(entity)}/{entity_id}") + return entity(**resp) + except APIError as err: + logger.error( + f"Error {err.status_code} trying to GET {entity.__class__.__name__} for ID {entity_id}" + ) + return None def list_entities( self, entity: Type[T], fields: str = None, after: str = None, limit: int = 1000 @@ -295,16 +320,11 @@ class OMeta(Generic[T, C]): """ suffix = self.get_suffix(entity) + url_limit = f"?limit={limit}" + url_after = f"&after={after}" if after else "" + url_fields = f"&fields={fields}" if fields else "" - if fields is None: - resp = self.client.get(suffix) - else: - if after is not None: - resp = self.client.get( - f"{suffix}?fields={fields}&after={after}&limit={limit}" - ) - else: - resp = self.client.get(f"{suffix}?fields={fields}&limit={limit}") + resp = self.client.get(f"{suffix}{url_limit}{url_after}{url_fields}") if self._use_raw_data: return resp @@ -327,3 +347,6 @@ class OMeta(Generic[T, C]): def delete(self, entity: Type[T], entity_id: str) -> None: self.client.delete(f"{self.get_suffix(entity)}/{entity_id}") + + def close(self): + self.client.close() diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 795c6a18f21..777654bfc5b 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -312,7 +312,7 @@ class MetadataRestSink(Sink): def _bootstrap_entities(self): team_response = self.api_client.get(self.api_team) for team in team_response["data"]: - self.team_entities[team["displayName"]] = team["id"] + self.team_entities[team["name"]] = team["id"] def _create_team(self, record: MetadataUser) -> None: team_name = record.team_name diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 32e5f435058..01e169d027c 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -57,6 +57,7 @@ from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.user import User +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import ( MetadataServerConfig, OpenMetadataAPIClient, @@ -271,20 +272,6 @@ class GenerateFakeSampleData: colData.append(row) return {"columns": colList, "rows": colData} - @staticmethod - def generate_team(self): - return random.choice( - [ - "Data Platform", - "Cloud Infra", - "Payments", - "Legal", - "Customer Support", - "Finance", - "Marketplace", - ] - ) - class SampleDataSource(Source): def __init__( @@ -295,6 +282,7 @@ class SampleDataSource(Source): self.config = config self.metadata_config = metadata_config self.client = OpenMetadataAPIClient(metadata_config) + self.metadata = OpenMetadata(metadata_config) self.database_service_json = json.load( open(self.config.sample_data_folder + "/datasets/service.json", "r") ) @@ -487,25 +475,20 @@ class SampleDataSource(Source): Convert sample model data into a Model Entity to feed the metastore """ + from metadata.generated.schema.entity.data.dashboard import Dashboard + for model in self.models: # Fetch linked dashboard ID from name - dashboard_name = model["dashboard"] - dashboard_id = next( - iter( - [ - dash["id"] - for dash in self.dashboards["dashboards"] - if dash["name"] == dashboard_name - ] - ), - None, - ) + fqdn = model["dashboard"] + dashboard = self.metadata.get_by_name(entity=Dashboard, fqdn=fqdn) - if not dashboard_id: + if not dashboard: raise InvalidSampleDataException( - f"Cannot find {dashboard_name} in Sample Dashboards" + f"Cannot find {fqdn} in Sample Dashboards" ) + dashboard_id = str(dashboard.id.__root__) + model_ev = Model( id=uuid.uuid4(), name=model["name"], diff --git a/ingestion/tests/__init__.py b/ingestion/tests/__init__.py index dc408941a6d..da03fdb2aec 100644 --- a/ingestion/tests/__init__.py +++ b/ingestion/tests/__init__.py @@ -12,4 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/ingestion/tests/integration/__init__.py b/ingestion/tests/integration/__init__.py index dc408941a6d..da03fdb2aec 100644 --- a/ingestion/tests/integration/__init__.py +++ b/ingestion/tests/integration/__init__.py @@ -12,4 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/ingestion/tests/integration/hive/test_hive_crud.py b/ingestion/tests/integration/hive/test_hive_crud.py index 31c27ddff2c..dfdaf7167ae 100644 --- a/ingestion/tests/integration/hive/test_hive_crud.py +++ b/ingestion/tests/integration/hive/test_hive_crud.py @@ -13,20 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + import pytest import requests -import time -from metadata.ingestion.ometa.client import REST -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.entity.data.table import Column -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest from sqlalchemy.engine import create_engine from sqlalchemy.inspection import inspect -headers = {'Content-type': 'application/json'} -url = 'http://localhost:8585/api/v1/' +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.client import REST + +headers = {"Content-type": "application/json"} +url = "http://localhost:8585/api/v1/" def is_responsive(url): @@ -47,32 +53,39 @@ def status(r): def create_delete_table(client): databases = client.list_databases() - columns = [Column(name="id", columnDataType="INT"), - Column(name="name", columnDataType="VARCHAR")] + columns = [ + Column(name="id", columnDataType="INT"), + Column(name="name", columnDataType="VARCHAR"), + ] table = CreateTableEntityRequest( - name="test1", columns=columns, database=databases[0].id) + name="test1", columns=columns, database=databases[0].id + ) created_table = client.create_or_update_table(table) - if(table.name.__root__ == created_table.name.__root__): + if table.name.__root__ == created_table.name.__root__: requests.delete( - 'http://localhost:8585/api/v1/tables/{}'.format(created_table.id.__root__)) + "http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__) + ) return 1 else: requests.delete( - 'http://localhost:8585/api/v1/tables/{}'.format(created_table.id.__root__)) + "http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__) + ) return 0 def create_delete_database(client): - data = {'jdbc': {'connectionUrl': 'hive://localhost/default', 'driverClass': 'jdbc'}, - 'name': 'temp_local_hive', - 'serviceType': 'HIVE', - 'description': 'local hive env'} + data = { + "jdbc": {"connectionUrl": "hive://localhost/default", "driverClass": "jdbc"}, + "name": "temp_local_hive", + "serviceType": "HIVE", + "description": "local hive env", + } create_hive_service = CreateDatabaseServiceEntityRequest(**data) hive_service = client.create_database_service(create_hive_service) create_database_request = CreateDatabaseEntityRequest( - name="dwh", service=EntityReference(id=hive_service.id, type="databaseService")) - created_database = client.create_database( - create_database_request) + name="dwh", service=EntityReference(id=hive_service.id, type="databaseService") + ) + created_database = client.create_database(create_database_request) resp = create_delete_table(client) print(resp) client.delete_database(created_database.id.__root__) @@ -109,10 +122,12 @@ def test_check_schema(hive_service): def test_read_tables(hive_service): inspector = hive_service - check_tables = ["metadata_array_struct_test", - "metadata_struct_test", - "metadata_test_table", - "test_check"] + check_tables = [ + "metadata_array_struct_test", + "metadata_struct_test", + "metadata_test_table", + "test_check", + ] tables = [] for schema in inspector.get_schema_names(): for table in inspector.get_table_names(schema): @@ -124,7 +139,7 @@ def test_read_tables(hive_service): def test_check_table(): - client = REST("http://localhost:8585/api", 'test', 'test') + client = REST("http://localhost:8585/api", "test", "test") databases = client.list_databases() if len(databases) > 0: assert create_delete_table(client) diff --git a/ingestion/tests/integration/ldap/test_ldap_crud.py b/ingestion/tests/integration/ldap/test_ldap_crud.py index bb597c592d6..5729cbcc241 100644 --- a/ingestion/tests/integration/ldap/test_ldap_crud.py +++ b/ingestion/tests/integration/ldap/test_ldap_crud.py @@ -15,17 +15,19 @@ import logging from time import sleep -from metadata.ingestion.models.user import MetadataUser, User -import pytest -from ldap3 import Server, Connection, ALL -import requests -headers = {'Content-type': 'application/json'} -url = 'http://localhost:8585/api/v1/users' +import pytest +import requests +from ldap3 import ALL, Connection, Server + +from metadata.ingestion.models.user import MetadataUser, User + +headers = {"Content-type": "application/json"} +url = "http://localhost:8585/api/v1/users" def read_user_by_name(name: str): - r = requests.get(url + '/name/' + name) + r = requests.get(url + "/name/" + name) r.raise_for_status() bool = status(r) return [bool, r.json()] @@ -39,7 +41,7 @@ def status(r): def ldap_connection(): - s = Server('ldap://localhost:389', get_info=ALL) + s = Server("ldap://localhost:389", get_info=ALL) c = Connection(s, user="cn=admin,dc=example,dc=com", password="ldappassword") c.open() if not c.bind(): @@ -57,23 +59,28 @@ def openldap_service(docker_ip, docker_services): def test_ldap_connection(openldap_service): c = openldap_service - if 'listening' in str(c[1]): + if "listening" in str(c[1]): assert 1 def test_insert_user(openldap_service): c = openldap_service if c[0]: - user = User('john_doe@example.com', - 'John', - 'Doe', - 'John Doe', - 'john_doe', - '', '', '', True, - 0) - metadata_user = MetadataUser(name=user.github_username, - display_name=user.name, - email=user.email) + user = User( + "john_doe@example.com", + "John", + "Doe", + "John Doe", + "john_doe", + "", + "", + "", + True, + 0, + ) + metadata_user = MetadataUser( + name=user.github_username, display_name=user.name, email=user.email + ) r = requests.post(url, data=metadata_user.to_json(), headers=headers) r.raise_for_status() if r.status_code == 200 or r.status_code == 201: @@ -81,22 +88,24 @@ def test_insert_user(openldap_service): else: assert 0 else: - logging.error('OpenLDAP not running') + logging.error("OpenLDAP not running") assert 0 def test_read_user(): - assert read_user_by_name('john_doe')[0] + assert read_user_by_name("john_doe")[0] def test_update_user(openldap_service): c = openldap_service if c[0]: - user = read_user_by_name('john_doe') - user[1]['displayName'] = 'Jane Doe' - metadata_user = MetadataUser(name=user[1]['name'], - display_name=user[1]['displayName'], - email=user[1]['name']) + user = read_user_by_name("john_doe") + user[1]["displayName"] = "Jane Doe" + metadata_user = MetadataUser( + name=user[1]["name"], + display_name=user[1]["displayName"], + email=user[1]["name"], + ) r = requests.patch(url, data=metadata_user.to_json(), headers=headers) @@ -104,8 +113,8 @@ def test_update_user(openldap_service): def test_delete_user(openldap_service): c = openldap_service if c[0]: - r = read_user_by_name('john_doe') - r = requests.delete(url + '/{}'.format(r[1]['id'])) + r = read_user_by_name("john_doe") + r = requests.delete(url + "/{}".format(r[1]["id"])) r.raise_for_status() assert 1 else: diff --git a/ingestion/tests/integration/mssql/test_mssql_crud.py b/ingestion/tests/integration/mssql/test_mssql_crud.py index 8e8b5a48301..43c6d142e3a 100644 --- a/ingestion/tests/integration/mssql/test_mssql_crud.py +++ b/ingestion/tests/integration/mssql/test_mssql_crud.py @@ -13,21 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -from metadata.ingestion.ometa.client import REST -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.entity.data.table import Column -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest +import time + import pytest import requests -import time from sqlalchemy.engine import create_engine from sqlalchemy.inspection import inspect +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.client import REST -headers = {'Content-type': 'application/json'} -url = 'http://localhost:8585/api/v1/' +headers = {"Content-type": "application/json"} +url = "http://localhost:8585/api/v1/" def is_responsive(url): @@ -61,32 +66,42 @@ def mssql_service(docker_ip, docker_services): def create_delete_table(client): databases = client.list_databases() - columns = [Column(name="id", columnDataType="INT"), - Column(name="name", columnDataType="VARCHAR")] + columns = [ + Column(name="id", columnDataType="INT"), + Column(name="name", columnDataType="VARCHAR"), + ] table = CreateTableEntityRequest( - name="test1", columns=columns, database=databases[0].id) + name="test1", columns=columns, database=databases[0].id + ) created_table = client.create_or_update_table(table) - if(table.name.__root__ == created_table.name.__root__): + if table.name.__root__ == created_table.name.__root__: requests.delete( - 'http://localhost:8585/api/v1/tables/{}'.format(created_table.id.__root__)) + "http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__) + ) return 1 else: requests.delete( - 'http://localhost:8585/api/v1/tables/{}'.format(created_table.id.__root__)) + "http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__) + ) return 0 def create_delete_database(client): - data = {'jdbc': {'connectionUrl': 'mssql://localhost/catalog_test', 'driverClass': 'jdbc'}, - 'name': 'temp_local_mssql', - 'serviceType': 'MSSQL', - 'description': 'local mssql env'} + data = { + "jdbc": { + "connectionUrl": "mssql://localhost/catalog_test", + "driverClass": "jdbc", + }, + "name": "temp_local_mssql", + "serviceType": "MSSQL", + "description": "local mssql env", + } create_mssql_service = CreateDatabaseServiceEntityRequest(**data) mssql_service = client.create_database_service(create_mssql_service) create_database_request = CreateDatabaseEntityRequest( - name="dwh", service=EntityReference(id=mssql_service.id, type="databaseService")) - created_database = client.create_database( - create_database_request) + name="dwh", service=EntityReference(id=mssql_service.id, type="databaseService") + ) + created_database = client.create_database(create_database_request) resp = create_delete_table(client) print(resp) client.delete_database(created_database.id.__root__) @@ -95,7 +110,7 @@ def create_delete_database(client): def test_check_tables(mssql_service): - client = REST("{}/api".format(mssql_service), 'test', 'test') + client = REST("{}/api".format(mssql_service), "test", "test") databases = client.list_databases() if len(databases) > 0: assert create_delete_table(client) diff --git a/ingestion/tests/integration/mysql/test_mysql_crud.py b/ingestion/tests/integration/mysql/test_mysql_crud.py index afc1dfa3927..d773d4843f7 100644 --- a/ingestion/tests/integration/mysql/test_mysql_crud.py +++ b/ingestion/tests/integration/mysql/test_mysql_crud.py @@ -14,11 +14,6 @@ # limitations under the License. import time -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.entity.data.table import Column -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest import pytest import requests @@ -26,7 +21,19 @@ from requests.exceptions import ConnectionError from sqlalchemy.engine import create_engine from sqlalchemy.inspection import inspect -from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig, OpenMetadataAPIClient +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.openmetadata_rest import ( + MetadataServerConfig, + OpenMetadataAPIClient, +) def is_responsive(url): @@ -40,32 +47,42 @@ def is_responsive(url): def create_delete_table(client): databases = client.list_databases() - columns = [Column(name="id", dataType="INT", dataLength=1), - Column(name="name", dataType="VARCHAR", dataLength=1)] + columns = [ + Column(name="id", dataType="INT", dataLength=1), + Column(name="name", dataType="VARCHAR", dataLength=1), + ] table = CreateTableEntityRequest( - name="test1", columns=columns, database=databases[0].id) + name="test1", columns=columns, database=databases[0].id + ) created_table = client.create_or_update_table(table) - if(table.name.__root__ == created_table.name.__root__): + if table.name.__root__ == created_table.name.__root__: requests.delete( - 'http://localhost:8585/api/v1/tables/{}'.format(created_table.id.__root__)) + "http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__) + ) return 1 else: requests.delete( - 'http://localhost:8585/api/v1/tables/{}'.format(created_table.id.__root__)) + "http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__) + ) return 0 def create_delete_database(client): - data = {'jdbc': {'connectionUrl': 'mysql://localhost/catalog_db', 'driverClass': 'jdbc'}, - 'name': 'temp_local_mysql', - 'serviceType': 'MySQL', - 'description': 'local mysql env'} + data = { + "jdbc": { + "connectionUrl": "mysql://localhost/catalog_db", + "driverClass": "jdbc", + }, + "name": "temp_local_mysql", + "serviceType": "MySQL", + "description": "local mysql env", + } create_mysql_service = CreateDatabaseServiceEntityRequest(**data) mysql_service = client.create_database_service(create_mysql_service) create_database_request = CreateDatabaseEntityRequest( - name="dwh", service=EntityReference(id=mysql_service.id, type="databaseService")) - created_database = client.create_database( - create_database_request) + name="dwh", service=EntityReference(id=mysql_service.id, type="databaseService") + ) + created_database = client.create_database(create_database_request) resp = create_delete_table(client) print(resp) client.delete_database(created_database.id.__root__) @@ -88,15 +105,13 @@ def catalog_service(docker_ip, docker_services): def test_check_tables(catalog_service): metadata_config = MetadataServerConfig.parse_obj( - { - "api_endpoint": catalog_service + "/api", - "auth_provider_type": "no-auth" - } + {"api_endpoint": catalog_service + "/api", "auth_provider_type": "no-auth"} ) client = OpenMetadataAPIClient(metadata_config) databases = client.list_databases() assert create_delete_database(client) + def test_read_schema(): url = "mysql+pymysql://catalog_user:catalog_password@localhost:3307" # pool_recycle to avoid the occasional "Lost connection to MySQL server during query" error diff --git a/ingestion/tests/integration/ometa/test_ometa_chart_api.py b/ingestion/tests/integration/ometa/test_ometa_chart_api.py new file mode 100644 index 00000000000..0009b9234be --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_chart_api.py @@ -0,0 +1,179 @@ +""" +OpenMetadata high-level API Database test +""" +import uuid +from unittest import TestCase + +from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest +from metadata.generated.schema.api.services.createDashboardService import ( + CreateDashboardServiceEntityRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest +from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.services.dashboardService import ( + DashboardService, + DashboardServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +class OMetaChartTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + service_entity_id = None + + server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + user = metadata.create_or_update( + data=CreateUserEntityRequest(name="random-user", email="random@user.com"), + ) + owner = EntityReference(id=user.id, type="user") + + service = CreateDashboardServiceEntityRequest( + name="test-service-chart", + serviceType=DashboardServiceType.Superset, + dashboardUrl="https://localhost:1000", + ) + service_type = "dashboardService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + cls.service_entity = cls.metadata.create_or_update(data=cls.service) + + cls.entity = Chart( + id=uuid.uuid4(), + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + fullyQualifiedName="test-service-chart.test", + ) + + cls.create = CreateChartEntityRequest( + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + _id = str( + cls.metadata.get_by_name( + entity=Chart, fqdn="test-service-chart.test" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=DashboardService, fqdn="test-service-chart" + ).id.__root__ + ) + + cls.metadata.delete(entity=Chart, entity_id=_id) + cls.metadata.delete(entity=DashboardService, entity_id=service_id) + + def test_create(self): + """ + We can create a Chart and we receive it back as Entity + """ + + res = self.metadata.create_or_update(data=self.create) + + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.service.id, self.entity.service.id) + self.assertEqual(res.owner, None) + + def test_update(self): + """ + Updating it properly changes its properties + """ + + res_create = self.metadata.create_or_update(data=self.create) + + updated = self.create.dict(exclude_unset=True) + updated["owner"] = self.owner + updated_entity = CreateChartEntityRequest(**updated) + + res = self.metadata.create_or_update(data=updated_entity) + + # Same ID, updated algorithm + self.assertEqual(res.service.id, updated_entity.service.id) + self.assertEqual(res_create.id, res.id) + self.assertEqual(res.owner.id, self.user.id) + + def test_get_name(self): + """ + We can fetch a Chart by name and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.get_by_name( + entity=Chart, fqdn=self.entity.fullyQualifiedName + ) + self.assertEqual(res.name, self.entity.name) + + def test_get_id(self): + """ + We can fetch a Chart by ID and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + # First pick up by name + res_name = self.metadata.get_by_name( + entity=Chart, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res = self.metadata.get_by_id(entity=Chart, entity_id=str(res_name.id.__root__)) + + self.assertEqual(res_name.id, res.id) + + def test_list(self): + """ + We can list all our Charts + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.list_entities(entity=Chart, limit=100) + + # Fetch our test Database. We have already inserted it, so we should find it + data = next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) + assert data + + def test_delete(self): + """ + We can delete a Chart by ID + """ + + self.metadata.create_or_update(data=self.create) + + # Find by name + res_name = self.metadata.get_by_name( + entity=Chart, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res_id = self.metadata.get_by_id( + entity=Chart, entity_id=str(res_name.id.__root__) + ) + + # Delete + self.metadata.delete(entity=Chart, entity_id=str(res_id.id.__root__)) + + # Then we should not find it + res = self.metadata.list_entities(entity=Chart) + assert not next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_dashboard_api.py b/ingestion/tests/integration/ometa/test_ometa_dashboard_api.py new file mode 100644 index 00000000000..767197a56c5 --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_dashboard_api.py @@ -0,0 +1,183 @@ +""" +OpenMetadata high-level API Database test +""" +import uuid +from unittest import TestCase + +from metadata.generated.schema.api.data.createDashboard import ( + CreateDashboardEntityRequest, +) +from metadata.generated.schema.api.services.createDashboardService import ( + CreateDashboardServiceEntityRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.services.dashboardService import ( + DashboardService, + DashboardServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +class OMetaDashboardTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + service_entity_id = None + + server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + user = metadata.create_or_update( + data=CreateUserEntityRequest(name="random-user", email="random@user.com"), + ) + owner = EntityReference(id=user.id, type="user") + + service = CreateDashboardServiceEntityRequest( + name="test-service-dashboard", + serviceType=DashboardServiceType.Superset, + dashboardUrl="https://localhost:1000", + ) + service_type = "dashboardService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + cls.service_entity = cls.metadata.create_or_update(data=cls.service) + + cls.entity = Dashboard( + id=uuid.uuid4(), + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + fullyQualifiedName="test-service-dashboard.test", + ) + + cls.create = CreateDashboardEntityRequest( + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + _id = str( + cls.metadata.get_by_name( + entity=Dashboard, fqdn="test-service-dashboard.test" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=DashboardService, fqdn="test-service-dashboard" + ).id.__root__ + ) + + cls.metadata.delete(entity=Dashboard, entity_id=_id) + cls.metadata.delete(entity=DashboardService, entity_id=service_id) + + def test_create(self): + """ + We can create a Dashboard and we receive it back as Entity + """ + + res = self.metadata.create_or_update(data=self.create) + + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.service.id, self.entity.service.id) + self.assertEqual(res.owner, None) + + def test_update(self): + """ + Updating it properly changes its properties + """ + + res_create = self.metadata.create_or_update(data=self.create) + + updated = self.create.dict(exclude_unset=True) + updated["owner"] = self.owner + updated_entity = CreateDashboardEntityRequest(**updated) + + res = self.metadata.create_or_update(data=updated_entity) + + # Same ID, updated algorithm + self.assertEqual(res.service.id, updated_entity.service.id) + self.assertEqual(res_create.id, res.id) + self.assertEqual(res.owner.id, self.user.id) + + def test_get_name(self): + """ + We can fetch a Dashboard by name and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.get_by_name( + entity=Dashboard, fqdn=self.entity.fullyQualifiedName + ) + self.assertEqual(res.name, self.entity.name) + + def test_get_id(self): + """ + We can fetch a Dashboard by ID and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + # First pick up by name + res_name = self.metadata.get_by_name( + entity=Dashboard, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res = self.metadata.get_by_id( + entity=Dashboard, entity_id=str(res_name.id.__root__) + ) + + self.assertEqual(res_name.id, res.id) + + def test_list(self): + """ + We can list all our Dashboards + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.list_entities(entity=Dashboard, limit=100) + + # Fetch our test Database. We have already inserted it, so we should find it + data = next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) + assert data + + def test_delete(self): + """ + We can delete a Dashboard by ID + """ + + self.metadata.create_or_update(data=self.create) + + # Find by name + res_name = self.metadata.get_by_name( + entity=Dashboard, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res_id = self.metadata.get_by_id( + entity=Dashboard, entity_id=str(res_name.id.__root__) + ) + + # Delete + self.metadata.delete(entity=Dashboard, entity_id=str(res_id.id.__root__)) + + # Then we should not find it + res = self.metadata.list_entities(entity=Dashboard) + assert not next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_database_api.py b/ingestion/tests/integration/ometa/test_ometa_database_api.py index e4decd4e805..95feda70ad2 100644 --- a/ingestion/tests/integration/ometa/test_ometa_database_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_database_api.py @@ -18,7 +18,7 @@ from metadata.generated.schema.entity.services.databaseService import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.jdbcConnection import JdbcInfo -from metadata.ingestion.ometa.ometa_api import OMeta +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -31,58 +31,67 @@ class OMetaDatabaseTest(TestCase): service_entity_id = None server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") - metadata = OMeta(server_config) + metadata = OpenMetadata(server_config) user = metadata.create_or_update( - entity=CreateUserEntityRequest, data=CreateUserEntityRequest(name="random-user", email="random@user.com"), ) owner = EntityReference(id=user.id, type="user") service = CreateDatabaseServiceEntityRequest( - name="test-service", + name="test-service-db", serviceType=DatabaseServiceType.MySQL, jdbc=JdbcInfo(driverClass="jdbc", connectionUrl="jdbc://localhost"), ) - def setUp(self) -> None: + @classmethod + def setUpClass(cls) -> None: """ Prepare ingredients """ - self.service_entity = self.metadata.create_or_update( - entity=CreateDatabaseServiceEntityRequest, data=self.service - ) + cls.service_entity = cls.metadata.create_or_update(data=cls.service) - self.entity = Database( + cls.entity = Database( id=uuid.uuid4(), name="test-db", - service=EntityReference(id=self.service_entity.id, type="databaseService"), - fullyQualifiedName="test-service.test-db", + service=EntityReference(id=cls.service_entity.id, type="databaseService"), + fullyQualifiedName="test-service-db.test-db", ) - self.create = CreateDatabaseEntityRequest( + + cls.create = CreateDatabaseEntityRequest( name="test-db", - service=EntityReference(id=self.service_entity.id, type="databaseService"), + service=EntityReference(id=cls.service_entity.id, type="databaseService"), ) - self.service_entity_id = str(self.service_entity.id.__root__) - - def tearDown(self) -> None: + @classmethod + def tearDownClass(cls) -> None: """ Clean up """ - self.metadata.delete(entity=DatabaseService, entity_id=self.service_entity_id) + db_id = str( + cls.metadata.get_by_name( + entity=Database, fqdn="test-service-db.test-db" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=DatabaseService, fqdn="test-service-db" + ).id.__root__ + ) + + cls.metadata.delete(entity=Database, entity_id=db_id) + cls.metadata.delete(entity=DatabaseService, entity_id=service_id) def test_create(self): """ We can create a Database and we receive it back as Entity """ - res = self.metadata.create_or_update( - entity=CreateDatabaseEntityRequest, data=self.create - ) + res = self.metadata.create_or_update(data=self.create) - self.assertEqual(res.name, self.create.name) - self.assertEqual(res.service.id, self.create.service.id) + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.service.id, self.entity.service.id) self.assertEqual(res.owner, None) def test_update(self): @@ -90,15 +99,13 @@ class OMetaDatabaseTest(TestCase): Updating it properly changes its properties """ - res_create = self.metadata.create_or_update( - entity=CreateDatabaseEntityRequest, data=self.create - ) + res_create = self.metadata.create_or_update(data=self.create) - updated = self.entity.dict(exclude_unset=True) + updated = self.create.dict(exclude_unset=True) updated["owner"] = self.owner - updated_entity = Database(**updated) + updated_entity = CreateDatabaseEntityRequest(**updated) - res = self.metadata.create_or_update(entity=Database, data=updated_entity) + res = self.metadata.create_or_update(data=updated_entity) # Same ID, updated algorithm self.assertEqual(res.service.id, updated_entity.service.id) @@ -110,7 +117,7 @@ class OMetaDatabaseTest(TestCase): We can fetch a Database by name and get it back as Entity """ - self.metadata.create_or_update(entity=Database, data=self.entity) + self.metadata.create_or_update(data=self.create) res = self.metadata.get_by_name( entity=Database, fqdn=self.entity.fullyQualifiedName @@ -122,7 +129,7 @@ class OMetaDatabaseTest(TestCase): We can fetch a Database by ID and get it back as Entity """ - self.metadata.create_or_update(entity=Database, data=self.entity) + self.metadata.create_or_update(data=self.create) # First pick up by name res_name = self.metadata.get_by_name( @@ -140,7 +147,7 @@ class OMetaDatabaseTest(TestCase): We can list all our Database """ - self.metadata.create_or_update(entity=Database, data=self.entity) + self.metadata.create_or_update(data=self.create) res = self.metadata.list_entities(entity=Database) @@ -155,7 +162,7 @@ class OMetaDatabaseTest(TestCase): We can delete a Database by ID """ - self.metadata.create_or_update(entity=Database, data=self.entity) + self.metadata.create_or_update(data=self.create) # Find by name res_name = self.metadata.get_by_name( @@ -171,7 +178,6 @@ class OMetaDatabaseTest(TestCase): # Then we should not find it res = self.metadata.list_entities(entity=Database) - print(res) assert not next( iter(ent for ent in res.entities if ent.name == self.entity.name), None ) diff --git a/ingestion/tests/integration/ometa/test_ometa_model_api.py b/ingestion/tests/integration/ometa/test_ometa_model_api.py index d561e566fa1..6c13cc79836 100644 --- a/ingestion/tests/integration/ometa/test_ometa_model_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_model_api.py @@ -8,7 +8,7 @@ from metadata.generated.schema.api.data.createModel import CreateModelEntityRequ from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest from metadata.generated.schema.entity.data.model import Model from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.ometa.ometa_api import OMeta +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -19,10 +19,9 @@ class OMetaModelTest(TestCase): """ server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") - metadata = OMeta(server_config) + metadata = OpenMetadata(server_config) user = metadata.create_or_update( - entity=CreateUserEntityRequest, data=CreateUserEntityRequest(name="random-user", email="random@user.com"), ) owner = EntityReference(id=user.id, type="user") @@ -40,12 +39,10 @@ class OMetaModelTest(TestCase): We can create a Model and we receive it back as Entity """ - res = self.metadata.create_or_update( - entity=CreateModelEntityRequest, data=self.create - ) + res = self.metadata.create_or_update(data=self.create) - self.assertEqual(res.name, self.create.name) - self.assertEqual(res.algorithm, self.create.algorithm) + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.algorithm, self.entity.algorithm) self.assertEqual(res.owner, None) def test_update(self): @@ -53,15 +50,13 @@ class OMetaModelTest(TestCase): Updating it properly changes its properties """ - res_create = self.metadata.create_or_update( - entity=CreateModelEntityRequest, data=self.create - ) + res_create = self.metadata.create_or_update(data=self.create) - updated = self.entity.dict(exclude_unset=True) + updated = self.create.dict(exclude_unset=True) updated["owner"] = self.owner - updated_entity = Model(**updated) + updated_entity = CreateModelEntityRequest(**updated) - res = self.metadata.create_or_update(entity=Model, data=updated_entity) + res = self.metadata.create_or_update(data=updated_entity) # Same ID, updated algorithm self.assertEqual(res.algorithm, updated_entity.algorithm) @@ -73,7 +68,7 @@ class OMetaModelTest(TestCase): We can fetch a model by name and get it back as Entity """ - self.metadata.create_or_update(entity=Model, data=self.entity) + self.metadata.create_or_update(data=self.create) res = self.metadata.get_by_name( entity=Model, fqdn=self.entity.fullyQualifiedName @@ -85,7 +80,7 @@ class OMetaModelTest(TestCase): We can fetch a model by ID and get it back as Entity """ - self.metadata.create_or_update(entity=Model, data=self.entity) + self.metadata.create_or_update(data=self.create) # First pick up by name res_name = self.metadata.get_by_name( @@ -101,7 +96,7 @@ class OMetaModelTest(TestCase): We can list all our models """ - self.metadata.create_or_update(entity=Model, data=self.entity) + self.metadata.create_or_update(data=self.create) res = self.metadata.list_entities(entity=Model) @@ -116,7 +111,7 @@ class OMetaModelTest(TestCase): We can delete a model by ID """ - self.metadata.create_or_update(entity=Model, data=self.entity) + self.metadata.create_or_update(data=self.create) # Find by name res_name = self.metadata.get_by_name( diff --git a/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py b/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py new file mode 100644 index 00000000000..45dc0950a29 --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py @@ -0,0 +1,183 @@ +""" +OpenMetadata high-level API Database test +""" +import uuid +from unittest import TestCase + +from metadata.generated.schema.api.data.createPipeline import ( + CreatePipelineEntityRequest, +) +from metadata.generated.schema.api.services.createPipelineService import ( + CreatePipelineServiceEntityRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +class OMetaPipelineTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + service_entity_id = None + + server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + user = metadata.create_or_update( + data=CreateUserEntityRequest(name="random-user", email="random@user.com"), + ) + owner = EntityReference(id=user.id, type="user") + + service = CreatePipelineServiceEntityRequest( + name="test-service-pipeline", + serviceType=PipelineServiceType.Airflow, + pipelineUrl="https://localhost:1000", + ) + service_type = "pipelineService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + cls.service_entity = cls.metadata.create_or_update(data=cls.service) + + cls.entity = Pipeline( + id=uuid.uuid4(), + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + fullyQualifiedName="test-service-pipeline.test", + ) + + cls.create = CreatePipelineEntityRequest( + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + _id = str( + cls.metadata.get_by_name( + entity=Pipeline, fqdn="test-service-pipeline.test" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=PipelineService, fqdn="test-service-pipeline" + ).id.__root__ + ) + + cls.metadata.delete(entity=Pipeline, entity_id=_id) + cls.metadata.delete(entity=PipelineService, entity_id=service_id) + + def test_create(self): + """ + We can create a Pipeline and we receive it back as Entity + """ + + res = self.metadata.create_or_update(data=self.create) + + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.service.id, self.entity.service.id) + self.assertEqual(res.owner, None) + + def test_update(self): + """ + Updating it properly changes its properties + """ + + res_create = self.metadata.create_or_update(data=self.create) + + updated = self.create.dict(exclude_unset=True) + updated["owner"] = self.owner + updated_entity = CreatePipelineEntityRequest(**updated) + + res = self.metadata.create_or_update(data=updated_entity) + + # Same ID, updated algorithm + self.assertEqual(res.service.id, updated_entity.service.id) + self.assertEqual(res_create.id, res.id) + self.assertEqual(res.owner.id, self.user.id) + + def test_get_name(self): + """ + We can fetch a Pipeline by name and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.get_by_name( + entity=Pipeline, fqdn=self.entity.fullyQualifiedName + ) + self.assertEqual(res.name, self.entity.name) + + def test_get_id(self): + """ + We can fetch a Pipeline by ID and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + # First pick up by name + res_name = self.metadata.get_by_name( + entity=Pipeline, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res = self.metadata.get_by_id( + entity=Pipeline, entity_id=str(res_name.id.__root__) + ) + + self.assertEqual(res_name.id, res.id) + + def test_list(self): + """ + We can list all our Pipelines + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.list_entities(entity=Pipeline, limit=100) + + # Fetch our test Database. We have already inserted it, so we should find it + data = next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) + assert data + + def test_delete(self): + """ + We can delete a Pipeline by ID + """ + + self.metadata.create_or_update(data=self.create) + + # Find by name + res_name = self.metadata.get_by_name( + entity=Pipeline, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res_id = self.metadata.get_by_id( + entity=Pipeline, entity_id=str(res_name.id.__root__) + ) + + # Delete + self.metadata.delete(entity=Pipeline, entity_id=str(res_id.id.__root__)) + + # Then we should not find it + res = self.metadata.list_entities(entity=Pipeline) + assert not next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_table_api.py b/ingestion/tests/integration/ometa/test_ometa_table_api.py new file mode 100644 index 00000000000..b0fa2cc3f2e --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_table_api.py @@ -0,0 +1,200 @@ +""" +OpenMetadata high-level API Database test +""" +import uuid +from unittest import TestCase + +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.jdbcConnection import JdbcInfo +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +class OMetaTableTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + service_entity_id = None + + server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + user = metadata.create_or_update( + data=CreateUserEntityRequest(name="random-user", email="random@user.com"), + ) + owner = EntityReference(id=user.id, type="user") + + service = CreateDatabaseServiceEntityRequest( + name="test-service", + serviceType=DatabaseServiceType.MySQL, + jdbc=JdbcInfo(driverClass="jdbc", connectionUrl="jdbc://localhost"), + ) + service_type = "databaseService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + cls.service_entity = cls.metadata.create_or_update(data=cls.service) + + cls.create_db = CreateDatabaseEntityRequest( + name="test-db", + service=EntityReference(id=cls.service_entity.id, type="databaseService"), + ) + + cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db) + + cls.entity = Table( + id=uuid.uuid4(), + name="test", + database=EntityReference(id=cls.create_db_entity.id, type="database"), + fullyQualifiedName="test-service.test-db.test", + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + cls.create = CreateTableEntityRequest( + name="test", + database=cls.create_db_entity.id, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + _id = str( + cls.metadata.get_by_name( + entity=Table, fqdn="test-service.test-db.test" + ).id.__root__ + ) + + database_id = str( + cls.metadata.get_by_name( + entity=Database, fqdn="test-service.test-db" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=DatabaseService, fqdn="test-service" + ).id.__root__ + ) + + cls.metadata.delete(entity=Table, entity_id=_id) + cls.metadata.delete(entity=Database, entity_id=database_id) + cls.metadata.delete(entity=DatabaseService, entity_id=service_id) + + def test_create(self): + """ + We can create a Table and we receive it back as Entity + """ + + res = self.metadata.create_or_update(data=self.create) + + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.database.id, self.entity.database.id) + self.assertEqual(res.owner, None) + + def test_update(self): + """ + Updating it properly changes its properties + """ + + res_create = self.metadata.create_or_update(data=self.create) + + updated = self.create.dict(exclude_unset=True) + updated["owner"] = self.owner + updated_entity = CreateTableEntityRequest(**updated) + + res = self.metadata.create_or_update(data=updated_entity) + + # Same ID, updated algorithm + self.assertEqual(res.database.id, updated_entity.database) + self.assertEqual(res_create.id, res.id) + self.assertEqual(res.owner.id, self.user.id) + + def test_get_name(self): + """ + We can fetch a Table by name and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.get_by_name( + entity=Table, fqdn=self.entity.fullyQualifiedName + ) + self.assertEqual(res.name, self.entity.name) + + def test_get_id(self): + """ + We can fetch a Table by ID and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + # First pick up by name + res_name = self.metadata.get_by_name( + entity=Table, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res = self.metadata.get_by_id(entity=Table, entity_id=str(res_name.id.__root__)) + + self.assertEqual(res_name.id, res.id) + + def test_list(self): + """ + We can list all our Tables + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.list_entities(entity=Table, limit=100) + + # Fetch our test Database. We have already inserted it, so we should find it + data = next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) + assert data + + def test_delete(self): + """ + We can delete a Table by ID + """ + + self.metadata.create_or_update(data=self.create) + + # Find by name + res_name = self.metadata.get_by_name( + entity=Table, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res_id = self.metadata.get_by_id( + entity=Table, entity_id=str(res_name.id.__root__) + ) + + # Delete + self.metadata.delete(entity=Table, entity_id=str(res_id.id.__root__)) + + # Then we should not find it + res = self.metadata.list_entities(entity=Table) + assert not next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_task_api.py b/ingestion/tests/integration/ometa/test_ometa_task_api.py new file mode 100644 index 00000000000..31d14bb7e8b --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_task_api.py @@ -0,0 +1,179 @@ +""" +OpenMetadata high-level API Database test +""" +import uuid +from unittest import TestCase + +from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest +from metadata.generated.schema.api.services.createPipelineService import ( + CreatePipelineServiceEntityRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest +from metadata.generated.schema.entity.data.task import Task +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +class OMetaTaskTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + service_entity_id = None + + server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + user = metadata.create_or_update( + data=CreateUserEntityRequest(name="random-user", email="random@user.com"), + ) + owner = EntityReference(id=user.id, type="user") + + service = CreatePipelineServiceEntityRequest( + name="test-service-task", + serviceType=PipelineServiceType.Airflow, + pipelineUrl="https://localhost:1000", + ) + service_type = "pipelineService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + cls.service_entity = cls.metadata.create_or_update(data=cls.service) + + cls.entity = Task( + id=uuid.uuid4(), + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + fullyQualifiedName="test-service-task.test", + ) + + cls.create = CreateTaskEntityRequest( + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + _id = str( + cls.metadata.get_by_name( + entity=Task, fqdn="test-service-task.test" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=PipelineService, fqdn="test-service-task" + ).id.__root__ + ) + + cls.metadata.delete(entity=Task, entity_id=_id) + cls.metadata.delete(entity=PipelineService, entity_id=service_id) + + def test_create(self): + """ + We can create a Task and we receive it back as Entity + """ + + res = self.metadata.create_or_update(data=self.create) + + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.service.id, self.entity.service.id) + self.assertEqual(res.owner, None) + + def test_update(self): + """ + Updating it properly changes its properties + """ + + res_create = self.metadata.create_or_update(data=self.create) + + updated = self.create.dict(exclude_unset=True) + updated["owner"] = self.owner + updated_entity = CreateTaskEntityRequest(**updated) + + res = self.metadata.create_or_update(data=updated_entity) + + # Same ID, updated algorithm + self.assertEqual(res.service.id, updated_entity.service.id) + self.assertEqual(res_create.id, res.id) + self.assertEqual(res.owner.id, self.user.id) + + def test_get_name(self): + """ + We can fetch a Task by name and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.get_by_name( + entity=Task, fqdn=self.entity.fullyQualifiedName + ) + self.assertEqual(res.name, self.entity.name) + + def test_get_id(self): + """ + We can fetch a Task by ID and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + # First pick up by name + res_name = self.metadata.get_by_name( + entity=Task, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res = self.metadata.get_by_id(entity=Task, entity_id=str(res_name.id.__root__)) + + self.assertEqual(res_name.id, res.id) + + def test_list(self): + """ + We can list all our Tasks + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.list_entities(entity=Task, limit=100) + + # Fetch our test Database. We have already inserted it, so we should find it + data = next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) + assert data + + def test_delete(self): + """ + We can delete a Task by ID + """ + + self.metadata.create_or_update(data=self.create) + + # Find by name + res_name = self.metadata.get_by_name( + entity=Task, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res_id = self.metadata.get_by_id( + entity=Task, entity_id=str(res_name.id.__root__) + ) + + # Delete + self.metadata.delete(entity=Task, entity_id=str(res_id.id.__root__)) + + # Then we should not find it + res = self.metadata.list_entities(entity=Task) + assert not next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_topic_api.py b/ingestion/tests/integration/ometa/test_ometa_topic_api.py new file mode 100644 index 00000000000..dda8a2be675 --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_topic_api.py @@ -0,0 +1,184 @@ +""" +OpenMetadata high-level API Database test +""" +import uuid +from unittest import TestCase + +from ingestion.src.metadata.generated.schema.entity.services.messagingService import ( + Brokers, +) +from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequest +from metadata.generated.schema.api.services.createMessagingService import ( + CreateMessagingServiceEntityRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest +from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.messagingService import ( + MessagingService, + MessagingServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig + + +class OMetaTopicTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + service_entity_id = None + + server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + user = metadata.create_or_update( + data=CreateUserEntityRequest(name="random-user", email="random@user.com"), + ) + owner = EntityReference(id=user.id, type="user") + + service = CreateMessagingServiceEntityRequest( + name="test-service-topic", + serviceType=MessagingServiceType.Kafka, + brokers=["https://localhost:1000"], + ) + service_type = "messagingService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + cls.service_entity = cls.metadata.create_or_update(data=cls.service) + + cls.entity = Topic( + id=uuid.uuid4(), + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + fullyQualifiedName="test-service-topic.test", + partitions=2, + ) + + cls.create = CreateTopicEntityRequest( + name="test", + service=EntityReference(id=cls.service_entity.id, type=cls.service_type), + partitions=2, + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + _id = str( + cls.metadata.get_by_name( + entity=Topic, fqdn="test-service-topic.test" + ).id.__root__ + ) + + service_id = str( + cls.metadata.get_by_name( + entity=MessagingService, fqdn="test-service-topic" + ).id.__root__ + ) + + cls.metadata.delete(entity=Topic, entity_id=_id) + cls.metadata.delete(entity=MessagingService, entity_id=service_id) + + def test_create(self): + """ + We can create a Topic and we receive it back as Entity + """ + + res = self.metadata.create_or_update(data=self.create) + + self.assertEqual(res.name, self.entity.name) + self.assertEqual(res.service.id, self.entity.service.id) + self.assertEqual(res.owner, None) + + def test_update(self): + """ + Updating it properly changes its properties + """ + + res_create = self.metadata.create_or_update(data=self.create) + + updated = self.create.dict(exclude_unset=True) + updated["owner"] = self.owner + updated_entity = CreateTopicEntityRequest(**updated) + + res = self.metadata.create_or_update(data=updated_entity) + + # Same ID, updated algorithm + self.assertEqual(res.service.id, updated_entity.service.id) + self.assertEqual(res_create.id, res.id) + self.assertEqual(res.owner.id, self.user.id) + + def test_get_name(self): + """ + We can fetch a Topic by name and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.get_by_name( + entity=Topic, fqdn=self.entity.fullyQualifiedName + ) + self.assertEqual(res.name, self.entity.name) + + def test_get_id(self): + """ + We can fetch a Topic by ID and get it back as Entity + """ + + self.metadata.create_or_update(data=self.create) + + # First pick up by name + res_name = self.metadata.get_by_name( + entity=Topic, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res = self.metadata.get_by_id(entity=Topic, entity_id=str(res_name.id.__root__)) + + self.assertEqual(res_name.id, res.id) + + def test_list(self): + """ + We can list all our Topics + """ + + self.metadata.create_or_update(data=self.create) + + res = self.metadata.list_entities(entity=Topic, limit=100) + + # Fetch our test Database. We have already inserted it, so we should find it + data = next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) + assert data + + def test_delete(self): + """ + We can delete a Topic by ID + """ + + self.metadata.create_or_update(data=self.create) + + # Find by name + res_name = self.metadata.get_by_name( + entity=Topic, fqdn=self.entity.fullyQualifiedName + ) + # Then fetch by ID + res_id = self.metadata.get_by_id( + entity=Topic, entity_id=str(res_name.id.__root__) + ) + + # Delete + self.metadata.delete(entity=Topic, entity_id=str(res_id.id.__root__)) + + # Then we should not find it + res = self.metadata.list_entities(entity=Topic) + assert not next( + iter(ent for ent in res.entities if ent.name == self.entity.name), None + ) diff --git a/ingestion/tests/integration/postgres/test_postgres_crud.py b/ingestion/tests/integration/postgres/test_postgres_crud.py index 2843c0300aa..9c33d572acd 100644 --- a/ingestion/tests/integration/postgres/test_postgres_crud.py +++ b/ingestion/tests/integration/postgres/test_postgres_crud.py @@ -13,17 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.ingestion.ometa.client import REST -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.entity.data.table import Column -from metadata.generated.schema.type.entityReference import EntityReference import pytest import requests -headers = {'Content-type': 'application/json'} -service_name = 'temp_local_postgres' +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.client import REST + +headers = {"Content-type": "application/json"} +service_name = "temp_local_postgres" database_name = "Test_Postgres" table_name = "test1" @@ -50,11 +55,16 @@ def catalog_service(docker_ip, docker_services): def test_create_database_service(catalog_service): - client = REST(catalog_service + "/api", 'test', 'test') - data = {'jdbc': {'connectionUrl': 'postgresql+psycopg2://catalog_user:catalog_password@localhost:5432/pagila', 'driverClass': 'jdbc'}, - 'name': 'temp_local_postgres', - 'serviceType': 'POSTGRES', - 'description': 'local postgres env'} + client = REST(catalog_service + "/api", "test", "test") + data = { + "jdbc": { + "connectionUrl": "postgresql+psycopg2://catalog_user:catalog_password@localhost:5432/pagila", + "driverClass": "jdbc", + }, + "name": "temp_local_postgres", + "serviceType": "POSTGRES", + "description": "local postgres env", + } create_postgres_service = CreateDatabaseServiceEntityRequest(**data) database_service = client.create_database_service(create_postgres_service) if database_service: @@ -64,17 +74,31 @@ def test_create_database_service(catalog_service): def test_create_table_service(catalog_service): - client = REST(catalog_service + "/api", 'test', 'test') + client = REST(catalog_service + "/api", "test", "test") postgres_dbservice = client.get_database_service(service_name) - columns = [Column(name='test', description='test_desc', - columnDataType='VARCHAR', ordinalPosition=0), - Column(name='test2', description='test_desc2', - columnDataType='INT', ordinalPosition=1)] + columns = [ + Column( + name="test", + description="test_desc", + columnDataType="VARCHAR", + ordinalPosition=0, + ), + Column( + name="test2", + description="test_desc2", + columnDataType="INT", + ordinalPosition=1, + ), + ] create_database_request = CreateDatabaseEntityRequest( - name=database_name, service=EntityReference(id=postgres_dbservice.id, type="databaseService")) + name=database_name, + service=EntityReference(id=postgres_dbservice.id, type="databaseService"), + ) created_database = client.create_database(create_database_request) - table = CreateTableEntityRequest(name=table_name, columns=columns, database=created_database.id.__root__) + table = CreateTableEntityRequest( + name=table_name, columns=columns, database=created_database.id.__root__ + ) created_table = client.create_or_update_table(table) if created_database and created_table: assert 1 @@ -83,11 +107,13 @@ def test_create_table_service(catalog_service): def test_check_and_delete_ingest(catalog_service): - client = REST(catalog_service + "/api", 'test', 'test') + client = REST(catalog_service + "/api", "test", "test") postgres_dbservice = client.get_database_service(service_name) - database = client.get_database_by_name('{}.{}'.format(service_name, database_name)) - table = client.get_table_by_name(f'{service_name}.{database_name}.{table_name}') - r = requests.delete('http://localhost:8585/api/v1/tables/{}'.format(table.id.__root__)) + database = client.get_database_by_name("{}.{}".format(service_name, database_name)) + table = client.get_table_by_name(f"{service_name}.{database_name}.{table_name}") + r = requests.delete( + "http://localhost:8585/api/v1/tables/{}".format(table.id.__root__) + ) r.raise_for_status() client.delete_database(database.id.__root__) client.delete_database_service(postgres_dbservice.id.__root__) diff --git a/ingestion/tests/unit/__init__.py b/ingestion/tests/unit/__init__.py index dc408941a6d..da03fdb2aec 100644 --- a/ingestion/tests/unit/__init__.py +++ b/ingestion/tests/unit/__init__.py @@ -12,4 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/ingestion/tests/unit/helpers_test.py b/ingestion/tests/unit/helpers_test.py index b8fbca1168d..546c5eaee9f 100644 --- a/ingestion/tests/unit/helpers_test.py +++ b/ingestion/tests/unit/helpers_test.py @@ -15,23 +15,30 @@ import json from unittest import TestCase -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest +from metadata.generated.schema.api.data.createDatabase import ( + CreateDatabaseEntityRequest, +) from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.api.services.createDatabaseService import \ - CreateDatabaseServiceEntityRequest +from metadata.generated.schema.api.services.createDashboardService import ( + CreateDashboardServiceEntityRequest, +) +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceEntityRequest, +) +from metadata.generated.schema.api.services.createMessagingService import ( + CreateMessagingServiceEntityRequest, +) from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient -from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig -from metadata.generated.schema.api.services.createDashboardService import \ - CreateDashboardServiceEntityRequest -from metadata.generated.schema.api.services.createMessagingService import \ - CreateMessagingServiceEntityRequest +from metadata.ingestion.ometa.openmetadata_rest import ( + MetadataServerConfig, + OpenMetadataAPIClient, +) class RestTest(TestCase): - file_path = 'tests/unit/mysql_test.json' + file_path = "tests/unit/mysql_test.json" with open(file_path) as ingestionFile: ingestionData = ingestionFile.read() client_config = json.loads(ingestionData).get("metadata_server") @@ -42,66 +49,90 @@ class RestTest(TestCase): def test_1_create_service(self): data = { - 'jdbc': {'connectionUrl': 'mysql://localhost/openmetadata_db', 'driverClass': 'jdbc'}, - 'name': 'local_mysql_test', - 'serviceType': "MySQL", - 'description': 'local mysql env'} + "jdbc": { + "connectionUrl": "mysql://localhost/openmetadata_db", + "driverClass": "jdbc", + }, + "name": "local_mysql_test", + "serviceType": "MySQL", + "description": "local mysql env", + } create_mysql_service = CreateDatabaseServiceEntityRequest(**data) - mysql_service = self.openmetadata_client.create_database_service(create_mysql_service) + mysql_service = self.openmetadata_client.create_database_service( + create_mysql_service + ) self.assertEqual(mysql_service.name, create_mysql_service.name) def test_2_get_service(self): - mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') - self.assertEqual(mysql_service.name, 'local_mysql_test') + mysql_service = self.openmetadata_client.get_database_service( + "local_mysql_test" + ) + self.assertEqual(mysql_service.name, "local_mysql_test") def test_3_get_service_by_id(self): - mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') + mysql_service = self.openmetadata_client.get_database_service( + "local_mysql_test" + ) mysql_service_get_id = self.openmetadata_client.get_database_service_by_id( mysql_service.id.__root__ ) self.assertEqual(mysql_service.id, mysql_service_get_id.id) def test_4_create_update_databases(self): - mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') - service_reference = EntityReference(id=mysql_service.id.__root__, type="databaseService") + mysql_service = self.openmetadata_client.get_database_service( + "local_mysql_test" + ) + service_reference = EntityReference( + id=mysql_service.id.__root__, type="databaseService" + ) create_database_request = CreateDatabaseEntityRequest( name="dwh", service=service_reference ) - created_database = self.openmetadata_client.create_database(create_database_request) + created_database = self.openmetadata_client.create_database( + create_database_request + ) created_database.description = "hello world" update_database_request = CreateDatabaseEntityRequest( - name=created_database.name, description=created_database.description, - service=service_reference + name=created_database.name, + description=created_database.description, + service=service_reference, + ) + updated_database = self.openmetadata_client.create_database( + update_database_request ) - updated_database = self.openmetadata_client.create_database(update_database_request) self.assertEqual(updated_database.description, created_database.description) def test_5_create_table(self): databases = self.openmetadata_client.list_databases() - columns = [Column(name="id", columnDataType="INT"), - Column(name="name", columnDataType="VARCHAR")] + columns = [ + Column(name="id", columnDataType="INT"), + Column(name="name", columnDataType="VARCHAR"), + ] table = CreateTableEntityRequest( name="test1", columns=columns, database=databases[0].id.__root__ - ) + ) created_table = self.openmetadata_client.create_or_update_table(table) self.client.delete(f"/tables/{created_table.id.__root__}") self.client.delete(f"/databases/{databases[0].id.__root__}") self.assertEqual(table.name, created_table.name) def test_6_delete_service(self): - mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') + mysql_service = self.openmetadata_client.get_database_service( + "local_mysql_test" + ) self.openmetadata_client.delete_database_service(mysql_service.id.__root__) self.assertRaises( - APIError, self.openmetadata_client.get_database_service_by_id, - mysql_service.id.__root__ + APIError, + self.openmetadata_client.get_database_service_by_id, + mysql_service.id.__root__, ) def test_7_create_messaging_service(self): create_messaging_service = CreateMessagingServiceEntityRequest( - name='sample_kafka_test', - serviceType='Kafka', - brokers=['localhost:9092'], - schemaRegistry='http://localhost:8081' + name="sample_kafka_test", + serviceType="Kafka", + brokers=["localhost:9092"], + schemaRegistry="http://localhost:8081", ) messaging_service = self.openmetadata_client.create_messaging_service( create_messaging_service @@ -109,17 +140,21 @@ class RestTest(TestCase): self.assertEqual(create_messaging_service.name, messaging_service.name) def test_8_get_messaging_service(self): - messaging_service = self.openmetadata_client.get_messaging_service('sample_kafka_test') - self.client.delete(f"/services/messagingServices/{messaging_service.id.__root__}") - self.assertEqual(messaging_service.name, 'sample_kafka_test') + messaging_service = self.openmetadata_client.get_messaging_service( + "sample_kafka_test" + ) + self.client.delete( + f"/services/messagingServices/{messaging_service.id.__root__}" + ) + self.assertEqual(messaging_service.name, "sample_kafka_test") def test_9_create_dashboard_service(self): create_dashboard_service = CreateDashboardServiceEntityRequest( - name='sample_superset_test', - serviceType='Superset', - username='admin', - password='admin', - dashboardUrl='http://localhost:8088' + name="sample_superset_test", + serviceType="Superset", + username="admin", + password="admin", + dashboardUrl="http://localhost:8088", ) dashboard_service = None try: @@ -131,6 +166,10 @@ class RestTest(TestCase): self.assertEqual(create_dashboard_service.name, dashboard_service.name) def test_10_get_dashboard_service(self): - dashboard_service = self.openmetadata_client.get_dashboard_service('sample_superset_test') - self.client.delete(f"/services/dashboardServices/{dashboard_service.id.__root__}") - self.assertEqual(dashboard_service.name, 'sample_superset_test') + dashboard_service = self.openmetadata_client.get_dashboard_service( + "sample_superset_test" + ) + self.client.delete( + f"/services/dashboardServices/{dashboard_service.id.__root__}" + ) + self.assertEqual(dashboard_service.name, "sample_superset_test") diff --git a/ingestion/tests/unit/test_ometa_endpoints.py b/ingestion/tests/unit/test_ometa_endpoints.py index fb4626fcc13..1adb29f5eab 100644 --- a/ingestion/tests/unit/test_ometa_endpoints.py +++ b/ingestion/tests/unit/test_ometa_endpoints.py @@ -24,7 +24,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.teams.user import User -from metadata.ingestion.ometa.ometa_api import OMeta +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -35,7 +35,7 @@ class OMetaEndpointTest(TestCase): """ server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") - metadata = OMeta(server_config) + metadata = OpenMetadata(server_config) def test_entities_suffix(self): """ diff --git a/ingestion/tests/unit/workflow_test.py b/ingestion/tests/unit/workflow_test.py index 3f3286bfc98..0c543b39c8f 100644 --- a/ingestion/tests/unit/workflow_test.py +++ b/ingestion/tests/unit/workflow_test.py @@ -4,13 +4,15 @@ from unittest import TestCase from metadata.config.common import load_config_file from metadata.ingestion.api.workflow import Workflow -from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig +from metadata.ingestion.ometa.openmetadata_rest import ( + MetadataServerConfig, + OpenMetadataAPIClient, +) class WorkflowTest(TestCase): - def test_get_200(self): - key = 'metadata.ingestion.sink.metadata_rest.MetadataRestSink' + key = "metadata.ingestion.sink.metadata_rest.MetadataRestSink" if key.find(".") >= 0: module_name, class_name = key.rsplit(".", 1) my_class = getattr(importlib.import_module(module_name), class_name) @@ -18,7 +20,7 @@ class WorkflowTest(TestCase): def test_get_4xx(self): my_class = None - key = 'metadata.ingestion.sink.MYSQL.mysqlSINK' + key = "metadata.ingestion.sink.MYSQL.mysqlSINK" try: if key.find(".") >= 0: module_name, class_name = key.rsplit(".", 1) @@ -28,32 +30,34 @@ class WorkflowTest(TestCase): def test_title_typeClassFetch(self): is_file = True - file_type = 'query-parser' + file_type = "query-parser" if is_file: - replace = file_type.replace('-', '_') + replace = file_type.replace("-", "_") else: - replace = ''.join([i.title() for i in file_type.replace('-', '_').split('_')]) - self.assertEqual(replace, 'query_parser') + replace = "".join( + [i.title() for i in file_type.replace("-", "_").split("_")] + ) + self.assertEqual(replace, "query_parser") def test_title_typeClassFetch_4xx(self): is_file = False - file_type = 'query-parser' + file_type = "query-parser" if is_file: - replace = file_type.replace('-', '_') + replace = file_type.replace("-", "_") else: - replace = ''.join([i.title() for i in file_type.replace('-', '_').split('_')]) - self.assertEqual(replace, 'QueryParser') + replace = "".join( + [i.title() for i in file_type.replace("-", "_").split("_")] + ) + self.assertEqual(replace, "QueryParser") def test_execute_200(self): - config_file = pathlib.Path('tests/unit/mysql_test.json') + config_file = pathlib.Path("tests/unit/mysql_test.json") workflow_config = load_config_file(config_file) workflow = Workflow.create(workflow_config) workflow.execute() workflow.stop() config = MetadataServerConfig.parse_obj( - workflow_config.get('metadata_server').get( - 'config' - ) + workflow_config.get("metadata_server").get("config") ) client = OpenMetadataAPIClient(config).client @@ -61,17 +65,17 @@ class WorkflowTest(TestCase): f"/services/databaseServices/" f"{client.get('/services/databaseServices/name/local_mysql_test')['id']}" ) - file_path = '/tmp/mysql_test' + file_path = "/tmp/mysql_test" with open(file_path) as ingestionFile: ingestionData = ingestionFile.read() self.assertEqual(ingestionData is not None, True) def test_execute_4xx(self): - config_file = pathlib.Path('tests/unit/mysql_test.json') + config_file = pathlib.Path("tests/unit/mysql_test.json") workflow_config = load_config_file(config_file) ingestionData = None try: - file_path = '/tmp/mysql_test123' + file_path = "/tmp/mysql_test123" with open(file_path) as ingestionFile: ingestionData = ingestionFile.read() except FileNotFoundError: