2022-03-31 14:27:51 +05:30
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Query parser utils tests
"""
import json
from unittest import TestCase
from unittest . mock import patch
from sqlalchemy . types import ARRAY , DATE , Float , Integer , Numeric , String
from sqlalchemy_bigquery . _types import STRUCT
from metadata . generated . schema . entity . data . table import Column , Table , TableType
from metadata . ingestion . api . workflow import Workflow
from metadata . ingestion . models . ometa_table_db import OMetaDatabaseAndTable
CONFIG = """
{
" source " : {
" type " : " bigquery " ,
2022-04-13 08:33:33 -07:00
" serviceName " : " local_bigquery " ,
" serviceConnection " : {
" config " : {
" type " : " BigQuery " ,
2022-04-19 12:31:34 +02:00
" credentials " : {
" gcsConfig " : {
" type " : " service_account " ,
" projectId " : " projectID " ,
" privateKeyId " : " privateKeyId " ,
" privateKey " : " privateKey " ,
" clientEmail " : " clientEmail " ,
" clientId " : " clientId " ,
" authUri " : " https://accounts.google.com/o/oauth2/auth " ,
" tokenUri " : " https://oauth2.googleapis.com/token " ,
" authProviderX509CertUrl " : " https://www.googleapis.com/oauth2/v1/certs " ,
" clientX509CertUrl " : " https://cert.url "
}
2022-04-13 08:33:33 -07:00
}
2022-03-31 14:27:51 +05:30
}
2022-04-13 08:33:33 -07:00
} ,
" sourceConfig " : { " config " : { " enableDataProfiler " : false } }
2022-03-31 14:27:51 +05:30
} ,
" sink " : {
" type " : " file " ,
" config " : {
" filename " : " /var/tmp/datasets.json "
}
2022-04-13 08:33:33 -07:00
} ,
" workflowConfig " : {
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth "
2022-03-31 14:27:51 +05:30
}
}
}
"""
2022-06-06 10:48:23 +05:30
MOCK_GET_SOURCE_CONNECTION = " XXXXX-XXXX-XXXXX "
2022-04-13 08:33:33 -07:00
MOCK_GET_TABLE_NAMES = [ " test_schema_1.test_table_1 " , " test_schema_1.test_table_2 " ]
2022-03-31 14:27:51 +05:30
GET_TABLE_DESCRIPTIONS = { " text " : " Test " }
2022-04-13 08:33:33 -07:00
MOCK_GET_SCHEMA_NAMES = [ " test_schema_1 " ]
2022-03-31 14:27:51 +05:30
MOCK_UNIQUE_CONSTRAINTS = [ ]
MOCK_PK_CONSTRAINT = { " constrained_columns " : [ ] }
MOCK_GET_COLUMN = [
{
" name " : " region_name " ,
" type " : String ( ) ,
" nullable " : True ,
" comment " : " Name of the region within the country " ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " VARCHAR " ,
" policy_tags " : None ,
} ,
{
" name " : " date " ,
" type " : DATE ( ) ,
" nullable " : True ,
" comment " : " Date of the measured policy action status " ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " DATE " ,
" policy_tags " : None ,
} ,
{
" name " : " emergency_healthcare_investment " ,
" type " : Float ( ) ,
" nullable " : True ,
" comment " : " H4 - Short-term spending on, e.g, hospitals, masks, etc in USD; Record monetary value in USD of new short-term spending on health. If none, enter 0. No data - blank Please use the exchange rate of the date you are coding, not the current date. " ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " FLOAT " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_2 " ,
" type " : Integer ( ) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " INTEGER " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_3 " ,
" type " : ARRAY (
STRUCT ( bigquery_test_datatype_31 = ARRAY ( STRUCT ( record_test = String ( ) ) ) )
) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " ARRAY " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_3.bigquery_test_datatype_31 " ,
" type " : ARRAY ( STRUCT ( record_test = String ( ) ) ) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " ARRAY " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_4 " ,
" type " : ARRAY ( Numeric ( ) ) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " ARRAY " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_5 " ,
" type " : STRUCT (
bigquery_test_datatype_51 = ARRAY (
STRUCT ( bigquery_test_datatype_511 = ARRAY ( String ( ) ) )
)
) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " STRUCT<bigquery_test_datatype_51 ARRAY<STRUCT<bigquery_test_datatype_511 ARRAY<STRING>>>> " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_5.bigquery_test_datatype_51 " ,
" type " : ARRAY ( STRUCT ( bigquery_test_datatype_511 = ARRAY ( String ( ) ) ) ) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " ARRAY " ,
" policy_tags " : None ,
} ,
{
" name " : " bigquery_test_datatype_5.bigquery_test_datatype_51.bigquery_test_datatype_511 " ,
" type " : ARRAY ( String ( ) ) ,
" nullable " : True ,
" comment " : None ,
" default " : None ,
" precision " : None ,
" scale " : None ,
" max_length " : None ,
" raw_data_type " : " ARRAY " ,
" policy_tags " : None ,
} ,
]
MOCK_GET_VIEW_NAMES = [ ]
MOCK_GET_VIEW_DEFINITION = " "
MOCK_GET_INDEXES = [
{ " name " : " partition " , " column_names " : [ " region_name " ] , " unique " : False } ,
{
" name " : " clustering " ,
" column_names " : [ " bigquery_test_datatype_5 " ] ,
" unique " : False ,
} ,
]
def execute_workflow ( config_dict ) :
workflow = Workflow . create ( config_dict )
workflow . execute ( )
workflow . print_status ( )
workflow . stop ( )
class BigQueryIngestionTest ( TestCase ) :
@patch ( " sqlalchemy.engine.reflection.Inspector.get_indexes " )
2022-05-26 21:00:18 +02:00
@patch (
" metadata.ingestion.source.database.bigquery.BigquerySource.fetch_sample_data "
)
2022-03-31 14:27:51 +05:30
@patch ( " sqlalchemy.engine.reflection.Inspector.get_view_definition " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_view_names " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_table_comment " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_table_names " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_schema_names " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_unique_constraints " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_pk_constraint " )
@patch ( " sqlalchemy.engine.reflection.Inspector.get_columns " )
2022-06-06 10:48:23 +05:30
@patch ( " google.auth.default " )
2022-03-31 14:27:51 +05:30
@patch ( " sqlalchemy.engine.base.Engine.connect " )
@patch ( " sqlalchemy_bigquery._helpers.create_bigquery_client " )
def test_bigquery_ingestion (
self ,
mock_connect ,
mock_create_bigquery_client ,
2022-06-06 10:48:23 +05:30
auth_default ,
2022-03-31 14:27:51 +05:30
get_columns ,
get_pk_constraint ,
get_unique_constraints ,
get_schema_names ,
get_table_names ,
get_table_comment ,
get_view_names ,
get_view_definition ,
fetch_sample_data ,
get_indexes ,
) :
get_schema_names . return_value = MOCK_GET_SCHEMA_NAMES
get_table_names . return_value = MOCK_GET_TABLE_NAMES
get_table_comment . return_value = GET_TABLE_DESCRIPTIONS
get_unique_constraints . return_value = MOCK_UNIQUE_CONSTRAINTS
get_pk_constraint . return_value = MOCK_PK_CONSTRAINT
get_columns . return_value = MOCK_GET_COLUMN
get_view_names . return_value = MOCK_GET_VIEW_NAMES
get_view_definition . return_value = MOCK_GET_VIEW_DEFINITION
fetch_sample_data . return_value = [ ]
2022-06-06 10:48:23 +05:30
auth_default . return_value = ( None , MOCK_GET_SOURCE_CONNECTION )
2022-03-31 14:27:51 +05:30
get_indexes . return_value = MOCK_GET_INDEXES
execute_workflow ( json . loads ( CONFIG ) )
config = json . loads ( CONFIG )
file_data = open ( config [ " sink " ] [ " config " ] [ " filename " ] )
2022-04-13 08:33:33 -07:00
file_sink = json . load ( file_data )
for ometa_data in file_sink :
table = ometa_data . get ( " table " )
2022-04-14 11:22:39 +02:00
_ : OMetaDatabaseAndTable = OMetaDatabaseAndTable . parse_obj ( ometa_data )
_ : Table = Table . parse_obj ( table )
2022-03-31 14:27:51 +05:30
assert table . get ( " description " ) == GET_TABLE_DESCRIPTIONS . get ( " text " )
2022-04-13 08:33:33 -07:00
table_name = (
f " { ometa_data . get ( ' database_schema ' ) . get ( ' name ' ) } . { table . get ( ' name ' ) } "
)
2022-03-31 14:27:51 +05:30
if table . get ( " tableType " ) == TableType . Regular . value :
assert table_name in MOCK_GET_TABLE_NAMES
for column in table . get ( " columns " ) :
2022-04-14 11:22:39 +02:00
_ : Column = Column . parse_obj ( column )
2022-03-31 14:27:51 +05:30
if column in MOCK_UNIQUE_CONSTRAINTS :
assert Column . constraint . UNIQUE == column . get ( " constraint " )
if column in MOCK_PK_CONSTRAINT . get ( " constrained_columns " ) :
assert Column . constraint . PRIMARY_KEY == column . get ( " constraint " )
if table . get ( " name " ) in MOCK_GET_VIEW_NAMES :
assert table . get ( " tableType " ) == TableType . View . value
assert table . get ( " viewDefinition " ) == MOCK_GET_VIEW_DEFINITION