feat(ingest): clickhouse - add initial support (#4057)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Alexander Chashnikov 2022-02-21 17:36:08 +02:00 committed by GitHub
parent 3a6d1bde64
commit c2065bd7fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 3458 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View File

@ -111,6 +111,8 @@ plugins: Dict[str, Set[str]] = {
"azure-ad": set(),
"bigquery": sql_common | bigquery_common | {"pybigquery >= 0.6.0"},
"bigquery-usage": bigquery_common | {"cachetools"},
"clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"clickhouse-usage": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"datahub-business-glossary": set(),
"data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"},
"dbt": {"requests"},
@ -216,6 +218,8 @@ base_dev_requirements = {
for plugin in [
"bigquery",
"bigquery-usage",
"clickhouse",
"clickhouse-usage",
"elasticsearch",
"looker",
"glue",
@ -268,6 +272,7 @@ full_test_dev_requirements = {
for plugin in [
# Only include Athena for Python 3.7 or newer.
*(["athena"] if is_py37_or_newer else []),
"clickhouse",
"druid",
"feast",
"hive",
@ -293,6 +298,8 @@ entry_points = {
"azure-ad = datahub.ingestion.source.identity.azure_ad:AzureADSource",
"bigquery = datahub.ingestion.source.sql.bigquery:BigQuerySource",
"bigquery-usage = datahub.ingestion.source.usage.bigquery_usage:BigQueryUsageSource",
"clickhouse = datahub.ingestion.source.sql.clickhouse:ClickHouseSource",
"clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource",
"data-lake = datahub.ingestion.source.data_lake:DataLakeSource",
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.sql.druid:DruidSource",

View File

@ -0,0 +1,177 @@
# ClickHouse
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
To install this plugin, run `pip install 'acryl-datahub[clickhouse]'`.
## Capabilities
This plugin extracts the following:
- Metadata for tables, views, materialized views and dictionaries
- Column types associated with each table(except *AggregateFunction and DateTime with timezone)
- Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md)
- Table, view, materialized view and dictionary(with CLICKHOUSE source_type) lineage
:::tip
You can also get fine-grained usage statistics for ClickHouse using the `clickhouse-usage` source described below.
:::
## Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
```yml
source:
type: clickhouse
config:
# Coordinates
host_port: localhost:9000
# Credentials
username: user
password: pass
# Options
platform_instance: DatabaseNameToBeIngested
include_views: True # whether to include views, defaults to True
include_tables: True # whether to include views, defaults to True
sink:
# sink configs
```
<details>
<summary>Extra options to use encryption connection or different interface</summary>
For the HTTP interface:
```yml
source:
type: clickhouse
config:
host_port: localhost:8443
protocol: https
```
For the Native interface:
```yml
source:
type: clickhouse
config:
host_port: localhost:9440
scheme: clickhouse+native
secure: True
```
</details>
## Config details
Like all SQL-based sources, the ClickHouse integration supports:
- Stale Metadata Deletion: See [here](./stateful_ingestion.md) for more details on configuration.
- SQL Profiling: See [here](./sql_profiles.md) for more details on configuration.
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
|-----------------------------|----------|----------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `username` | | | ClickHouse username. |
| `password` | | | ClickHouse password. |
| `host_port` | ✅ | | ClickHouse host URL. |
| `database` | | | ClickHouse database to connect. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `options.<option>` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `schema_pattern.allow` | | | List of regex patterns for schemas to include in ingestion. |
| `schema_pattern.deny` | | | List of regex patterns for schemas to exclude from ingestion. |
| `schema_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `view_pattern.allow` | | | List of regex patterns for views to include in ingestion. |
| `view_pattern.deny` | | | List of regex patterns for views to exclude from ingestion. |
| `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `include_tables` | | `True` | Whether tables should be ingested. |
| `include_views` | | `True` | Whether views should be ingested. |
| `include_table_lineage` | | `True` | Whether table lineage should be ingested. |
| `profiling` | | See the defaults for [profiling config](./sql_profiles.md#Config-details). | See [profiling config](./sql_profiles.md#Config-details). |
# ClickHouse Usage Stats
This plugin extracts usage statistics for datasets in ClickHouse. For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
Note: Usage information is computed by querying the system.query_log table. In case you have a cluster or need to apply additional transformation/filters you can create a view and put to the `query_log_table` setting.
## Setup
To install this plugin, run `pip install 'acryl-datahub[clickhouse-usage]'`.
## Capabilities
This plugin has the below functionalities -
1. For a specific dataset this plugin ingests the following statistics -
1. top n queries.
2. top users.
3. usage of each column in the dataset.
2. Aggregation of these statistics into buckets, by day or hour granularity.
:::note
This source only does usage statistics. To get the tables, views, and schemas in your ClickHouse warehouse, ingest using the `clickhouse` source described above.
:::
## Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
```yml
source:
type: clickhouse-usage
config:
# Coordinates
host_port: db_host:port
platform_instance: dev_cluster
email_domain: acryl.io
# Credentials
username: username
password: "password"
sink:
# sink configs
```
## Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
| Field | Required | Default | Description |
|-----------------------------|----------|----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `username` | | | ClickHouse username. |
| `password` | | | ClickHouse password. |
| `host_port` | ✅ | | ClickHouse host URL. |
| `database` | | | ClickHouse database to connect. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `options.<option>` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `email_domain` | ✅ | | Email domain of your organisation so users can be displayed on UI appropriately. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `include_operational_stats` | | `true` | Whether to display operational stats. |
| `bucket_duration` | | `"DAY"` | Size of the time window to aggregate usage stats. |
## Questions
If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)!

View File

@ -0,0 +1,624 @@
import json
import textwrap
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
import clickhouse_driver # noqa: F401
import clickhouse_sqlalchemy.types as custom_types
from clickhouse_sqlalchemy.drivers import base
from clickhouse_sqlalchemy.drivers.base import ClickHouseDialect
from sqlalchemy import create_engine, text
from sqlalchemy.engine import reflection
from sqlalchemy.sql import sqltypes
from sqlalchemy.types import BOOLEAN, DATE, DATETIME, INTEGER
import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
SqlWorkUnit,
logger,
register_custom_type,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
MapTypeClass,
NumberTypeClass,
StringTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
DatasetSnapshotClass,
UpstreamClass,
)
# adding extra types not handled by clickhouse-sqlalchemy 0.1.8
base.ischema_names["DateTime64(0)"] = DATETIME
base.ischema_names["DateTime64(1)"] = DATETIME
base.ischema_names["DateTime64(2)"] = DATETIME
base.ischema_names["DateTime64(3)"] = DATETIME
base.ischema_names["DateTime64(4)"] = DATETIME
base.ischema_names["DateTime64(5)"] = DATETIME
base.ischema_names["DateTime64(6)"] = DATETIME
base.ischema_names["DateTime64(7)"] = DATETIME
base.ischema_names["DateTime64(8)"] = DATETIME
base.ischema_names["DateTime64(9)"] = DATETIME
base.ischema_names["Date32"] = DATE
base.ischema_names["Bool"] = BOOLEAN
base.ischema_names["Nothing"] = sqltypes.NullType
base.ischema_names["Int128"] = INTEGER
base.ischema_names["Int256"] = INTEGER
base.ischema_names["UInt128"] = INTEGER
base.ischema_names["UInt256"] = INTEGER
register_custom_type(custom_types.common.Array, ArrayTypeClass)
register_custom_type(custom_types.ip.IPv4, NumberTypeClass)
register_custom_type(custom_types.ip.IPv6, StringTypeClass)
register_custom_type(custom_types.common.Map, MapTypeClass)
register_custom_type(custom_types.common.Tuple, UnionTypeClass)
class LineageCollectorType(Enum):
TABLE = "table"
VIEW = "view"
MATERIALIZED_VIEW = "materialized_view"
class LineageDatasetPlatform(Enum):
CLICKHOUSE = "clickhouse"
@dataclass(frozen=True, eq=True)
class LineageDataset:
platform: LineageDatasetPlatform
path: str
@dataclass
class LineageItem:
dataset: LineageDataset
upstreams: Set[LineageDataset]
collector_type: LineageCollectorType
dataset_lineage_type: str = field(init=False)
def __post_init__(self):
if self.collector_type == LineageCollectorType.TABLE:
self.dataset_lineage_type = DatasetLineageTypeClass.COPY
elif self.collector_type in [
LineageCollectorType.VIEW,
]:
self.dataset_lineage_type = DatasetLineageTypeClass.VIEW
else:
self.dataset_lineage_type = DatasetLineageTypeClass.TRANSFORMED
class ClickHouseConfig(
BasicSQLAlchemyConfig, BaseTimeWindowConfig, DatasetLineageProviderConfigBase
):
# defaults
host_port = "localhost:8123"
scheme = "clickhouse"
secure: Optional[bool]
protocol: Optional[str]
include_table_lineage: Optional[bool] = True
include_materialized_views: Optional[bool] = True
def get_sql_alchemy_url(self, database=None):
uri_opts = None
if self.scheme == "clickhouse+native" and self.secure:
uri_opts = {"secure": "true"}
elif self.scheme != "clickhouse+native" and self.protocol:
uri_opts = {"protocol": self.protocol}
return super().get_sql_alchemy_url(uri_opts=uri_opts)
PROPERTIES_COLUMNS = (
"engine, partition_key, sorting_key, primary_key, sampling_key, storage_policy"
)
# reflection.cache uses eval and other magic to partially rewrite the function.
# mypy can't handle it, so we ignore it for now.
@reflection.cache # type: ignore
def _get_all_table_comments_and_properties(self, connection, **kw):
properties_clause = (
"formatRow('JSONEachRow', {properties_columns})".format(
properties_columns=PROPERTIES_COLUMNS
)
if PROPERTIES_COLUMNS
else "null"
)
comment_sql = textwrap.dedent(
"""\
SELECT database
, name AS table_name
, comment
, {properties_clause} AS properties
FROM system.tables
WHERE name NOT LIKE '.inner%'""".format(
properties_clause=properties_clause
)
)
all_table_comments: Dict[Tuple[str, str], Dict[str, Any]] = {}
result = connection.execute(text(comment_sql))
for table in result:
all_table_comments[(table.database, table.table_name)] = {
"text": table.comment,
"properties": {k: str(v) for k, v in json.loads(table.properties).items()}
if table.properties
else {},
}
return all_table_comments
@reflection.cache # type: ignore
def get_table_comment(self, connection, table_name, schema=None, **kw):
all_table_comments = self._get_all_table_comments_and_properties(connection, **kw)
return all_table_comments.get((schema, table_name), {"text": None})
@reflection.cache # type: ignore
def _get_all_relation_info(self, connection, **kw):
result = connection.execute(
text(
textwrap.dedent(
"""\
SELECT database
, if(engine LIKE '%View', 'v', 'r') AS relkind
, name AS relname
FROM system.tables
WHERE name NOT LIKE '.inner%'"""
)
)
)
relations = {}
for rel in result:
relations[(rel.database, rel.relname)] = rel
return relations
def _get_table_or_view_names(self, relkind, connection, schema=None, **kw):
info_cache = kw.get("info_cache")
all_relations = self._get_all_relation_info(connection, info_cache=info_cache)
relation_names = []
for key, relation in all_relations.items():
if relation.database == schema and relation.relkind == relkind:
relation_names.append(relation.relname)
return relation_names
@reflection.cache # type: ignore
def get_table_names(self, connection, schema=None, **kw):
return self._get_table_or_view_names("r", connection, schema, **kw)
@reflection.cache # type: ignore
def get_view_names(self, connection, schema=None, **kw):
return self._get_table_or_view_names("v", connection, schema, **kw)
# We fetch column info an entire schema at a time to improve performance
# when reflecting schema for multiple tables at once.
@reflection.cache # type: ignore
def _get_schema_column_info(self, connection, schema=None, **kw):
schema_clause = "database = '{schema}'".format(schema=schema) if schema else "1"
all_columns = defaultdict(list)
result = connection.execute(
text(
textwrap.dedent(
"""\
SELECT database
, table AS table_name
, name
, type
, comment
FROM system.columns
WHERE {schema_clause}
ORDER BY database, table, position""".format(
schema_clause=schema_clause
)
)
)
)
for col in result:
key = (col.database, col.table_name)
all_columns[key].append(col)
return dict(all_columns)
def _get_clickhouse_columns(self, connection, table_name, schema=None, **kw):
info_cache = kw.get("info_cache")
all_schema_columns = self._get_schema_column_info(
connection, schema, info_cache=info_cache
)
key = (schema, table_name)
return all_schema_columns[key]
def _get_column_info(self, name, format_type, comment):
col_type = self._get_column_type(name, format_type)
nullable = False
# extract nested_type from LowCardinality type
if isinstance(col_type, custom_types.common.LowCardinality):
col_type = col_type.nested_type
# extract nested_type from Nullable type
if isinstance(col_type, custom_types.common.Nullable):
col_type = col_type.nested_type
nullable = True
result = {
"name": name,
"type": col_type,
"nullable": nullable,
"comment": comment,
"full_type": format_type,
}
return result
@reflection.cache # type: ignore
def get_columns(self, connection, table_name, schema=None, **kw):
if not schema:
query = "DESCRIBE TABLE {}".format(self._quote_table_name(table_name))
cols = self._execute(connection, query)
else:
cols = self._get_clickhouse_columns(connection, table_name, schema, **kw)
return [
self._get_column_info(name=col.name, format_type=col.type, comment=col.comment)
for col in cols
]
# This monkey-patching enables us to batch fetch the table descriptions, rather than
# fetching them one at a time.
ClickHouseDialect._get_all_table_comments_and_properties = (
_get_all_table_comments_and_properties
)
ClickHouseDialect.get_table_comment = get_table_comment
ClickHouseDialect._get_all_relation_info = _get_all_relation_info
ClickHouseDialect._get_table_or_view_names = _get_table_or_view_names
ClickHouseDialect.get_table_names = get_table_names
ClickHouseDialect.get_view_names = get_view_names
ClickHouseDialect._get_schema_column_info = _get_schema_column_info
ClickHouseDialect._get_clickhouse_columns = _get_clickhouse_columns
ClickHouseDialect._get_column_info = _get_column_info
ClickHouseDialect.get_columns = get_columns
clickhouse_datetime_format = "%Y-%m-%d %H:%M:%S"
class ClickHouseSource(SQLAlchemySource):
config: ClickHouseConfig
def __init__(self, config, ctx):
super().__init__(config, ctx, "clickhouse")
self._lineage_map: Optional[Dict[str, LineageItem]] = None
self._all_tables_set: Optional[Set[str]] = None
@classmethod
def create(cls, config_dict, ctx):
config = ClickHouseConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
for wu in super().get_workunits():
if (
self.config.include_table_lineage
and isinstance(wu, SqlWorkUnit)
and isinstance(wu.metadata, MetadataChangeEvent)
and isinstance(wu.metadata.proposedSnapshot, DatasetSnapshot)
):
dataset_snapshot: DatasetSnapshotClass = wu.metadata.proposedSnapshot
assert dataset_snapshot
lineage_mcp, lineage_properties_aspect = self.get_lineage_mcp(
wu.metadata.proposedSnapshot.urn
)
if lineage_mcp is not None:
lineage_wu = MetadataWorkUnit(
id=f"{self.platform}-{lineage_mcp.entityUrn}-{lineage_mcp.aspectName}",
mcp=lineage_mcp,
)
self.report.report_workunit(lineage_wu)
yield lineage_wu
if lineage_properties_aspect:
aspects = dataset_snapshot.aspects
if aspects is None:
aspects = []
dataset_properties_aspect: Optional[DatasetPropertiesClass] = None
for aspect in aspects:
if isinstance(aspect, DatasetPropertiesClass):
dataset_properties_aspect = aspect
if dataset_properties_aspect is None:
dataset_properties_aspect = DatasetPropertiesClass()
aspects.append(dataset_properties_aspect)
custom_properties = (
{
**dataset_properties_aspect.customProperties,
**lineage_properties_aspect.customProperties,
}
if dataset_properties_aspect.customProperties
else lineage_properties_aspect.customProperties
)
dataset_properties_aspect.customProperties = custom_properties
dataset_snapshot.aspects = aspects
dataset_snapshot.aspects.append(dataset_properties_aspect)
# Emit the work unit from super.
yield wu
def _get_all_tables(self) -> Set[str]:
all_tables_query: str = textwrap.dedent(
"""\
SELECT database, name AS table_name
FROM system.tables
WHERE name NOT LIKE '.inner%'"""
)
all_tables_set = set()
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
for db_row in engine.execute(text(all_tables_query)):
all_tables_set.add(f'{db_row["database"]}.{db_row["table_name"]}')
return all_tables_set
def _populate_lineage_map(
self, query: str, lineage_type: LineageCollectorType
) -> None:
"""
This method generate table level lineage based with the given query.
The query should return the following columns: target_schema, target_table, source_table, source_schema
:param query: The query to run to extract lineage.
:type query: str
:param lineage_type: The way the lineage should be processed
:type lineage_type: LineageType
return: The method does not return with anything as it directly modify the self._lineage_map property.
:rtype: None
"""
assert self._lineage_map is not None
if not self._all_tables_set:
self._all_tables_set = self._get_all_tables()
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
try:
for db_row in engine.execute(text(query)):
if not self.config.schema_pattern.allowed(
db_row["target_schema"]
) or not self.config.table_pattern.allowed(db_row["target_table"]):
continue
# Target
target_path = (
f'{self.config.platform_instance+"." if self.config.platform_instance else ""}'
f'{db_row["target_schema"]}.{db_row["target_table"]}'
)
target = LineageItem(
dataset=LineageDataset(
platform=LineageDatasetPlatform.CLICKHOUSE, path=target_path
),
upstreams=set(),
collector_type=lineage_type,
)
# Source
platform = LineageDatasetPlatform.CLICKHOUSE
path = f'{db_row["source_schema"]}.{db_row["source_table"]}'
sources = [
LineageDataset(
platform=platform,
path=path,
)
]
for source in sources:
# Filtering out tables which does not exist in ClickHouse
# It was deleted in the meantime
if (
source.platform == LineageDatasetPlatform.CLICKHOUSE
and source.path not in self._all_tables_set
):
logger.warning(f"{source.path} missing table")
continue
target.upstreams.add(source)
# Merging downstreams if dataset already exists and has downstreams
if target.dataset.path in self._lineage_map:
self._lineage_map[
target.dataset.path
].upstreams = self._lineage_map[
target.dataset.path
].upstreams.union(
target.upstreams
)
else:
self._lineage_map[target.dataset.path] = target
logger.info(
f"Lineage[{target}]:{self._lineage_map[target.dataset.path]}"
)
except Exception as e:
logger.warning(
f"Extracting {lineage_type.name} lineage from ClickHouse failed."
f"Continuing...\nError was {e}."
)
def _populate_lineage(self) -> None:
# only dictionaries with clickhouse as a source are supported
table_lineage_query = textwrap.dedent(
"""\
SELECT extractAll(engine_full, '''(.*?)''')[2] AS source_schema
, extractAll(engine_full, '''(.*?)''')[3] AS source_table
, database AS target_schema
, name AS target_table
FROM system.tables
WHERE engine IN ('Distributed')
UNION ALL
SELECT extract(create_table_query, 'DB ''(.*?)''') AS source_schema
, extract(create_table_query, 'TABLE ''(.*?)''') AS source_table
, database AS target_schema
, name AS target_table
FROM system.tables
WHERE engine IN ('Dictionary')
AND create_table_query LIKE '%SOURCE(CLICKHOUSE(%'
ORDER BY target_schema, target_table, source_schema, source_table"""
)
view_lineage_query = textwrap.dedent(
"""\
WITH
(SELECT groupUniqArray(concat(database, '.', name))
FROM system.tables
) AS tables
SELECT substring(source, 1, position(source, '.') - 1) AS source_schema
, substring(source, position(source, '.') + 1) AS source_table
, database AS target_schema
, name AS target_table
FROM system.tables
ARRAY JOIN arrayIntersect(splitByRegexp('[\\s()'']+', create_table_query), tables) AS source
WHERE engine IN ('View')
AND NOT (source_schema = target_schema AND source_table = target_table)
ORDER BY target_schema, target_table, source_schema, source_table"""
)
# get materialized view downstream and upstream
materialized_view_lineage_query = textwrap.dedent(
"""\
SELECT source_schema, source_table, target_schema, target_table
FROM (
WITH
(SELECT groupUniqArray(concat(database, '.', name))
FROM system.tables
) AS tables
SELECT substring(source, 1, position(source, '.') - 1) AS source_schema
, substring(source, position(source, '.') + 1) AS source_table
, database AS target_schema
, name AS target_table
, extract(create_table_query, 'TO (.*?) \\(') AS extract_to
FROM system.tables
ARRAY JOIN arrayIntersect(splitByRegexp('[\\s()'']+', create_table_query), tables) AS source
WHERE engine IN ('MaterializedView')
AND NOT (source_schema = target_schema AND source_table = target_table)
AND source <> extract_to
UNION ALL
SELECT database AS source_schema
, name AS source_table
, substring(extract_to, 1, position(extract_to, '.') - 1) AS target_schema
, substring(extract_to, position(extract_to, '.') + 1) AS target_table
, extract(create_table_query, 'TO (.*?) \\(') AS extract_to
FROM system.tables
WHERE engine IN ('MaterializedView')
AND extract_to <> '')
ORDER BY target_schema, target_table, source_schema, source_table"""
)
if not self._lineage_map:
self._lineage_map = defaultdict()
if self.config.include_tables:
# Populate table level lineage for dictionaries and distributed tables
self._populate_lineage_map(
query=table_lineage_query, lineage_type=LineageCollectorType.TABLE
)
if self.config.include_views:
# Populate table level lineage for views
self._populate_lineage_map(
query=view_lineage_query, lineage_type=LineageCollectorType.VIEW
)
if self.config.include_materialized_views:
# Populate table level lineage for materialized_views
self._populate_lineage_map(
query=materialized_view_lineage_query,
lineage_type=LineageCollectorType.MATERIALIZED_VIEW,
)
def get_lineage_mcp(
self, dataset_urn: str
) -> Tuple[
Optional[MetadataChangeProposalWrapper], Optional[DatasetPropertiesClass]
]:
dataset_key = mce_builder.dataset_urn_to_key(dataset_urn)
if dataset_key is None:
return None, None
if not self._lineage_map:
self._populate_lineage()
assert self._lineage_map is not None
upstream_lineage: List[UpstreamClass] = []
custom_properties: Dict[str, str] = {}
if dataset_key.name in self._lineage_map:
item = self._lineage_map[dataset_key.name]
for upstream in item.upstreams:
upstream_table = UpstreamClass(
dataset=builder.make_dataset_urn_with_platform_instance(
upstream.platform.value,
upstream.path,
self.config.platform_instance,
self.config.env,
),
type=item.dataset_lineage_type,
)
upstream_lineage.append(upstream_table)
properties = None
if custom_properties:
properties = DatasetPropertiesClass(customProperties=custom_properties)
if not upstream_lineage:
return None, properties
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="upstreamLineage",
aspect=UpstreamLineage(upstreams=upstream_lineage),
)
return mcp, properties

View File

@ -95,6 +95,8 @@ logger: logging.Logger = logging.getLogger(__name__)
def get_platform_from_sqlalchemy_uri(sqlalchemy_uri: str) -> str:
if sqlalchemy_uri.startswith("bigquery"):
return "bigquery"
if sqlalchemy_uri.startswith("clickhouse"):
return "clickhouse"
if sqlalchemy_uri.startswith("druid"):
return "druid"
if sqlalchemy_uri.startswith("mssql"):

View File

@ -0,0 +1,226 @@
import collections
import dataclasses
import logging
from datetime import datetime
from typing import Dict, Iterable, List
from dateutil import parser
from pydantic.main import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.clickhouse import ClickHouseConfig
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
logger = logging.getLogger(__name__)
clickhouse_datetime_format = "%Y-%m-%d %H:%M:%S"
clickhouse_usage_sql_comment = """\
SELECT user AS usename
, query
, substring(full_table_name, 1, position(full_table_name, '.') - 1) AS schema_
, substring(full_table_name, position(full_table_name, '.') + 1) AS table
, arrayMap(x -> substr(x, length(full_table_name) + 2),
arrayFilter(x -> startsWith(x, full_table_name || '.'), columns)) AS columns
, query_start_time AS starttime
, event_time AS endtime
FROM {query_log_table}
ARRAY JOIN tables AS full_table_name
WHERE is_initial_query
AND type = 'QueryFinish'
AND query_kind = 'Select'
AND full_table_name NOT LIKE 'system.%%'
AND full_table_name NOT LIKE '_table_function.%%'
AND table NOT LIKE '`.inner%%'
AND event_time >= '{start_time}'
AND event_time < '{end_time}'
ORDER BY event_time DESC"""
ClickHouseTableRef = str
AggregatedDataset = GenericAggregatedDataset[ClickHouseTableRef]
class ClickHouseJoinedAccessEvent(BaseModel):
usename: str = None # type:ignore
query: str = None # type: ignore
schema_: str = None # type:ignore
table: str = None # type:ignore
columns: List[str]
starttime: datetime
endtime: datetime
class ClickHouseUsageConfig(ClickHouseConfig, BaseUsageConfig):
env: str = builder.DEFAULT_ENV
email_domain: str
options: dict = {}
query_log_table: str = "system.query_log"
def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url()
@dataclasses.dataclass
class ClickHouseUsageSource(Source):
config: ClickHouseUsageConfig
report: SourceReport = dataclasses.field(default_factory=SourceReport)
@classmethod
def create(cls, config_dict, ctx):
config = ClickHouseUsageConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
"""Gets ClickHouse usage stats as work units"""
access_events = self._get_clickhouse_history()
# If the query results is empty, we don't want to proceed
if not access_events:
return []
joined_access_event = self._get_joined_access_event(access_events)
aggregated_info = self._aggregate_access_events(joined_access_event)
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu
def _make_usage_query(self) -> str:
return clickhouse_usage_sql_comment.format(
query_log_table=self.config.query_log_table,
start_time=self.config.start_time.strftime(clickhouse_datetime_format),
end_time=self.config.end_time.strftime(clickhouse_datetime_format),
)
def _make_sql_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url = {url}")
engine = create_engine(url, **self.config.options)
return engine
def _get_clickhouse_history(self):
query = self._make_usage_query()
engine = self._make_sql_engine()
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)
# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
if isinstance(v, str):
event_dict[k] = v.strip()
if not self.config.schema_pattern.allowed(
event_dict.get("schema_")
) or not self.config.table_pattern.allowed(event_dict.get("table")):
continue
if event_dict.get("starttime", None):
event_dict["starttime"] = event_dict.get("starttime").__str__()
if event_dict.get("endtime", None):
event_dict["endtime"] = event_dict.get("endtime").__str__()
# when the http protocol is used, the columns field is returned as a string
if isinstance(event_dict.get("columns"), str):
event_dict["columns"] = (
event_dict.get("columns").replace("'", "").strip("][").split(",")
)
logger.debug(f"event_dict: {event_dict}")
events.append(event_dict)
if events:
return events
# SQL results can be empty. If results is empty, the SQL connection closes.
# Then, we don't want to proceed ingestion.
logging.info("SQL Result is empty")
return None
def _convert_str_to_datetime(self, v):
if isinstance(v, str):
isodate = parser.parse(v) # compatible with Python 3.6+
return isodate.strftime(clickhouse_datetime_format)
def _get_joined_access_event(self, events):
joined_access_events = []
for event_dict in events:
event_dict["starttime"] = self._convert_str_to_datetime(
event_dict.get("starttime")
)
event_dict["endtime"] = self._convert_str_to_datetime(
event_dict.get("endtime")
)
if not (event_dict.get("schema_", None) and event_dict.get("table", None)):
logging.info("An access event parameter(s) is missing. Skipping ....")
continue
if not event_dict.get("usename") or event_dict["usename"] == "":
logging.info("The username parameter is missing. Skipping ....")
continue
joined_access_events.append(ClickHouseJoinedAccessEvent(**event_dict))
return joined_access_events
def _aggregate_access_events(
self, events: List[ClickHouseJoinedAccessEvent]
) -> Dict[datetime, Dict[ClickHouseTableRef, AggregatedDataset]]:
datasets: Dict[
datetime, Dict[ClickHouseTableRef, AggregatedDataset]
] = collections.defaultdict(dict)
for event in events:
floored_ts = get_time_bucket(event.starttime, self.config.bucket_duration)
resource = (
f'{self.config.platform_instance+"." if self.config.platform_instance else ""}'
f"{event.schema_}.{event.table}"
)
agg_bucket = datasets[floored_ts].setdefault(
resource,
AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
)
# current limitation in user stats UI, we need to provide email to show users
user_email = f"{event.usename if event.usename else 'unknown'}"
if "@" not in user_email:
user_email += f"@{self.config.email_domain}"
logger.info(f"user_email: {user_email}")
agg_bucket.add_read_entry(
user_email,
event.query,
event.columns,
)
return datasets
def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
return agg.make_usage_workunit(
self.config.bucket_duration,
lambda resource: builder.make_dataset_urn(
"clickhouse", resource, self.config.env
),
self.config.top_n_queries,
)
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
pass

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,32 @@
run_id: clickhouse-test
source:
type: clickhouse
config:
username: clickhouseuser
password: clickhousepass
database: clickhousedb
host_port: localhost:58123
platform_instance: clickhousetestserver
schema_pattern:
allow:
- "^db1"
profile_pattern:
allow:
- "clickhousetestserver.db1.mv_target_table"
profiling:
enabled: true
include_field_null_count: true
include_field_min_value: true
include_field_max_value: true
include_field_mean_value: true
include_field_median_value: true
include_field_stddev_value: true
include_field_quantiles: true
include_field_distinct_value_frequencies: true
include_field_histogram: true
include_field_sample_values: true
sink:
type: file
config:
filename: "./clickhouse_mces.json"

View File

@ -0,0 +1,14 @@
version: "2"
services:
clickhouse:
image: yandex/clickhouse-server:22.1
container_name: "testclickhouse"
environment:
CLICKHOUSE_USER: clickhouseuser
CLICKHOUSE_PASSWORD: clickhousepass
CLICKHOUSE_DB: clickhousedb
ports:
- "58123:8123"
- "59000:9000"
volumes:
- ./setup:/docker-entrypoint-initdb.d

View File

@ -0,0 +1,144 @@
CREATE DATABASE db1;
CREATE TABLE db1.test_data_types
(
`col_Array` Array(String),
`col_Bool` Bool COMMENT 'https://github.com/ClickHouse/ClickHouse/pull/31072',
`col_Date` Date,
`col_Date32` Date32 COMMENT 'this type was added in ClickHouse v21.9',
`col_DateTime` DateTime,
`col_DatetimeTZ` DateTime('Europe/Berlin'),
`col_DateTime32` DateTime,
`col_DateTime64` DateTime64(3),
`col_DateTime64TZ` DateTime64(2, 'Europe/Berlin'),
`col_Decimal` Decimal(2, 1),
`col_Decimal128` Decimal(38, 2),
`col_Decimal256` Decimal(76, 3),
`col_Decimal32` Decimal(9, 4),
`col_Decimal64` Decimal(18, 5),
`col_Enum` Enum8('hello' = 1, 'world' = 2),
`col_Enum16` Enum16('hello' = 1, 'world' = 2),
`col_Enum8` Enum8('hello' = 1, 'world' = 2),
`col_FixedString` FixedString(128),
`col_Float32` Float32,
`col_Float64` Float64,
`col_IPv4` IPv4,
`col_IPv6` IPv6,
`col_Int128` Int128,
`col_Int16` Int16,
`col_Int256` Int256,
`col_Int32` Int32,
`col_Int64` Int64,
`col_Int8` Int8,
`col_Map` Map(String, Nullable(UInt64)),
`col_String` String,
`col_Tuple` Tuple(UInt8, Array(String)),
`col_UInt128` UInt128,
`col_UInt16` UInt16,
`col_UInt256` UInt256,
`col_UInt32` UInt32,
`col_UInt64` UInt64,
`col_UInt8` UInt8,
`col_UUID` UUID
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS index_granularity = 8192
COMMENT 'This table has basic types';
CREATE TABLE db1.test_nested_data_types
(
`col_ArrayArrayInt` Array(Array(Int8)) COMMENT 'this is a comment',
`col_LowCardinality` LowCardinality(String),
`col_AggregateFunction` AggregateFunction(avg, Float64),
`col_SimpleAggregateFunction` SimpleAggregateFunction(max, Decimal(38, 7)),
`col_Nested.c1` Array(UInt32),
`col_Nested.c2` Array(UInt64),
`col_Nested.c3.c4` Array(UInt128),
`col_Nullable` Nullable(Int8),
`col_Array_Nullable_String` Array(Nullable(String)),
`col_LowCardinality_Nullable_String` LowCardinality(Nullable(String))
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS index_granularity = 8192
COMMENT 'This table has nested types';
CREATE DICTIONARY db1.test_dict
(
`col_Int64` Int64,
`col_String` String
)
PRIMARY KEY col_Int64
SOURCE(CLICKHOUSE(DB 'db1' TABLE 'test_data_types'))
LAYOUT(DIRECT());
CREATE VIEW db1.test_view
(
`col_String` String
) AS
SELECT dictGetOrDefault('db1.test_dict', 'col_String', toUInt64(123), 'na') AS col_String;
CREATE TABLE db1.mv_target_table
(
`col_DateTime` DateTime,
`col_Int64` Int64,
`col_Float64` Float64,
`col_Decimal64` Decimal(18, 5),
`col_String` Nullable(String)
)
ENGINE = MergeTree
ORDER BY col_Int64
SETTINGS index_granularity = 8192
COMMENT 'This is target table for materialized view';
-- https://clickhouse.com/docs/en/sql-reference/table-functions/generate/#generaterandom
INSERT INTO db1.mv_target_table
SELECT *
FROM generateRandom('col_DateTime DateTime, col_Int64 Int64, col_Float64 Float64, col_Decimal64 Decimal(18, 5), col_String Nullable(String)',
5 -- random_seed
)
LIMIT 10;
CREATE MATERIALIZED VIEW db1.mv_with_target_table TO db1.mv_target_table
(
`col_DateTime` DateTime,
`col_Int64` Int64,
`col_Float64` Float64,
`col_Decimal64` Decimal(18, 5),
`col_String` String
) AS
SELECT
col_DateTime,
col_Int64,
col_Float64,
col_Decimal64,
col_String
FROM db1.test_data_types;
CREATE MATERIALIZED VIEW db1.mv_without_target_table
(
`col_ArrayArrayInt` Array(Array(Int8)),
`col_LowCardinality` LowCardinality(String),
`col_Nullable` Nullable(Int8),
`col_Array_Nullable_String` Array(Nullable(String)),
`col_LowCardinality_Nullable_String` LowCardinality(Nullable(String))
)
ENGINE = MergeTree
PRIMARY KEY tuple()
ORDER BY tuple()
SETTINGS index_granularity = 8192 AS
SELECT
col_ArrayArrayInt,
col_LowCardinality,
col_Nullable,
col_Array_Nullable_String,
col_LowCardinality_Nullable_String
FROM db1.test_nested_data_types;

View File

@ -0,0 +1,32 @@
import pytest
from freezegun import freeze_time
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2020-04-14 07:00:00"
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_clickhouse_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/clickhouse"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "clickhouse"
) as docker_services:
wait_for_port(docker_services, "testclickhouse", 8123, timeout=120)
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "clickhouse_to_file.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "clickhouse_mces.json",
golden_path=test_resources_dir / "clickhouse_mces_golden.json",
)

View File

@ -0,0 +1,55 @@
import pytest
@pytest.mark.integration
def test_clickhouse_uri_https():
from datahub.ingestion.source.sql.clickhouse import ClickHouseConfig
config = ClickHouseConfig.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "host:1111",
"database": "db",
"protocol": "https",
}
)
assert (
config.get_sql_alchemy_url()
== "clickhouse://user:password@host:1111/db?protocol=https"
)
@pytest.mark.integration
def test_clickhouse_uri_native():
from datahub.ingestion.source.sql.clickhouse import ClickHouseConfig
config = ClickHouseConfig.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "host:1111",
"scheme": "clickhouse+native",
}
)
assert config.get_sql_alchemy_url() == "clickhouse+native://user:password@host:1111"
@pytest.mark.integration
def test_clickhouse_uri_native_secure():
from datahub.ingestion.source.sql.clickhouse import ClickHouseConfig
config = ClickHouseConfig.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "host:1111",
"database": "db",
"scheme": "clickhouse+native",
"secure": True,
}
)
assert (
config.get_sql_alchemy_url()
== "clickhouse+native://user:password@host:1111/db?secure=true"
)

View File

@ -38,6 +38,16 @@
"type": "OBJECT_STORE"
}
},
{
"urn": "urn:li:dataPlatform:clickhouse",
"aspect": {
"datasetNameDelimiter": ".",
"name": "clickhouse",
"displayName": "ClickHouse",
"type": "RELATIONAL_DB",
"logoUrl": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/clickhouselogo.png"
}
},
{
"urn": "urn:li:dataPlatform:couchbase",
"aspect": {