Simplify bigquery usage (#5728)

This commit is contained in:
Pere Miquel Brull 2022-06-29 13:58:41 +02:00 committed by GitHub
parent 7734adb3db
commit e3223f6a20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 67 deletions

View File

@ -73,6 +73,12 @@
"type": "string",
"default": "us"
},
"usageLocation": {
"title": "Usage Location",
"description": "Location used to query INFORMATION_SCHEMA.JOBS_BY_PROJECT to fetch usage data. You can pass multi-regions, such as `us` or `eu`, or you specific region. Australia and Asia multi-regions are not yet in GA.",
"type": "string",
"default": "us"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"

View File

@ -8,16 +8,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
# This import verifies that the dependencies are available.
import os
"""
Handle big query usage extraction
"""
from datetime import datetime
from typing import Any, Dict, Iterable, Optional
from google import auth
from google.cloud import logging
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
@ -31,11 +27,11 @@ from metadata.generated.schema.entity.services.databaseService import (
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.credentials import set_google_credentials
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import BIGQUERY_USAGE_STATEMENT
logger = ingestion_logger()
@ -46,14 +42,11 @@ class BigqueryUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.temp_credentials = None
self.project_id = self.set_project_id()
self.database = self.project_id
self.logger_name = "cloudaudit.googleapis.com%2Fdata_access"
self.logging_client = logging.Client()
self.usage_logger = self.logging_client.logger(self.logger_name)
logger.debug("Listing entries for logger {}:".format(self.usage_logger.name))
self.sql_stmt = BIGQUERY_USAGE_STATEMENT
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -70,61 +63,17 @@ class BigqueryUsageSource(UsageSource):
return cls(config, metadata_config)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
"""
returns sql statement to fetch query logs
"""
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
region=self.connection.usageLocation,
)
@staticmethod
def set_project_id():
_, project_id = auth.default()
return project_id
def get_table_query(self, entries: Iterable):
query_list = []
for entry in entries:
timestamp = entry.timestamp.isoformat()
timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d")
if timestamp >= self.start and timestamp <= self.end:
if ("query" in str(entry.payload)) and type(
entry.payload
) == collections.OrderedDict:
payload = list(entry.payload.items())[-1][1]
if "jobChange" in payload:
logger.debug(f"\nEntries: {payload}")
if "queryConfig" in payload["jobChange"]["job"]["jobConfig"]:
queryConfig = payload["jobChange"]["job"]["jobConfig"][
"queryConfig"
]
else:
continue
jobStats = payload["jobChange"]["job"]["jobStats"]
statementType = ""
if hasattr(queryConfig, "statementType"):
statementType = queryConfig["statementType"]
database = self.project_id
analysis_date = str(
datetime.strptime(
jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S")
)
logger.debug(f"Query :{statementType}:{queryConfig['query']}")
tq = TableQuery(
query=queryConfig["query"],
userName=entry.resource.labels["project_id"],
startTime=str(jobStats["startTime"]),
endTime=str(jobStats["endTime"]),
analysisDate=analysis_date,
aborted=0,
databaseName=str(database),
serviceName=self.config.serviceName,
databaseSchema=None,
)
query_list.append(tq)
return query_list
def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, Any]]]:
entries = self.usage_logger.list_entries()
yield TableQueries(
queries=self.get_table_query(entries),
)
def close(self):
super().close()
if self.temp_credentials:
os.unlink(self.temp_credentials)

View File

@ -340,3 +340,18 @@ SNOWFLAKE_GET_COMMENTS = """
WHERE TABLE_SCHEMA = '{schema_name}'
AND TABLE_NAME = '{table_name}'
"""
BIGQUERY_USAGE_STATEMENT = """
SELECT
project_id as database_name,
user_email as user_name,
statement_type as query_type,
start_time,
end_time,
query as query_text,
null as schema_name
FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time BETWEEN "{start_time}" AND "{end_time}"
AND job_type = "QUERY"
AND state = "DONE"
"""