From bd40ae45ce766887986cd75b22efa6651db9ae7e Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 26 Aug 2021 01:15:02 +0530 Subject: [PATCH 1/6] Bigquery Usage Pipeline added --- .../examples/workflows/bigquery_usage.json | 48 +++++++ .../src/metadata/ingestion/api/workflow.py | 2 - .../ingestion/sink/metadata_rest_tables.py | 4 +- .../src/metadata/ingestion/source/bigquery.py | 1 + .../ingestion/source/bigquery_usage.py | 131 ++++++++++++++++++ .../metadata/ingestion/source/sql_source.py | 13 +- 6 files changed, 189 insertions(+), 10 deletions(-) create mode 100644 ingestion/examples/workflows/bigquery_usage.json create mode 100644 ingestion/src/metadata/ingestion/source/bigquery_usage.py diff --git a/ingestion/examples/workflows/bigquery_usage.json b/ingestion/examples/workflows/bigquery_usage.json new file mode 100644 index 00000000000..cac249e1986 --- /dev/null +++ b/ingestion/examples/workflows/bigquery_usage.json @@ -0,0 +1,48 @@ +{ + "source": { + "type": "bigquery-usage", + "config": { + "project_id": "arched-champion-319818", + "username": "gcpuser@arched-champion-319818.iam.gserviceaccount.com", + "host_port": "https://bigquery.googleapis.com", + "service_name": "gcp_bigquery", + "duration": 2, + "options": { + "credentials_path": "examples/creds/bigquery-cred.json" + }, + "service_type": "BigQuery" + } + }, + "processor": { + "type": "query-parser", + "config": { + "filter": "" + } + }, + "stage": { + "type": "table-usage", + "config": { + "filename": "/tmp/bigquery_usage" + } + }, + "bulk_sink": { + "type": "metadata-usage", + "config": { + "filename": "/tmp/bigquery_usage" + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 8452223bb89..5347a384e0b 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -63,10 +63,8 @@ class Workflow: self.config = config self.ctx = WorkflowContext(workflow_id=self.config.run_id) source_type = self.config.source.type - print(source_type) source_class = self.get('metadata.ingestion.source.{}.{}Source'.format( self.typeClassFetch(source_type, True), self.typeClassFetch(source_type, False))) - print(source_class) metadata_config = self.config.metadata_server.dict().get("config", {}) self.source: Source = source_class.create( self.config.source.dict().get("config", {}), metadata_config, self.ctx diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py index f367dc68a23..65736a56f61 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py @@ -77,9 +77,9 @@ class MetadataRestTablesSink(Sink): '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) except (APIError, ValidationError) as err: logger.error( - "Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name)) + "Failed to ingest table {} in database {} ".format(table_and_db.table.name.__root__, table_and_db.database.name.__root__)) logger.error(err) - self.status.failure(table_and_db.table.name) + self.status.failure(table_and_db.table.name.__root__) def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index 58fe12c6c04..78861f71b1a 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -26,6 +26,7 @@ from ..ometa.auth_provider import MetadataServerConfig class BigQueryConfig(SQLConnectionConfig, SQLSource): scheme = "bigquery" project_id: Optional[str] = None + duration: int = 1 def get_connection_url(self): if self.project_id: diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py new file mode 100644 index 00000000000..96ab84212e4 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This import verifies that the dependencies are available. +import logging +from metadata.ingestion.models.table_queries import TableQuery +from sqlalchemy.engine import create_engine +from sqlalchemy.inspection import inspect +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus +from metadata.ingestion.api.source import Source, SourceStatus +from typing import Iterator, Union, Dict, Any, Iterable +from metadata.utils.helpers import get_start_and_end +from metadata.ingestion.source.bigquery import BigQueryConfig + +logger = logging.getLogger(__name__) + + +class BigqueryUsageSource(Source): + # SELECT statement from mysql information_schema to extract table and column metadata + SQL_STATEMENT = """ + DECLARE start_date, end_date DATE; + SET start_date = DATE("{start_date}"); + SET end_date = DATE("{end_date}"); + + WITH + auditlog AS ( + SELECT * + FROM `{project_id}.{dataset}.cloudaudit_googleapis_com_data*` + WHERE + _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', start_date) + AND FORMAT_DATE('%Y%m%d', end_date) + AND resource.type = "bigquery_resource" + ) + , job_completed AS ( + SELECT * + FROM auditlog + WHERE + protopayload_auditlog.methodName = "jobservice.jobcompleted" + AND protopayload_auditlog.status.message = "DONE" + ) + , insert AS ( + SELECT * + FROM auditlog + WHERE + protopayload_auditlog.methodName = "jobservice.insert" + AND protopayload_auditlog.status.message = "DONE" + ) + , unioned AS ( + SELECT * FROM job_completed + UNION ALL + SELECT * FROM insert + ) + + SELECT * FROM unioned + ORDER BY timestamp + LIMIT 1000 + """ + + SERVICE_TYPE = 'Bigquery' + scheme = "bigquery" + + def __init__(self, config, metadata_config, ctx): + super().__init__(ctx) + start, end = get_start_and_end(config.duration) + self.project_id = config.project_id + self.alchemy_helper = create_engine(self.get_connection_url(), **config.options) + inspector = inspect(self.alchemy_helper) + for schema in inspector.get_schema_names(): + dataset_name = schema + self.sql_stmt = BigqueryUsageSource.SQL_STATEMENT.format( + project_id=self.project_id, + start_date=start, + end_date=end, + dataset=dataset_name + ) + self._extract_iter: Union[None, Iterator] = None + self._database = 'bigquery' + self.status = SQLSourceStatus() + + def get_connection_url(self): + if self.project_id: + print(f"{self.scheme}://{self.project_id}") + return f"{self.scheme}://{self.project_id}" + return f"{self.scheme}://" + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = BigQueryConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: + """ + Provides iterator of result row from SQLAlchemy helper + :return: + """ + try: + rows = self.alchemy_helper.connect().execute(self.sql_stmt).fetchall() + for row in rows: + yield row + except Exception as err: + logger.error(f"{repr(err)} + {err}") + + def next_record(self) -> Iterable[TableQuery]: + for row in self._get_raw_extract_iter(): + tq = TableQuery(row['query'], row['label'], row['userid'], row['xid'], row['pid'], str(row['starttime']), + str(row['endtime']), str(row['analysis_date']), row['duration'], row['database'], + row['aborted'], row['sql']) + yield tq + + def close(self): + self.alchemy_helper.connect().close() + + def get_status(self) -> SourceStatus: + return self.status diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 78ba132fcf7..a8f4c40c56f 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -48,13 +48,14 @@ class SQLSourceStatus(SourceStatus): success: List[str] = field(default_factory=list) failures: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) def scanned(self, table_name: str) -> None: self.success.append(table_name) logger.info('Table Scanned: {}'.format(table_name)) - def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: - self.warnings.append(table_name) + def filter(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: + self.filtered.append(table_name) logger.warning("Dropped Table {} due to {}".format(table_name, err)) @@ -194,7 +195,7 @@ class SQLSource(Source): inspector = inspect(self.engine) for schema in inspector.get_schema_names(): if not self.sql_config.filter_pattern.included(schema): - self.status.filtered(schema, "Schema pattern not allowed") + self.status.filter(schema, "Schema pattern not allowed") continue logger.debug("total tables {}".format(inspector.get_table_names(schema))) if self.config.include_tables: @@ -209,7 +210,7 @@ class SQLSource(Source): try: schema, table_name = self.standardize_schema_table_names(schema, table_name) if not self.sql_config.filter_pattern.included(table_name): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name), + self.status.filter('{}.{}'.format(self.config.get_service_name(), table_name), "Table pattern not allowed") continue self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name)) @@ -230,7 +231,7 @@ class SQLSource(Source): yield table_and_db except ValidationError as err: logger.error(err) - self.status.filtered('{}.{}'.format(self.config.service_name, table_name), + self.status.filter('{}.{}'.format(self.config.service_name, table_name), "Validation error") continue @@ -240,7 +241,7 @@ class SQLSource(Source): for view_name in inspector.get_view_names(schema): try: if not self.sql_config.filter_pattern.included(view_name): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), view_name), + self.status.filter('{}.{}'.format(self.config.get_service_name(), view_name), "View pattern not allowed") continue try: From 2d1ad58e1b0b975e662262c0083af35be28ac401 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 26 Aug 2021 01:17:01 +0530 Subject: [PATCH 2/6] Pipeline code refactored --- .../metadata/ingestion/source/bigquery_usage.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index 96ab84212e4..2d86950f8fd 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -19,9 +19,9 @@ from metadata.ingestion.models.table_queries import TableQuery from sqlalchemy.engine import create_engine from sqlalchemy.inspection import inspect from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.ingestion.api.source import Source, SourceStatus -from typing import Iterator, Union, Dict, Any, Iterable +from typing import Dict, Any, Iterable +from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.utils.helpers import get_start_and_end from metadata.ingestion.source.bigquery import BigQueryConfig @@ -76,8 +76,8 @@ class BigqueryUsageSource(Source): super().__init__(ctx) start, end = get_start_and_end(config.duration) self.project_id = config.project_id - self.alchemy_helper = create_engine(self.get_connection_url(), **config.options) - inspector = inspect(self.alchemy_helper) + self.engine = create_engine(self.get_connection_url(), **config.options).connect() + inspector = inspect(self.engine) for schema in inspector.get_schema_names(): dataset_name = schema self.sql_stmt = BigqueryUsageSource.SQL_STATEMENT.format( @@ -86,8 +86,6 @@ class BigqueryUsageSource(Source): end_date=end, dataset=dataset_name ) - self._extract_iter: Union[None, Iterator] = None - self._database = 'bigquery' self.status = SQLSourceStatus() def get_connection_url(self): @@ -111,7 +109,7 @@ class BigqueryUsageSource(Source): :return: """ try: - rows = self.alchemy_helper.connect().execute(self.sql_stmt).fetchall() + rows = self.engine.execute(self.sql_stmt).fetchall() for row in rows: yield row except Exception as err: @@ -125,7 +123,7 @@ class BigqueryUsageSource(Source): yield tq def close(self): - self.alchemy_helper.connect().close() + self.engine.close() def get_status(self) -> SourceStatus: return self.status From a58cae7286dd64ec9bdd5b4c693c694d0a2d2907 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 26 Aug 2021 01:19:32 +0530 Subject: [PATCH 3/6] Pipeline config changes --- ingestion/examples/workflows/bigquery_usage.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/examples/workflows/bigquery_usage.json b/ingestion/examples/workflows/bigquery_usage.json index cac249e1986..4df0bc97128 100644 --- a/ingestion/examples/workflows/bigquery_usage.json +++ b/ingestion/examples/workflows/bigquery_usage.json @@ -2,9 +2,9 @@ "source": { "type": "bigquery-usage", "config": { - "project_id": "arched-champion-319818", - "username": "gcpuser@arched-champion-319818.iam.gserviceaccount.com", + "project_id": "project_id", "host_port": "https://bigquery.googleapis.com", + "username": "gcpuser@project_id.iam.gserviceaccount.com", "service_name": "gcp_bigquery", "duration": 2, "options": { From 820af4629ce42a2cdd5ac7ba2f8f195cdf123d0c Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 26 Aug 2021 01:42:12 +0530 Subject: [PATCH 4/6] RedshiftConfig Filter Modified --- ingestion/examples/workflows/redshift.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index ec1492219a6..8f4b60c9ef5 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -9,7 +9,7 @@ "service_name": "aws_redshift", "service_type": "Redshift", "filter_pattern": { - "excludes": ["information_schema.*"] + "excludes": ["information_schema.*","[\\w]*event_vw.*"] } } }, From ef865ccf9fb3f38dfdf8b38da7e2de5159e69c13 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 27 Aug 2021 01:30:00 +0530 Subject: [PATCH 5/6] Bigquery Audit Log client implemented --- .../ingestion/source/bigquery_usage.py | 102 ++++++------------ .../metadata/ingestion/stage/table_usage.py | 6 +- 2 files changed, 35 insertions(+), 73 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index 2d86950f8fd..048079604a3 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -14,10 +14,12 @@ # limitations under the License. # This import verifies that the dependencies are available. -import logging +import logging as log from metadata.ingestion.models.table_queries import TableQuery from sqlalchemy.engine import create_engine -from sqlalchemy.inspection import inspect +from google.cloud import logging +import collections +from datetime import datetime from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.api.source import Source, SourceStatus from typing import Dict, Any, Iterable @@ -25,67 +27,20 @@ from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.utils.helpers import get_start_and_end from metadata.ingestion.source.bigquery import BigQueryConfig -logger = logging.getLogger(__name__) +logger = log.getLogger(__name__) class BigqueryUsageSource(Source): - # SELECT statement from mysql information_schema to extract table and column metadata - SQL_STATEMENT = """ - DECLARE start_date, end_date DATE; - SET start_date = DATE("{start_date}"); - SET end_date = DATE("{end_date}"); - - WITH - auditlog AS ( - SELECT * - FROM `{project_id}.{dataset}.cloudaudit_googleapis_com_data*` - WHERE - _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', start_date) - AND FORMAT_DATE('%Y%m%d', end_date) - AND resource.type = "bigquery_resource" - ) - , job_completed AS ( - SELECT * - FROM auditlog - WHERE - protopayload_auditlog.methodName = "jobservice.jobcompleted" - AND protopayload_auditlog.status.message = "DONE" - ) - , insert AS ( - SELECT * - FROM auditlog - WHERE - protopayload_auditlog.methodName = "jobservice.insert" - AND protopayload_auditlog.status.message = "DONE" - ) - , unioned AS ( - SELECT * FROM job_completed - UNION ALL - SELECT * FROM insert - ) - - SELECT * FROM unioned - ORDER BY timestamp - LIMIT 1000 - """ - SERVICE_TYPE = 'Bigquery' scheme = "bigquery" def __init__(self, config, metadata_config, ctx): super().__init__(ctx) - start, end = get_start_and_end(config.duration) - self.project_id = config.project_id - self.engine = create_engine(self.get_connection_url(), **config.options).connect() - inspector = inspect(self.engine) - for schema in inspector.get_schema_names(): - dataset_name = schema - self.sql_stmt = BigqueryUsageSource.SQL_STATEMENT.format( - project_id=self.project_id, - start_date=start, - end_date=end, - dataset=dataset_name - ) + + self.config = config + self.project_id = self.config.project_id + self.engine = create_engine(self.get_connection_url(), **self.config.options).connect() + self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" self.status = SQLSourceStatus() def get_connection_url(self): @@ -103,24 +58,27 @@ class BigqueryUsageSource(Source): def prepare(self): pass - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: - """ - Provides iterator of result row from SQLAlchemy helper - :return: - """ - try: - rows = self.engine.execute(self.sql_stmt).fetchall() - for row in rows: - yield row - except Exception as err: - logger.error(f"{repr(err)} + {err}") - def next_record(self) -> Iterable[TableQuery]: - for row in self._get_raw_extract_iter(): - tq = TableQuery(row['query'], row['label'], row['userid'], row['xid'], row['pid'], str(row['starttime']), - str(row['endtime']), str(row['analysis_date']), row['duration'], row['database'], - row['aborted'], row['sql']) - yield tq + logging_client = logging.Client() + logger = logging_client.logger(self.logger_name) + print("Listing entries for logger {}:".format(logger.name)) + start, end = get_start_and_end(self.config.duration) + for entry in logger.list_entries(): + timestamp = entry.timestamp.isoformat() + timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") + if("query" in str(entry.payload)) and type(entry.payload) == collections.OrderedDict: + payload = list(entry.payload.items())[-1][1] + if "jobChange" in payload: + queryConfig = payload['jobChange']['job']['jobConfig']['queryConfig'] + jobStats = payload['jobChange']['job']['jobStats'] + statementType = queryConfig['statementType'] if hasattr(queryConfig, 'statementType') else '' + database = queryConfig['destinationTable'] if hasattr(queryConfig, 'destinationTable') is not None else '' + analysis_date = str(datetime.strptime(jobStats['startTime'][0:19],"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S')) + tq = TableQuery(statementType, + queryConfig['priority'], 0, 0, 0, str(jobStats['startTime']), + str(jobStats['endTime']), analysis_date, self.config.duration, str( + database), 0, queryConfig['query']) + yield tq def close(self): self.engine.close() diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 2c970451237..b45a74963c7 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -32,7 +32,11 @@ def get_table_column_join(table, table_aliases, joins): for join in joins: try: if "." in join: - jtable, column = join.split(".") + if join.count(".") < 3: + jtable, column = join.split(".") + else: + jtable, column = join.split(".")[2:] + if table == jtable or jtable in table_aliases: table_column = TableColumn(table=table_aliases[jtable] if jtable in table_aliases else jtable, column=column) From 3f4b938b2072671b4a2227066dfa61f1c2845571 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 27 Aug 2021 02:21:41 +0530 Subject: [PATCH 6/6] Timestamp according to duration added --- .../ingestion/source/bigquery_usage.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index 048079604a3..d1995a15e55 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -16,7 +16,6 @@ # This import verifies that the dependencies are available. import logging as log from metadata.ingestion.models.table_queries import TableQuery -from sqlalchemy.engine import create_engine from google.cloud import logging import collections from datetime import datetime @@ -39,7 +38,6 @@ class BigqueryUsageSource(Source): self.config = config self.project_id = self.config.project_id - self.engine = create_engine(self.get_connection_url(), **self.config.options).connect() self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" self.status = SQLSourceStatus() @@ -66,22 +64,24 @@ class BigqueryUsageSource(Source): for entry in logger.list_entries(): timestamp = entry.timestamp.isoformat() timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") - if("query" in str(entry.payload)) and type(entry.payload) == collections.OrderedDict: - payload = list(entry.payload.items())[-1][1] - if "jobChange" in payload: - queryConfig = payload['jobChange']['job']['jobConfig']['queryConfig'] - jobStats = payload['jobChange']['job']['jobStats'] - statementType = queryConfig['statementType'] if hasattr(queryConfig, 'statementType') else '' - database = queryConfig['destinationTable'] if hasattr(queryConfig, 'destinationTable') is not None else '' - analysis_date = str(datetime.strptime(jobStats['startTime'][0:19],"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S')) - tq = TableQuery(statementType, - queryConfig['priority'], 0, 0, 0, str(jobStats['startTime']), - str(jobStats['endTime']), analysis_date, self.config.duration, str( - database), 0, queryConfig['query']) - yield tq + if timestamp >= start and timestamp <= end: + if("query" in str(entry.payload)) and type(entry.payload) == collections.OrderedDict: + payload = list(entry.payload.items())[-1][1] + if "jobChange" in payload: + print(f"\nEntries: {payload}") + queryConfig = payload['jobChange']['job']['jobConfig']['queryConfig'] + jobStats = payload['jobChange']['job']['jobStats'] + statementType = queryConfig['statementType'] if hasattr(queryConfig, 'statementType') else '' + database = queryConfig['destinationTable'] if hasattr(queryConfig, 'destinationTable') is not None else '' + analysis_date = str(datetime.strptime(jobStats['startTime'][0:19], "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S')) + tq = TableQuery(statementType, + queryConfig['priority'], 0, 0, 0, str(jobStats['startTime']), + str(jobStats['endTime']), analysis_date, self.config.duration, str( + database), 0, queryConfig['query']) + yield tq def close(self): - self.engine.close() + pass def get_status(self) -> SourceStatus: return self.status