From bd40ae45ce766887986cd75b22efa6651db9ae7e Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 26 Aug 2021 01:15:02 +0530 Subject: [PATCH] Bigquery Usage Pipeline added --- .../examples/workflows/bigquery_usage.json | 48 +++++++ .../src/metadata/ingestion/api/workflow.py | 2 - .../ingestion/sink/metadata_rest_tables.py | 4 +- .../src/metadata/ingestion/source/bigquery.py | 1 + .../ingestion/source/bigquery_usage.py | 131 ++++++++++++++++++ .../metadata/ingestion/source/sql_source.py | 13 +- 6 files changed, 189 insertions(+), 10 deletions(-) create mode 100644 ingestion/examples/workflows/bigquery_usage.json create mode 100644 ingestion/src/metadata/ingestion/source/bigquery_usage.py diff --git a/ingestion/examples/workflows/bigquery_usage.json b/ingestion/examples/workflows/bigquery_usage.json new file mode 100644 index 00000000000..cac249e1986 --- /dev/null +++ b/ingestion/examples/workflows/bigquery_usage.json @@ -0,0 +1,48 @@ +{ + "source": { + "type": "bigquery-usage", + "config": { + "project_id": "arched-champion-319818", + "username": "gcpuser@arched-champion-319818.iam.gserviceaccount.com", + "host_port": "https://bigquery.googleapis.com", + "service_name": "gcp_bigquery", + "duration": 2, + "options": { + "credentials_path": "examples/creds/bigquery-cred.json" + }, + "service_type": "BigQuery" + } + }, + "processor": { + "type": "query-parser", + "config": { + "filter": "" + } + }, + "stage": { + "type": "table-usage", + "config": { + "filename": "/tmp/bigquery_usage" + } + }, + "bulk_sink": { + "type": "metadata-usage", + "config": { + "filename": "/tmp/bigquery_usage" + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 8452223bb89..5347a384e0b 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -63,10 +63,8 @@ class Workflow: self.config = config self.ctx = WorkflowContext(workflow_id=self.config.run_id) source_type = self.config.source.type - print(source_type) source_class = self.get('metadata.ingestion.source.{}.{}Source'.format( self.typeClassFetch(source_type, True), self.typeClassFetch(source_type, False))) - print(source_class) metadata_config = self.config.metadata_server.dict().get("config", {}) self.source: Source = source_class.create( self.config.source.dict().get("config", {}), metadata_config, self.ctx diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py index f367dc68a23..65736a56f61 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py @@ -77,9 +77,9 @@ class MetadataRestTablesSink(Sink): '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) except (APIError, ValidationError) as err: logger.error( - "Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name)) + "Failed to ingest table {} in database {} ".format(table_and_db.table.name.__root__, table_and_db.database.name.__root__)) logger.error(err) - self.status.failure(table_and_db.table.name) + self.status.failure(table_and_db.table.name.__root__) def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index 58fe12c6c04..78861f71b1a 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -26,6 +26,7 @@ from ..ometa.auth_provider import MetadataServerConfig class BigQueryConfig(SQLConnectionConfig, SQLSource): scheme = "bigquery" project_id: Optional[str] = None + duration: int = 1 def get_connection_url(self): if self.project_id: diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py new file mode 100644 index 00000000000..96ab84212e4 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This import verifies that the dependencies are available. +import logging +from metadata.ingestion.models.table_queries import TableQuery +from sqlalchemy.engine import create_engine +from sqlalchemy.inspection import inspect +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus +from metadata.ingestion.api.source import Source, SourceStatus +from typing import Iterator, Union, Dict, Any, Iterable +from metadata.utils.helpers import get_start_and_end +from metadata.ingestion.source.bigquery import BigQueryConfig + +logger = logging.getLogger(__name__) + + +class BigqueryUsageSource(Source): + # SELECT statement from mysql information_schema to extract table and column metadata + SQL_STATEMENT = """ + DECLARE start_date, end_date DATE; + SET start_date = DATE("{start_date}"); + SET end_date = DATE("{end_date}"); + + WITH + auditlog AS ( + SELECT * + FROM `{project_id}.{dataset}.cloudaudit_googleapis_com_data*` + WHERE + _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', start_date) + AND FORMAT_DATE('%Y%m%d', end_date) + AND resource.type = "bigquery_resource" + ) + , job_completed AS ( + SELECT * + FROM auditlog + WHERE + protopayload_auditlog.methodName = "jobservice.jobcompleted" + AND protopayload_auditlog.status.message = "DONE" + ) + , insert AS ( + SELECT * + FROM auditlog + WHERE + protopayload_auditlog.methodName = "jobservice.insert" + AND protopayload_auditlog.status.message = "DONE" + ) + , unioned AS ( + SELECT * FROM job_completed + UNION ALL + SELECT * FROM insert + ) + + SELECT * FROM unioned + ORDER BY timestamp + LIMIT 1000 + """ + + SERVICE_TYPE = 'Bigquery' + scheme = "bigquery" + + def __init__(self, config, metadata_config, ctx): + super().__init__(ctx) + start, end = get_start_and_end(config.duration) + self.project_id = config.project_id + self.alchemy_helper = create_engine(self.get_connection_url(), **config.options) + inspector = inspect(self.alchemy_helper) + for schema in inspector.get_schema_names(): + dataset_name = schema + self.sql_stmt = BigqueryUsageSource.SQL_STATEMENT.format( + project_id=self.project_id, + start_date=start, + end_date=end, + dataset=dataset_name + ) + self._extract_iter: Union[None, Iterator] = None + self._database = 'bigquery' + self.status = SQLSourceStatus() + + def get_connection_url(self): + if self.project_id: + print(f"{self.scheme}://{self.project_id}") + return f"{self.scheme}://{self.project_id}" + return f"{self.scheme}://" + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = BigQueryConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: + """ + Provides iterator of result row from SQLAlchemy helper + :return: + """ + try: + rows = self.alchemy_helper.connect().execute(self.sql_stmt).fetchall() + for row in rows: + yield row + except Exception as err: + logger.error(f"{repr(err)} + {err}") + + def next_record(self) -> Iterable[TableQuery]: + for row in self._get_raw_extract_iter(): + tq = TableQuery(row['query'], row['label'], row['userid'], row['xid'], row['pid'], str(row['starttime']), + str(row['endtime']), str(row['analysis_date']), row['duration'], row['database'], + row['aborted'], row['sql']) + yield tq + + def close(self): + self.alchemy_helper.connect().close() + + def get_status(self) -> SourceStatus: + return self.status diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 78ba132fcf7..a8f4c40c56f 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -48,13 +48,14 @@ class SQLSourceStatus(SourceStatus): success: List[str] = field(default_factory=list) failures: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) def scanned(self, table_name: str) -> None: self.success.append(table_name) logger.info('Table Scanned: {}'.format(table_name)) - def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: - self.warnings.append(table_name) + def filter(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: + self.filtered.append(table_name) logger.warning("Dropped Table {} due to {}".format(table_name, err)) @@ -194,7 +195,7 @@ class SQLSource(Source): inspector = inspect(self.engine) for schema in inspector.get_schema_names(): if not self.sql_config.filter_pattern.included(schema): - self.status.filtered(schema, "Schema pattern not allowed") + self.status.filter(schema, "Schema pattern not allowed") continue logger.debug("total tables {}".format(inspector.get_table_names(schema))) if self.config.include_tables: @@ -209,7 +210,7 @@ class SQLSource(Source): try: schema, table_name = self.standardize_schema_table_names(schema, table_name) if not self.sql_config.filter_pattern.included(table_name): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name), + self.status.filter('{}.{}'.format(self.config.get_service_name(), table_name), "Table pattern not allowed") continue self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name)) @@ -230,7 +231,7 @@ class SQLSource(Source): yield table_and_db except ValidationError as err: logger.error(err) - self.status.filtered('{}.{}'.format(self.config.service_name, table_name), + self.status.filter('{}.{}'.format(self.config.service_name, table_name), "Validation error") continue @@ -240,7 +241,7 @@ class SQLSource(Source): for view_name in inspector.get_view_names(schema): try: if not self.sql_config.filter_pattern.included(view_name): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), view_name), + self.status.filter('{}.{}'.format(self.config.get_service_name(), view_name), "View pattern not allowed") continue try: