diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index eb6b620f3f..d6a5585092 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -283,23 +283,30 @@ class BigQuerySchemaApi: with self.report.list_datasets_timer: self.report.num_list_datasets_api_requests += 1 datasets = self.bq_client.list_datasets(project_id, max_results=maxResults) - return [ - BigqueryDataset( - name=d.dataset_id, - labels=d.labels, - location=( - d._properties.get("location") - if hasattr(d, "_properties") and isinstance(d._properties, dict) - else None - ), - # TODO: Fetch dataset description individually impacts overall performance if the number of datasets is high (hundreds); instead we should fetch in batch for all datasets. - # TODO: Given we are calling get_dataset for each dataset, we may consume and publish other fields too, such as created, modified, etc... - # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_get_dataset - # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.dataset.Dataset - comment=self.bq_client.get_dataset(d.reference).description, + result = [] + for d in datasets: + # TODO: Fetch dataset description individually impacts overall performance if the number of datasets is high (hundreds); instead we should fetch in batch for all datasets. + # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_get_dataset + # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.dataset.Dataset + dataset = self.bq_client.get_dataset(d.reference) + + location = ( + d._properties.get("location") + if hasattr(d, "_properties") and isinstance(d._properties, dict) + else None ) - for d in datasets - ] + + result.append( + BigqueryDataset( + name=d.dataset_id, + labels=d.labels, + location=location, + comment=dataset.description, + created=dataset.created, + last_altered=dataset.modified, + ) + ) + return result # This is not used anywhere def get_datasets_for_project_id_with_information_schema( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 0172a93ac7..56e13e1a13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -12,6 +12,7 @@ from datahub.emitter.mce_builder import ( make_dataset_urn_with_platform_instance, make_schema_field_urn, make_tag_urn, + make_ts_millis, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey @@ -300,6 +301,8 @@ class BigQuerySchemaGenerator: description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, extra_properties: Optional[Dict[str, str]] = None, + created: Optional[int] = None, + last_modified: Optional[int] = None, ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_dataset_key(project_id, dataset) @@ -349,6 +352,8 @@ class BigQuerySchemaGenerator: ), tags=tags_joined, extra_properties=extra_properties, + created=created, + last_modified=last_modified, ) def _process_project( @@ -484,6 +489,12 @@ class BigQuerySchemaGenerator: else None ), description=bigquery_dataset.comment, + created=make_ts_millis(bigquery_dataset.created) + if bigquery_dataset.created + else None, + last_modified=make_ts_millis(bigquery_dataset.last_altered) + if bigquery_dataset.last_altered + else None, ) columns = None diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py b/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py index aa11cbe6c5..2df57eeff8 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List, Optional, cast from unittest.mock import MagicMock, Mock, patch import pytest +from freezegun import freeze_time from google.api_core.exceptions import GoogleAPICallError from google.cloud.bigquery.table import Row, TableListItem @@ -57,6 +58,8 @@ from datahub.metadata.schema_classes import ( TimeStampClass, ) +FROZEN_TIME = "2022-02-03 07:00:00" + def test_bigquery_uri(): config = BigQueryV2Config.parse_obj( @@ -569,6 +572,91 @@ def test_gen_table_dataset_workunits( assert len(mcps) >= 7 +@freeze_time(FROZEN_TIME) +@patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_datasets_for_project_id_with_timestamps( + get_projects_client, get_bq_client_mock +): + project_id = "test-project" + frozen_time = datetime.now(tz=timezone.utc) + + # Mock the BigQuery client + mock_bq_client = MagicMock() + get_bq_client_mock.return_value = mock_bq_client + + # Mock dataset list items (what list_datasets returns) + mock_dataset_list_item1 = MagicMock() + mock_dataset_list_item1.dataset_id = "dataset1" + mock_dataset_list_item1.labels = {"env": "test"} + mock_dataset_list_item1.reference = "dataset1_reference" + mock_dataset_list_item1._properties = {"location": "US"} + + mock_dataset_list_item2 = MagicMock() + mock_dataset_list_item2.dataset_id = "dataset2" + mock_dataset_list_item2.labels = {"env": "prod"} + mock_dataset_list_item2.reference = "dataset2_reference" + mock_dataset_list_item2._properties = {"location": "EU"} + + # Mock full dataset objects (what get_dataset returns) + mock_full_dataset1 = MagicMock() + mock_full_dataset1.description = "Test dataset 1" + mock_full_dataset1.created = frozen_time + mock_full_dataset1.modified = frozen_time + timedelta(hours=1) + + mock_full_dataset2 = MagicMock() + mock_full_dataset2.description = None # Test missing description + mock_full_dataset2.created = None # Test missing created timestamp + mock_full_dataset2.modified = None # Test missing modified timestamp + + # Configure mocks + mock_bq_client.list_datasets.return_value = [ + mock_dataset_list_item1, + mock_dataset_list_item2, + ] + mock_bq_client.get_dataset.side_effect = lambda ref: { + "dataset1_reference": mock_full_dataset1, + "dataset2_reference": mock_full_dataset2, + }[ref] + + # Create BigQuerySchemaApi instance + config = BigQueryV2Config.parse_obj({"project_id": project_id}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) + schema_api = source.bq_schema_extractor.schema_api + + # Call the method + result = schema_api.get_datasets_for_project_id(project_id) + + # Assert correct number of datasets returned + assert len(result) == 2 + + # Assert first dataset + dataset1 = result[0] + assert dataset1.name == "dataset1" + assert dataset1.labels == {"env": "test"} + assert dataset1.location == "US" + assert dataset1.comment == "Test dataset 1" + assert dataset1.created == frozen_time + assert dataset1.last_altered == frozen_time + timedelta(hours=1) + + # Assert second dataset (with missing timestamps) + dataset2 = result[1] + assert dataset2.name == "dataset2" + assert dataset2.labels == {"env": "prod"} + assert dataset2.location == "EU" + assert dataset2.comment is None + assert dataset2.created is None + assert dataset2.last_altered is None + + # Verify get_dataset was called exactly once per dataset + assert mock_bq_client.get_dataset.call_count == 2 + mock_bq_client.get_dataset.assert_any_call("dataset1_reference") + mock_bq_client.get_dataset.assert_any_call("dataset2_reference") + + # Verify list_datasets was called once + mock_bq_client.list_datasets.assert_called_once_with(project_id, max_results=None) + + @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_simple_upstream_table_generation(get_bq_client_mock, get_projects_client):