From c5cc53b99a15d2bda762497a009c99a2e265c088 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 14 Jun 2023 12:50:21 -0400 Subject: [PATCH] feat(ingest/bigquery_v2): enable platform instance using project id (#8216) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Adrián Pertíñez Co-authored-by: Harshal Sheth --- .../ingestion/source/bigquery_v2/bigquery.py | 20 +- .../bigquery_v2/bigquery_mcp_golden.json | 267 ++++++++++++++++++ .../integration/bigquery_v2/test_bigquery.py | 80 ++++++ .../tests/unit/test_bigquery_source.py | 20 ++ 4 files changed, 385 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json create mode 100644 metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index f6f7912397..4daee3a59e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -13,6 +13,7 @@ from google.cloud.bigquery.table import TableListItem from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter.mce_builder import ( make_data_platform_urn, + make_dataplatform_instance_urn, make_dataset_urn, make_tag_urn, set_dataset_urn_to_lower, @@ -105,6 +106,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import ( TimeType, ) from datahub.metadata.schema_classes import ( + DataPlatformInstanceClass, DatasetLineageTypeClass, GlobalTagsClass, TagAssociationClass, @@ -138,9 +140,9 @@ def cleanup(config: BigQueryV2Config) -> None: @platform_name("BigQuery", doc_order=1) @config_class(BigQueryV2Config) @support_status(SupportStatus.CERTIFIED) -@capability( +@capability( # DataPlatformAspect is set to project id, but not added to urns as project id is in the container path SourceCapability.PLATFORM_INSTANCE, - "Not supported since BigQuery project ids are globally unique", + "Platform instance is pre-set to the BigQuery project id", supported=False, ) @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") @@ -398,6 +400,17 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): ) return test_report + def get_dataplatform_instance_aspect( + self, dataset_urn: str, project_id: str + ) -> MetadataWorkUnit: + aspect = DataPlatformInstanceClass( + platform=make_data_platform_urn(self.platform), + instance=make_dataplatform_instance_urn(self.platform, project_id), + ) + return MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=aspect + ).as_workunit() + def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey: return BigQueryDatasetKey( project_id=db_name, @@ -987,6 +1000,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): dataset_urn=dataset_urn, parent_container_key=self.gen_dataset_key(project_id, dataset_name), ) + yield self.get_dataplatform_instance_aspect( + dataset_urn=dataset_urn, project_id=project_id + ) subTypes = SubTypes(typeNames=sub_types) yield MetadataChangeProposalWrapper( diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json new file mode 100644 index 0000000000..38d36871fb --- /dev/null +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -0,0 +1,267 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "project-id-1" + }, + "name": "project-id-1" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "project-id-1", + "dataset_id": "bigquery-dataset-1" + }, + "externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m4!1m3!3m2!1sproject-id-1!2sbigquery-dataset-1", + "name": "bigquery-dataset-1" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:068bd9323110994a40019fcf6cfc60d3" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "project-id-1.bigquery-dataset-1.table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m5!1m4!4m3!1sproject-id-1!2sbigquery-dataset-1!3stable-1", + "name": "table-1", + "qualifiedName": "project-id-1.bigquery-dataset-1.table-1", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py new file mode 100644 index 0000000000..3bda6c5cce --- /dev/null +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -0,0 +1,80 @@ +from typing import Any, Dict +from unittest.mock import patch + +from freezegun import freeze_time +from google.cloud.bigquery.table import TableListItem + +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigqueryDataset, + BigqueryTable, +) +from tests.test_helpers import mce_helpers +from tests.test_helpers.state_helpers import run_and_get_pipeline + +FROZEN_TIME = "2022-02-03 07:00:00" + + +@freeze_time(FROZEN_TIME) +@patch( + "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset" +) +@patch( + "datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_core_table_details" +) +@patch( + "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_datasets_for_project_id" +) +@patch("google.cloud.bigquery.Client") +def test_bigquery_v2_ingest( + client, + get_datasets_for_project_id, + get_core_table_details, + get_tables_for_dataset, + pytestconfig, + tmp_path, +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" + mcp_golden_path = "{}/bigquery_mcp_golden.json".format(test_resources_dir) + mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") + + get_datasets_for_project_id.return_value = [ + BigqueryDataset(name="bigquery-dataset-1") + ] + + table_list_item = TableListItem( + {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} + ) + table_name = "table-1" + get_core_table_details.return_value = {table_name: table_list_item} + + bigquery_table = BigqueryTable( + name=table_name, + comment=None, + created=None, + last_altered=None, + size_in_bytes=None, + rows_count=None, + ) + get_tables_for_dataset.return_value = iter([bigquery_table]) + + source_config_dict: Dict[str, Any] = { + "project_ids": ["project-id-1"], + "include_usage_statistics": False, + "include_table_lineage": False, + } + + pipeline_config_dict: Dict[str, Any] = { + "source": { + "type": "bigquery", + "config": source_config_dict, + }, + "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + } + + run_and_get_pipeline(pipeline_config_dict) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=mcp_output_path, + golden_path=mcp_golden_path, + ) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index e753284add..49dc66b232 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -128,6 +128,26 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern(): ] +def test_get_dataplatform_instance_aspect_returns_project_id(): + project_id = "project_id" + expected_instance = ( + f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})" + ) + + config = BigQueryV2Config.parse_obj({}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) + + data_platform_instance = source.get_dataplatform_instance_aspect( + "urn:li:test", project_id + ) + + metadata = data_platform_instance.get_metadata()["metadata"] + + assert data_platform_instance is not None + assert metadata.aspectName == "dataPlatformInstance" + assert metadata.aspect.instance == expected_instance + + @patch("google.cloud.bigquery.client.Client") def test_get_projects_with_single_project_id(client_mock): config = BigQueryV2Config.parse_obj({"project_id": "test-3"})