mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-05 07:38:26 +00:00
Fix 1477: Bigquery Usage - queryConfig (#1478)
* Fix 1477: Bigquery Usage - queryConfig * Bigquery Usage Modified
This commit is contained in:
parent
341d2d5bf8
commit
128a69045d
@ -61,45 +61,55 @@ class BigqueryUsageSource(Source):
|
||||
|
||||
def next_record(self) -> Iterable[TableQuery]:
|
||||
logging_client = logging.Client()
|
||||
logger = logging_client.logger(self.logger_name)
|
||||
print("Listing entries for logger {}:".format(logger.name))
|
||||
usage_logger = logging_client.logger(self.logger_name)
|
||||
logger.debug("Listing entries for logger {}:".format(usage_logger.name))
|
||||
start, end = get_start_and_end(self.config.duration)
|
||||
for entry in logger.list_entries():
|
||||
timestamp = entry.timestamp.isoformat()
|
||||
timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d")
|
||||
if timestamp >= start and timestamp <= end:
|
||||
if ("query" in str(entry.payload)) and type(
|
||||
entry.payload
|
||||
) == collections.OrderedDict:
|
||||
payload = list(entry.payload.items())[-1][1]
|
||||
if "jobChange" in payload:
|
||||
print(f"\nEntries: {payload}")
|
||||
queryConfig = payload["jobChange"]["job"]["jobConfig"][
|
||||
"queryConfig"
|
||||
]
|
||||
jobStats = payload["jobChange"]["job"]["jobStats"]
|
||||
statementType = ""
|
||||
if hasattr(queryConfig, "statementType"):
|
||||
statementType = queryConfig["statementType"]
|
||||
database = ""
|
||||
if hasattr(queryConfig, "destinationTable"):
|
||||
database = queryConfig["destinationTable"]
|
||||
analysis_date = str(
|
||||
datetime.strptime(
|
||||
jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S"
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
)
|
||||
tq = TableQuery(
|
||||
query=statementType,
|
||||
user_name=entry.resource.labels["project_id"],
|
||||
starttime=str(jobStats["startTime"]),
|
||||
endtime=str(jobStats["endTime"]),
|
||||
analysis_date=analysis_date,
|
||||
aborted=0,
|
||||
database=str(database),
|
||||
sql=queryConfig["query"],
|
||||
)
|
||||
yield tq
|
||||
try:
|
||||
entries = usage_logger.list_entries()
|
||||
for entry in entries:
|
||||
timestamp = entry.timestamp.isoformat()
|
||||
timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d")
|
||||
if timestamp >= start and timestamp <= end:
|
||||
if ("query" in str(entry.payload)) and type(
|
||||
entry.payload
|
||||
) == collections.OrderedDict:
|
||||
payload = list(entry.payload.items())[-1][1]
|
||||
if "jobChange" in payload:
|
||||
logger.debug(f"\nEntries: {payload}")
|
||||
if (
|
||||
"queryConfig"
|
||||
in payload["jobChange"]["job"]["jobConfig"]
|
||||
):
|
||||
queryConfig = payload["jobChange"]["job"]["jobConfig"][
|
||||
"queryConfig"
|
||||
]
|
||||
else:
|
||||
continue
|
||||
jobStats = payload["jobChange"]["job"]["jobStats"]
|
||||
statementType = ""
|
||||
if hasattr(queryConfig, "statementType"):
|
||||
statementType = queryConfig["statementType"]
|
||||
database = ""
|
||||
if hasattr(queryConfig, "destinationTable"):
|
||||
database = queryConfig["destinationTable"]
|
||||
analysis_date = str(
|
||||
datetime.strptime(
|
||||
jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S"
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
)
|
||||
tq = TableQuery(
|
||||
query=statementType,
|
||||
user_name=entry.resource.labels["project_id"],
|
||||
starttime=str(jobStats["startTime"]),
|
||||
endtime=str(jobStats["endTime"]),
|
||||
analysis_date=analysis_date,
|
||||
aborted=0,
|
||||
database=str(database),
|
||||
sql=queryConfig["query"],
|
||||
)
|
||||
yield tq
|
||||
except Exception as err:
|
||||
logger.error(repr(err))
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
Loading…
x
Reference in New Issue
Block a user