feat: add support for doris datasource (#14087)

* feat: add support for doris datasource

* fix: fix python style check

* fix: add pydoris dependency

* fix: add pydoris dependency

* fix: py_format_check

* fix: parse error when doris view column is VARCHAR(*), check data length if not digit then return 1

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
chyueyi 2023-11-28 16:27:52 +08:00 committed by GitHub
parent b56b67bf67
commit b6b337e09a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 668 additions and 2 deletions

View File

@ -174,6 +174,7 @@ plugins: Dict[str, Set[str]] = {
"deltalake": {"delta-spark<=2.3.0"},
"docker": {"python_on_whales==0.55.0"},
"domo": {VERSIONS["pydomo"]},
"doris": {"pydoris==1.0.2"},
"druid": {"pydruid>=0.6.5"},
"dynamodb": {VERSIONS["boto3"]},
"elasticsearch": {

View File

@ -0,0 +1,72 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source connection handler
"""
from typing import Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.dorisConnection import (
DorisConnection,
)
from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
get_connection_url_common,
init_empty_connection_options,
)
from metadata.ingestion.connections.test_connections import (
test_connection_db_schema_sources,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
def get_connection(connection: DorisConnection) -> Engine:
"""
Create connection
"""
if connection.sslCA or connection.sslCert or connection.sslKey:
if not connection.connectionOptions:
connection.connectionOptions = init_empty_connection_options()
if connection.sslCA:
connection.connectionOptions.__root__["ssl_ca"] = connection.sslCA
if connection.sslCert:
connection.connectionOptions.__root__["ssl_cert"] = connection.sslCert
if connection.sslKey:
connection.connectionOptions.__root__["ssl_key"] = connection.sslKey
return create_generic_db_connection(
connection=connection,
get_connection_url_fn=get_connection_url_common,
get_connection_args_fn=get_connection_args_common,
)
def test_connection(
metadata: OpenMetadata,
engine: Engine,
service_connection: DorisConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
test_connection_db_schema_sources(
metadata=metadata,
engine=engine,
service_connection=service_connection,
automation_workflow=automation_workflow,
)

View File

@ -0,0 +1,315 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mysql source module"""
import re
import traceback
from typing import Dict, Iterable, List, Optional, Tuple, cast
from pydoris.sqlalchemy import datatype
from pydoris.sqlalchemy.dialect import DorisDialect
from sqlalchemy import sql
from sqlalchemy.dialects.mysql.reflection import MySQLTableDefinitionParser
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.data.table import (
Column,
IntervalType,
TableConstraint,
TablePartition,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.dorisConnection import (
DorisConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.doris.queries import (
DORIS_GET_TABLE_NAMES,
DORIS_PARTITION_DETAILS,
DORIS_SHOW_FULL_COLUMNS,
)
from metadata.ingestion.source.database.doris.utils import (
get_table_comment,
get_table_names_and_type,
)
from metadata.ingestion.source.database.mysql.utils import parse_column
from metadata.utils.logger import ingestion_logger
MySQLTableDefinitionParser._parse_column = ( # pylint: disable=protected-access
parse_column
)
RELKIND_MAP = {
"Doris": TableType.Regular,
"View": TableType.View,
"MEMORY": TableType.View,
}
DorisDialect.get_table_names_and_type = get_table_names_and_type
DorisDialect.get_table_comment = get_table_comment
logger = ingestion_logger()
def extract_number(data):
"""
extract data type length for CHAR, VARCHAR, DECIMAL, such as CHAR(1), return ['1'],
DECIMAL[9,0] return ['9', '0']
"""
result = re.findall(r"\((.*?)\)", data)
# doris view column may be VARCHAR(*), check data length if not digit then return 1
if result:
result = [i.strip() if i.strip().isdigit() else 1 for i in result[0].split(",")]
return result
return []
def extract_child(data):
"""
extract_child for ARRAY and Struct, such as ARRAY<INT(11)>, then return INT(11)
"""
result = re.findall(r"(?<=<).+(?=>)", data)
return result[0]
def _parse_type(_type):
"""
parse raw type to system_data_type like CHAR(1) -> CHAR, STRUCT<s_id:int(11),s_name:text> -> STRUCT,
DECIMALV3(9, 0) -> DECIMAL, DATEV2 -> DATE
"""
parse_type = _type.split("(")[0].split("<")[0]
if parse_type[-2] == "v":
system_data_type = parse_type.split("v")[0].upper()
else:
system_data_type = parse_type.upper()
return system_data_type
def _get_column(ordinal, field, _type, null, default, comment):
_type = _type.lower()
system_data_type = _parse_type(_type)
data_type = datatype.parse_sqltype(_type)
if system_data_type in ["VARCHAR", "CHAR"]:
data_type.length = extract_number(_type)[0]
if system_data_type == "DECIMAL":
number = extract_number(_type)
if len(number) == 2:
data_type.precision = number[0]
data_type.scale = number[1]
arr_data_type = None
children = None
if system_data_type == "ARRAY":
arr_data_type = _parse_type(extract_child(_type))
if system_data_type == "STRUCT":
children = []
for k, child in enumerate(extract_child(_type).split(",")):
name_type = child.split(":")
children.append(
_get_column(k, name_type[0], name_type[1], "YES", None, None)
)
return {
"name": field,
"default": default,
"nullable": True if null == "YES" else None,
"type": _type,
"data_type": data_type,
"display_type": _type.lower(),
"system_data_type": system_data_type,
"comment": comment,
"ordinalPosition": ordinal,
"arr_data_type": arr_data_type,
"children": children,
}
class DorisSource(CommonDbSourceService):
"""
Implements the necessary methods to extract
Database metadata from Mysql Source
"""
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
if config.serviceConnection is None:
raise InvalidSourceException("Missing service connection")
connection = cast(DorisConnection, config.serviceConnection.__root__.config)
if not isinstance(connection, DorisConnection):
raise InvalidSourceException(
f"Expected DorisConnection, but got {connection}"
)
return cls(config, metadata)
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the table
name and type. By default, use the inspector method
to get the names and pass the Regular type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., external, foreign,...
"""
tables = [
TableNameAndType(name=name, type_=RELKIND_MAP.get(engine))
for name, engine in self.connection.execute(
sql.text(DORIS_GET_TABLE_NAMES), {"schema": schema_name}
)
or []
]
return tables
@staticmethod
def get_table_description(
schema_name: str, table_name: str, inspector: Inspector
) -> str:
description = None
try:
table_info: dict = inspector.get_table_comment(table_name, schema_name)
# Catch any exception without breaking the ingestion
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Table description error for table [{schema_name}.{table_name}]: {exc}"
)
else:
description = table_info.get("text")
return description[0]
def _get_columns(self, table_name, schema=None):
"""
Overriding the dialect method to add raw_data_type in response
"""
table_columns = []
primary_columns = []
# row schema: Field, Type, Collation, Null, Key, Default, Extra, Privileges, Comment
for i, row in enumerate(
self.connection.execute(
sql.text(DORIS_SHOW_FULL_COLUMNS.format(schema, table_name))
)
):
table_columns.append(_get_column(i, row[0], row[1], row[3], row[5], row[8]))
if row[4] == "YES":
primary_columns.append(row[0])
return table_columns, primary_columns
def get_columns_and_constraints(
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
) -> Tuple[
Optional[List[Column]], Optional[List[TableConstraint]], Optional[List[Dict]]
]:
"""
:param schema_name:
:param table_name:
:param db_name:
:param inspector:
:return:
"""
table_constraints = []
table_columns = []
columns, primary_columns = self._get_columns(table_name, schema_name)
for column in columns:
try:
children = None
if column["children"]:
children = [
Column(
name=child["name"] if child["name"] else " ",
description=child.get("comment"),
dataType=child["system_data_type"],
dataTypeDisplay=child["display_type"],
dataLength=self._check_col_length(
child["system_data_type"], child["data_type"]
),
constraint=None,
children=child["children"],
arrayDataType=child["arr_data_type"],
ordinalPosition=child.get("ordinalPosition"),
)
for child in column["children"]
]
self.process_additional_table_constraints(
column=column, table_constraints=table_constraints
)
col_constraint = self._get_column_constraints(
column, primary_columns, []
)
col_data_length = self._check_col_length(
column["system_data_type"], column["data_type"]
)
if col_data_length is None:
col_data_length = 1
if column["system_data_type"] is None:
logger.warning(
f"Unknown type {repr(column['type'])}: {column['name']}"
)
om_column = Column(
name=column["name"] if column["name"] else " ",
description=column.get("comment"),
dataType=column["system_data_type"],
dataTypeDisplay=column.get("type"),
dataLength=col_data_length,
constraint=col_constraint,
children=children,
arrayDataType=column["arr_data_type"],
ordinalPosition=column.get("ordinalPosition"),
)
if column["system_data_type"] == "DECIMAL":
om_column.precision = column["data_type"].precision
om_column.scale = column["data_type"].scale
om_column.tags = self.get_column_tag_labels(
table_name=table_name, column=column
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception processing column [{column}]: {exc}"
)
continue
table_columns.append(om_column)
return table_columns, [], []
def get_table_partition_details(
self,
table_name: str,
schema_name: str,
inspector: Inspector,
) -> Tuple[bool, Optional[TablePartition]]:
"""
check if the table is partitioned table and return the partition details
"""
try:
result = self.engine.execute(
sql.text(DORIS_PARTITION_DETAILS.format(schema_name, table_name))
).all()
if result and result[0].PartitionKey != "":
partition_details = TablePartition(
intervalType=IntervalType.TIME_UNIT.value,
columns=result[0].PartitionKey.split(", "),
)
return True, partition_details
return False, None
except Exception:
return False, None

View File

@ -0,0 +1,54 @@
"""
SQL Queries used during ingestion
"""
import textwrap
DORIS_GET_SCHEMA_COLUMN_INFO = textwrap.dedent(
"""
SELECT COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION,NUMERIC_SCALE,COLUMN_TYPE,COLUMN_KEY,COLUMN_COMMENT,ORDINAL_POSITION
from information_schema.`columns` t
where TABLE_SCHEMA = :schema
AND TABLE_NAME = :table_name
"""
)
DORIS_SHOW_FULL_COLUMNS = textwrap.dedent(
"""
SHOW FULL COLUMNS FROM {}.{}
"""
)
DORIS_GET_TABLE_NAMES = textwrap.dedent(
"""
select TABLE_NAME as name, `ENGINE` as engine
from INFORMATION_SCHEMA.tables
where TABLE_SCHEMA = :schema
"""
)
DORIS_TABLE_COMMENTS = textwrap.dedent(
"""
SELECT TABLE_COMMENT
FROM information_schema.tables
WHERE TABLE_SCHEMA = :schema
AND TABLE_NAME = :table_name
"""
)
DORIS_VIEW_DEFINITIONS = textwrap.dedent(
"""
select
TABLE_NAME as view_name,
TABLE_SCHEMA as schema,
'' as view_def
from information_schema.tables where engine in ['MaterializedView', 'View']
"""
)
DORIS_PARTITION_DETAILS = textwrap.dedent(
"""
SHOW PARTITIONS FROM {}.{}
"""
)

View File

@ -0,0 +1,64 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MySQL SQLAlchemy Helper Methods
"""
import textwrap
from sqlalchemy import sql
from sqlalchemy.engine import reflection
from metadata.ingestion.source.database.doris.queries import (
DORIS_TABLE_COMMENTS,
DORIS_VIEW_DEFINITIONS,
)
from metadata.utils.sqlalchemy_utils import get_view_definition_wrapper
query = textwrap.dedent(
"""
select TABLE_NAME as name, `ENGINE` as engine
from INFORMATION_SCHEMA.tables
"""
)
@reflection.cache
def get_view_definition(self, connection, table_name, schema=None):
return get_view_definition_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=DORIS_VIEW_DEFINITIONS,
)
def get_table_names_and_type(_, connection, schema=None, **kw):
if schema:
query_sql = query + f" WHERE TABLE_SCHEMA = '{schema}'"
database = schema or connection.engine.url.database
rows = connection.execute(query_sql, database=database, **kw)
return list(rows)
@reflection.cache
def get_table_comment(_, connection, table_name, schema=None, **kw):
comment = None
rows = connection.execute(
sql.text(DORIS_TABLE_COMMENTS),
{"table_name": table_name, "schema": schema},
**kw,
)
for table_comment in rows:
comment = table_comment
break
return {"text": comment}

View File

@ -0,0 +1,33 @@
{
"name": "Doris",
"displayName": "Doris Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"steps": [
{
"name": "CheckAccess",
"description": "Validate that we can properly reach the database and authenticate with the given credentials.",
"errorMessage": "Failed to connect to doris, please validate the credentials",
"shortCircuit": true,
"mandatory": true
},
{
"name": "GetSchemas",
"description": "List all the schemas available to the user.",
"errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.",
"mandatory": true
},
{
"name": "GetTables",
"description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.",
"mandatory": true
},
{
"name": "GetViews",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false
}
]
}

View File

@ -151,7 +151,12 @@
"IMAGE",
"IPV4",
"IPV6",
"DATETIMERANGE"
"DATETIMERANGE",
"HLL",
"LARGEINT",
"QUANTILE_STATE",
"AGG_STATE",
"BITMAP"
]
},
"constraint": {

View File

@ -0,0 +1,102 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/dorisConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DorisConnection",
"description": "Doris Database Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.database.DorisConnection",
"definitions": {
"dorisType": {
"description": "Service type.",
"type": "string",
"enum": ["Doris"],
"default": "Doris"
},
"dorisScheme": {
"description": "SQLAlchemy driver scheme options.",
"type": "string",
"enum": ["doris"],
"default": "doris"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/dorisType",
"default": "Doris"
},
"scheme": {
"title": "Connection Scheme",
"description": "SQLAlchemy driver scheme options.",
"$ref": "#/definitions/dorisScheme",
"default": "doris"
},
"username": {
"title": "Username",
"description": "Username to connect to Doris. This user should have privileges to read all the metadata in Doris.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password to connect to Doris.",
"type": "string",
"format": "password"
},
"hostPort": {
"title": "Host and Port",
"description": "Host and port of the Doris service.",
"type": "string"
},
"databaseName": {
"title": "Database Name",
"description": "Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.",
"type": "string"
},
"databaseSchema": {
"title": "Database Schema",
"description": "Database Schema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion attempts to scan all the schemas.",
"type": "string"
},
"sslCA": {
"title": "SSL CA",
"description": "Provide the path to ssl ca file",
"type": "string"
},
"sslCert": {
"title": "SSL Client Certificate File",
"description": "Provide the path to ssl client certificate file (ssl_cert)",
"type": "string"
},
"sslKey": {
"title": "SSL Client Key File",
"description": "Provide the path to ssl client certificate file (ssl_key)",
"type": "string"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"title": "Connection Arguments",
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsQueryComment": {
"title": "Supports Query Comment",
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"
}
},
"additionalProperties": false,
"required": ["hostPort", "username"]
}

View File

@ -49,7 +49,8 @@
"SapHana",
"MongoDB",
"Couchbase",
"Greenplum"
"Greenplum",
"Doris"
],
"javaEnums": [
{
@ -156,6 +157,9 @@
},
{
"name": "Greenplum"
},
{
"name": "Doris"
}
]
},
@ -267,6 +271,9 @@
},
{
"$ref": "./connections/database/greenplumConnection.json"
},
{
"$ref": "./connections/database/dorisConnection.json"
}
]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

View File

@ -30,6 +30,7 @@ import databrick from '../assets/img/service-icon-databrick.png';
import datalake from '../assets/img/service-icon-datalake.png';
import deltalake from '../assets/img/service-icon-delta-lake.png';
import domo from '../assets/img/service-icon-domo.png';
import doris from '../assets/img/service-icon-doris.png';
import druid from '../assets/img/service-icon-druid.png';
import dynamodb from '../assets/img/service-icon-dynamodb.png';
import fivetran from '../assets/img/service-icon-fivetran.png';
@ -142,6 +143,7 @@ export const AZURESQL = azuresql;
export const CLICKHOUSE = clickhouse;
export const DATABRICK = databrick;
export const IBMDB2 = ibmdb2;
export const DORIS = doris;
export const DRUID = druid;
export const DYNAMODB = dynamodb;
export const SINGLESTORE = singlestore;

View File

@ -25,6 +25,7 @@ import DatalakeConnection from '../jsons/connectionSchemas/connections/database/
import db2Connection from '../jsons/connectionSchemas/connections/database/db2Connection.json';
import deltaLakeConnection from '../jsons/connectionSchemas/connections/database/deltaLakeConnection.json';
import domoDatabaseConnection from '../jsons/connectionSchemas/connections/database/domoDatabaseConnection.json';
import dorisConnection from '../jsons/connectionSchemas/connections/database/dorisConnection.json';
import druidConnection from '../jsons/connectionSchemas/connections/database/druidConnection.json';
import dynamoDBConnection from '../jsons/connectionSchemas/connections/database/dynamoDBConnection.json';
import glueConnection from '../jsons/connectionSchemas/connections/database/glueConnection.json';
@ -92,11 +93,17 @@ export const getDatabaseConfig = (type: DatabaseServiceType) => {
break;
}
case DatabaseServiceType.Doris: {
schema = dorisConnection;
break;
}
case DatabaseServiceType.Druid: {
schema = druidConnection;
break;
}
case DatabaseServiceType.DynamoDB: {
schema = dynamoDBConnection;

View File

@ -33,6 +33,7 @@ import {
DEFAULT_SERVICE,
DELTALAKE,
DOMO,
DORIS,
DRUID,
DYNAMODB,
ELASTIC_SEARCH,
@ -201,6 +202,9 @@ class ServiceUtilClassBase {
case DatabaseServiceType.Db2:
return IBMDB2;
case DatabaseServiceType.Doris:
return DORIS;
case DatabaseServiceType.Druid:
return DRUID;