2025-04-03 10:39:47 +05:30
|
|
|
# Copyright 2025 Collate
|
|
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
2022-09-26 19:41:40 +05:30
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
2025-04-03 10:39:47 +05:30
|
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
2022-09-26 19:41:40 +05:30
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
2022-11-10 10:54:31 +01:00
|
|
|
from typing import Optional, cast
|
2022-09-26 19:41:40 +05:30
|
|
|
from unittest import TestCase
|
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
from google.cloud.bigquery import PartitionRange, RangePartitioning, TimePartitioning
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
|
|
from metadata.generated.schema.entity.data.database import Database
|
2022-11-10 10:54:31 +01:00
|
|
|
from metadata.generated.schema.entity.data.table import (
|
2024-02-28 07:11:00 +01:00
|
|
|
PartitionColumnDetails,
|
|
|
|
PartitionIntervalTypes,
|
2022-11-10 10:54:31 +01:00
|
|
|
PartitionIntervalUnit,
|
|
|
|
PartitionProfilerConfig,
|
|
|
|
Table,
|
|
|
|
TablePartition,
|
|
|
|
TableProfilerConfig,
|
|
|
|
)
|
2022-10-11 09:36:36 +02:00
|
|
|
from metadata.generated.schema.entity.services.databaseService import (
|
|
|
|
DatabaseServiceType,
|
|
|
|
)
|
2022-09-26 19:41:40 +05:30
|
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
2024-11-19 08:10:45 +01:00
|
|
|
from metadata.sampler.partition import get_partition_details
|
2023-09-04 11:02:57 +02:00
|
|
|
from metadata.workflow.profiler import ProfilerWorkflow
|
2022-09-26 19:41:40 +05:30
|
|
|
|
|
|
|
"""
|
|
|
|
Check Partitioned Table in Profiler Workflow
|
|
|
|
"""
|
2023-01-18 18:59:32 +05:30
|
|
|
MOCK_GET_SOURCE_CONNECTION = "XXXXX-XXXX-XXXXX"
|
2022-09-26 19:41:40 +05:30
|
|
|
mock_bigquery_config = {
|
|
|
|
"source": {
|
|
|
|
"type": "bigquery",
|
|
|
|
"serviceName": "local_bigquery",
|
|
|
|
"serviceConnection": {
|
2023-06-06 11:57:00 +05:30
|
|
|
"config": {"type": "BigQuery", "credentials": {"gcpConfig": {}}}
|
2022-09-26 19:41:40 +05:30
|
|
|
},
|
|
|
|
"sourceConfig": {
|
|
|
|
"config": {
|
|
|
|
"type": "Profiler",
|
|
|
|
}
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"processor": {
|
|
|
|
"type": "orm-profiler",
|
|
|
|
"config": {
|
|
|
|
"profiler": {
|
|
|
|
"name": "my_profiler",
|
|
|
|
"timeout_seconds": 60,
|
|
|
|
"metrics": ["row_count", "min", "max", "COUNT", "null_count"],
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
|
|
"workflowConfig": {
|
|
|
|
"openMetadataServerConfig": {
|
|
|
|
"hostPort": "http://localhost:8585/api",
|
2022-09-26 20:39:42 +02:00
|
|
|
"authProvider": "openmetadata",
|
|
|
|
"securityConfig": {
|
|
|
|
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
|
|
|
},
|
2022-09-26 19:41:40 +05:30
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_PARTITION = {"schema_name": "test_schema", "table_name": "test_table"}
|
|
|
|
|
|
|
|
MOCK_DATABASE = Database(
|
|
|
|
id="2aaa012e-099a-11ed-861d-0242ac120002",
|
|
|
|
name="118146679784",
|
|
|
|
fullyQualifiedName="bigquery_source.bigquery.db",
|
|
|
|
displayName="118146679784",
|
|
|
|
description="",
|
|
|
|
service=EntityReference(
|
|
|
|
id="85811038-099a-11ed-861d-0242ac120002",
|
|
|
|
type="databaseService",
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class MockTable(BaseModel):
|
|
|
|
tablePartition: Optional[TablePartition]
|
2022-11-10 10:54:31 +01:00
|
|
|
tableProfilerConfig: Optional[TableProfilerConfig]
|
2024-06-05 21:18:37 +02:00
|
|
|
serviceType: DatabaseServiceType = DatabaseServiceType.BigQuery
|
2022-09-26 19:41:40 +05:30
|
|
|
|
|
|
|
class Config:
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
|
|
|
|
|
2022-11-10 10:54:31 +01:00
|
|
|
class MockRedshiftTable(BaseModel):
|
|
|
|
tablePartition: Optional[TablePartition]
|
|
|
|
tableProfilerConfig: Optional[TableProfilerConfig]
|
2024-06-05 21:18:37 +02:00
|
|
|
serviceType: DatabaseServiceType = DatabaseServiceType.Redshift
|
2022-11-10 10:54:31 +01:00
|
|
|
|
|
|
|
class Config:
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
|
|
|
|
|
2022-09-26 19:41:40 +05:30
|
|
|
MOCK_TIME_UNIT_PARTITIONING = TimePartitioning(
|
|
|
|
expiration_ms=None, field="test_column", type_="DAY"
|
|
|
|
)
|
|
|
|
|
|
|
|
MOCK_INGESTION_TIME_PARTITIONING = TimePartitioning(expiration_ms=None, type_="HOUR")
|
|
|
|
|
|
|
|
MOCK_RANGE_PARTITIONING = RangePartitioning(
|
|
|
|
field="test_column", range_=PartitionRange(end=100, interval=10, start=0)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class ProfilerPartitionUnitTest(TestCase):
|
2023-09-04 11:02:57 +02:00
|
|
|
@patch(
|
|
|
|
"metadata.profiler.source.metadata.OpenMetadataSource._validate_service_name"
|
|
|
|
)
|
2022-12-10 21:46:28 +05:30
|
|
|
@patch("google.auth.default")
|
|
|
|
@patch("sqlalchemy.engine.base.Engine.connect")
|
|
|
|
@patch("sqlalchemy_bigquery._helpers.create_bigquery_client")
|
2023-01-02 13:52:27 +01:00
|
|
|
@patch("metadata.ingestion.source.database.bigquery.connection.test_connection")
|
2022-12-10 21:46:28 +05:30
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
methodName,
|
|
|
|
mock_connect,
|
|
|
|
mock_create_bigquery_client,
|
|
|
|
auth_default,
|
|
|
|
validate_service_name,
|
2023-01-02 13:52:27 +01:00
|
|
|
mock_test_connection,
|
2022-12-10 21:46:28 +05:30
|
|
|
):
|
2022-09-26 19:41:40 +05:30
|
|
|
super().__init__(methodName)
|
2023-01-02 13:52:27 +01:00
|
|
|
mock_test_connection.return_value = True
|
2022-09-26 19:41:40 +05:30
|
|
|
validate_service_name.return_value = True
|
2022-12-10 21:46:28 +05:30
|
|
|
auth_default.return_value = (None, MOCK_GET_SOURCE_CONNECTION)
|
2022-09-26 19:41:40 +05:30
|
|
|
self.profiler_workflow = ProfilerWorkflow.create(mock_bigquery_config)
|
|
|
|
|
|
|
|
def test_partition_details_time_unit(self):
|
|
|
|
table_entity = MockTable(
|
|
|
|
tablePartition=TablePartition(
|
2024-02-28 07:11:00 +01:00
|
|
|
columns=[
|
|
|
|
PartitionColumnDetails(
|
|
|
|
columnName="e",
|
|
|
|
intervalType=PartitionIntervalTypes.TIME_UNIT,
|
|
|
|
interval="DAY",
|
|
|
|
)
|
|
|
|
]
|
2022-11-10 10:54:31 +01:00
|
|
|
),
|
|
|
|
tableProfilerConfig=None,
|
2022-09-26 19:41:40 +05:30
|
|
|
)
|
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
table_entity = cast(Table, table_entity)
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2023-03-01 08:20:38 +01:00
|
|
|
|
|
|
|
if resp:
|
|
|
|
assert resp.partitionColumnName == "e"
|
2023-12-27 19:13:44 +01:00
|
|
|
assert resp.partitionInterval == 1
|
2023-03-01 08:20:38 +01:00
|
|
|
assert not resp.partitionValues
|
|
|
|
else:
|
|
|
|
assert False
|
2022-09-26 19:41:40 +05:30
|
|
|
|
2022-11-10 10:54:31 +01:00
|
|
|
table_entity.tableProfilerConfig = TableProfilerConfig(
|
|
|
|
partitioning=PartitionProfilerConfig(
|
|
|
|
partitionColumnName="e",
|
|
|
|
partitionInterval=3,
|
|
|
|
partitionIntervalUnit="MONTH",
|
|
|
|
) # type: ignore
|
|
|
|
)
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2022-11-10 10:54:31 +01:00
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
if resp:
|
|
|
|
assert resp.partitionColumnName == "e"
|
|
|
|
assert resp.partitionInterval == 3
|
|
|
|
assert resp.partitionIntervalUnit == PartitionIntervalUnit.MONTH
|
|
|
|
else:
|
|
|
|
assert False
|
2022-11-10 10:54:31 +01:00
|
|
|
|
2022-09-26 19:41:40 +05:30
|
|
|
def test_partition_details_ingestion_time_date(self):
|
|
|
|
table_entity = MockTable(
|
|
|
|
tablePartition=TablePartition(
|
2024-02-28 07:11:00 +01:00
|
|
|
columns=[
|
|
|
|
PartitionColumnDetails(
|
|
|
|
columnName="e",
|
|
|
|
intervalType=PartitionIntervalTypes.INGESTION_TIME.value,
|
|
|
|
interval="DAY",
|
|
|
|
)
|
|
|
|
]
|
2022-11-10 10:54:31 +01:00
|
|
|
),
|
|
|
|
tableProfilerConfig=None,
|
2022-09-26 19:41:40 +05:30
|
|
|
)
|
2022-11-10 10:54:31 +01:00
|
|
|
|
|
|
|
table_entity = cast(Table, table_entity)
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2022-11-10 10:54:31 +01:00
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
if resp:
|
|
|
|
assert resp.partitionColumnName == "_PARTITIONDATE"
|
2023-12-27 19:13:44 +01:00
|
|
|
assert resp.partitionInterval == 1
|
2023-03-01 08:20:38 +01:00
|
|
|
assert not resp.partitionValues
|
|
|
|
else:
|
|
|
|
assert False
|
2022-09-26 19:41:40 +05:30
|
|
|
|
2022-11-10 10:54:31 +01:00
|
|
|
table_entity.tableProfilerConfig = TableProfilerConfig(
|
|
|
|
partitioning=PartitionProfilerConfig(
|
|
|
|
partitionColumnName="_PARTITIONDATE",
|
|
|
|
partitionInterval=10,
|
|
|
|
partitionIntervalUnit="DAY",
|
|
|
|
) # type: ignore
|
|
|
|
)
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2023-03-01 08:20:38 +01:00
|
|
|
if resp:
|
|
|
|
assert resp.partitionInterval == 10
|
|
|
|
assert resp.partitionColumnName == "_PARTITIONDATE"
|
|
|
|
assert resp.partitionIntervalUnit == PartitionIntervalUnit.DAY
|
|
|
|
else:
|
|
|
|
assert False
|
2022-11-10 10:54:31 +01:00
|
|
|
|
2022-09-26 19:41:40 +05:30
|
|
|
def test_partition_details_ingestion_time_hour(self):
|
|
|
|
table_entity = MockTable(
|
|
|
|
tablePartition=TablePartition(
|
2024-02-28 07:11:00 +01:00
|
|
|
columns=[
|
|
|
|
PartitionColumnDetails(
|
|
|
|
columnName="e",
|
|
|
|
intervalType=PartitionIntervalTypes.INGESTION_TIME.value,
|
|
|
|
interval="HOUR",
|
|
|
|
)
|
|
|
|
]
|
2022-11-10 10:54:31 +01:00
|
|
|
),
|
|
|
|
tableProfilerConfig=None,
|
2022-09-26 19:41:40 +05:30
|
|
|
)
|
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
table_entity = cast(Table, table_entity)
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2023-03-01 08:20:38 +01:00
|
|
|
|
|
|
|
if resp:
|
|
|
|
assert resp.partitionColumnName == "_PARTITIONTIME"
|
2023-12-27 19:13:44 +01:00
|
|
|
assert resp.partitionInterval == 1
|
2023-03-01 08:20:38 +01:00
|
|
|
assert not resp.partitionValues
|
|
|
|
else:
|
|
|
|
assert False
|
2022-11-10 10:54:31 +01:00
|
|
|
|
|
|
|
table_entity.tableProfilerConfig = TableProfilerConfig(
|
|
|
|
partitioning=PartitionProfilerConfig(
|
|
|
|
partitionColumnName="_PARTITIONTIME",
|
|
|
|
partitionInterval=1,
|
|
|
|
partitionIntervalUnit="HOUR",
|
|
|
|
) # type: ignore
|
|
|
|
)
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2022-11-10 10:54:31 +01:00
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
if resp:
|
|
|
|
assert resp.partitionInterval == 1
|
|
|
|
assert resp.partitionColumnName == "_PARTITIONTIME"
|
|
|
|
assert resp.partitionIntervalUnit == PartitionIntervalUnit.HOUR
|
|
|
|
else:
|
|
|
|
assert False
|
2022-11-10 10:54:31 +01:00
|
|
|
|
|
|
|
def test_partition_non_bq_table_profiler_partition_config(self):
|
|
|
|
table_entity = MockRedshiftTable(
|
|
|
|
tablePartition=TablePartition(
|
2024-02-28 07:11:00 +01:00
|
|
|
columns=[
|
|
|
|
PartitionColumnDetails(
|
|
|
|
columnName="datetime",
|
|
|
|
intervalType=PartitionIntervalTypes.TIME_UNIT.value,
|
|
|
|
interval="DAY",
|
|
|
|
)
|
|
|
|
]
|
2022-11-10 10:54:31 +01:00
|
|
|
),
|
|
|
|
tableProfilerConfig=TableProfilerConfig(
|
|
|
|
partitioning=PartitionProfilerConfig(
|
|
|
|
enablePartitioning=True,
|
|
|
|
partitionColumnName="foo",
|
2024-02-28 07:11:00 +01:00
|
|
|
partitionIntervalType=PartitionIntervalTypes.TIME_UNIT,
|
2022-11-10 10:54:31 +01:00
|
|
|
partitionIntervalUnit="DAY",
|
|
|
|
partitionInterval=1,
|
|
|
|
) # type: ignore
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
table_entity = cast(Table, table_entity)
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2023-03-01 08:20:38 +01:00
|
|
|
if resp:
|
|
|
|
assert resp.enablePartitioning
|
|
|
|
assert resp.partitionColumnName == "foo"
|
2024-02-28 07:11:00 +01:00
|
|
|
assert resp.partitionIntervalType == PartitionIntervalTypes.TIME_UNIT
|
2023-03-01 08:20:38 +01:00
|
|
|
assert resp.partitionIntervalUnit == PartitionIntervalUnit.DAY
|
|
|
|
assert resp.partitionInterval == 1
|
|
|
|
else:
|
|
|
|
assert False
|
2022-11-10 10:54:31 +01:00
|
|
|
|
|
|
|
def test_partition_non_bq_table_no_profiler_partition_config(self):
|
|
|
|
table_entity = MockRedshiftTable(
|
|
|
|
tablePartition=TablePartition(
|
2024-02-28 07:11:00 +01:00
|
|
|
columns=[
|
|
|
|
PartitionColumnDetails(
|
|
|
|
columnName="datetime",
|
|
|
|
intervalType=PartitionIntervalTypes.TIME_UNIT.value,
|
|
|
|
interval="DAY",
|
|
|
|
)
|
|
|
|
]
|
2022-11-10 10:54:31 +01:00
|
|
|
),
|
|
|
|
tableProfilerConfig=None,
|
|
|
|
)
|
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
table_entity = cast(Table, table_entity)
|
2024-11-19 08:10:45 +01:00
|
|
|
resp = get_partition_details(table_entity)
|
2022-11-10 10:54:31 +01:00
|
|
|
|
|
|
|
assert resp is None
|