From c09dc3bff75b6c9e5e90bd52f93d1a06b232c75e Mon Sep 17 00:00:00 2001 From: codingwithabhi <63392662+codingwithabhi@users.noreply.github.com> Date: Thu, 21 Apr 2022 19:37:29 +0530 Subject: [PATCH] athena-source-connection-fixed (#4278) * athena-source-connection-fixed * ran-core_bump_version_dev * aws-cred-json-schema-support-added-in-athena Co-authored-by: Pere Miquel Brull --- .../database/athenaConnection.json | 14 +--- .../connections/database/mysqlConnection.json | 11 ++- ingestion-core/src/metadata/_version.py | 2 +- ingestion/examples/workflows/athena.json | 34 ++++++++++ .../src/metadata/ingestion/source/athena.py | 68 +++++++++++++------ .../src/metadata/utils/source_connections.py | 20 ++++++ .../database/athenaConnection.json | 22 +++--- .../connections/database/mysqlConnection.json | 6 +- 8 files changed, 121 insertions(+), 56 deletions(-) create mode 100644 ingestion/examples/workflows/athena.json diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json index 85ba9f91abb..50b502cd170 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json @@ -30,14 +30,8 @@ "$ref": "#/definitions/athenaScheme", "default": "awsathena+rest" }, - "username": { - "description": "username to connect to the Athena. This user should have privileges to read all the metadata in Athena.", - "type": "string" - }, - "password": { - "description": "password to connect to the Athena.", - "type": "string", - "format": "password" + "awsConfig": { + "$ref": "../../../../security/credentials/s3Credentials.json" }, "hostPort": { "description": "Host and port of the Athena", @@ -47,10 +41,6 @@ "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Athena.", "type": "string" }, - "awsRegion": { - "description": "AWS Athena AWS Region. ", - "type": "string" - }, "s3StagingDir": { "description": "S3 Staging Directory.", "type": "string" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json index 7e18fb85c0b..675fbc1fc88 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json @@ -31,11 +31,11 @@ "default": "mysql+pymysql" }, "username": { - "description": "username to connect to the SingleStore. This user should have privileges to read all the metadata in SingleStore.", + "description": "username to connect to the Mysql. This user should have privileges to read all the metadata in Mysql.", "type": "string" }, "password": { - "description": "password to connect to the SingleStore.", + "description": "password to connect to the Mysql.", "type": "string", "format": "password" }, @@ -44,7 +44,7 @@ "type": "string" }, "database": { - "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in SingleStore.", + "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Mysql.", "type": "string" }, "connectionOptions": { @@ -58,8 +58,5 @@ } }, "additionalProperties": false, - "required": [ - "hostPort", - "username" - ] + "required": ["hostPort", "username"] } diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index 13415497897..20ccf8b763c 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 10, 0, dev=0) +__version__ = Version("metadata", 0, 10, 0, dev=1) __all__ = ["__version__"] diff --git a/ingestion/examples/workflows/athena.json b/ingestion/examples/workflows/athena.json new file mode 100644 index 00000000000..d26a32d2217 --- /dev/null +++ b/ingestion/examples/workflows/athena.json @@ -0,0 +1,34 @@ +{ + "source": { + "type": "athena", + "serviceName": "local_athena", + "serviceConnection": { + "config": { + "type": "Athena", + "database": "database_name", + "awsConfig": { + "awsAccessKeyId": "access key id", + "awsSecretAccessKey": "access secret key", + "awsRegion": "aws region name" + }, + "s3StagingDir": "s3 directory for datasource", + "workgroup": "workgroup name" + } + }, + "sourceConfig": { + "config": { + "enableDataProfiler": false + } + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" + } + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/athena.py b/ingestion/src/metadata/ingestion/source/athena.py index d0818a6d582..6732784165d 100644 --- a/ingestion/src/metadata/ingestion/source/athena.py +++ b/ingestion/src/metadata/ingestion/source/athena.py @@ -9,33 +9,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -from urllib.parse import quote_plus +import logging +from typing import Iterable +from sqlalchemy.inspection import inspect + +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( AthenaConnection, ) + +# This import verifies that the dependencies are available. from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource +from metadata.utils.filters import filter_by_schema +from metadata.utils.fqdn_generator import get_fqdn - -class AthenaConfig(AthenaConnection): - def get_connection_url(self): - url = f"{self.scheme}://" - if self.username: - url += f"{quote_plus(self.username)}" - if self.password: - url += f":{quote_plus(self.password)}" - else: - url += ":" - url += f"@athena.{self.awsRegion}.amazonaws.com:443/" - if self.database: - url += f"{self.database}" - url += f"?s3_staging_dir={quote_plus(self.s3StagingDir)}" - url += f"&work_group={self.workgroup}" - - return url +logger: logging.Logger = logging.getLogger(__name__) class AthenaSource(SQLSource): @@ -44,5 +41,38 @@ class AthenaSource(SQLSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - config = AthenaConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: AthenaConnection = config.serviceConnection.__root__.config + if not isinstance(connection, AthenaConnection): + raise InvalidSourceException( + f"Expected AthenaConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def prepare(self): + self.inspector = inspect(self.engine) + self.service_connection.database = "default" + return super().prepare() + + def next_record(self) -> Iterable[Entity]: + for schema in self.inspector.get_schema_names(): + self.database_source_state.clear() + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name=schema + ): + self.status.filter(schema, "Schema pattern not allowed") + continue + + if self.source_config.includeTables: + yield from self.fetch_tables(self.inspector, schema) + if self.source_config.includeViews: + yield from self.fetch_views(self.inspector, schema) + if self.source_config.markDeletedTables: + schema_fqdn = get_fqdn( + DatabaseSchema, + self.config.serviceName, + self.service_connection.database, + schema, + ) + yield from self.delete_tables(schema_fqdn) diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index c2fc8d22e19..d25b7802604 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -16,6 +16,9 @@ from urllib.parse import quote_plus from requests import Session +from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( + AthenaConnection, +) from metadata.generated.schema.entity.services.connections.database.azureSQLConnection import ( AzureSQLConnection, ) @@ -296,3 +299,20 @@ def _(connection: AzureSQLConnection): url = f"{url}?{params}" return url + + +@get_connection_url.register +def _(connection: AthenaConnection): + url = f"{connection.scheme.value}://" + if connection.awsConfig.awsAccessKeyId: + url += connection.awsConfig.awsAccessKeyId + if connection.awsConfig.awsSecretAccessKey: + url += f":{connection.awsConfig.awsSecretAccessKey.get_secret_value()}" + else: + url += ":" + url += f"@athena.{connection.awsConfig.awsRegion}.amazonaws.com:443" + if connection.database: + url += f"/{connection.database}" + url += f"?s3_staging_dir={quote_plus(connection.s3StagingDir)}" + url += f"&work_group={connection.workgroup}" + return url diff --git a/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/athenaConnection.json b/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/athenaConnection.json index 819724f986b..0dd9dff594b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/athenaConnection.json +++ b/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/athenaConnection.json @@ -9,13 +9,17 @@ "athenaType": { "description": "Service type.", "type": "string", - "enum": ["Athena"], + "enum": [ + "Athena" + ], "default": "Athena" }, "athenaScheme": { "description": "SQLAlchemy driver scheme options.", "type": "string", - "enum": ["awsathena+rest"], + "enum": [ + "awsathena+rest" + ], "default": "awsathena+rest" } }, @@ -30,15 +34,6 @@ "$ref": "#/definitions/athenaScheme", "default": "awsathena+rest" }, - "username": { - "description": "username to connect to the Athena. This user should have privileges to read all the metadata in Athena.", - "type": "string" - }, - "password": { - "description": "password to connect to the Athena.", - "type": "string", - "format": "password" - }, "hostPort": { "description": "Host and port of the Athena", "type": "string" @@ -47,9 +42,8 @@ "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Athena.", "type": "string" }, - "awsRegion": { - "description": "AWS Athena AWS Region. ", - "type": "string" + "awsConfig": { + "$ref": "../../../../../../../../../../../OpenMetadata/catalog-rest-service/src/main/resources/json/schema/security/credentials/s3Credentials.json" }, "s3StagingDir": { "description": "S3 Staging Directory.", diff --git a/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/mysqlConnection.json b/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/mysqlConnection.json index 1b8fcb29f97..59471da0b34 100644 --- a/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/mysqlConnection.json +++ b/openmetadata-ui/src/main/resources/ui/src/jsons/connectionSchemas/connections/database/mysqlConnection.json @@ -31,11 +31,11 @@ "default": "mysql+pymysql" }, "username": { - "description": "username to connect to the SingleStore. This user should have privileges to read all the metadata in SingleStore.", + "description": "username to connect to the Mysql. This user should have privileges to read all the metadata in Mysql.", "type": "string" }, "password": { - "description": "password to connect to the SingleStore.", + "description": "password to connect to the Mysql.", "type": "string", "format": "password" }, @@ -44,7 +44,7 @@ "type": "string" }, "database": { - "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in SingleStore.", + "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Mysql.", "type": "string" }, "connectionOptions": {