Fix #4032 - Bigquery properties & GCS Credentials (#4202)

Fix #4032 - Bigquery properties & GCS Credentials (#4202)
This commit is contained in:
Pere Miquel Brull 2022-04-19 12:31:34 +02:00 committed by GitHub
parent c7bd445676
commit 256b16d877
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 312 additions and 128 deletions

View File

@ -39,10 +39,14 @@
"description": "username to connect to the Athena. This user should have privileges to read all the metadata in Athena.",
"type": "string"
},
"projectID": {
"description": "Google BigQuery project id.",
"projectId": {
"description": "BigQuery project ID. Inform it here if passing the credentials path.",
"type": "string"
},
"credentials": {
"description": "GCS Credentials",
"$ref": "../../../../security/credentials/gcsCredentials.json"
},
"enablePolicyTagImport": {
"description": "Enable importing policy tags of BigQuery into OpenMetadata",
"type": "boolean",
@ -87,6 +91,6 @@
},
"additionalProperties": false,
"required": [
"projectID"
"credentials"
]
}

View File

@ -0,0 +1,82 @@
{
"$id": "https://open-metadata.org/security/credentials/gcsCredentials.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GCSCredentials",
"description": "GCS credentials configs.",
"type": "object",
"javaType": "org.openmetadata.catalog.security.credentials.GCSCredentials",
"definitions": {
"GCSValues": {
"description": "GCS Credentials.",
"type": "object",
"properties": {
"type": {
"description": "Google Cloud service account type.",
"type": "string"
},
"projectId": {
"description": "Google Cloud project id.",
"type": "string"
},
"privateKeyId": {
"description": "Google Cloud private key id.",
"type": "string"
},
"privateKey": {
"description": "Google Cloud private key.",
"type": "string"
},
"clientEmail": {
"description": "Google Cloud email.",
"type": "string"
},
"clientId": {
"description": "Google Cloud Client ID.",
"type": "string"
},
"authUri": {
"description": "Google Cloud auth uri.",
"type": "string",
"format": "uri"
},
"tokenUri": {
"description": "Google Cloud token uri.",
"type": "string",
"format": "uri"
},
"authProviderX509CertUrl": {
"description": "Google Cloud auth provider certificate.",
"type": "string",
"format": "uri"
},
"clientX509CertUrl": {
"description": "Google Cloud client certificate uri.",
"type": "string",
"format": "uri"
}
},
"additionalProperties": false
},
"GCSCredentialsPath": {
"description": "GCS Credentials Path.",
"type": "string"
}
},
"properties": {
"gcsConfig": {
"description": "GCS configs.",
"oneOf": [
{
"$ref": "#/definitions/GCSValues"
},
{
"$ref": "#/definitions/GCSCredentialsPath"
}
]
}
},
"additionalProperties": false,
"required": [
"gcsConfig"
]
}

View File

