Fix(ingestion/clickhouse) move to two tier sqlalchemy (#8300)

Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
This commit is contained in:
alplatonov 2023-08-12 00:11:40 +04:00 committed by GitHub
parent 7bb4e7b90d
commit 11fdfcf956
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 272 additions and 207 deletions

View File

@ -12,6 +12,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #8525: In LDAP ingestor, the `manager_pagination_enabled` changed to general `pagination_enabled`
### Other Notable Changes
- #8300: Clickhouse source now inherited from TwoTierSQLAlchemy. In old way we have platform_instance -> container -> co
container db (None) -> container schema and now we have platform_instance -> container database.
- #8300: Added `uri_opts` argument; now we can add any options for clickhouse client.
## 0.10.5

View File

@ -24,7 +24,8 @@ source:
type: clickhouse
config:
host_port: localhost:8443
protocol: https
uri_opts:
protocol: https
#---------------------------------------------------------------------------
# For the Native interface:
@ -35,4 +36,27 @@ source:
config:
host_port: localhost:9440
scheme: clickhouse+native
secure: True
uri_opts:
secure: True
#------------------------------------------------------------------------
# Example: using ingestion with configured SSL-TLS and uri_opts
# See https://clickhouse.com/docs/en/guides/sre/configuring-ssl
# ------------------------------------------------------------------------
source:
type: clickhouse
config:
# Url form, prefered
sqlalchemy_uri: 'clickhouse+native://user:pass@localhost:9000/db?&ca_certs=ca.crt'
# Non url form
username: user
password: pass
host_port: localhost:9000
uri_opts:
secure: True
ca_certs: "ca.crt"
certfile: "clickhouse.crt"
keyfile: "clickhouse.key"

View File

@ -10,13 +10,16 @@ import clickhouse_sqlalchemy.types as custom_types
import pydantic
from clickhouse_sqlalchemy.drivers import base
from clickhouse_sqlalchemy.drivers.base import ClickHouseDialect
from pydantic.class_validators import root_validator
from pydantic.fields import Field
from sqlalchemy import create_engine, text
from sqlalchemy.engine import reflection
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import sqltypes
from sqlalchemy.types import BOOLEAN, DATE, DATETIME, INTEGER
import datahub.emitter.mce_builder as builder
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter import mce_builder
@ -31,12 +34,15 @@ from datahub.ingestion.api.decorators import (
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
SqlWorkUnit,
logger,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig
from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -118,7 +124,7 @@ class LineageItem:
class ClickHouseConfig(
BasicSQLAlchemyConfig, BaseTimeWindowConfig, DatasetLineageProviderConfigBase
TwoTierSQLAlchemyConfig, BaseTimeWindowConfig, DatasetLineageProviderConfigBase
):
# defaults
host_port = Field(default="localhost:8123", description="ClickHouse host URL.")
@ -126,22 +132,98 @@ class ClickHouseConfig(
password: pydantic.SecretStr = Field(
default=pydantic.SecretStr(""), description="password"
)
secure: Optional[bool] = Field(default=None, description="")
protocol: Optional[str] = Field(default=None, description="")
_deprecate_secure = pydantic_field_deprecated("secure")
_deprecate_protocol = pydantic_field_deprecated("protocol")
uri_opts: Dict[str, str] = Field(
default={},
description="The part of the URI and it's used to provide additional configuration options or parameters for the database connection.",
)
include_table_lineage: Optional[bool] = Field(
default=True, description="Whether table lineage should be ingested."
)
include_materialized_views: Optional[bool] = Field(default=True, description="")
def get_sql_alchemy_url(self, database=None):
uri_opts = None
if self.scheme == "clickhouse+native" and self.secure:
uri_opts = {"secure": "true"}
elif self.scheme != "clickhouse+native" and self.protocol:
uri_opts = {"protocol": self.protocol}
return super().get_sql_alchemy_url(uri_opts=uri_opts)
def get_sql_alchemy_url(self, current_db=None):
url = make_url(
super().get_sql_alchemy_url(uri_opts=self.uri_opts, current_db=current_db)
)
if url.drivername == "clickhouse+native" and url.query.get("protocol"):
logger.debug(f"driver = {url.drivername}, query = {url.query}")
raise Exception(
"You cannot use a schema clickhouse+native and clickhouse+http at the same time"
)
# We can setup clickhouse ingestion in sqlalchemy_uri form and config form.
# If we use sqlalchemu_uri form then super().get_sql_alchemy_url doesn't
# update current_db because it return self.sqlalchemy_uri without any update.
# This code bellow needed for rewriting sqlalchemi_uri and replace database with current_db.from
# For the future without python3.7 and sqlalchemy 1.3 support we can use code
# url=url.set(db=current_db), but not now.
# Why we need to update database in uri at all?
# Because we get database from sqlalchemy inspector and inspector we form from url inherited from
# TwoTierSQLAlchemySource and SQLAlchemySource
if self.sqlalchemy_uri and current_db:
self.scheme = url.drivername
self.username = url.username
self.password = (
pydantic.SecretStr(str(url.password))
if url.password
else pydantic.SecretStr("")
)
if url.host and url.port:
self.host_port = url.host + ":" + str(url.port)
elif url.host:
self.host_port = url.host
# untill released https://github.com/python/mypy/pull/15174
self.uri_opts = {str(k): str(v) for (k, v) in url.query.items()}
url = make_url(
make_sqlalchemy_uri(
self.scheme,
self.username,
self.password.get_secret_value() if self.password else None,
self.host_port,
current_db if current_db else self.database,
uri_opts=self.uri_opts,
)
)
return str(url)
# pre = True because we want to take some decision before pydantic initialize the configuration to default values
@root_validator(pre=True)
def projects_backward_compatibility(cls, values: Dict) -> Dict:
secure = values.get("secure")
protocol = values.get("protocol")
uri_opts = values.get("uri_opts")
if (secure or protocol) and not uri_opts:
logger.warning(
"uri_opts is not set but protocol or secure option is set."
" secure and protocol options is deprecated, please use "
"uri_opts instead."
)
logger.info(
"Initializing uri_opts from deprecated secure or protocol options"
)
values["uri_opts"] = {}
if secure:
values["uri_opts"]["secure"] = secure
if protocol:
values["uri_opts"]["protocol"] = protocol
logger.debug(f"uri_opts: {uri_opts}")
elif (secure or protocol) and uri_opts:
raise ValueError(
"secure and protocol options is deprecated. Please use uri_opts only."
)
return values
PROPERTIES_COLUMNS = (
@ -330,7 +412,7 @@ clickhouse_datetime_format = "%Y-%m-%d %H:%M:%S"
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
class ClickHouseSource(SQLAlchemySource):
class ClickHouseSource(TwoTierSQLAlchemySource):
"""
This plugin extracts the following:

View File

@ -1,7 +1,7 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"entityUrn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
@ -10,9 +10,9 @@
"platform": "clickhouse",
"instance": "clickhousetestserver",
"env": "PROD",
"database": "clickhousedb"
"database": "db1"
},
"name": "clickhousedb"
"name": "db1"
}
},
"systemMetadata": {
@ -22,7 +22,7 @@
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"entityUrn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
@ -37,7 +37,7 @@
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"entityUrn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
@ -53,7 +53,7 @@
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"entityUrn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
@ -70,7 +70,7 @@
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"entityUrn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
@ -88,115 +88,6 @@
"runId": "clickhouse-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "clickhouse",
"instance": "clickhousetestserver",
"env": "PROD",
"database": "clickhousedb",
"schema": "db1"
},
"name": "db1"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "clickhouse-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "clickhouse-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:clickhouse",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "clickhouse-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Schema"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "clickhouse-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "clickhouse-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "clickhouse-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:clickhouse,clickhousetestserver.db1.mv_target_table,PROD)",
@ -204,7 +95,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -255,11 +146,11 @@
"primary_key": "col_Int64",
"sampling_key": "",
"storage_policy": "default",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "10",
"total_bytes": "671",
"data_paths": "['/var/lib/clickhouse/store/6cb/6cbac4d1-c700-4f8a-9cc9-542cc349e497/']",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/mv_target_table.sql"
"data_paths": "['/var/lib/clickhouse/store/b18/b18ad53b-7256-4a06-8af0-252f2dfcd99d/']",
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/mv_target_table.sql"
},
"name": "mv_target_table",
"description": "This is target table for materialized view",
@ -408,12 +299,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}
@ -430,7 +317,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -457,11 +344,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "default",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "0",
"total_bytes": "0",
"data_paths": "['/var/lib/clickhouse/store/339/339ddf61-6dc4-47ae-9ae5-a358864e6457/']",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/test_data_types.sql"
"data_paths": "['/var/lib/clickhouse/store/54d/54dc8931-17de-4afd-bc11-636bbcadcdeb/']",
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/test_data_types.sql"
},
"name": "test_data_types",
"description": "This table has basic types",
@ -1039,12 +926,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}
@ -1061,7 +944,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -1112,11 +995,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "None",
"total_bytes": "None",
"data_paths": "[]",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/test_dict.sql"
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/test_dict.sql"
},
"name": "test_dict",
"description": "",
@ -1226,12 +1109,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}
@ -1248,7 +1127,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -1275,11 +1154,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "default",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "0",
"total_bytes": "0",
"data_paths": "['/var/lib/clickhouse/store/22c/22c46b00-4f2a-444a-8bee-73e60b9deba6/']",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/test_nested_data_types.sql"
"data_paths": "['/var/lib/clickhouse/store/320/320b9e3e-757c-4b98-8576-43be9678c69a/']",
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/test_nested_data_types.sql"
},
"name": "test_nested_data_types",
"description": "This table has nested types",
@ -1493,12 +1372,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}
@ -1515,7 +1390,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -1566,11 +1441,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "None",
"total_bytes": "None",
"data_paths": "['/var/lib/clickhouse/store/6cb/6cbac4d1-c700-4f8a-9cc9-542cc349e497/']",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/mv_with_target_table.sql",
"data_paths": "['/var/lib/clickhouse/store/b18/b18ad53b-7256-4a06-8af0-252f2dfcd99d/']",
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/mv_with_target_table.sql",
"view_definition": "",
"is_view": "True"
},
@ -1738,12 +1613,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}
@ -1760,7 +1631,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -1811,11 +1682,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "0",
"total_bytes": "0",
"data_paths": "['/var/lib/clickhouse/store/0f1/0f172bf3-80e5-4ba7-9ae3-938da0a9799d/']",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/mv_without_target_table.sql",
"data_paths": "['/var/lib/clickhouse/store/d51/d5151daf-778d-4171-85b6-d9c409ed6ec1/']",
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/mv_without_target_table.sql",
"view_definition": "",
"is_view": "True"
},
@ -1983,12 +1854,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}
@ -2005,7 +1872,7 @@
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"container": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
},
"systemMetadata": {
@ -2056,11 +1923,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2023-07-24 21:34:01",
"metadata_modification_time": "2023-07-25 07:22:35",
"total_rows": "None",
"total_bytes": "None",
"data_paths": "[]",
"metadata_path": "/var/lib/clickhouse/store/0a2/0a2bd3dd-893f-4f9a-b310-92e4c830091a/test_view.sql",
"metadata_path": "/var/lib/clickhouse/store/1f3/1f37b9b4-f5c5-40f6-8a1b-54c6e2ad5d59/test_view.sql",
"view_definition": "",
"is_view": "True"
},
@ -2176,12 +2043,8 @@
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)"
},
{
"id": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c",
"urn": "urn:li:container:ab016b94aa0d75c5b9205c33260e989c"
},
{
"id": "urn:li:container:1154d1da73a95376c9f33f47694cf1de",
"urn": "urn:li:container:1154d1da73a95376c9f33f47694cf1de"
"id": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2",
"urn": "urn:li:container:ccb08ea309d6caaba2b45ebb4c9327a2"
}
]
}

View File

@ -5,10 +5,9 @@ source:
config:
username: clickhouseuser
password: clickhousepass
database: clickhousedb
host_port: localhost:28123
platform_instance: clickhousetestserver
schema_pattern:
database_pattern:
allow:
- "^db1"
profile_pattern:

View File

@ -0,0 +1,29 @@
run_id: clickhouse-test
source:
type: clickhouse
config:
platform_instance: clickhousetestserver
sqlalchemy_uri: "clickhouse://clickhouseuser:clickhousepass@localhost:28123/?compression=lz4&secure=false"
database_pattern:
allow:
- "^db1"
profile_pattern:
allow:
- "clickhousetestserver.db1.mv_target_table"
profiling:
enabled: true
include_field_null_count: true
include_field_min_value: true
include_field_max_value: true
include_field_mean_value: true
include_field_median_value: true
include_field_stddev_value: true
include_field_quantiles: true
include_field_distinct_value_frequencies: true
include_field_histogram: true
include_field_sample_values: true
sink:
type: file
config:
filename: "./clickhouse_mces_uri_form.json"

View File

@ -14,12 +14,10 @@ FROZEN_TIME = "2020-04-14 07:00:00"
@pytest.mark.integration
def test_clickhouse_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/clickhouse"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "clickhouse"
) as docker_services:
wait_for_port(docker_services, "testclickhouse", 8123, timeout=120)
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "clickhouse_to_file.yml").resolve()
run_datahub_cmd(
@ -39,3 +37,35 @@ def test_clickhouse_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_t
output_path=tmp_path / "clickhouse_mces.json",
golden_path=test_resources_dir / "clickhouse_mces_golden.json",
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_clickhouse_ingest_uri_form(
docker_compose_runner, pytestconfig, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/clickhouse"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "clickhouse"
) as docker_services:
wait_for_port(docker_services, "testclickhouse", 8123, timeout=120)
# Run the metadata ingestion pipeline with uri form.
config_file = (test_resources_dir / "clickhouse_to_file_uri_form.yml").resolve()
run_datahub_cmd(
["ingest", "-c", f"{config_file}"],
tmp_path=tmp_path,
)
# These paths change from one instance run of the clickhouse docker to the other, and the FROZEN_TIME does not apply to these.
ignore_paths: List[str] = [
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_modification_time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['data_paths'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_path'\]",
]
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=ignore_paths,
output_path=tmp_path / "clickhouse_mces_uri_form.json",
golden_path=test_resources_dir / "clickhouse_mces_golden.json",
)

View File

@ -8,7 +8,7 @@ def test_clickhouse_uri_https():
"password": "password",
"host_port": "host:1111",
"database": "db",
"protocol": "https",
"uri_opts": {"protocol": "https"},
}
)
assert (
@ -26,7 +26,9 @@ def test_clickhouse_uri_native():
"scheme": "clickhouse+native",
}
)
assert config.get_sql_alchemy_url() == "clickhouse+native://user:password@host:1111"
assert (
config.get_sql_alchemy_url() == "clickhouse+native://user:password@host:1111/"
)
def test_clickhouse_uri_native_secure():
@ -37,12 +39,12 @@ def test_clickhouse_uri_native_secure():
"host_port": "host:1111",
"database": "db",
"scheme": "clickhouse+native",
"secure": True,
"uri_opts": {"secure": True},
}
)
assert (
config.get_sql_alchemy_url()
== "clickhouse+native://user:password@host:1111/db?secure=true"
== "clickhouse+native://user:password@host:1111/db?secure=True"
)
@ -55,4 +57,37 @@ def test_clickhouse_uri_default_password():
"scheme": "clickhouse+native",
}
)
assert config.get_sql_alchemy_url() == "clickhouse+native://user:@host:1111/db"
assert config.get_sql_alchemy_url() == "clickhouse+native://user@host:1111/db"
def test_clickhouse_uri_native_secure_backward_compatibility():
config = ClickHouseConfig.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "host:1111",
"database": "db",
"scheme": "clickhouse+native",
"secure": True,
}
)
assert (
config.get_sql_alchemy_url()
== "clickhouse+native://user:password@host:1111/db?secure=True"
)
def test_clickhouse_uri_https_backward_compatibility():
config = ClickHouseConfig.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "host:1111",
"database": "db",
"protocol": "https",
}
)
assert (
config.get_sql_alchemy_url()
== "clickhouse://user:password@host:1111/db?protocol=https"
)