Fixes Issue #11803 #12103 - Add BigQuery Struct Support (#12435)

* ref: implemented interface for profiler components + removed struct logic

* ref: ran python linting

* ref: added UML diagram to readme.md

* ref: empty commit for labeler check

* ref: remove multiple context manager for 3.7 3.8 compatibility

* ref: remove

* fix: mapper logic for BQ struct types

* feat: added BQ support for structs

* feat: clean code smell + handle null self.col.table value

* feat: ran python linting

* feat: updated test for profiler handler + disabled flaky test

* Update ingestion/tests/unit/profiler/pandas/test_sample.py

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>

---------

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
Teddy 2023-07-14 09:12:46 +02:00 committed by GitHub
parent 3433e2b5dd
commit 42a426226e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 192 additions and 15 deletions

View File

@ -13,6 +13,7 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from sqlalchemy import Column, inspect
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
@ -23,6 +24,23 @@ from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
class BigQueryProfilerInterface(SQAProfilerInterface):
"""BigQuery profiler interface"""
def _get_struct_columns(self, columns: dict, parent: str):
""""""
# pylint: disable=import-outside-toplevel
from sqlalchemy_bigquery import STRUCT
columns_list = []
for key, value in columns.items():
if not isinstance(value, STRUCT):
col = Column(f"{parent}.{key}", value)
columns_list.append(col)
else:
col = self._get_struct_columns(
value.__dict__.get("_STRUCT_byname"), f"{parent}.{key}"
)
columns_list.extend(col)
return columns_list
def _get_sampler(self, **kwargs):
"""get sampler object"""
session = kwargs.get("session")
@ -37,3 +55,20 @@ class BigQueryProfilerInterface(SQAProfilerInterface):
profile_sample_query=self.profile_query,
table_type=self.table_entity.tableType,
)
def get_columns(self) -> Column:
"""Get columns from table"""
# pylint: disable=import-outside-toplevel
from sqlalchemy_bigquery import STRUCT
columns = []
for column in inspect(self.table).c:
if isinstance(column.type, STRUCT):
columns.extend(
self._get_struct_columns(
column.type.__dict__.get("_STRUCT_byname"), column.name
)
)
else:
columns.append(column)
return columns

View File

