diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json index 3b03f5b4730..0fa05d9c5cc 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json @@ -73,6 +73,12 @@ "type": "string", "default": "us" }, + "usageLocation": { + "title": "Usage Location", + "description": "Location used to query INFORMATION_SCHEMA.JOBS_BY_PROJECT to fetch usage data. You can pass multi-regions, such as `us` or `eu`, or you specific region. Australia and Asia multi-regions are not yet in GA.", + "type": "string", + "default": "us" + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py index 66e8ba0fa9d..02802763a77 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py @@ -8,16 +8,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import collections - -# This import verifies that the dependencies are available. -import os +""" +Handle big query usage extraction +""" from datetime import datetime -from typing import Any, Dict, Iterable, Optional from google import auth -from google.cloud import logging from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, @@ -31,11 +27,11 @@ from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource from metadata.utils.credentials import set_google_credentials from metadata.utils.logger import ingestion_logger +from metadata.utils.sql_queries import BIGQUERY_USAGE_STATEMENT logger = ingestion_logger() @@ -46,14 +42,11 @@ class BigqueryUsageSource(UsageSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) - self.temp_credentials = None self.project_id = self.set_project_id() + self.database = self.project_id - self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" - self.logging_client = logging.Client() - self.usage_logger = self.logging_client.logger(self.logger_name) - logger.debug("Listing entries for logger {}:".format(self.usage_logger.name)) + self.sql_stmt = BIGQUERY_USAGE_STATEMENT @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -70,61 +63,17 @@ class BigqueryUsageSource(UsageSource): return cls(config, metadata_config) + def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: + """ + returns sql statement to fetch query logs + """ + return self.sql_stmt.format( + start_time=start_time, + end_time=end_time, + region=self.connection.usageLocation, + ) + @staticmethod def set_project_id(): _, project_id = auth.default() return project_id - - def get_table_query(self, entries: Iterable): - query_list = [] - for entry in entries: - timestamp = entry.timestamp.isoformat() - timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") - if timestamp >= self.start and timestamp <= self.end: - if ("query" in str(entry.payload)) and type( - entry.payload - ) == collections.OrderedDict: - payload = list(entry.payload.items())[-1][1] - if "jobChange" in payload: - logger.debug(f"\nEntries: {payload}") - if "queryConfig" in payload["jobChange"]["job"]["jobConfig"]: - queryConfig = payload["jobChange"]["job"]["jobConfig"][ - "queryConfig" - ] - else: - continue - jobStats = payload["jobChange"]["job"]["jobStats"] - statementType = "" - if hasattr(queryConfig, "statementType"): - statementType = queryConfig["statementType"] - database = self.project_id - analysis_date = str( - datetime.strptime( - jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S" - ).strftime("%Y-%m-%d %H:%M:%S") - ) - logger.debug(f"Query :{statementType}:{queryConfig['query']}") - tq = TableQuery( - query=queryConfig["query"], - userName=entry.resource.labels["project_id"], - startTime=str(jobStats["startTime"]), - endTime=str(jobStats["endTime"]), - analysisDate=analysis_date, - aborted=0, - databaseName=str(database), - serviceName=self.config.serviceName, - databaseSchema=None, - ) - query_list.append(tq) - return query_list - - def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, Any]]]: - entries = self.usage_logger.list_entries() - yield TableQueries( - queries=self.get_table_query(entries), - ) - - def close(self): - super().close() - if self.temp_credentials: - os.unlink(self.temp_credentials) diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 5c0cd6b3971..574e9ed57c2 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -340,3 +340,18 @@ SNOWFLAKE_GET_COMMENTS = """ WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}' """ + +BIGQUERY_USAGE_STATEMENT = """ + SELECT + project_id as database_name, + user_email as user_name, + statement_type as query_type, + start_time, + end_time, + query as query_text, + null as schema_name +FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT +WHERE creation_time BETWEEN "{start_time}" AND "{end_time}" + AND job_type = "QUERY" + AND state = "DONE" +"""