From 83d2811c0740c15d78bca59f355b57f6fa546ee3 Mon Sep 17 00:00:00 2001 From: Teddy Date: Tue, 6 May 2025 15:58:37 +0200 Subject: [PATCH] MINOR: data sample ingestion bigquery (#21074) * fix: data sample ingestion bigquery * style: ran python linting * fix: flaky test in topology (cherry picked from commit a853561d30fcab34796a06c0e75ec0bb4f20e1f4) --- .../sampler/sqlalchemy/bigquery/sampler.py | 65 +------------------ .../metadata/sampler/sqlalchemy/sampler.py | 26 ++------ ingestion/tests/unit/topology/test_runner.py | 2 +- 3 files changed, 9 insertions(+), 84 deletions(-) diff --git a/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py index 3f4da8a849f..b74c375169f 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py @@ -12,18 +12,16 @@ Helper module to handle data sampling for the profiler """ -import traceback -from typing import Dict, List, Optional, Union +from typing import Dict, Optional, Union from sqlalchemy import Column from sqlalchemy import Table as SqaTable -from sqlalchemy import inspect, text +from sqlalchemy import text from sqlalchemy.orm import Query from metadata.generated.schema.entity.data.table import ( ProfileSampleType, Table, - TableData, TableType, ) from metadata.generated.schema.entity.services.connections.connectionBasicType import ( @@ -130,62 +128,3 @@ class BigQuerySampler(SQASampler): ) return super().get_sample_query(column=column) - - def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData: - """ - Use the sampler to retrieve sample data rows as per limit given by user - - Args: - columns (Optional[List]): List of columns to fetch - Returns: - TableData to be added to the Table Entity - """ - if self.sample_query: - return self._fetch_sample_data_from_user_query() - - # Add new RandomNumFn column - ds = self.get_dataset() - if not columns: - sqa_columns = [col for col in inspect(ds).c if col.name != RANDOM_LABEL] - else: - # we can't directly use columns as it is bound to self.raw_dataset and not the rnd table. - # If we use it, it will result in a cross join between self.raw_dataset and rnd table - names = [col.name for col in columns] - sqa_columns = [ - col - for col in inspect(ds).c - if col.name != RANDOM_LABEL and col.name in names - ] - with self.get_client() as client: - try: - query = ( - str(client.query(*sqa_columns)).replace( - f"`{ds.__table_args__['schema']}`.`{ds.__tablename__}`", - f"`{self.entity.database.name}`.`{ds.__table_args__['schema']}`.`{ds.__tablename__}`", - ) - + f" LIMIT {self.sample_limit}" - ) - sqa_sample = client.execute(text(query)) - except Exception: - logger.debug( - "Cannot fetch sample data with random sampling. Falling back to 100 rows." - ) - logger.debug(traceback.format_exc()) - ds_inspect = inspect(self.raw_dataset) - sqa_columns = list(ds_inspect.c) - - schema = ds_inspect.class_.__table_args__["schema"] - table = ds_inspect.class_.__tablename__ - - query = ( - str(client.query(*sqa_columns)).replace( - f"`{schema}`.`{table}`", - f"`{self.entity.database.name}`.`{schema}`.`{table}`", - ) - + f" LIMIT {self.sample_limit}" - ) - sqa_sample = client.execute(text(query)) - return TableData( - columns=[column.name for column in sqa_columns], - rows=[list(row) for row in sqa_sample], - ) diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index 083f7214edf..e030e9c51f6 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -13,7 +13,6 @@ Helper module to handle data sampling for the profiler """ import hashlib -import traceback from typing import List, Optional, Union, cast from sqlalchemy import Column, inspect, text @@ -199,25 +198,12 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin): if col.name != RANDOM_LABEL and col.name in names ] with self.get_client() as client: - try: - sqa_sample = ( - client.query(*sqa_columns) - .select_from(ds) - .limit(self.sample_limit) - .all() - ) - except Exception: - logger.debug( - "Cannot fetch sample data with random sampling. Falling back to 100 rows." - ) - logger.debug(traceback.format_exc()) - sqa_columns = list(inspect(self.raw_dataset).c) - sqa_sample = ( - client.query(*sqa_columns) - .select_from(self.raw_dataset) - .limit(100) - .all() - ) + sqa_sample = ( + client.query(*sqa_columns) + .select_from(ds) + .limit(self.sample_limit) + .all() + ) return TableData( columns=[column.name for column in sqa_columns], rows=[list(row) for row in sqa_sample], diff --git a/ingestion/tests/unit/topology/test_runner.py b/ingestion/tests/unit/topology/test_runner.py index a480a7b1d49..1f83dff8071 100644 --- a/ingestion/tests/unit/topology/test_runner.py +++ b/ingestion/tests/unit/topology/test_runner.py @@ -186,7 +186,7 @@ class TopologyRunnerTest(TestCase): # will get. self.assertGreater(len(self.source.context.contexts.keys()), 1) - self.assertEqual( + self.assertCountEqual( # check the post process being at the end [ either.right if hasattr(either, "right") else either