From 128a69045dba4ffeb741b92c8c0522466f9bca40 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 3 Dec 2021 22:27:44 +0530 Subject: [PATCH] Fix 1477: Bigquery Usage - queryConfig (#1478) * Fix 1477: Bigquery Usage - queryConfig * Bigquery Usage Modified --- .../ingestion/source/bigquery_usage.py | 86 +++++++++++-------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index 0c6ef3b57a0..28eae3d7d0d 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -61,45 +61,55 @@ class BigqueryUsageSource(Source): def next_record(self) -> Iterable[TableQuery]: logging_client = logging.Client() - logger = logging_client.logger(self.logger_name) - print("Listing entries for logger {}:".format(logger.name)) + usage_logger = logging_client.logger(self.logger_name) + logger.debug("Listing entries for logger {}:".format(usage_logger.name)) start, end = get_start_and_end(self.config.duration) - for entry in logger.list_entries(): - timestamp = entry.timestamp.isoformat() - timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") - if timestamp >= start and timestamp <= end: - if ("query" in str(entry.payload)) and type( - entry.payload - ) == collections.OrderedDict: - payload = list(entry.payload.items())[-1][1] - if "jobChange" in payload: - print(f"\nEntries: {payload}") - queryConfig = payload["jobChange"]["job"]["jobConfig"][ - "queryConfig" - ] - jobStats = payload["jobChange"]["job"]["jobStats"] - statementType = "" - if hasattr(queryConfig, "statementType"): - statementType = queryConfig["statementType"] - database = "" - if hasattr(queryConfig, "destinationTable"): - database = queryConfig["destinationTable"] - analysis_date = str( - datetime.strptime( - jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S" - ).strftime("%Y-%m-%d %H:%M:%S") - ) - tq = TableQuery( - query=statementType, - user_name=entry.resource.labels["project_id"], - starttime=str(jobStats["startTime"]), - endtime=str(jobStats["endTime"]), - analysis_date=analysis_date, - aborted=0, - database=str(database), - sql=queryConfig["query"], - ) - yield tq + try: + entries = usage_logger.list_entries() + for entry in entries: + timestamp = entry.timestamp.isoformat() + timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") + if timestamp >= start and timestamp <= end: + if ("query" in str(entry.payload)) and type( + entry.payload + ) == collections.OrderedDict: + payload = list(entry.payload.items())[-1][1] + if "jobChange" in payload: + logger.debug(f"\nEntries: {payload}") + if ( + "queryConfig" + in payload["jobChange"]["job"]["jobConfig"] + ): + queryConfig = payload["jobChange"]["job"]["jobConfig"][ + "queryConfig" + ] + else: + continue + jobStats = payload["jobChange"]["job"]["jobStats"] + statementType = "" + if hasattr(queryConfig, "statementType"): + statementType = queryConfig["statementType"] + database = "" + if hasattr(queryConfig, "destinationTable"): + database = queryConfig["destinationTable"] + analysis_date = str( + datetime.strptime( + jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") + ) + tq = TableQuery( + query=statementType, + user_name=entry.resource.labels["project_id"], + starttime=str(jobStats["startTime"]), + endtime=str(jobStats["endTime"]), + analysis_date=analysis_date, + aborted=0, + database=str(database), + sql=queryConfig["query"], + ) + yield tq + except Exception as err: + logger.error(repr(err)) def close(self): pass