@ -364,7 +364,7 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
public static void validateBigQueryConnection(
BigQueryConnection expectedBigQueryConnection, BigQueryConnection actualBigQueryConnection) {
assertEquals(expectedBigQueryConnection.getHostPort(), actualBigQueryConnection.getHostPort());
assertEquals(expectedBigQueryConnection.getProjectID(), actualBigQueryConnection.getProjectID());
assertEquals(expectedBigQueryConnection.getCredentials(), actualBigQueryConnection.getCredentials());
assertEquals(expectedBigQueryConnection.getUsername(), actualBigQueryConnection.getUsername());
assertEquals(expectedBigQueryConnection.getScheme(), actualBigQueryConnection.getScheme());
assertEquals(expectedBigQueryConnection.getDatabase(), actualBigQueryConnection.getDatabase());

View File

@ -5,7 +5,20 @@
"config": {
"type": "BigQuery",
"hostPort": "localhost:1234",
"projectID": "project_id"
"credentials": {
"gcsConfig": {
"type": "service_account",
"projectId": "projectID",
"privateKeyId": "privateKeyId",
"privateKey": "privateKey",
"clientEmail": "clientEmail",
"clientId": "clientId",
"authUri": "https://accounts.google.com/o/oauth2/auth",
"tokenUri": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "https://cert.url"
}
}
}
},
"sourceConfig": {

View File

@ -5,22 +5,17 @@
"serviceConnection": {
"config": {
"type": "BigQuery",
"projectID": "project_id",
"enablePolicyTagImport": true,
"connectionOptions": {
"credentials": {
"type": "service_account",
"project_id": "project_id",
"private_key_id": "private_key_id",
"private_key": "private_key",
"client_email": "gcpuser@project_id.iam.gserviceaccount.com",
"client_id": "client_id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": ""
}
}
"credentialsType": "service_account",
"projectID": "projectID",
"privateKeyId": "privateKeyId",
"privateKey": "privateKey",
"clientEmail": "clientEmail",
"clientId": "clientId",
"authURI": "https://accounts.google.com/o/oauth2/auth",
"tokenURI": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "clientX509CertUrl"
}
},
"sourceConfig": {"config": {"enableDataProfiler": false}}

View File

@ -8,10 +8,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import tempfile
from typing import Optional, Tuple
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
@ -40,6 +38,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.column_type_parser import create_sqlalchemy_type
from metadata.utils.credentials import set_google_credentials
from metadata.utils.helpers import get_start_and_end
logger = logging.getLogger(__name__)
@ -84,7 +83,9 @@ _types.get_columns = get_columns
class BigquerySource(SQLSource):
def __init__(self, config, metadata_config):
super().__init__(config, metadata_config)
self.connection_config = self.config.serviceConnection.__root__.config
self.connection_config: BigQueryConnection = (
self.config.serviceConnection.__root__.config
)
self.temp_credentials = None
# and "policy_tags" in column and column["policy_tags"]
@ -109,35 +110,12 @@ class BigquerySource(SQLSource):
raise InvalidSourceException(
f"Expected BigQueryConnection, but got {connection}"
)
if (
not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
and connection.connectionOptions
):
options = connection.connectionOptions.dict()
if options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[
"credentials_path"
]
del connection.connectionOptions.credentials_path
elif options.get("credentials"):
cls.temp_credentials = cls.create_credential_temp_file(
credentials=options["credentials"]
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.temp_credentials
del connection.connectionOptions.credentials
else:
logger.warning(
"Please refer to the BigQuery connector documentation, especially the credentials part "
"https://docs.open-metadata.org/connectors/bigquery"
)
return cls(config, metadata_config)
@staticmethod
def create_credential_temp_file(credentials: dict) -> str:
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(credentials, indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name
set_google_credentials(
gcs_credentials=config.serviceConnection.__root__.config.credentials
)
return cls(config, metadata_config)
def standardize_schema_table_names(
self, schema: str, table: str
@ -184,7 +162,10 @@ class BigquerySource(SQLSource):
def _get_database(self, database: Optional[str]) -> Database:
if not database:
database = self.service_connection.projectID
database = (
self.connection_config.projectId
or self.connection_config.credentials.gcsConfig.projectId
)
return Database(
name=database,
service=EntityReference(

View File

@ -33,8 +33,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.bigquery import BigquerySource
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.credentials import set_google_credentials
from metadata.utils.helpers import get_start_and_end
logger = log.getLogger(__name__)
@ -50,7 +50,13 @@ class BigqueryUsageSource(Source[TableQuery]):
self.metadata_config = metadata_config
self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.project_id = self.service_connection.projectID
# Used as db
self.project_id = (
self.service_connection.projectId
or self.service_connection.credentials.gcsConfig.projectId
)
self.logger_name = "cloudaudit.googleapis.com%2Fdata_access"
self.status = SQLSourceStatus()
@ -58,28 +64,15 @@ class BigqueryUsageSource(Source[TableQuery]):
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: BigQueryConnection = config.serviceConnection.__root__.config
options = connection.connectionOptions.dict()
if not isinstance(connection, BigQueryConnection):
raise InvalidSourceException(
f"Expected BigQueryConnection, but got {connection}"
)
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
if options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[
"credentials_path"
]
del connection.connectionOptions.credentials_path
elif options.get("credentials"):
cls.temp_credentials = BigquerySource.create_credential_temp_file(
credentials=options["credentials"]
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.temp_credentials
del connection.connectionOptions.credentials
else:
logger.warning(
"Please refer to the BigQuery connector documentation, especially the credentials part "
"https://docs.open-metadata.org/connectors/bigquery"
)
set_google_credentials(
gcs_credentials=config.serviceConnection.__root__.config.credentials
)
return cls(config, metadata_config)
def prepare(self):

View File

@ -18,9 +18,6 @@ from typing import Any, Dict, Iterable
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
@ -31,7 +28,7 @@ from metadata.ingestion.api.source import InvalidSourceException, Source, Source
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.engines import get_engine, test_connection
from metadata.utils.helpers import get_raw_extract_iter, get_start_and_end
from metadata.utils.helpers import get_start_and_end
from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT

View File

@ -17,9 +17,6 @@ from typing import Any, Dict, Iterable
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
@ -30,7 +27,7 @@ from metadata.ingestion.api.source import InvalidSourceException, Source, Source
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.engines import get_engine, test_connection
from metadata.utils.helpers import get_raw_extract_iter, get_start_and_end
from metadata.utils.helpers import get_start_and_end
from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT

View File

@ -66,7 +66,9 @@ class PostgresSource(SQLSource):
try:
logger.info(f"Ingesting from database: {row[0]}")
self.service_connection.database = row[0]
self.engine = get_engine(self.config.serviceConnection)
self.engine = get_engine(
self.config.serviceConnection.__root__.config
)
self.engine.connect()
yield inspect(self.engine)
except Exception as err:

View File

@ -35,7 +35,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.column_type_parser import create_sqlalchemy_type
from metadata.utils.engines import get_engine
GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY")
ischema_names["VARIANT"] = VARIANT
@ -82,7 +81,6 @@ class SnowflakeSource(SQLSource):
self.connection.execute(use_db_query)
logger.info(f"Ingesting from database: {row[1]}")
self.config.serviceConnection.__root__.config.database = row[1]
self.engine = get_engine(self.config.serviceConnection)
yield inspect(self.engine)
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:

View File

@ -134,7 +134,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
entity=DatabaseService, config=config
)
self.engine = get_engine(service_connection=self.config.serviceConnection)
self.engine = get_engine(self.service_connection)
self.test_connection()
self._session = None # We will instantiate this just if needed

View File

@ -0,0 +1,81 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Credentials helper module
"""
import json
import os
import tempfile
from metadata.generated.schema.security.credentials.gcsCredentials import (
GCSCredentials,
GCSCredentialsPath,
GCSValues,
)
GOOGLE_CREDENTIALS = "GOOGLE_APPLICATION_CREDENTIALS"
class InvalidGcsConfigException(Exception):
"""
Raised when we have errors trying to set GCS credentials
"""
def create_credential_tmp_file(credentials: dict) -> str:
"""
Given a credentials' dict, store it in a tmp file
:param credentials: dictionary to store
:return: path to find the file
"""
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(credentials, indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name
def set_google_credentials(gcs_credentials: GCSCredentials) -> None:
"""
Set GCS credentials environment variable
:param gcs_credentials: GCSCredentials
"""
if os.environ.get(GOOGLE_CREDENTIALS):
return
if isinstance(gcs_credentials.gcsConfig, GCSCredentialsPath):
os.environ[GOOGLE_CREDENTIALS] = str(gcs_credentials.gcsConfig.__root__)
return
if isinstance(gcs_credentials.gcsConfig, GCSValues):
credentials_dict = {
"type": gcs_credentials.gcsConfig.type,
"project_id": gcs_credentials.gcsConfig.projectId,
"private_key_id": gcs_credentials.gcsConfig.privateKeyId,
"private_key": gcs_credentials.gcsConfig.privateKey,
"client_email": gcs_credentials.gcsConfig.clientEmail,
"client_id": gcs_credentials.gcsConfig.clientId,
"auth_uri": str(gcs_credentials.gcsConfig.authUri),
"token_uri": str(gcs_credentials.gcsConfig.tokenUri),
"auth_provider_x509_cert_url": str(
gcs_credentials.gcsConfig.authProviderX509CertUrl
),
"client_x509_cert_url": str(gcs_credentials.gcsConfig.clientX509CertUrl),
}
tmp_credentials_file = create_credential_tmp_file(credentials=credentials_dict)
os.environ[GOOGLE_CREDENTIALS] = tmp_credentials_file
return
raise InvalidGcsConfigException(
f"Error trying to set GCS credentials with {gcs_credentials}."
" Check https://docs.open-metadata.org/connectors/bigquery"
)

View File

@ -23,9 +23,6 @@ from sqlalchemy.orm.session import Session
from metadata.generated.schema.entity.services.connections.connectionBasicType import (
ConnectionOptions,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.utils.source_connections import get_connection_args, get_connection_url
from metadata.utils.timeout import timeout
@ -38,20 +35,19 @@ class SourceConnectionException(Exception):
"""
def get_engine(service_connection: ServiceConnection, verbose: bool = False) -> Engine:
def get_engine(connection, verbose: bool = False) -> Engine:
"""
Given an SQL configuration, build the SQLAlchemy Engine
"""
service_connection_config = service_connection.__root__.config
options = service_connection_config.connectionOptions
options = connection.connectionOptions
if not options:
options = ConnectionOptions()
engine = create_engine(
get_connection_url(service_connection_config),
get_connection_url(connection),
**options.dict(),
connect_args=get_connection_args(service_connection_config),
connect_args=get_connection_args(connection),
echo=verbose,
)

View File

@ -73,6 +73,7 @@ from metadata.generated.schema.entity.services.connections.database.trinoConnect
from metadata.generated.schema.entity.services.connections.database.verticaConnection import (
VerticaConnection,
)
from metadata.generated.schema.security.credentials.gcsCredentials import GCSValues
def get_connection_url_common(connection):
@ -254,8 +255,13 @@ def _(connection: HiveConnection):
@get_connection_url.register
def _(connection: BigQueryConnection):
if connection.projectID:
return f"{connection.scheme.value}://{connection.projectID}"
project_id = connection.projectId
if not project_id and isinstance(connection.credentials.gcsConfig, GCSValues):
project_id = connection.credentials.gcsConfig.projectId
if project_id:
return (
f"{connection.scheme.value}://{connection.credentials.gcsConfig.projectId}"
)
return f"{connection.scheme.value}://"

View File

@ -126,6 +126,50 @@ class OMetaServiceTest(TestCase):
# Clean
self.metadata.delete(entity=DatabaseService, entity_id=service.id)
def test_create_database_service_bigquery(self):
"""
Create a db service from WorkflowSource
"""
data = {
"type": "bigquery",
"serviceName": "local_bigquery",
"serviceConnection": {
"config": {
"type": "BigQuery",
"enablePolicyTagImport": True,
"credentials": {
"gcsConfig": {
"type": "service_account",
"projectId": "projectID",
"privateKeyId": "privateKeyId",
"privateKey": "privateKey",
"clientEmail": "clientEmail",
"clientId": "clientId",
"authUri": "https://accounts.google.com/o/oauth2/auth",
"tokenUri": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "https://cert.url",
}
},
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
}
workflow_source = WorkflowSource(**data)
# Create service
service: DatabaseService = self.metadata.get_service_or_create(
entity=DatabaseService, config=workflow_source
)
assert service
assert service.serviceType == DatabaseServiceType.BigQuery
# Check get
assert service == self.metadata.get_service_or_create(
entity=DatabaseService, config=workflow_source
)
def test_create_dashboard_service_looker(self):
"""
Create a db service from WorkflowSource

View File

@ -31,22 +31,21 @@ CONFIG = """
"serviceConnection": {
"config": {
"type": "BigQuery",
"projectID": "project_id",
"enablePolicyTagImport": true,
"connectionOptions": {
"credentials": {
"type": "service_account",
"project_id": "project_id",
"private_key_id": "private_key_id",
"private_key": "private_key",
"client_email": "gcpuser@project_id.iam.gserviceaccount.com",
"client_id": "client_id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": ""
"credentials": {
"gcsConfig": {
"type": "service_account",
"projectId": "projectID",
"privateKeyId": "privateKeyId",
"privateKey": "privateKey",
"clientEmail": "clientEmail",
"clientId": "clientId",
"authUri": "https://accounts.google.com/o/oauth2/auth",
"tokenUri": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "https://cert.url"
}
}
}
}
},
"sourceConfig": {"config": {"enableDataProfiler": false}}

View File

@ -172,20 +172,19 @@ def test_bigquery():
"serviceConnection": {
"config": {
"type": "BigQuery",
"projectID": "project_id",
"enablePolicyTagImport": True,
"connectionOptions": {
"credentials": {
"credentials": {
"gcsConfig": {
"type": "service_account",
"project_id": "project_id",
"private_key_id": "private_key_id",
"private_key": "private_key",
"client_email": "gcpuser@project_id.iam.gserviceaccount.com",
"client_id": "client_id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "",
"projectId": "projectID",
"privateKeyId": "privateKeyId",
"privateKey": "privateKey",
"clientEmail": "clientEmail",
"clientId": "clientId",
"authUri": "https://accounts.google.com/o/oauth2/auth",
"tokenUri": "https://oauth2.googleapis.com/token",
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
"clientX509CertUrl": "https://cert.url",
}
},
}

View File

@ -173,7 +173,9 @@ class REST_API(AppBuilderBaseView):
try:
service_connection_model = ServiceConnectionModel(**json_request)
response = test_source_connection(service_connection_model)
response = test_source_connection(
service_connection_model.serviceConnection.__root__.config
)
return response

View File

@ -15,9 +15,6 @@ from a WorkflowSource
from flask import Response
from openmetadata.api.response import ApiResponse
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnectionModel,
)
from metadata.utils.engines import (
SourceConnectionException,
get_engine,
@ -25,15 +22,13 @@ from metadata.utils.engines import (
)
def test_source_connection(
service_connection_model: ServiceConnectionModel,
) -> Response:
def test_source_connection(connection) -> Response:
"""
Create the engine and test the connection
:param workflow_source: Source to test
:return: None or exception
"""
engine = get_engine(service_connection_model.serviceConnection)
engine = get_engine(connection)
try:
test_connection(engine)