feat(ingest/bigquery_v2): enable platform instance using project id (#8142)

Co-authored-by: Andrew Sikowitz <andrew.sikowitz@acryl.io>
This commit is contained in:
Adrián Pertíñez 2023-06-06 00:17:40 +02:00 committed by GitHub
parent 3022c2d12e
commit 743439c11d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 382 additions and 19 deletions

View File

@ -14,7 +14,7 @@ from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_dataset_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
@ -142,7 +142,7 @@ def cleanup(config: BigQueryV2Config) -> None:
@support_status(SupportStatus.CERTIFIED)
@capability(
SourceCapability.PLATFORM_INSTANCE,
"Not supported since BigQuery project ids are globally unique",
"Platform instance is pre-set to the BigQuery project id",
supported=False,
)
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@ -401,21 +401,16 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
return test_report
def get_dataplatform_instance_aspect(
self, dataset_urn: str
) -> Optional[MetadataWorkUnit]:
self, dataset_urn: str, project_id: str
) -> MetadataWorkUnit:
# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
aspect = DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
),
)
return MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=aspect
).as_workunit()
else:
return None
aspect = DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(self.platform, project_id),
)
return MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=aspect
).as_workunit()
def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
return BigQueryDatasetKey(
@ -1008,7 +1003,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
dataset_urn=dataset_urn,
parent_container_key=self.gen_dataset_key(project_id, dataset_name),
)
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
dpi_aspect = self.get_dataplatform_instance_aspect(
dataset_urn=dataset_urn, project_id=project_id
)
if dpi_aspect:
yield dpi_aspect
@ -1068,10 +1065,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
def gen_dataset_urn(self, project_id: str, dataset_name: str, table: str) -> str:
datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table)
dataset_urn = make_dataset_urn_with_platform_instance(
dataset_urn = make_dataset_urn(
self.platform,
str(datahub_dataset_name),
self.config.platform_instance,
self.config.env,
)
return dataset_urn

View File

@ -0,0 +1,267 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "bigquery",
"env": "PROD",
"project_id": "project-id-1"
},
"name": "project-id-1"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Project"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "bigquery",
"env": "PROD",
"project_id": "project-id-1",
"dataset_id": "bigquery-dataset-1"
},
"externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m4!1m3!3m2!1sproject-id-1!2sbigquery-dataset-1",
"name": "bigquery-dataset-1"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Dataset"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:068bd9323110994a40019fcf6cfc60d3"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "project-id-1.bigquery-dataset-1.table-1",
"platform": "urn:li:dataPlatform:bigquery",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m5!1m4!4m3!1sproject-id-1!2sbigquery-dataset-1!3stable-1",
"name": "table-1",
"qualifiedName": "project-id-1.bigquery-dataset-1.table-1",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
}
]

View File

@ -0,0 +1,80 @@
from typing import Any, Dict
from unittest.mock import patch
from freezegun import freeze_time
from google.cloud.bigquery.table import TableListItem
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryDataset,
BigqueryTable,
)
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import run_and_get_pipeline
FROZEN_TIME = "2022-02-03 07:00:00"
@freeze_time(FROZEN_TIME)
@patch(
"datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset"
)
@patch(
"datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_core_table_details"
)
@patch(
"datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_datasets_for_project_id"
)
@patch("google.cloud.bigquery.Client")
def test_bigquery_v2_ingest(
client,
get_datasets_for_project_id,
get_core_table_details,
get_tables_for_dataset,
pytestconfig,
tmp_path,
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2"
mcp_golden_path = "{}/bigquery_mcp_golden.json".format(test_resources_dir)
mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json")
get_datasets_for_project_id.return_value = [
BigqueryDataset(name="bigquery-dataset-1")
]
table_list_item = TableListItem(
{"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}}
)
table_name = "table-1"
get_core_table_details.return_value = {table_name: table_list_item}
bigquery_table = BigqueryTable(
name=table_name,
comment=None,
created=None,
last_altered=None,
size_in_bytes=None,
rows_count=None,
)
get_tables_for_dataset.return_value = iter([bigquery_table])
source_config_dict: Dict[str, Any] = {
"project_ids": ["project-id-1"],
"include_usage_statistics": False,
"include_table_lineage": False,
}
pipeline_config_dict: Dict[str, Any] = {
"source": {
"type": "bigquery",
"config": source_config_dict,
},
"sink": {"type": "file", "config": {"filename": mcp_output_path}},
}
run_and_get_pipeline(pipeline_config_dict)
mce_helpers.check_golden_file(
pytestconfig,
output_path=mcp_output_path,
golden_path=mcp_golden_path,
)

View File

@ -128,6 +128,26 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern():
]
def test_get_dataplatform_instance_aspect_returns_project_id():
project_id = "project_id"
expected_instance = (
f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})"
)
config = BigQueryV2Config.parse_obj({})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
data_platform_instance = source.get_dataplatform_instance_aspect(
"urn:li:test", project_id
)
metadata = data_platform_instance.get_metadata()["metadata"]
assert data_platform_instance is not None
assert metadata.aspectName == "dataPlatformInstance"
assert metadata.aspect.instance == expected_instance
@patch("google.cloud.bigquery.client.Client")
def test_get_projects_with_single_project_id(client_mock):
config = BigQueryV2Config.parse_obj({"project_id": "test-3"})