feat(ingest/bigquery_v2): enable platform instance using project id (#8216)

Co-authored-by: Adrián Pertíñez <khurzak92@gmail.com>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Andrew Sikowitz 2023-06-14 12:50:21 -04:00 committed by GitHub
parent 0c22bda209
commit c5cc53b99a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 385 additions and 2 deletions

View File

@ -13,6 +13,7 @@ from google.cloud.bigquery.table import TableListItem
from datahub.configuration.pattern_utils import is_schema_allowed from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import ( from datahub.emitter.mce_builder import (
make_data_platform_urn, make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn, make_dataset_urn,
make_tag_urn, make_tag_urn,
set_dataset_urn_to_lower, set_dataset_urn_to_lower,
@ -105,6 +106,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
TimeType, TimeType,
) )
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetLineageTypeClass, DatasetLineageTypeClass,
GlobalTagsClass, GlobalTagsClass,
TagAssociationClass, TagAssociationClass,
@ -138,9 +140,9 @@ def cleanup(config: BigQueryV2Config) -> None:
@platform_name("BigQuery", doc_order=1) @platform_name("BigQuery", doc_order=1)
@config_class(BigQueryV2Config) @config_class(BigQueryV2Config)
@support_status(SupportStatus.CERTIFIED) @support_status(SupportStatus.CERTIFIED)
@capability( @capability( # DataPlatformAspect is set to project id, but not added to urns as project id is in the container path
SourceCapability.PLATFORM_INSTANCE, SourceCapability.PLATFORM_INSTANCE,
"Not supported since BigQuery project ids are globally unique", "Platform instance is pre-set to the BigQuery project id",
supported=False, supported=False,
) )
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@ -398,6 +400,17 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
) )
return test_report return test_report
def get_dataplatform_instance_aspect(
self, dataset_urn: str, project_id: str
) -> MetadataWorkUnit:
aspect = DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(self.platform, project_id),
)
return MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=aspect
).as_workunit()
def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey: def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
return BigQueryDatasetKey( return BigQueryDatasetKey(
project_id=db_name, project_id=db_name,
@ -987,6 +1000,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
dataset_urn=dataset_urn, dataset_urn=dataset_urn,
parent_container_key=self.gen_dataset_key(project_id, dataset_name), parent_container_key=self.gen_dataset_key(project_id, dataset_name),
) )
yield self.get_dataplatform_instance_aspect(
dataset_urn=dataset_urn, project_id=project_id
)
subTypes = SubTypes(typeNames=sub_types) subTypes = SubTypes(typeNames=sub_types)
yield MetadataChangeProposalWrapper( yield MetadataChangeProposalWrapper(

View File

@ -0,0 +1,267 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "bigquery",
"env": "PROD",
"project_id": "project-id-1"
},
"name": "project-id-1"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Project"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "bigquery",
"env": "PROD",
"project_id": "project-id-1",
"dataset_id": "bigquery-dataset-1"
},
"externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m4!1m3!3m2!1sproject-id-1!2sbigquery-dataset-1",
"name": "bigquery-dataset-1"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Dataset"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:068bd9323110994a40019fcf6cfc60d3"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "project-id-1.bigquery-dataset-1.table-1",
"platform": "urn:li:dataPlatform:bigquery",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m5!1m4!4m3!1sproject-id-1!2sbigquery-dataset-1!3stable-1",
"name": "table-1",
"qualifiedName": "project-id-1.bigquery-dataset-1.table-1",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00"
}
}
]

View File

@ -0,0 +1,80 @@
from typing import Any, Dict
from unittest.mock import patch
from freezegun import freeze_time
from google.cloud.bigquery.table import TableListItem
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryDataset,
BigqueryTable,
)
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import run_and_get_pipeline
FROZEN_TIME = "2022-02-03 07:00:00"
@freeze_time(FROZEN_TIME)
@patch(
"datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset"
)
@patch(
"datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_core_table_details"
)
@patch(
"datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_datasets_for_project_id"
)
@patch("google.cloud.bigquery.Client")
def test_bigquery_v2_ingest(
client,
get_datasets_for_project_id,
get_core_table_details,
get_tables_for_dataset,
pytestconfig,
tmp_path,
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2"
mcp_golden_path = "{}/bigquery_mcp_golden.json".format(test_resources_dir)
mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json")
get_datasets_for_project_id.return_value = [
BigqueryDataset(name="bigquery-dataset-1")
]
table_list_item = TableListItem(
{"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}}
)
table_name = "table-1"
get_core_table_details.return_value = {table_name: table_list_item}
bigquery_table = BigqueryTable(
name=table_name,
comment=None,
created=None,
last_altered=None,
size_in_bytes=None,
rows_count=None,
)
get_tables_for_dataset.return_value = iter([bigquery_table])
source_config_dict: Dict[str, Any] = {
"project_ids": ["project-id-1"],
"include_usage_statistics": False,
"include_table_lineage": False,
}
pipeline_config_dict: Dict[str, Any] = {
"source": {
"type": "bigquery",
"config": source_config_dict,
},
"sink": {"type": "file", "config": {"filename": mcp_output_path}},
}
run_and_get_pipeline(pipeline_config_dict)
mce_helpers.check_golden_file(
pytestconfig,
output_path=mcp_output_path,
golden_path=mcp_golden_path,
)

View File

@ -128,6 +128,26 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern():
] ]
def test_get_dataplatform_instance_aspect_returns_project_id():
project_id = "project_id"
expected_instance = (
f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})"
)
config = BigQueryV2Config.parse_obj({})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
data_platform_instance = source.get_dataplatform_instance_aspect(
"urn:li:test", project_id
)
metadata = data_platform_instance.get_metadata()["metadata"]
assert data_platform_instance is not None
assert metadata.aspectName == "dataPlatformInstance"
assert metadata.aspect.instance == expected_instance
@patch("google.cloud.bigquery.client.Client") @patch("google.cloud.bigquery.client.Client")
def test_get_projects_with_single_project_id(client_mock): def test_get_projects_with_single_project_id(client_mock):
config = BigQueryV2Config.parse_obj({"project_id": "test-3"}) config = BigQueryV2Config.parse_obj({"project_id": "test-3"})