athena-source-connection-fixed (#4278)

* athena-source-connection-fixed

* ran-core_bump_version_dev

* aws-cred-json-schema-support-added-in-athena

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
codingwithabhi 2022-04-21 19:37:29 +05:30 committed by GitHub
parent c7c3d153e9
commit c09dc3bff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 121 additions and 56 deletions

View File

@ -30,14 +30,8 @@
"$ref": "#/definitions/athenaScheme", "$ref": "#/definitions/athenaScheme",
"default": "awsathena+rest" "default": "awsathena+rest"
}, },
"username": { "awsConfig": {
"description": "username to connect to the Athena. This user should have privileges to read all the metadata in Athena.", "$ref": "../../../../security/credentials/s3Credentials.json"
"type": "string"
},
"password": {
"description": "password to connect to the Athena.",
"type": "string",
"format": "password"
}, },
"hostPort": { "hostPort": {
"description": "Host and port of the Athena", "description": "Host and port of the Athena",
@ -47,10 +41,6 @@
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Athena.", "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Athena.",
"type": "string" "type": "string"
}, },
"awsRegion": {
"description": "AWS Athena AWS Region. ",
"type": "string"
},
"s3StagingDir": { "s3StagingDir": {
"description": "S3 Staging Directory.", "description": "S3 Staging Directory.",
"type": "string" "type": "string"

View File

@ -31,11 +31,11 @@
"default": "mysql+pymysql" "default": "mysql+pymysql"
}, },
"username": { "username": {
"description": "username to connect to the SingleStore. This user should have privileges to read all the metadata in SingleStore.", "description": "username to connect to the Mysql. This user should have privileges to read all the metadata in Mysql.",
"type": "string" "type": "string"
}, },
"password": { "password": {
"description": "password to connect to the SingleStore.", "description": "password to connect to the Mysql.",
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
@ -44,7 +44,7 @@
"type": "string" "type": "string"
}, },
"database": { "database": {
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in SingleStore.", "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Mysql.",
"type": "string" "type": "string"
}, },
"connectionOptions": { "connectionOptions": {
@ -58,8 +58,5 @@
} }
}, },
"additionalProperties": false, "additionalProperties": false,
"required": [ "required": ["hostPort", "username"]
"hostPort",
"username"
]
} }

View File

@ -7,5 +7,5 @@ Provides metadata version information.
from incremental import Version from incremental import Version
__version__ = Version("metadata", 0, 10, 0, dev=0) __version__ = Version("metadata", 0, 10, 0, dev=1)
__all__ = ["__version__"] __all__ = ["__version__"]

View File

