diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json index 68215f9eb71..85ba9f91abb 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json @@ -20,7 +20,7 @@ } }, "properties": { - "serviceType": { + "type": { "description": "Service Type", "$ref": "#/definitions/athenaType", "default": "Athena" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json index fcb635f4234..6490c04b869 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json @@ -16,7 +16,8 @@ "properties": { "type": { "description": "Service Type", - "$ref": "#/definitions/sampleDataType" + "$ref": "#/definitions/sampleDataType", + "default": "SampleData" }, "sampleDataFolder": { "description": "Sample Data File Path", diff --git a/ingestion/examples/sample_data/glue/database_schema.json b/ingestion/examples/sample_data/glue/database_schema.json index eef513f7df9..d2e7c1ae5ac 100644 --- a/ingestion/examples/sample_data/glue/database_schema.json +++ b/ingestion/examples/sample_data/glue/database_schema.json @@ -4,7 +4,7 @@ "description": "This **mock** database contains tables related to the Glue service", "service": { "id": "b946d870-03b2-4d33-a075-13665a7a76b9", - "type": "GLUE" + "type": "Glue" } } \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index 9956fa2a1dc..ace9aa4633b 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -156,7 +156,7 @@ test = { build_options = {"includes": ["_cffi_backend"]} setup( name="openmetadata-ingestion", - version="0.9.2.dev1", + version="0.10.0.dev0", url="https://open-metadata.org/", author="OpenMetadata Committers", license="Apache License 2.0", diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index f5607d4c698..f78abbf2ee5 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -25,6 +25,7 @@ from sqlalchemy_bigquery._types import ( from metadata.generated.schema.api.tags.createTagCategory import ( CreateTagCategoryRequest, ) +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, @@ -35,6 +36,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource from metadata.utils.column_type_parser import create_sqlalchemy_type @@ -103,12 +105,15 @@ class BigquerySource(SQLSource): def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: BigQueryConnection = config.serviceConnection.__root__.config - options = connection.connectionOptions.dict() if not isinstance(connection, BigQueryConnection): raise InvalidSourceException( f"Expected BigQueryConnection, but got {connection}" ) - if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): + if ( + not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + and connection.connectionOptions + ): + options = connection.connectionOptions.dict() if options.get("credentials_path"): os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[ "credentials_path" @@ -177,6 +182,16 @@ class BigquerySource(SQLSource): super().fetch_sample_data(schema, table) + def _get_database(self, database: Optional[str]) -> Database: + if not database: + database = self.service_connection.projectID + return Database( + name=database, + service=EntityReference( + id=self.service.id, type=self.service_connection.type.value + ), + ) + def parse_raw_data_type(self, raw_data_type): return raw_data_type.replace(", ", ",").replace(" ", ":").lower() diff --git a/ingestion/src/metadata/ingestion/source/oracle.py b/ingestion/src/metadata/ingestion/source/oracle.py index a05148fa48e..1fd793f46b7 100644 --- a/ingestion/src/metadata/ingestion/source/oracle.py +++ b/ingestion/src/metadata/ingestion/source/oracle.py @@ -10,6 +10,7 @@ # limitations under the License. # This import verifies that the dependencies are available. +from typing import Optional import cx_Oracle # noqa: F401 import pydantic @@ -47,7 +48,7 @@ class OracleSource(SQLSource): ) return cls(config, metadata_config) - def _get_database(self, database: str) -> Database: + def _get_database(self, database: Optional[str]) -> Database: if not database: database = self.service_connection.oracleServiceName return Database( diff --git a/ingestion/src/metadata/ingestion/source/postgres.py b/ingestion/src/metadata/ingestion/source/postgres.py index 20c39af4036..0c644aa8280 100644 --- a/ingestion/src/metadata/ingestion/source/postgres.py +++ b/ingestion/src/metadata/ingestion/source/postgres.py @@ -51,37 +51,34 @@ class PostgresSource(SQLSource): connection: PostgresConnection = config.serviceConnection.__root__.config if not isinstance(connection, PostgresConnection): raise InvalidSourceException( - f"Expected MysqlConnection, but got {connection}" + f"Expected PostgresConnection, but got {connection}" ) return cls(config, metadata_config) def get_databases(self) -> Iterable[Inspector]: - if self.config.database != None: + if self.service_connection.database: yield from super().get_databases() else: query = "select datname from pg_catalog.pg_database;" - results = self.connection.execute(query) - for res in results: - row = list(res) try: - logger.info(f"Ingesting from database: {row[0]}") - self.config.database = row[0] + self.service_connection.database = row[0] self.engine = get_engine(self.config.serviceConnection) - self.connection = self.engine.connect() + self.engine.connect() yield inspect(self.engine) - except Exception as err: logger.error(f"Failed to Connect: {row[0]} due to error {err}") - def _get_database(self, schema: str) -> Database: + def _get_database(self, database: str) -> Database: + if database: + self.service_connection.database = database return Database( - name=self.config.database + FQDN_SEPARATOR + schema, - service=EntityReference(id=self.service.id, type=self.config.service_type), + name=self.service_connection.database, + service=EntityReference(id=self.service.id, type="database"), ) def get_status(self) -> SourceStatus: diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 7c61bb134a7..122217ae8a1 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -549,7 +549,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): return columns - def _get_database(self, database: str) -> Database: + def _get_database(self, database: Optional[str]) -> Database: if not database: database = "default" return Database( diff --git a/ingestion/tests/integration/ometa/test_ometa_api.py b/ingestion/tests/integration/ometa/test_ometa_api.py index 710225e60b8..756e3e8e831 100644 --- a/ingestion/tests/integration/ometa/test_ometa_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_api.py @@ -12,14 +12,15 @@ """ OpenMetadata API initialization """ - - +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata def test_init_ometa(): - server_config = OpenMetadataServerConfig(hostPort="http://localhost:8585/api") + server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api") metadata = OpenMetadata(server_config) assert metadata.health_check() diff --git a/ingestion/tests/integration/stage/stage_test.py b/ingestion/tests/integration/stage/stage_test.py index 1f3bb82910b..3d832a8dbc0 100644 --- a/ingestion/tests/integration/stage/stage_test.py +++ b/ingestion/tests/integration/stage/stage_test.py @@ -7,9 +7,14 @@ config = """ { "source": { "type": "sample-data", - "config": { - "sample_data_folder": "ingestion/examples/sample_data" - } + "serviceName": "sample_data", + "serviceConnection": { + "config": { + "type": "SampleData", + "sampleDataFolder": "ingestion/examples/sample_data" + } + }, + "sourceConfig": {} }, "stage": { "type": "file", @@ -17,11 +22,10 @@ config = """ "filename": "/tmp/stage_test" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/tests/unit/source/test_bigquery.py b/ingestion/tests/unit/source/test_bigquery.py index c857342963d..86e6c4c227e 100644 --- a/ingestion/tests/unit/source/test_bigquery.py +++ b/ingestion/tests/unit/source/test_bigquery.py @@ -27,57 +27,49 @@ CONFIG = """ { "source": { "type": "bigquery", - "config": { - "service_name": "test_bigquery", - "project_id": "project_id", - "host_port": "host_port", - "username": "username", - "service_name": "gcp_bigquery", - "connect_args":{}, - "partition_query": "select * from {}.{} WHERE DATE({}) = '{}' LIMIT 1000", - "enable_policy_tags": "True", - "duration":1, - "options":{ - "credentials":{ - "type": "service_account", - "project_id": "project_id", - "private_key_id": "private_key_id", - "private_key": "", - "client_email": "gcpuser@project_id.iam.gserviceaccount.com", - "client_id": "", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "" + "serviceName": "local_bigquery", + "serviceConnection": { + "config": { + "type": "BigQuery", + "projectID": "project_id", + "enablePolicyTagImport": true, + "connectionOptions": { + "credentials": { + "type": "service_account", + "project_id": "project_id", + "private_key_id": "private_key_id", + "private_key": "private_key", + "client_email": "gcpuser@project_id.iam.gserviceaccount.com", + "client_id": "client_id", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "" + } } } - } + }, + "sourceConfig": {"config": {"enableDataProfiler": false}} }, "sink": { "type": "file", "config": { "filename": "/var/tmp/datasets.json" } - }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } - """ - -MOCK_GET_TABLE_NAMES = [ - "open_metadata.test1", - "open_metadata.cloudaudit_googleapis_com_data_access_20220122", -] +MOCK_GET_TABLE_NAMES = ["test_schema_1.test_table_1", "test_schema_1.test_table_2"] GET_TABLE_DESCRIPTIONS = {"text": "Test"} -MOCK_GET_SCHEMA_NAMES = ["open_metadata"] +MOCK_GET_SCHEMA_NAMES = ["test_schema_1"] MOCK_UNIQUE_CONSTRAINTS = [] MOCK_PK_CONSTRAINT = {"constrained_columns": []} MOCK_GET_COLUMN = [ @@ -209,7 +201,6 @@ MOCK_GET_COLUMN = [ }, ] - MOCK_GET_VIEW_NAMES = [] MOCK_GET_VIEW_DEFINITION = "" @@ -274,14 +265,18 @@ class BigQueryIngestionTest(TestCase): def test_file_sink(self): config = json.loads(CONFIG) file_data = open(config["sink"]["config"]["filename"]) - data = json.load(file_data) - for i in data: - table = i.get("table") - omdtable_obj: OMetaDatabaseAndTable = OMetaDatabaseAndTable.parse_obj(i) + file_sink = json.load(file_data) + for ometa_data in file_sink: + table = ometa_data.get("table") + omdtable_obj: OMetaDatabaseAndTable = OMetaDatabaseAndTable.parse_obj( + ometa_data + ) table_obj: Table = Table.parse_obj(table) assert table.get("description") == GET_TABLE_DESCRIPTIONS.get("text") - table_name = f"{i.get('database').get('name')}.{table.get('name')}" + table_name = ( + f"{ometa_data.get('database_schema').get('name')}.{table.get('name')}" + ) if table.get("tableType") == TableType.Regular.value: assert table_name in MOCK_GET_TABLE_NAMES diff --git a/ingestion/tests/unit/source/test_vertica.py b/ingestion/tests/unit/source/test_vertica.py index 95e2cffce76..6bda1774ff6 100644 --- a/ingestion/tests/unit/source/test_vertica.py +++ b/ingestion/tests/unit/source/test_vertica.py @@ -58,7 +58,7 @@ CONFIG = """ "sink": { "type": "file", "config": { - "filename": "/var/tmp/datasets.json" + "filename": "/var/tmp/vertica.json" } }, "workflowConfig": { @@ -213,7 +213,6 @@ class VerticaIngestionTest(TestCase): get_columns.return_value = MOCK_GET_COLUMN get_view_names.return_value = MOCK_GET_VIEW_NAMES get_view_definition.return_value = MOCK_GET_VIEW_DEFINITION - execute_workflow() def test_file_sink(self): diff --git a/ingestion/tests/unit/test_query_parser.py b/ingestion/tests/unit/test_query_parser.py index cdcdb8c2acf..b06a80ddb12 100644 --- a/ingestion/tests/unit/test_query_parser.py +++ b/ingestion/tests/unit/test_query_parser.py @@ -25,7 +25,7 @@ config = """ "serviceConnection": { "config": { "type": "SampleData", - "sampleDataFolder": "./examples/sample_data" + "sampleDataFolder": "ingestion/examples/sample_data" } }, "sourceConfig": {}