import random import string from datetime import datetime, timezone from typing import Any, Dict, Optional from unittest.mock import MagicMock, patch import pytest from freezegun import freeze_time from google.cloud.bigquery.table import TableListItem from datahub.api.entities.platformresource.platform_resource import ( PlatformResource, PlatformResourceKey, ) from datahub.ingestion.glossary.classifier import ( ClassificationConfig, DynamicTypedClassifierConfig, ) from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_data_reader import BigQueryDataReader from datahub.ingestion.source.bigquery_v2.bigquery_platform_resource_helper import ( BigQueryLabelInfo, BigQueryPlatformResourceHelper, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryDataset, BigqueryProject, BigQuerySchemaApi, BigqueryTable, BigqueryTableSnapshot, BigqueryView, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import ( BigQuerySchemaGenerator, BigQueryV2Config, ) from datahub.testing import mce_helpers from tests.test_helpers.state_helpers import run_and_get_pipeline FROZEN_TIME = "2022-02-03 07:00:00" def random_email(): return ( "".join( [ random.choice(string.ascii_lowercase) for i in range(random.randint(10, 15)) ] ) + "@xyz.com" ) def recipe(mcp_output_path: str, source_config_override: Optional[dict] = None) -> dict: source_config_override = source_config_override or {} return { "source": { "type": "bigquery", "config": { "project_ids": ["project-id-1"], "credential": { "project_id": "project-id-1", "private_key_id": "private_key_id", "private_key": "private_key", "client_email": "client_email", "client_id": "client_id", }, "include_usage_statistics": False, "include_table_lineage": True, "include_data_platform_instance": True, "capture_table_label_as_tag": True, "capture_dataset_label_as_tag": True, "classification": ClassificationConfig( enabled=True, classifiers=[ DynamicTypedClassifierConfig( type="datahub", config=DataHubClassifierConfig( minimum_values_threshold=1, ), ) ], max_workers=1, ).dict(), **source_config_override, }, }, "sink": {"type": "file", "config": {"filename": mcp_output_path}}, } @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_snapshots_for_dataset") @patch.object(BigQuerySchemaApi, "get_views_for_dataset") @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch.object(BigQueryPlatformResourceHelper, "get_platform_resource") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_v2_ingest( client, policy_tag_manager_client, projects_client, get_platform_resource, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, get_core_table_details, get_tables_for_dataset, get_views_for_dataset, get_snapshots_for_dataset, pytestconfig, tmp_path, ): test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" mcp_golden_path = f"{test_resources_dir}/bigquery_mcp_golden.json" mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") dataset_name = "bigquery-dataset-1" def side_effect(*args: Any) -> Optional[PlatformResource]: if args[0].primary_key == "mixedcasetag": return PlatformResource.create( key=PlatformResourceKey( primary_key="mixedcasetag", resource_type="BigQueryLabelInfo", platform="bigquery", ), value=BigQueryLabelInfo( datahub_urn="urn:li:tag:MixedCaseTag", managed_by_datahub=True, key="mixedcasetag", value="", ), ) return None get_platform_resource.side_effect = side_effect get_datasets_for_project_id.return_value = [ # BigqueryDataset(name=dataset_name, location="US") BigqueryDataset( name=dataset_name, location="US", labels={"priority": "medium:test"} ) ] table_list_item = TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} ) table_name = "table-1" snapshot_table_name = "snapshot-table-1" view_name = "view-1" get_core_table_details.return_value = {table_name: table_list_item} columns = [ BigqueryColumn( name="age", ordinal_position=1, is_nullable=False, field_path="col_1", data_type="INT", comment="comment", is_partition_column=False, cluster_column_position=None, policy_tags=["Test Policy Tag"], ), BigqueryColumn( name="email", ordinal_position=1, is_nullable=False, field_path="col_2", data_type="STRING", comment="comment", is_partition_column=False, cluster_column_position=None, ), ] get_columns_for_dataset.return_value = { table_name: columns, snapshot_table_name: columns, view_name: columns, } get_sample_data_for_table.return_value = { "age": [random.randint(1, 80) for i in range(20)], "email": [random_email() for i in range(20)], } bigquery_table = BigqueryTable( name=table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, labels={ "priority": "high", "purchase": "", "mixedcasetag": "", }, ) get_tables_for_dataset.return_value = iter([bigquery_table]) snapshot_table = BigqueryTableSnapshot( name=snapshot_table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, base_table_identifier=BigqueryTableIdentifier( project_id="project-id-1", dataset="bigquery-dataset-1", table="table-1", ), ) get_snapshots_for_dataset.return_value = iter([snapshot_table]) bigquery_view = BigqueryView( name=view_name, comment=None, created=None, view_definition=f"create view `{dataset_name}.view-1` as select email from `{dataset_name}.table-1`", last_altered=None, size_in_bytes=None, rows_count=None, materialized=False, ) get_views_for_dataset.return_value = iter([bigquery_view]) pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, ) @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, attribute="get_projects_with_labels") @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_v2_project_labels_ingest( client, policy_tag_manager_client, projects_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, get_core_table_details, get_tables_for_dataset, get_projects_with_labels, pytestconfig, tmp_path, ): test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" mcp_golden_path = f"{test_resources_dir}/bigquery_project_label_mcp_golden.json" mcp_output_path = "{}/{}".format(tmp_path, "bigquery_project_label_mcp_output.json") get_datasets_for_project_id.return_value = [ BigqueryDataset(name="bigquery-dataset-1") ] get_projects_with_labels.return_value = [ BigqueryProject(id="dev", name="development") ] table_list_item = TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} ) table_name = "table-1" get_core_table_details.return_value = {table_name: table_list_item} get_columns_for_dataset.return_value = { table_name: [ BigqueryColumn( name="age", ordinal_position=1, is_nullable=False, field_path="col_1", data_type="INT", comment="comment", is_partition_column=False, cluster_column_position=None, policy_tags=["Test Policy Tag"], ), BigqueryColumn( name="email", ordinal_position=1, is_nullable=False, field_path="col_2", data_type="STRING", comment="comment", is_partition_column=False, cluster_column_position=None, ), ] } get_sample_data_for_table.return_value = { "age": [random.randint(1, 80) for i in range(20)], "email": [random_email() for i in range(20)], } bigquery_table = BigqueryTable( name=table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, ) get_tables_for_dataset.return_value = iter([bigquery_table]) pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) del pipeline_config_dict["source"]["config"]["project_ids"] pipeline_config_dict["source"]["config"]["project_labels"] = [ "environment:development" ] run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, ) @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_snapshots_for_dataset") @patch.object(BigQuerySchemaApi, "get_views_for_dataset") @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_queries_v2_ingest( client, policy_tag_manager_client, projects_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, get_core_table_details, get_tables_for_dataset, get_views_for_dataset, get_snapshots_for_dataset, pytestconfig, tmp_path, ): test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" mcp_golden_path = f"{test_resources_dir}/bigquery_mcp_queries_golden.json" mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_queries_output.json") dataset_name = "bigquery-dataset-1" get_datasets_for_project_id.return_value = [ BigqueryDataset(name=dataset_name, location="US") ] table_list_item = TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} ) table_name = "table-1" snapshot_table_name = "snapshot-table-1" view_name = "view-1" get_core_table_details.return_value = {table_name: table_list_item} columns = [ BigqueryColumn( name="age", ordinal_position=1, is_nullable=False, field_path="col_1", data_type="INT", comment="comment", is_partition_column=False, cluster_column_position=None, policy_tags=["Test Policy Tag"], ), BigqueryColumn( name="email", ordinal_position=1, is_nullable=False, field_path="col_2", data_type="STRING", comment="comment", is_partition_column=False, cluster_column_position=None, ), ] get_columns_for_dataset.return_value = { table_name: columns, snapshot_table_name: columns, view_name: columns, } get_sample_data_for_table.return_value = { "age": [random.randint(1, 80) for i in range(20)], "email": [random_email() for i in range(20)], } bigquery_table = BigqueryTable( name=table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, ) get_tables_for_dataset.return_value = iter([bigquery_table]) bigquery_view = BigqueryView( name=view_name, comment=None, created=None, view_definition=f"create view `{dataset_name}.view-1` as select email from `{dataset_name}.table-1`", last_altered=None, size_in_bytes=None, rows_count=None, materialized=False, ) get_views_for_dataset.return_value = iter([bigquery_view]) snapshot_table = BigqueryTableSnapshot( name=snapshot_table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, base_table_identifier=BigqueryTableIdentifier( project_id="project-id-1", dataset="bigquery-dataset-1", table="table-1", ), ) get_snapshots_for_dataset.return_value = iter([snapshot_table]) # Even if `include_table_lineage` is disabled, we still ingest view and snapshot lineage # if use_queries_v2 is set. pipeline_config_dict: Dict[str, Any] = recipe( mcp_output_path=mcp_output_path, source_config_override={"use_queries_v2": True, "include_table_lineage": False}, ) run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, ) @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQueryV2Config, "get_bigquery_client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_queries_v2_lineage_usage_ingest( projects_client, policy_tag_manager_client, get_bigquery_client, get_datasets_for_project_id, pytestconfig, tmp_path, ): test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" mcp_golden_path = f"{test_resources_dir}/bigquery_lineage_usage_golden.json" mcp_output_path = "{}/{}".format(tmp_path, "bigquery_lineage_usage_output.json") dataset_name = "bigquery-dataset-1" get_datasets_for_project_id.return_value = [BigqueryDataset(name=dataset_name)] client = MagicMock() get_bigquery_client.return_value = client client.list_tables.return_value = [ TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": "table-1"}} ), TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": "view-1"}} ), ] # mocking the query results for fetching audit log # note that this is called twice, once for each region client.query.return_value = [ { "job_id": "1", "project_id": "project-id-1", "creation_time": datetime.now(timezone.utc), "user_email": "foo@xyz.com", "query": "select * from `bigquery-dataset-1`.`table-1`", "session_id": None, "query_hash": None, "statement_type": "SELECT", "destination_table": None, "referenced_tables": None, }, { "job_id": "2", "project_id": "project-id-1", "creation_time": datetime.now(timezone.utc), "user_email": "foo@xyz.com", "query": "create view `bigquery-dataset-1`.`view-1` as select * from `bigquery-dataset-1`.`table-1`", "session_id": None, "query_hash": None, "statement_type": "CREATE", "destination_table": None, "referenced_tables": None, }, { "job_id": "3", "project_id": "project-id-1", "creation_time": datetime.now(timezone.utc), "user_email": "service_account@xyz.com", "query": """\ select * from `bigquery-dataset-1`.`view-1` LIMIT 100 -- {"user":"@bar","email":"bar@xyz.com","url":"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37","scheduled":false} """, "session_id": None, "query_hash": None, "statement_type": "SELECT", "destination_table": None, "referenced_tables": None, }, { "job_id": "4", "project_id": "project-id-1", "creation_time": datetime.now(timezone.utc), "user_email": "service_account@xyz.com", "query": """\ select * from `bigquery-dataset-1`.`view-1` LIMIT 100 -- {"user":"@foo","email":"foo@xyz.com","url":"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37","scheduled":false} """, "session_id": None, "query_hash": None, "statement_type": "SELECT", "destination_table": None, "referenced_tables": None, }, ] pipeline_config_dict: Dict[str, Any] = recipe( mcp_output_path=mcp_output_path, source_config_override={ "use_queries_v2": True, "include_schema_metadata": False, "include_table_lineage": True, "include_usage_statistics": True, "classification": {"enabled": False}, }, ) run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, ) @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_snapshots_for_dataset") @patch.object(BigQuerySchemaApi, "get_views_for_dataset") @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") @pytest.mark.parametrize( "use_queries_v2, include_table_lineage, include_usage_statistics, golden_file", [ (True, False, False, "bigquery_mcp_lineage_golden_1.json"), (True, True, False, "bigquery_mcp_lineage_golden_1.json"), (False, False, True, "bigquery_mcp_lineage_golden_2.json"), (False, True, True, "bigquery_mcp_lineage_golden_2.json"), ], ) def test_bigquery_lineage_v2_ingest_view_snapshots( client, policy_tag_manager_client, projects_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, get_core_table_details, get_tables_for_dataset, get_views_for_dataset, get_snapshots_for_dataset, pytestconfig, tmp_path, use_queries_v2, include_table_lineage, include_usage_statistics, golden_file, ): test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" mcp_golden_path = f"{test_resources_dir}/{golden_file}" mcp_output_path = "{}/{}_output.json".format(tmp_path, golden_file) dataset_name = "bigquery-dataset-1" get_datasets_for_project_id.return_value = [ BigqueryDataset(name=dataset_name, location="US") ] table_list_item = TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} ) table_name = "table-1" snapshot_table_name = "snapshot-table-1" view_name = "view-1" get_core_table_details.return_value = {table_name: table_list_item} columns = [ BigqueryColumn( name="age", ordinal_position=1, is_nullable=False, field_path="col_1", data_type="INT", comment="comment", is_partition_column=False, cluster_column_position=None, policy_tags=["Test Policy Tag"], ), BigqueryColumn( name="email", ordinal_position=1, is_nullable=False, field_path="col_2", data_type="STRING", comment="comment", is_partition_column=False, cluster_column_position=None, ), ] get_columns_for_dataset.return_value = { table_name: columns, snapshot_table_name: columns, view_name: columns, } get_sample_data_for_table.return_value = { "age": [random.randint(1, 80) for i in range(20)], "email": [random_email() for i in range(20)], } bigquery_table = BigqueryTable( name=table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, ) get_tables_for_dataset.return_value = iter([bigquery_table]) bigquery_view = BigqueryView( name=view_name, comment=None, created=None, view_definition=f"create view `{dataset_name}.view-1` as select email from `{dataset_name}.table-1`", last_altered=None, size_in_bytes=None, rows_count=None, materialized=False, ) get_views_for_dataset.return_value = iter([bigquery_view]) snapshot_table = BigqueryTableSnapshot( name=snapshot_table_name, comment=None, created=None, last_altered=None, size_in_bytes=None, rows_count=None, base_table_identifier=BigqueryTableIdentifier( project_id="project-id-1", dataset="bigquery-dataset-1", table="table-1", ), ) get_snapshots_for_dataset.return_value = iter([snapshot_table]) pipeline_config_dict: Dict[str, Any] = recipe( mcp_output_path=mcp_output_path, source_config_override={ "use_queries_v2": use_queries_v2, "include_table_lineage": include_table_lineage, "include_usage_statistics": include_usage_statistics, "classification": {"enabled": False}, }, ) run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, )