@ -0,0 +1,34 @@
{
"source": {
"type": "athena",
"serviceName": "local_athena",
"serviceConnection": {
"config": {
"type": "Athena",
"database": "database_name",
"awsConfig": {
"awsAccessKeyId": "access key id",
"awsSecretAccessKey": "access secret key",
"awsRegion": "aws region name"
},
"s3StagingDir": "s3 directory for datasource",
"workgroup": "workgroup name"
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": false
}
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -9,33 +9,30 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from urllib.parse import quote_plus import logging
from typing import Iterable
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaConnection, AthenaConnection,
) )
# This import verifies that the dependencies are available.
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.filters import filter_by_schema
from metadata.utils.fqdn_generator import get_fqdn
logger: logging.Logger = logging.getLogger(__name__)
class AthenaConfig(AthenaConnection):
def get_connection_url(self):
url = f"{self.scheme}://"
if self.username:
url += f"{quote_plus(self.username)}"
if self.password:
url += f":{quote_plus(self.password)}"
else:
url += ":"
url += f"@athena.{self.awsRegion}.amazonaws.com:443/"
if self.database:
url += f"{self.database}"
url += f"?s3_staging_dir={quote_plus(self.s3StagingDir)}"
url += f"&work_group={self.workgroup}"
return url
class AthenaSource(SQLSource): class AthenaSource(SQLSource):
@ -44,5 +41,38 @@ class AthenaSource(SQLSource):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = AthenaConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: AthenaConnection = config.serviceConnection.__root__.config
if not isinstance(connection, AthenaConnection):
raise InvalidSourceException(
f"Expected AthenaConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self):
self.inspector = inspect(self.engine)
self.service_connection.database = "default"
return super().prepare()
def next_record(self) -> Iterable[Entity]:
for schema in self.inspector.get_schema_names():
self.database_source_state.clear()
if filter_by_schema(
self.source_config.schemaFilterPattern, schema_name=schema
):
self.status.filter(schema, "Schema pattern not allowed")
continue
if self.source_config.includeTables:
yield from self.fetch_tables(self.inspector, schema)
if self.source_config.includeViews:
yield from self.fetch_views(self.inspector, schema)
if self.source_config.markDeletedTables:
schema_fqdn = get_fqdn(
DatabaseSchema,
self.config.serviceName,
self.service_connection.database,
schema,
)
yield from self.delete_tables(schema_fqdn)

View File

@ -16,6 +16,9 @@ from urllib.parse import quote_plus
from requests import Session from requests import Session
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaConnection,
)
from metadata.generated.schema.entity.services.connections.database.azureSQLConnection import ( from metadata.generated.schema.entity.services.connections.database.azureSQLConnection import (
AzureSQLConnection, AzureSQLConnection,
) )
@ -296,3 +299,20 @@ def _(connection: AzureSQLConnection):
url = f"{url}?{params}" url = f"{url}?{params}"
return url return url
@get_connection_url.register
def _(connection: AthenaConnection):
url = f"{connection.scheme.value}://"
if connection.awsConfig.awsAccessKeyId:
url += connection.awsConfig.awsAccessKeyId
if connection.awsConfig.awsSecretAccessKey:
url += f":{connection.awsConfig.awsSecretAccessKey.get_secret_value()}"
else:
url += ":"
url += f"@athena.{connection.awsConfig.awsRegion}.amazonaws.com:443"
if connection.database:
url += f"/{connection.database}"
url += f"?s3_staging_dir={quote_plus(connection.s3StagingDir)}"
url += f"&work_group={connection.workgroup}"
return url

View File

@ -9,13 +9,17 @@
"athenaType": { "athenaType": {
"description": "Service type.", "description": "Service type.",
"type": "string", "type": "string",
"enum": ["Athena"], "enum": [
"Athena"
],
"default": "Athena" "default": "Athena"
}, },
"athenaScheme": { "athenaScheme": {
"description": "SQLAlchemy driver scheme options.", "description": "SQLAlchemy driver scheme options.",
"type": "string", "type": "string",
"enum": ["awsathena+rest"], "enum": [
"awsathena+rest"
],
"default": "awsathena+rest" "default": "awsathena+rest"
} }
}, },
@ -30,15 +34,6 @@
"$ref": "#/definitions/athenaScheme", "$ref": "#/definitions/athenaScheme",
"default": "awsathena+rest" "default": "awsathena+rest"
}, },
"username": {
"description": "username to connect to the Athena. This user should have privileges to read all the metadata in Athena.",
"type": "string"
},
"password": {
"description": "password to connect to the Athena.",
"type": "string",
"format": "password"
},
"hostPort": { "hostPort": {
"description": "Host and port of the Athena", "description": "Host and port of the Athena",
"type": "string" "type": "string"
@ -47,9 +42,8 @@
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Athena.", "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Athena.",
"type": "string" "type": "string"
}, },
"awsRegion": { "awsConfig": {
"description": "AWS Athena AWS Region. ", "$ref": "../../../../../../../../../../../OpenMetadata/catalog-rest-service/src/main/resources/json/schema/security/credentials/s3Credentials.json"
"type": "string"
}, },
"s3StagingDir": { "s3StagingDir": {
"description": "S3 Staging Directory.", "description": "S3 Staging Directory.",

View File

@ -31,11 +31,11 @@
"default": "mysql+pymysql" "default": "mysql+pymysql"
}, },
"username": { "username": {
"description": "username to connect to the SingleStore. This user should have privileges to read all the metadata in SingleStore.", "description": "username to connect to the Mysql. This user should have privileges to read all the metadata in Mysql.",
"type": "string" "type": "string"
}, },
"password": { "password": {
"description": "password to connect to the SingleStore.", "description": "password to connect to the Mysql.",
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
@ -44,7 +44,7 @@
"type": "string" "type": "string"
}, },
"database": { "database": {
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in SingleStore.", "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Mysql.",
"type": "string" "type": "string"
}, },
"connectionOptions": { "connectionOptions": {