diff --git a/ingestion/examples/workflows/bigquery_usage.json b/ingestion/examples/workflows/bigquery_usage.json index f439a23e448..008e94a067b 100644 --- a/ingestion/examples/workflows/bigquery_usage.json +++ b/ingestion/examples/workflows/bigquery_usage.json @@ -1,16 +1,29 @@ { "source": { "type": "bigquery-usage", - "config": { - "project_id": "project_id", - "host_port": "https://bigquery.googleapis.com", - "username": "gcpuser@project_id.iam.gserviceaccount.com", - "service_name": "gcp_bigquery", - "duration": 2, - "options": { - "credentials_path": "examples/creds/bigquery-cred.json" + "serviceName": "local_bigquery", + "serviceConnection": { + "config": { + "type": "BigQuery", + "projectID": "project_id", + "enablePolicyTagImport": true, + "connectionOptions": { + "credentials": { + "type": "service_account", + "project_id": "project_id", + "private_key_id": "private_key_id", + "private_key": "private_key", + "client_email": "gcpuser@project_id.iam.gserviceaccount.com", + "client_id": "client_id", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "" + } + } } - } + }, + "sourceConfig": {"config": {"queryLogDuration": "1"}} }, "processor": { "type": "query-parser", @@ -24,17 +37,16 @@ "filename": "/tmp/bigquery_usage" } }, - "bulk_sink": { + "bulkSink": { "type": "metadata-usage", "config": { "filename": "/tmp/bigquery_usage" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index b2e5f6e56e9..c479ec56e0a 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -19,15 +19,21 @@ from typing import Iterable from google.cloud import logging +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigQueryConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.bigquery import BigQueryConfig, BigquerySource +from metadata.ingestion.source.bigquery import BigquerySource from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.utils.helpers import get_start_and_end @@ -38,40 +44,42 @@ class BigqueryUsageSource(Source[TableQuery]): SERVICE_TYPE = DatabaseServiceType.BigQuery.value scheme = "bigquery" - def __init__(self, config, metadata_config): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__() self.temp_credentials = None self.metadata_config = metadata_config self.config = config - self.project_id = self.config.project_id + self.service_connection = config.serviceConnection.__root__.config + self.project_id = self.service_connection.projectID self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" self.status = SQLSourceStatus() + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: BigQueryConnection = config.serviceConnection.__root__.config + options = connection.connectionOptions.dict() + if not isinstance(connection, BigQueryConnection): + raise InvalidSourceException( + f"Expected BigQueryConnection, but got {connection}" + ) if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): - if config.options.get("credentials_path"): - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = config.options[ + if options.get("credentials_path"): + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[ "credentials_path" ] - elif config.options.get("credentials"): - self.temp_credentials = BigquerySource.create_credential_temp_file( - credentials=config.options.get("credentials") + del connection.connectionOptions.credentials_path + elif options.get("credentials"): + cls.temp_credentials = BigquerySource.create_credential_temp_file( + credentials=options["credentials"] ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.temp_credentials - del config.options["credentials"] + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.temp_credentials + del connection.connectionOptions.credentials else: logger.warning( "Please refer to the BigQuery connector documentation, especially the credentials part " - "https://docs.open-metadata.org/connectors/bigquery-usage" + "https://docs.open-metadata.org/connectors/bigquery" ) - - def get_connection_url(self): - if self.project_id: - return f"{self.scheme}://{self.project_id}" - return f"{self.scheme}://" - - @classmethod - def create(cls, config_dict, metadata_config: OpenMetadataConnection): - config = BigQueryConfig.parse_obj(config_dict) return cls(config, metadata_config) def prepare(self): @@ -81,7 +89,7 @@ class BigqueryUsageSource(Source[TableQuery]): logging_client = logging.Client() usage_logger = logging_client.logger(self.logger_name) logger.debug("Listing entries for logger {}:".format(usage_logger.name)) - start, end = get_start_and_end(self.config.duration) + start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) try: entries = usage_logger.list_entries() for entry in entries: @@ -107,9 +115,7 @@ class BigqueryUsageSource(Source[TableQuery]): statementType = "" if hasattr(queryConfig, "statementType"): statementType = queryConfig["statementType"] - database = "" - if hasattr(queryConfig, "destinationTable"): - database = queryConfig["destinationTable"] + database = self.project_id analysis_date = str( datetime.strptime( jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S" @@ -127,7 +133,7 @@ class BigqueryUsageSource(Source[TableQuery]): aborted=0, database=str(database), sql=queryConfig["query"], - service_name=self.config.service_name, + service_name=self.config.serviceName, ) yield tq @@ -137,6 +143,9 @@ class BigqueryUsageSource(Source[TableQuery]): def get_status(self) -> SourceStatus: return self.status + def test_connection(self) -> SourceStatus: + pass + def close(self): super().close() if self.temp_credentials: