Bigquery Usage Fix (#4140)

This commit is contained in:
Mayur Singal 2022-04-15 02:21:22 +05:30 committed by GitHub
parent c4a6bbdc83
commit 0c27f16582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 41 deletions

View File

@ -1,16 +1,29 @@
{
"source": {
"type": "bigquery-usage",
"config": {
"project_id": "project_id",
"host_port": "https://bigquery.googleapis.com",
"username": "gcpuser@project_id.iam.gserviceaccount.com",
"service_name": "gcp_bigquery",
"duration": 2,
"options": {
"credentials_path": "examples/creds/bigquery-cred.json"
"serviceName": "local_bigquery",
"serviceConnection": {
"config": {
"type": "BigQuery",
"projectID": "project_id",
"enablePolicyTagImport": true,
"connectionOptions": {
"credentials": {
"type": "service_account",
"project_id": "project_id",
"private_key_id": "private_key_id",
"private_key": "private_key",
"client_email": "gcpuser@project_id.iam.gserviceaccount.com",
"client_id": "client_id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": ""
}
}
}
}
},
"sourceConfig": {"config": {"queryLogDuration": "1"}}
},
"processor": {
"type": "query-parser",
@ -24,17 +37,16 @@
"filename": "/tmp/bigquery_usage"
}
},
"bulk_sink": {
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/bigquery_usage"
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -19,15 +19,21 @@ from typing import Iterable
from google.cloud import logging
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.bigquery import BigQueryConfig, BigquerySource
from metadata.ingestion.source.bigquery import BigquerySource
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.helpers import get_start_and_end
@ -38,40 +44,42 @@ class BigqueryUsageSource(Source[TableQuery]):
SERVICE_TYPE = DatabaseServiceType.BigQuery.value
scheme = "bigquery"
def __init__(self, config, metadata_config):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.temp_credentials = None
self.metadata_config = metadata_config
self.config = config
self.project_id = self.config.project_id
self.service_connection = config.serviceConnection.__root__.config
self.project_id = self.service_connection.projectID
self.logger_name = "cloudaudit.googleapis.com%2Fdata_access"
self.status = SQLSourceStatus()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: BigQueryConnection = config.serviceConnection.__root__.config
options = connection.connectionOptions.dict()
if not isinstance(connection, BigQueryConnection):
raise InvalidSourceException(
f"Expected BigQueryConnection, but got {connection}"
)
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
if config.options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = config.options[
if options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[
"credentials_path"
]
elif config.options.get("credentials"):
self.temp_credentials = BigquerySource.create_credential_temp_file(
credentials=config.options.get("credentials")
del connection.connectionOptions.credentials_path
elif options.get("credentials"):
cls.temp_credentials = BigquerySource.create_credential_temp_file(
credentials=options["credentials"]
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.temp_credentials
del config.options["credentials"]
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.temp_credentials
del connection.connectionOptions.credentials
else:
logger.warning(
"Please refer to the BigQuery connector documentation, especially the credentials part "
"https://docs.open-metadata.org/connectors/bigquery-usage"
"https://docs.open-metadata.org/connectors/bigquery"
)
def get_connection_url(self):
if self.project_id:
return f"{self.scheme}://{self.project_id}"
return f"{self.scheme}://"
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = BigQueryConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def prepare(self):
@ -81,7 +89,7 @@ class BigqueryUsageSource(Source[TableQuery]):
logging_client = logging.Client()
usage_logger = logging_client.logger(self.logger_name)
logger.debug("Listing entries for logger {}:".format(usage_logger.name))
start, end = get_start_and_end(self.config.duration)
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
try:
entries = usage_logger.list_entries()
for entry in entries:
@ -107,9 +115,7 @@ class BigqueryUsageSource(Source[TableQuery]):
statementType = ""
if hasattr(queryConfig, "statementType"):
statementType = queryConfig["statementType"]
database = ""
if hasattr(queryConfig, "destinationTable"):
database = queryConfig["destinationTable"]
database = self.project_id
analysis_date = str(
datetime.strptime(
jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S"
@ -127,7 +133,7 @@ class BigqueryUsageSource(Source[TableQuery]):
aborted=0,
database=str(database),
sql=queryConfig["query"],
service_name=self.config.service_name,
service_name=self.config.serviceName,
)
yield tq
@ -137,6 +143,9 @@ class BigqueryUsageSource(Source[TableQuery]):
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> SourceStatus:
pass
def close(self):
super().close()
if self.temp_credentials: