MINOR: data sample ingestion bigquery (#21074)

* fix: data sample ingestion bigquery

* style: ran python linting

* fix: flaky test in topology

(cherry picked from commit a853561d30fcab34796a06c0e75ec0bb4f20e1f4)
This commit is contained in:
Teddy 2025-05-06 15:58:37 +02:00 committed by OpenMetadata Release Bot
parent 5ab9ea2240
commit 83d2811c07
3 changed files with 9 additions and 84 deletions

View File

@ -12,18 +12,16 @@
Helper module to handle data sampling Helper module to handle data sampling
for the profiler for the profiler
""" """
import traceback from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union
from sqlalchemy import Column from sqlalchemy import Column
from sqlalchemy import Table as SqaTable from sqlalchemy import Table as SqaTable
from sqlalchemy import inspect, text from sqlalchemy import text
from sqlalchemy.orm import Query from sqlalchemy.orm import Query
from metadata.generated.schema.entity.data.table import ( from metadata.generated.schema.entity.data.table import (
ProfileSampleType, ProfileSampleType,
Table, Table,
TableData,
TableType, TableType,
) )
from metadata.generated.schema.entity.services.connections.connectionBasicType import ( from metadata.generated.schema.entity.services.connections.connectionBasicType import (
@ -130,62 +128,3 @@ class BigQuerySampler(SQASampler):
) )
return super().get_sample_query(column=column) return super().get_sample_query(column=column)
def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData:
"""
Use the sampler to retrieve sample data rows as per limit given by user
Args:
columns (Optional[List]): List of columns to fetch
Returns:
TableData to be added to the Table Entity
"""
if self.sample_query:
return self._fetch_sample_data_from_user_query()
# Add new RandomNumFn column
ds = self.get_dataset()
if not columns:
sqa_columns = [col for col in inspect(ds).c if col.name != RANDOM_LABEL]
else:
# we can't directly use columns as it is bound to self.raw_dataset and not the rnd table.
# If we use it, it will result in a cross join between self.raw_dataset and rnd table
names = [col.name for col in columns]
sqa_columns = [
col
for col in inspect(ds).c
if col.name != RANDOM_LABEL and col.name in names
]
with self.get_client() as client:
try:
query = (
str(client.query(*sqa_columns)).replace(
f"`{ds.__table_args__['schema']}`.`{ds.__tablename__}`",
f"`{self.entity.database.name}`.`{ds.__table_args__['schema']}`.`{ds.__tablename__}`",
)
+ f" LIMIT {self.sample_limit}"
)
sqa_sample = client.execute(text(query))
except Exception:
logger.debug(
"Cannot fetch sample data with random sampling. Falling back to 100 rows."
)
logger.debug(traceback.format_exc())
ds_inspect = inspect(self.raw_dataset)
sqa_columns = list(ds_inspect.c)
schema = ds_inspect.class_.__table_args__["schema"]
table = ds_inspect.class_.__tablename__
query = (
str(client.query(*sqa_columns)).replace(
f"`{schema}`.`{table}`",
f"`{self.entity.database.name}`.`{schema}`.`{table}`",
)
+ f" LIMIT {self.sample_limit}"
)
sqa_sample = client.execute(text(query))
return TableData(
columns=[column.name for column in sqa_columns],
rows=[list(row) for row in sqa_sample],
)

View File

@ -13,7 +13,6 @@ Helper module to handle data sampling
for the profiler for the profiler
""" """
import hashlib import hashlib
import traceback
from typing import List, Optional, Union, cast from typing import List, Optional, Union, cast
from sqlalchemy import Column, inspect, text from sqlalchemy import Column, inspect, text
@ -199,25 +198,12 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin):
if col.name != RANDOM_LABEL and col.name in names if col.name != RANDOM_LABEL and col.name in names
] ]
with self.get_client() as client: with self.get_client() as client:
try:
sqa_sample = ( sqa_sample = (
client.query(*sqa_columns) client.query(*sqa_columns)
.select_from(ds) .select_from(ds)
.limit(self.sample_limit) .limit(self.sample_limit)
.all() .all()
) )
except Exception:
logger.debug(
"Cannot fetch sample data with random sampling. Falling back to 100 rows."
)
logger.debug(traceback.format_exc())
sqa_columns = list(inspect(self.raw_dataset).c)
sqa_sample = (
client.query(*sqa_columns)
.select_from(self.raw_dataset)
.limit(100)
.all()
)
return TableData( return TableData(
columns=[column.name for column in sqa_columns], columns=[column.name for column in sqa_columns],
rows=[list(row) for row in sqa_sample], rows=[list(row) for row in sqa_sample],

View File

@ -186,7 +186,7 @@ class TopologyRunnerTest(TestCase):
# will get. # will get.
self.assertGreater(len(self.source.context.contexts.keys()), 1) self.assertGreater(len(self.source.context.contexts.keys()), 1)
self.assertEqual( self.assertCountEqual(
# check the post process being at the end # check the post process being at the end
[ [
either.right if hasattr(either, "right") else either either.right if hasattr(either, "right") else either