Modify Snowflake and Databricks schema (#4367)

This commit is contained in:
Ayush Shah 2022-04-22 20:07:06 +05:30 committed by GitHub
parent e4914cfae4
commit 32a16b059d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 11 deletions

View File

@ -59,6 +59,11 @@
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
"httpPath": {
"title": "Http Path",
"description": "Databricks compute resources URL",
"type": "string"
},
"connectionOptions": { "connectionOptions": {
"title": "Connection Options", "title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions" "$ref": "../connectionBasicType.json#/definitions/connectionOptions"
@ -77,8 +82,5 @@
} }
}, },
"additionalProperties": false, "additionalProperties": false,
"required": [ "required": ["hostPort", "token"]
"hostPort",
"token"
]
} }

View File

@ -68,6 +68,11 @@
"description": "Snowflake warehouse.", "description": "Snowflake warehouse.",
"type": "string" "type": "string"
}, },
"privateKey": {
"title": "Private Key",
"description": "Connection to Snowflake instance via Private Key",
"type": "string"
},
"connectionOptions": { "connectionOptions": {
"title": "Connection Options", "title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions" "$ref": "../connectionBasicType.json#/definitions/connectionOptions"
@ -89,9 +94,5 @@
} }
}, },
"additionalProperties": false, "additionalProperties": false,
"required": [ "required": ["hostPort", "username", "account"]
"hostPort",
"username",
"account"
]
} }

View File

@ -29,14 +29,17 @@ from metadata.generated.schema.entity.services.connections.connectionBasicType i
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection, BigQueryConnection,
) )
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection, DynamoDBConnection,
) )
from metadata.generated.schema.entity.services.connections.database.glueConnection import ( from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection, GlueConnection,
) )
from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import ( from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SampleDataConnection, SnowflakeConnection,
) )
from metadata.utils.aws_client import AWSClient, DynamoClient, GlueClient from metadata.utils.aws_client import AWSClient, DynamoClient, GlueClient
from metadata.utils.credentials import set_google_credentials from metadata.utils.credentials import set_google_credentials
@ -83,6 +86,46 @@ def get_connection(
return create_generic_connection(connection, verbose) return create_generic_connection(connection, verbose)
@get_connection.register
def _(connection: DatabricksConnection, verbose: bool = False):
args = connection.connectionArguments
if not args:
connection.connectionArguments = dict()
connection.connectionArguments["http_path"] = connection.httpPath
return create_generic_connection(connection, verbose)
@get_connection.register
def _(connection: SnowflakeConnection, verbose: bool = False):
if connection.privateKey:
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
snowflake_private_key_passphrase = os.environ.get(
"SNOWFLAKE_PRIVATE_KEY_PASSPHRASE", ""
)
if not snowflake_private_key_passphrase:
logger.warning(
"Snowflake Private Key Passphrase not found, replacing it with empty string"
)
p_key = serialization.load_pem_private_key(
bytes(connection.privateKey, "utf-8"),
password=snowflake_private_key_passphrase.encode(),
backend=default_backend(),
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
if not connection.connectionArguments:
connection.connectionArguments = dict()
connection.connectionArguments["private_key"] = pkb
return create_generic_connection(connection, verbose)
@get_connection.register @get_connection.register
def _(connection: BigQueryConnection, verbose: bool = False): def _(connection: BigQueryConnection, verbose: bool = False):
""" """