mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-05 22:16:32 +00:00
feat(bigquery): add created and modified timestamps to dataset containers (#14716)
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
parent
36cf767d2d
commit
d82ae8014e
@ -283,23 +283,30 @@ class BigQuerySchemaApi:
|
||||
with self.report.list_datasets_timer:
|
||||
self.report.num_list_datasets_api_requests += 1
|
||||
datasets = self.bq_client.list_datasets(project_id, max_results=maxResults)
|
||||
return [
|
||||
BigqueryDataset(
|
||||
name=d.dataset_id,
|
||||
labels=d.labels,
|
||||
location=(
|
||||
result = []
|
||||
for d in datasets:
|
||||
# TODO: Fetch dataset description individually impacts overall performance if the number of datasets is high (hundreds); instead we should fetch in batch for all datasets.
|
||||
# https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_get_dataset
|
||||
# https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.dataset.Dataset
|
||||
dataset = self.bq_client.get_dataset(d.reference)
|
||||
|
||||
location = (
|
||||
d._properties.get("location")
|
||||
if hasattr(d, "_properties") and isinstance(d._properties, dict)
|
||||
else None
|
||||
),
|
||||
# TODO: Fetch dataset description individually impacts overall performance if the number of datasets is high (hundreds); instead we should fetch in batch for all datasets.
|
||||
# TODO: Given we are calling get_dataset for each dataset, we may consume and publish other fields too, such as created, modified, etc...
|
||||
# https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_get_dataset
|
||||
# https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.dataset.Dataset
|
||||
comment=self.bq_client.get_dataset(d.reference).description,
|
||||
)
|
||||
for d in datasets
|
||||
]
|
||||
|
||||
result.append(
|
||||
BigqueryDataset(
|
||||
name=d.dataset_id,
|
||||
labels=d.labels,
|
||||
location=location,
|
||||
comment=dataset.description,
|
||||
created=dataset.created,
|
||||
last_altered=dataset.modified,
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
# This is not used anywhere
|
||||
def get_datasets_for_project_id_with_information_schema(
|
||||
|
@ -12,6 +12,7 @@ from datahub.emitter.mce_builder import (
|
||||
make_dataset_urn_with_platform_instance,
|
||||
make_schema_field_urn,
|
||||
make_tag_urn,
|
||||
make_ts_millis,
|
||||
)
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
|
||||
@ -300,6 +301,8 @@ class BigQuerySchemaGenerator:
|
||||
description: Optional[str] = None,
|
||||
tags: Optional[Dict[str, str]] = None,
|
||||
extra_properties: Optional[Dict[str, str]] = None,
|
||||
created: Optional[int] = None,
|
||||
last_modified: Optional[int] = None,
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
schema_container_key = self.gen_dataset_key(project_id, dataset)
|
||||
|
||||
@ -349,6 +352,8 @@ class BigQuerySchemaGenerator:
|
||||
),
|
||||
tags=tags_joined,
|
||||
extra_properties=extra_properties,
|
||||
created=created,
|
||||
last_modified=last_modified,
|
||||
)
|
||||
|
||||
def _process_project(
|
||||
@ -484,6 +489,12 @@ class BigQuerySchemaGenerator:
|
||||
else None
|
||||
),
|
||||
description=bigquery_dataset.comment,
|
||||
created=make_ts_millis(bigquery_dataset.created)
|
||||
if bigquery_dataset.created
|
||||
else None,
|
||||
last_modified=make_ts_millis(bigquery_dataset.last_altered)
|
||||
if bigquery_dataset.last_altered
|
||||
else None,
|
||||
)
|
||||
|
||||
columns = None
|
||||
|
@ -7,6 +7,7 @@ from typing import Any, Dict, List, Optional, cast
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
from google.api_core.exceptions import GoogleAPICallError
|
||||
from google.cloud.bigquery.table import Row, TableListItem
|
||||
|
||||
@ -57,6 +58,8 @@ from datahub.metadata.schema_classes import (
|
||||
TimeStampClass,
|
||||
)
|
||||
|
||||
FROZEN_TIME = "2022-02-03 07:00:00"
|
||||
|
||||
|
||||
def test_bigquery_uri():
|
||||
config = BigQueryV2Config.parse_obj(
|
||||
@ -569,6 +572,91 @@ def test_gen_table_dataset_workunits(
|
||||
assert len(mcps) >= 7
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
@patch.object(BigQueryV2Config, "get_bigquery_client")
|
||||
@patch.object(BigQueryV2Config, "get_projects_client")
|
||||
def test_get_datasets_for_project_id_with_timestamps(
|
||||
get_projects_client, get_bq_client_mock
|
||||
):
|
||||
project_id = "test-project"
|
||||
frozen_time = datetime.now(tz=timezone.utc)
|
||||
|
||||
# Mock the BigQuery client
|
||||
mock_bq_client = MagicMock()
|
||||
get_bq_client_mock.return_value = mock_bq_client
|
||||
|
||||
# Mock dataset list items (what list_datasets returns)
|
||||
mock_dataset_list_item1 = MagicMock()
|
||||
mock_dataset_list_item1.dataset_id = "dataset1"
|
||||
mock_dataset_list_item1.labels = {"env": "test"}
|
||||
mock_dataset_list_item1.reference = "dataset1_reference"
|
||||
mock_dataset_list_item1._properties = {"location": "US"}
|
||||
|
||||
mock_dataset_list_item2 = MagicMock()
|
||||
mock_dataset_list_item2.dataset_id = "dataset2"
|
||||
mock_dataset_list_item2.labels = {"env": "prod"}
|
||||
mock_dataset_list_item2.reference = "dataset2_reference"
|
||||
mock_dataset_list_item2._properties = {"location": "EU"}
|
||||
|
||||
# Mock full dataset objects (what get_dataset returns)
|
||||
mock_full_dataset1 = MagicMock()
|
||||
mock_full_dataset1.description = "Test dataset 1"
|
||||
mock_full_dataset1.created = frozen_time
|
||||
mock_full_dataset1.modified = frozen_time + timedelta(hours=1)
|
||||
|
||||
mock_full_dataset2 = MagicMock()
|
||||
mock_full_dataset2.description = None # Test missing description
|
||||
mock_full_dataset2.created = None # Test missing created timestamp
|
||||
mock_full_dataset2.modified = None # Test missing modified timestamp
|
||||
|
||||
# Configure mocks
|
||||
mock_bq_client.list_datasets.return_value = [
|
||||
mock_dataset_list_item1,
|
||||
mock_dataset_list_item2,
|
||||
]
|
||||
mock_bq_client.get_dataset.side_effect = lambda ref: {
|
||||
"dataset1_reference": mock_full_dataset1,
|
||||
"dataset2_reference": mock_full_dataset2,
|
||||
}[ref]
|
||||
|
||||
# Create BigQuerySchemaApi instance
|
||||
config = BigQueryV2Config.parse_obj({"project_id": project_id})
|
||||
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
|
||||
schema_api = source.bq_schema_extractor.schema_api
|
||||
|
||||
# Call the method
|
||||
result = schema_api.get_datasets_for_project_id(project_id)
|
||||
|
||||
# Assert correct number of datasets returned
|
||||
assert len(result) == 2
|
||||
|
||||
# Assert first dataset
|
||||
dataset1 = result[0]
|
||||
assert dataset1.name == "dataset1"
|
||||
assert dataset1.labels == {"env": "test"}
|
||||
assert dataset1.location == "US"
|
||||
assert dataset1.comment == "Test dataset 1"
|
||||
assert dataset1.created == frozen_time
|
||||
assert dataset1.last_altered == frozen_time + timedelta(hours=1)
|
||||
|
||||
# Assert second dataset (with missing timestamps)
|
||||
dataset2 = result[1]
|
||||
assert dataset2.name == "dataset2"
|
||||
assert dataset2.labels == {"env": "prod"}
|
||||
assert dataset2.location == "EU"
|
||||
assert dataset2.comment is None
|
||||
assert dataset2.created is None
|
||||
assert dataset2.last_altered is None
|
||||
|
||||
# Verify get_dataset was called exactly once per dataset
|
||||
assert mock_bq_client.get_dataset.call_count == 2
|
||||
mock_bq_client.get_dataset.assert_any_call("dataset1_reference")
|
||||
mock_bq_client.get_dataset.assert_any_call("dataset2_reference")
|
||||
|
||||
# Verify list_datasets was called once
|
||||
mock_bq_client.list_datasets.assert_called_once_with(project_id, max_results=None)
|
||||
|
||||
|
||||
@patch.object(BigQueryV2Config, "get_bigquery_client")
|
||||
@patch.object(BigQueryV2Config, "get_projects_client")
|
||||
def test_simple_upstream_table_generation(get_bq_client_mock, get_projects_client):
|
||||
|
Loading…
x
Reference in New Issue
Block a user