mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 20:19:31 +00:00
Timestamp according to duration added
This commit is contained in:
parent
ef865ccf9f
commit
3f4b938b20
@ -16,7 +16,6 @@
|
||||
# This import verifies that the dependencies are available.
|
||||
import logging as log
|
||||
from metadata.ingestion.models.table_queries import TableQuery
|
||||
from sqlalchemy.engine import create_engine
|
||||
from google.cloud import logging
|
||||
import collections
|
||||
from datetime import datetime
|
||||
@ -39,7 +38,6 @@ class BigqueryUsageSource(Source):
|
||||
|
||||
self.config = config
|
||||
self.project_id = self.config.project_id
|
||||
self.engine = create_engine(self.get_connection_url(), **self.config.options).connect()
|
||||
self.logger_name = "cloudaudit.googleapis.com%2Fdata_access"
|
||||
self.status = SQLSourceStatus()
|
||||
|
||||
@ -66,22 +64,24 @@ class BigqueryUsageSource(Source):
|
||||
for entry in logger.list_entries():
|
||||
timestamp = entry.timestamp.isoformat()
|
||||
timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d")
|
||||
if("query" in str(entry.payload)) and type(entry.payload) == collections.OrderedDict:
|
||||
payload = list(entry.payload.items())[-1][1]
|
||||
if "jobChange" in payload:
|
||||
queryConfig = payload['jobChange']['job']['jobConfig']['queryConfig']
|
||||
jobStats = payload['jobChange']['job']['jobStats']
|
||||
statementType = queryConfig['statementType'] if hasattr(queryConfig, 'statementType') else ''
|
||||
database = queryConfig['destinationTable'] if hasattr(queryConfig, 'destinationTable') is not None else ''
|
||||
analysis_date = str(datetime.strptime(jobStats['startTime'][0:19],"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'))
|
||||
tq = TableQuery(statementType,
|
||||
queryConfig['priority'], 0, 0, 0, str(jobStats['startTime']),
|
||||
str(jobStats['endTime']), analysis_date, self.config.duration, str(
|
||||
database), 0, queryConfig['query'])
|
||||
yield tq
|
||||
if timestamp >= start and timestamp <= end:
|
||||
if("query" in str(entry.payload)) and type(entry.payload) == collections.OrderedDict:
|
||||
payload = list(entry.payload.items())[-1][1]
|
||||
if "jobChange" in payload:
|
||||
print(f"\nEntries: {payload}")
|
||||
queryConfig = payload['jobChange']['job']['jobConfig']['queryConfig']
|
||||
jobStats = payload['jobChange']['job']['jobStats']
|
||||
statementType = queryConfig['statementType'] if hasattr(queryConfig, 'statementType') else ''
|
||||
database = queryConfig['destinationTable'] if hasattr(queryConfig, 'destinationTable') is not None else ''
|
||||
analysis_date = str(datetime.strptime(jobStats['startTime'][0:19], "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'))
|
||||
tq = TableQuery(statementType,
|
||||
queryConfig['priority'], 0, 0, 0, str(jobStats['startTime']),
|
||||
str(jobStats['endTime']), analysis_date, self.config.duration, str(
|
||||
database), 0, queryConfig['query'])
|
||||
yield tq
|
||||
|
||||
def close(self):
|
||||
self.engine.close()
|
||||
pass
|
||||
|
||||
def get_status(self) -> SourceStatus:
|
||||
return self.status
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user