@ -52,10 +52,19 @@ class FirstQuartile(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
return MedianFn(column(self.col.name), self.col.table.fullname, 0.25)
# col fullname is only needed for MySQL and SQLite
return MedianFn(
column(self.col.name),
self.col.table.fullname if self.col.table is not None else None,
0.25,
)
if is_concatenable(self.col.type):
return MedianFn(LenFn(column(self.col.name)), self.col.table.fullname, 0.25)
return MedianFn(
LenFn(column(self.col.name)),
self.col.table.fullname if self.col.table is not None else None,
0.25,
)
logger.debug(
f"Don't know how to process type {self.col.type} when computing First Quartile"

View File

@ -52,10 +52,19 @@ class Median(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
return MedianFn(column(self.col.name), self.col.table.fullname, 0.5)
# col fullname is only needed for MySQL and SQLite
return MedianFn(
column(self.col.name),
self.col.table.fullname if self.col.table is not None else None,
0.5,
)
if is_concatenable(self.col.type):
return MedianFn(LenFn(column(self.col.name)), self.col.table.fullname, 0.5)
return MedianFn(
LenFn(column(self.col.name)),
self.col.table.fullname if self.col.table is not None else None,
0.5,
)
logger.debug(
f"Don't know how to process type {self.col.type} when computing Median"

View File

@ -52,10 +52,19 @@ class ThirdQuartile(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
return MedianFn(column(self.col.name), self.col.table.fullname, 0.75)
# col fullname is only needed for MySQL and SQLite
return MedianFn(
column(self.col.name),
self.col.table.fullname if self.col.table is not None else None,
0.75,
)
if is_concatenable(self.col.type):
return MedianFn(LenFn(column(self.col.name)), self.col.table.fullname, 0.75)
return MedianFn(
LenFn(column(self.col.name)),
self.col.table.fullname if self.col.table is not None else None,
0.75,
)
logger.debug(
f"Don't know how to process type {self.col.type} when computing Third Quartile"

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source import sqa_types
from metadata.profiler.orm.registry import CustomTypes
from metadata.profiler.source.bigquery.type_mapper import bigquery_type_mapper
Base = declarative_base()
@ -87,6 +88,12 @@ def map_types(col: Column, table_service_type):
return VARIANT
if (
table_service_type == databaseService.DatabaseServiceType.BigQuery
and col.dataType == DataType.STRUCT
):
return bigquery_type_mapper(_TYPE_MAP, col)
return _TYPE_MAP.get(col.dataType)

View File

@ -194,12 +194,12 @@ class Profiler(Generic[TMetric]):
"""
for attrs, val in profile.tableProfile:
if attrs not in {"timestamp", "profileSample", "profileSampleType"} and val:
return profile
return
for col_element in profile.columnProfile:
for attrs, val in col_element:
if attrs not in {"timestamp", "name"} and val is not None:
return profile
return
raise RuntimeError(
f"No profile data computed for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}"
@ -468,7 +468,8 @@ class Profiler(Generic[TMetric]):
if process_pii_sensitive:
self.process_pii_sensitive(sample_data)
profile = self._check_profile_and_handle(self.get_profile())
profile = self.get_profile()
self._check_profile_and_handle(profile)
table_profile = ProfilerResponse(
table=self.profiler_interface.table_entity,

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Type mapper for bigquery specific types
"""
from metadata.generated.schema.entity.data.table import Column, DataType
def bigquery_type_mapper(_type_map: dict, col: Column):
"""Map the bigquery data types to the sqlalchemy data types
Args:
_type_map (dict): a dict of bigquery data types to sqlalchemy data types
col (Column): a column entity
Returns:
sqlalchemy data type
"""
# pylint: disable=import-outside-toplevel
from sqlalchemy_bigquery import STRUCT
def build_struct(_type_map: dict, col: Column):
structs = []
for child in col.children:
if child.dataType != DataType.STRUCT:
if child.arrayDataType:
type_ = _type_map.get(child.dataType)(item_type=child.arrayDataType)
else:
type_ = _type_map.get(child.dataType)
structs.append((child.name.__root__, type_))
else:
nested_structs = build_struct(_type_map, child)
structs.append((child.name.__root__, STRUCT(*nested_structs)))
return structs
return STRUCT(*build_struct(_type_map, col))

View File

@ -226,7 +226,7 @@ class ProfilerTest(TestCase):
profiler_interface=self.datalake_profiler_interface,
)
profile = profiler._check_profile_and_handle(
profiler._check_profile_and_handle(
CreateTableProfileRequest(
tableProfile=TableProfile(
timestamp=datetime.now().timestamp(), columnCount=10
@ -234,8 +234,6 @@ class ProfilerTest(TestCase):
)
)
assert profile.tableProfile.columnCount == 10
with pytest.raises(Exception):
profiler._check_profile_and_handle(
CreateTableProfileRequest(

View File

@ -16,6 +16,7 @@ import os
from unittest import TestCase, mock
from uuid import uuid4
import pytest
from sqlalchemy import TEXT, Column, Integer, String, func
from sqlalchemy.orm import declarative_base
@ -170,6 +171,7 @@ class DatalakeSampleTest(TestCase):
res = profiler.compute_metrics()._table_results
assert res.get(Metrics.ROW_COUNT.name) == 3
@pytest.mark.skip(reason="Flaky test due to small sample size")
def test_random_sample_histogram(self):
"""
Histogram should run correctly

View File

@ -0,0 +1,63 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test we map correctly struct columns for BQ
"""
from metadata.generated.schema.entity.data.table import Column
from metadata.profiler.orm.converter import _TYPE_MAP
from metadata.profiler.source.bigquery.type_mapper import bigquery_type_mapper
def test_map_struct():
column = Column.parse_obj(
{
"name": "col",
"dataType": "STRUCT",
"children": [
{
"name": "col1",
"dataType": "STRING",
"children": [],
},
{
"name": "col2",
"dataType": "STRUCT",
"children": [
{
"name": "col3",
"dataType": "STRING",
"children": [],
},
{
"name": "col4",
"dataType": "STRUCT",
"children": [
{
"name": "col5",
"dataType": "ARRAY",
"arrayDataType": "STRING",
"children": [],
}
],
},
],
},
],
}
)
type_ = bigquery_type_mapper(_TYPE_MAP, column)
assert (
type_.__repr__()
== "STRUCT(col1=String(), col2=STRUCT(col3=String(), col4=STRUCT(col5=CustomArray(<DataType.STRING: 'STRING'>))))"
)

View File

@ -214,7 +214,7 @@ class ProfilerTest(TestCase):
profiler_interface=self.sqa_profiler_interface,
)
profile = profiler._check_profile_and_handle(
profiler._check_profile_and_handle(
CreateTableProfileRequest(
tableProfile=TableProfile(
timestamp=datetime.now().timestamp(), columnCount=10
@ -222,8 +222,6 @@ class ProfilerTest(TestCase):
)
)
assert profile.tableProfile.columnCount == 10
with pytest.raises(Exception):
profiler._check_profile_and_handle(
CreateTableProfileRequest(