diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json index cb77a1cc4ea..6c9c68d0b92 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json @@ -24,7 +24,8 @@ "Glue", "MariaDB", "Druid", - "Db2" + "Db2", + "ClickHouse" ], "javaEnums": [ { @@ -74,6 +75,9 @@ }, { "name": "Db2" + }, + { + "name": "ClickHouse" } ] }, diff --git a/ingestion/examples/workflows/clickhouse.json b/ingestion/examples/workflows/clickhouse.json new file mode 100644 index 00000000000..051d0400a55 --- /dev/null +++ b/ingestion/examples/workflows/clickhouse.json @@ -0,0 +1,26 @@ +{ + "source": { + "type": "clickhouse", + "config": { + "username":"default", + "password":"", + "database": "default", + "service_name": "local_clickhouse", + "schema_filter_pattern": { + "excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"] + } + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } + } + \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index 819551fc7d3..94bf287e6f3 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -114,6 +114,7 @@ plugins: Dict[str, Set[str]] = { "mlflow": {"mlflow-skinny~=1.22.0"}, "sklearn": {"scikit-learn==1.0.2"}, "db2": {"ibm-db-sa==0.3.7"}, + "clickhouse": {"clickhouse-driver==0.2.3", "clickhouse-sqlalchemy==0.1.8"}, } dev = { "boto3==1.20.14", diff --git a/ingestion/src/metadata/ingestion/source/clickhouse.py b/ingestion/src/metadata/ingestion/source/clickhouse.py new file mode 100644 index 00000000000..f94ec76a097 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/clickhouse.py @@ -0,0 +1,45 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from clickhouse_sqlalchemy.drivers.base import ClickHouseDialect +from sqlalchemy.engine import reflection + +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.ingestion.source.sql_source import SQLSource +from metadata.ingestion.source.sql_source_common import SQLConnectionConfig + + +@reflection.cache +def get_pk_constraint(self, bind, table_name, schema=None, **kw): + return {"constrained_columns": [], "name": "undefined"} + + +ClickHouseDialect.get_pk_constraint = get_pk_constraint + + +class ClickhouseConfig(SQLConnectionConfig): + host_port = "localhost:8123" + scheme = "clickhouse+http" + service_type = "ClickHouse" + + def get_connection_url(self): + return super().get_connection_url() + + +class ClickhouseSource(SQLSource): + def __init__(self, config, metadata_config, ctx): + super().__init__(config, metadata_config, ctx) + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = ClickhouseConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 31dfe61120e..cd3087846fd 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -505,6 +505,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]): arr_data_type = re.match( r"(?:\w*)(?:[(]*)(\w*)(?:.*)", str(column["type"]) ).groups() + if isinstance(arr_data_type, list) or isinstance( + arr_data_type, tuple + ): + arr_data_type = ColumnTypeParser.get_column_type( + arr_data_type[0] + ) data_type_display = column["type"] if parsed_string is None: col_type = ColumnTypeParser.get_column_type(column["type"])