mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-30 03:46:10 +00:00
parent
b90d0e6365
commit
1492f79da1
@ -1,76 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Source connection handler
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from metadata.generated.schema.entity.automations.workflow import (
|
||||
Workflow as AutomationWorkflow,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.duckdbConnection import (
|
||||
DuckDbConnection,
|
||||
SslMode,
|
||||
)
|
||||
from metadata.ingestion.connections.builders import (
|
||||
create_generic_db_connection,
|
||||
get_connection_args_common,
|
||||
get_connection_url_common,
|
||||
init_empty_connection_arguments,
|
||||
)
|
||||
from metadata.ingestion.connections.test_connections import test_connection_db_common
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.database.duckdb.queries import DUCKDB_GET_DATABASE
|
||||
|
||||
|
||||
def get_connection(connection: DuckDbConnection) -> Engine:
|
||||
"""
|
||||
Create connection
|
||||
"""
|
||||
if connection.sslMode:
|
||||
if not connection.connectionArguments:
|
||||
connection.connectionArguments = init_empty_connection_arguments()
|
||||
connection.connectionArguments.__root__["sslmode"] = connection.sslMode.value
|
||||
if connection.sslMode in (SslMode.verify_ca, SslMode.verify_full):
|
||||
connection.connectionArguments.__root__[
|
||||
"sslrootcert"
|
||||
] = connection.sslConfig.__root__.certificatePath
|
||||
return create_generic_db_connection(
|
||||
connection=connection,
|
||||
get_connection_url_fn=get_connection_url_common,
|
||||
get_connection_args_fn=get_connection_args_common,
|
||||
)
|
||||
|
||||
|
||||
def test_connection(
|
||||
metadata: OpenMetadata,
|
||||
engine: Engine,
|
||||
service_connection: DuckDbConnection,
|
||||
automation_workflow: Optional[AutomationWorkflow] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Test connection. This can be executed either as part
|
||||
of a metadata workflow or during an Automation Workflow
|
||||
"""
|
||||
queries = {
|
||||
"GetDatabases": DUCKDB_GET_DATABASE,
|
||||
}
|
||||
test_connection_db_common(
|
||||
metadata=metadata,
|
||||
engine=engine,
|
||||
service_connection=service_connection,
|
||||
automation_workflow=automation_workflow,
|
||||
queries=queries,
|
||||
)
|
@ -1,204 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
DuckDb source module
|
||||
"""
|
||||
import traceback
|
||||
from collections import namedtuple
|
||||
from typing import Iterable, Optional, Tuple
|
||||
|
||||
from sqlalchemy import sql
|
||||
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
IntervalType,
|
||||
TablePartition,
|
||||
TableType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.duckdbConnection import (
|
||||
DuckDbConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.ingestion.api.steps import InvalidSourceException
|
||||
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
|
||||
from metadata.ingestion.source.database.common_db_source import (
|
||||
CommonDbSourceService,
|
||||
TableNameAndType,
|
||||
)
|
||||
from metadata.ingestion.source.database.duckdb.queries import (
|
||||
DUCKDB_GET_DB_NAMES,
|
||||
DUCKDB_GET_TABLE_NAMES,
|
||||
DUCKDB_PARTITION_DETAILS,
|
||||
)
|
||||
from metadata.ingestion.source.database.duckdb.utils import (
|
||||
get_column_info,
|
||||
get_columns,
|
||||
get_table_comment,
|
||||
get_view_definition,
|
||||
)
|
||||
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.filters import filter_by_database
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_all_table_comments,
|
||||
get_all_view_definitions,
|
||||
)
|
||||
|
||||
TableKey = namedtuple("TableKey", ["schema", "table_name"])
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
INTERVAL_TYPE_MAP = {
|
||||
"list": IntervalType.COLUMN_VALUE.value,
|
||||
"hash": IntervalType.COLUMN_VALUE.value,
|
||||
"range": IntervalType.TIME_UNIT.value,
|
||||
}
|
||||
|
||||
RELKIND_MAP = {
|
||||
"r": TableType.Regular,
|
||||
"p": TableType.Partitioned,
|
||||
"f": TableType.Foreign,
|
||||
}
|
||||
|
||||
GEOMETRY = create_sqlalchemy_type("GEOMETRY")
|
||||
POINT = create_sqlalchemy_type("POINT")
|
||||
POLYGON = create_sqlalchemy_type("POLYGON")
|
||||
|
||||
ischema_names.update(
|
||||
{
|
||||
"geometry": GEOMETRY,
|
||||
"point": POINT,
|
||||
"polygon": POLYGON,
|
||||
"box": create_sqlalchemy_type("BOX"),
|
||||
"circle": create_sqlalchemy_type("CIRCLE"),
|
||||
"line": create_sqlalchemy_type("LINE"),
|
||||
"lseg": create_sqlalchemy_type("LSEG"),
|
||||
"path": create_sqlalchemy_type("PATH"),
|
||||
"pg_lsn": create_sqlalchemy_type("PG_LSN"),
|
||||
"pg_snapshot": create_sqlalchemy_type("PG_SNAPSHOT"),
|
||||
"tsquery": create_sqlalchemy_type("TSQUERY"),
|
||||
"txid_snapshot": create_sqlalchemy_type("TXID_SNAPSHOT"),
|
||||
"xml": create_sqlalchemy_type("XML"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
PGDialect.get_all_table_comments = get_all_table_comments
|
||||
PGDialect.get_table_comment = get_table_comment
|
||||
PGDialect._get_column_info = get_column_info # pylint: disable=protected-access
|
||||
PGDialect.get_view_definition = get_view_definition
|
||||
PGDialect.get_columns = get_columns
|
||||
PGDialect.get_all_view_definitions = get_all_view_definitions
|
||||
|
||||
PGDialect.ischema_names = ischema_names
|
||||
|
||||
|
||||
class DuckDbSource(CommonDbSourceService, MultiDBSource):
|
||||
"""
|
||||
Implements the necessary methods to extract
|
||||
Database metadata from DuckDb Source
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, metadata: OpenMetadataConnection):
|
||||
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||
connection: DuckDbConnection = config.serviceConnection.__root__.config
|
||||
if not isinstance(connection, DuckDbConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected DuckDbConnection, but got {connection}"
|
||||
)
|
||||
return cls(config, metadata)
|
||||
|
||||
def query_table_names_and_types(
|
||||
self, schema_name: str
|
||||
) -> Iterable[TableNameAndType]:
|
||||
"""
|
||||
Overwrite the inspector implementation to handle partitioned
|
||||
and foreign types
|
||||
"""
|
||||
result = self.connection.execute(
|
||||
sql.text(DUCKDB_GET_TABLE_NAMES),
|
||||
{"schema": schema_name},
|
||||
)
|
||||
|
||||
return [
|
||||
TableNameAndType(
|
||||
name=name, type_=RELKIND_MAP.get(relkind, TableType.Regular)
|
||||
)
|
||||
for name, relkind in result
|
||||
]
|
||||
|
||||
def get_configured_database(self) -> Optional[str]:
|
||||
if not self.service_connection.ingestAllDatabases:
|
||||
return self.service_connection.database
|
||||
return None
|
||||
|
||||
def get_database_names_raw(self) -> Iterable[str]:
|
||||
yield from self._execute_database_query(DUCKDB_GET_DB_NAMES)
|
||||
|
||||
def get_database_names(self) -> Iterable[str]:
|
||||
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
|
||||
configured_db = self.config.serviceConnection.__root__.config.database
|
||||
self.set_inspector(database_name=configured_db)
|
||||
yield configured_db
|
||||
else:
|
||||
for new_database in self.get_database_names_raw():
|
||||
database_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Database,
|
||||
service_name=self.context.database_service,
|
||||
database_name=new_database,
|
||||
)
|
||||
|
||||
if filter_by_database(
|
||||
self.source_config.databaseFilterPattern,
|
||||
database_fqn
|
||||
if self.source_config.useFqnForFiltering
|
||||
else new_database,
|
||||
):
|
||||
self.status.filter(database_fqn, "Database Filtered Out")
|
||||
continue
|
||||
|
||||
try:
|
||||
self.set_inspector(database_name=new_database)
|
||||
yield new_database
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Error trying to connect to database {new_database}: {exc}"
|
||||
)
|
||||
|
||||
def get_table_partition_details(
|
||||
self, table_name: str, schema_name: str, inspector: Inspector
|
||||
) -> Tuple[bool, TablePartition]:
|
||||
result = self.engine.execute(
|
||||
DUCKDB_PARTITION_DETAILS.format(
|
||||
table_name=table_name, schema_name=schema_name
|
||||
)
|
||||
).all()
|
||||
if result:
|
||||
partition_details = TablePartition(
|
||||
intervalType=INTERVAL_TYPE_MAP.get(
|
||||
result[0].partition_strategy, IntervalType.COLUMN_VALUE.value
|
||||
),
|
||||
columns=[row.column_name for row in result if row.column_name],
|
||||
)
|
||||
return True, partition_details
|
||||
return False, None
|
@ -1,141 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
SQL Queries used during ingestion
|
||||
"""
|
||||
|
||||
import textwrap
|
||||
|
||||
# https://www.postgresql.org/docs/current/catalog-pg-class.html
|
||||
# r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table,
|
||||
DUCKDB_GET_TABLE_NAMES = """
|
||||
select c.relname, c.relkind
|
||||
from pg_catalog.pg_class c
|
||||
left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
where c.relkind in ('r', 'p', 'f')
|
||||
and pr.oid is null
|
||||
and n.nspname = :schema
|
||||
"""
|
||||
|
||||
DUCKDB_PARTITION_DETAILS = textwrap.dedent(
|
||||
"""
|
||||
select
|
||||
ns.nspname as schema,
|
||||
par.relname as table_name,
|
||||
partition_strategy,
|
||||
col.column_name
|
||||
from
|
||||
(select
|
||||
parrelid,
|
||||
parnatts,
|
||||
case parkind
|
||||
when 'l' then 'list'
|
||||
when 'h' then 'hash'
|
||||
when 'r' then 'range' end as partition_strategy,
|
||||
unnest(paratts) column_index
|
||||
from
|
||||
pg_catalog.pg_partition) pt
|
||||
join
|
||||
pg_class par
|
||||
on
|
||||
par.oid = pt.parrelid
|
||||
left join
|
||||
pg_catalog.pg_namespace ns on par.relnamespace = ns.oid
|
||||
left join
|
||||
information_schema.columns col
|
||||
on
|
||||
col.table_schema = ns.nspname
|
||||
and col.table_name = par.relname
|
||||
and ordinal_position = pt.column_index
|
||||
where par.relname='{table_name}' and ns.nspname='{schema_name}'
|
||||
"""
|
||||
)
|
||||
|
||||
DUCKDB_TABLE_COMMENTS = """
|
||||
SELECT n.nspname as schema,
|
||||
c.relname as table_name,
|
||||
pgd.description as table_comment
|
||||
FROM pg_catalog.pg_class c
|
||||
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||||
LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid
|
||||
WHERE c.relkind in ('r', 'v', 'm', 'f', 'p')
|
||||
AND pgd.description IS NOT NULL
|
||||
AND n.nspname <> 'pg_catalog'
|
||||
ORDER BY "schema", "table_name"
|
||||
"""
|
||||
|
||||
# Postgres\DuckDb views definitions only contains the select query
|
||||
# hence we are appending "create view <schema>.<table> as " to select query
|
||||
# to generate the column level lineage
|
||||
DUCKDB_VIEW_DEFINITIONS = """
|
||||
SELECT
|
||||
n.nspname "schema",
|
||||
c.relname view_name,
|
||||
'create view ' || n.nspname || '.' || c.relname || ' as ' || pg_get_viewdef(c.oid,true) view_def
|
||||
FROM pg_class c
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE c.relkind IN ('v', 'm')
|
||||
AND n.nspname not in ('pg_catalog','information_schema')
|
||||
"""
|
||||
|
||||
DUCKDB_GET_DATABASE = """
|
||||
select datname from pg_catalog.pg_database
|
||||
"""
|
||||
|
||||
DUCKDB_GET_DB_NAMES = """
|
||||
select datname from pg_catalog.pg_database
|
||||
"""
|
||||
|
||||
DUCKDB_COL_IDENTITY = """\
|
||||
(SELECT json_build_object(
|
||||
'always', a.attidentity = 'a',
|
||||
'start', s.seqstart,
|
||||
'increment', s.seqincrement,
|
||||
'minvalue', s.seqmin,
|
||||
'maxvalue', s.seqmax,
|
||||
'cache', s.seqcache,
|
||||
'cycle', s.seqcycle)
|
||||
FROM pg_catalog.pg_sequence s
|
||||
JOIN pg_catalog.pg_class c on s.seqrelid = c."oid"
|
||||
WHERE c.relkind = 'S'
|
||||
AND a.attidentity != ''
|
||||
AND s.seqrelid = pg_catalog.pg_get_serial_sequence(
|
||||
a.attrelid::regclass::text, a.attname
|
||||
)::regclass::oid
|
||||
) as identity_options\
|
||||
"""
|
||||
|
||||
DUCKDB_SQL_COLUMNS = """
|
||||
SELECT a.attname,
|
||||
pg_catalog.format_type(a.atttypid, a.atttypmod),
|
||||
(
|
||||
SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
|
||||
FROM pg_catalog.pg_attrdef d
|
||||
WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
|
||||
AND a.atthasdef
|
||||
) AS DEFAULT,
|
||||
a.attnotnull,
|
||||
a.attrelid as table_oid,
|
||||
pgd.description as comment,
|
||||
{generated},
|
||||
{identity}
|
||||
FROM pg_catalog.pg_attribute a
|
||||
LEFT JOIN pg_catalog.pg_description pgd ON (
|
||||
pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
|
||||
WHERE a.attrelid = :table_oid
|
||||
AND a.attnum > 0 AND NOT a.attisdropped
|
||||
ORDER BY a.attnum
|
||||
"""
|
||||
|
||||
DUCKDB_GET_SERVER_VERSION = """
|
||||
show server_version
|
||||
"""
|
@ -1,349 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# pylint: disable=protected-access
|
||||
|
||||
"""
|
||||
DuckDb SQLAlchemy util methods
|
||||
"""
|
||||
import re
|
||||
from typing import Dict, Tuple
|
||||
|
||||
from sqlalchemy import sql, util
|
||||
from sqlalchemy.dialects.postgresql.base import ENUM
|
||||
from sqlalchemy.engine import reflection
|
||||
from sqlalchemy.sql import sqltypes
|
||||
|
||||
from metadata.ingestion.source.database.duckdb.queries import (
|
||||
DUCKDB_COL_IDENTITY,
|
||||
DUCKDB_SQL_COLUMNS,
|
||||
DUCKDB_TABLE_COMMENTS,
|
||||
DUCKDB_VIEW_DEFINITIONS,
|
||||
)
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_table_comment_wrapper,
|
||||
get_view_definition_wrapper,
|
||||
)
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_table_comment(
|
||||
self, connection, table_name, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
return get_table_comment_wrapper(
|
||||
self,
|
||||
connection,
|
||||
table_name=table_name,
|
||||
schema=schema,
|
||||
query=DUCKDB_TABLE_COMMENTS,
|
||||
)
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_columns( # pylint: disable=too-many-locals
|
||||
self, connection, table_name, schema=None, **kw
|
||||
):
|
||||
"""
|
||||
Overriding the dialect method to add raw_data_type in response
|
||||
"""
|
||||
|
||||
table_oid = self.get_table_oid(
|
||||
connection, table_name, schema, info_cache=kw.get("info_cache")
|
||||
)
|
||||
|
||||
generated = (
|
||||
"a.attgenerated as generated"
|
||||
if self.server_version_info >= (12,)
|
||||
else "NULL as generated"
|
||||
)
|
||||
if self.server_version_info >= (10,):
|
||||
# a.attidentity != '' is required or it will reflect also
|
||||
# serial columns as identity.
|
||||
identity = DUCKDB_COL_IDENTITY
|
||||
else:
|
||||
identity = "NULL as identity_options"
|
||||
|
||||
sql_col_query = DUCKDB_SQL_COLUMNS.format(
|
||||
generated=generated,
|
||||
identity=identity,
|
||||
)
|
||||
sql_col_query = (
|
||||
sql.text(sql_col_query)
|
||||
.bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
|
||||
.columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
|
||||
)
|
||||
conn = connection.execute(sql_col_query, {"table_oid": table_oid})
|
||||
rows = conn.fetchall()
|
||||
|
||||
# dictionary with (name, ) if default search path or (schema, name)
|
||||
# as keys
|
||||
domains = self._load_domains(connection)
|
||||
|
||||
# dictionary with (name, ) if default search path or (schema, name)
|
||||
# as keys
|
||||
enums = dict(
|
||||
((rec["name"],), rec) if rec["visible"] else ((rec["schema"], rec["name"]), rec)
|
||||
for rec in self._load_enums(connection, schema="*")
|
||||
)
|
||||
|
||||
# format columns
|
||||
columns = []
|
||||
|
||||
for (
|
||||
name,
|
||||
format_type,
|
||||
default_,
|
||||
notnull,
|
||||
table_oid,
|
||||
comment,
|
||||
generated,
|
||||
identity,
|
||||
) in rows:
|
||||
column_info = self._get_column_info(
|
||||
name,
|
||||
format_type,
|
||||
default_,
|
||||
notnull,
|
||||
domains,
|
||||
enums,
|
||||
schema,
|
||||
comment,
|
||||
generated,
|
||||
identity,
|
||||
)
|
||||
column_info["system_data_type"] = format_type
|
||||
columns.append(column_info)
|
||||
return columns
|
||||
|
||||
|
||||
def _get_numeric_args(charlen):
|
||||
if charlen:
|
||||
prec, scale = charlen.split(",")
|
||||
return (int(prec), int(scale))
|
||||
return ()
|
||||
|
||||
|
||||
def _get_interval_args(charlen, attype, kwargs: Dict):
|
||||
field_match = re.match(r"interval (.+)", attype, re.I)
|
||||
if charlen:
|
||||
kwargs["precision"] = int(charlen)
|
||||
if field_match:
|
||||
kwargs["fields"] = field_match.group(1)
|
||||
attype = "interval"
|
||||
return (), attype, kwargs
|
||||
|
||||
|
||||
def _get_bit_var_args(charlen, kwargs):
|
||||
kwargs["varying"] = True
|
||||
if charlen:
|
||||
return (int(charlen),), kwargs
|
||||
|
||||
return (), kwargs
|
||||
|
||||
|
||||
def get_column_args(
|
||||
charlen: str, args: Tuple, kwargs: Dict, attype: str
|
||||
) -> Tuple[Tuple, Dict]:
|
||||
"""
|
||||
Method to determine the args and kwargs
|
||||
"""
|
||||
if attype == "numeric":
|
||||
args = _get_numeric_args(charlen)
|
||||
elif attype == "double precision":
|
||||
args = (53,)
|
||||
elif attype == "integer":
|
||||
args = ()
|
||||
elif attype in ("timestamp with time zone", "time with time zone"):
|
||||
kwargs["timezone"] = True
|
||||
if charlen:
|
||||
kwargs["precision"] = int(charlen)
|
||||
args = ()
|
||||
elif attype in (
|
||||
"timestamp without time zone",
|
||||
"time without time zone",
|
||||
"time",
|
||||
):
|
||||
kwargs["timezone"] = False
|
||||
if charlen:
|
||||
kwargs["precision"] = int(charlen)
|
||||
args = ()
|
||||
elif attype == "bit varying":
|
||||
args, kwargs = _get_bit_var_args(charlen, kwargs)
|
||||
elif attype == "geometry":
|
||||
args = ()
|
||||
elif attype.startswith("interval"):
|
||||
args, attype, kwargs = _get_interval_args(charlen, attype, kwargs)
|
||||
elif charlen:
|
||||
args = (int(charlen),)
|
||||
|
||||
return args, kwargs, attype
|
||||
|
||||
|
||||
def get_column_default(coltype, schema, default, generated):
|
||||
"""
|
||||
Method to determine the default of column
|
||||
"""
|
||||
autoincrement = False
|
||||
# If a zero byte or blank string depending on driver (is also absent
|
||||
# for older PG versions), then not a generated column. Otherwise, s =
|
||||
# stored. (Other values might be added in the future.)
|
||||
if generated not in (None, "", b"\x00"):
|
||||
computed = {"sqltext": default, "persisted": generated in ("s", b"s")}
|
||||
default = None
|
||||
else:
|
||||
computed = None
|
||||
if default is not None:
|
||||
match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
|
||||
if match is not None:
|
||||
if issubclass(coltype._type_affinity, sqltypes.Integer):
|
||||
autoincrement = True
|
||||
# the default is related to a Sequence
|
||||
sch = schema
|
||||
if "." not in match.group(2) and sch is not None:
|
||||
# unconditionally quote the schema name. this could
|
||||
# later be enhanced to obey quoting rules /
|
||||
# "quote schema"
|
||||
default = (
|
||||
match.group(1)
|
||||
+ (f'"{sch}"')
|
||||
+ "."
|
||||
+ match.group(2)
|
||||
+ match.group(3)
|
||||
)
|
||||
return default, autoincrement, computed
|
||||
|
||||
|
||||
def _handle_array_type(attype):
|
||||
return (
|
||||
# strip '[]' from integer[], etc.
|
||||
re.sub(r"\[\]$", "", attype),
|
||||
attype.endswith("[]"),
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=too-many-statements,too-many-branches,too-many-locals,too-many-arguments
|
||||
def get_column_info(
|
||||
self,
|
||||
name,
|
||||
format_type,
|
||||
default,
|
||||
notnull,
|
||||
domains,
|
||||
enums,
|
||||
schema,
|
||||
comment,
|
||||
generated,
|
||||
identity,
|
||||
):
|
||||
"""
|
||||
Method to return column info
|
||||
"""
|
||||
|
||||
if format_type is None:
|
||||
no_format_type = True
|
||||
attype = format_type = "no format_type()"
|
||||
is_array = False
|
||||
else:
|
||||
no_format_type = False
|
||||
|
||||
# strip (*) from character varying(5), timestamp(5)
|
||||
# with time zone, geometry(POLYGON), etc.
|
||||
attype = re.sub(r"\(.*\)", "", format_type)
|
||||
|
||||
# strip '[]' from integer[], etc. and check if an array
|
||||
attype, is_array = _handle_array_type(attype)
|
||||
|
||||
# strip quotes from case sensitive enum or domain names
|
||||
enum_or_domain_key = tuple(util.quoted_token_parser(attype))
|
||||
|
||||
nullable = not notnull
|
||||
|
||||
charlen = re.search(r"\(([\d,]+)\)", format_type)
|
||||
if charlen:
|
||||
charlen = charlen.group(1)
|
||||
args = re.search(r"\((.*)\)", format_type)
|
||||
if args and args.group(1):
|
||||
args = tuple(re.split(r"\s*,\s*", args.group(1)))
|
||||
else:
|
||||
args = ()
|
||||
kwargs = {}
|
||||
|
||||
args, kwargs, attype = get_column_args(charlen, args, kwargs, attype)
|
||||
|
||||
while True:
|
||||
# looping here to suit nested domains
|
||||
if attype in self.ischema_names:
|
||||
coltype = self.ischema_names[attype]
|
||||
break
|
||||
if enum_or_domain_key in enums:
|
||||
enum = enums[enum_or_domain_key]
|
||||
coltype = ENUM
|
||||
kwargs["name"] = enum["name"]
|
||||
if not enum["visible"]:
|
||||
kwargs["schema"] = enum["schema"]
|
||||
args = tuple(enum["labels"])
|
||||
break
|
||||
if enum_or_domain_key in domains:
|
||||
domain = domains[enum_or_domain_key]
|
||||
attype = domain["attype"]
|
||||
attype, is_array = _handle_array_type(attype)
|
||||
# strip quotes from case sensitive enum or domain names
|
||||
enum_or_domain_key = tuple(util.quoted_token_parser(attype))
|
||||
# A table can't override a not null on the domain,
|
||||
# but can override nullable
|
||||
nullable = nullable and domain["nullable"]
|
||||
if domain["default"] and not default:
|
||||
# It can, however, override the default
|
||||
# value, but can't set it to null.
|
||||
default = domain["default"]
|
||||
continue
|
||||
coltype = None
|
||||
break
|
||||
|
||||
if coltype:
|
||||
coltype = coltype(*args, **kwargs)
|
||||
if is_array:
|
||||
coltype = self.ischema_names["_array"](coltype)
|
||||
elif no_format_type:
|
||||
util.warn(f"PostgreSQL format_type() returned NULL for column '{name}'")
|
||||
coltype = sqltypes.NULLTYPE
|
||||
else:
|
||||
util.warn(f"Did not recognize type '{attype}' of column '{name}'")
|
||||
coltype = sqltypes.NULLTYPE
|
||||
|
||||
default, autoincrement, computed = get_column_default(
|
||||
coltype=coltype, schema=schema, default=default, generated=generated
|
||||
)
|
||||
column_info = {
|
||||
"name": name,
|
||||
"type": coltype,
|
||||
"nullable": nullable,
|
||||
"default": default,
|
||||
"autoincrement": autoincrement or identity is not None,
|
||||
"comment": comment,
|
||||
}
|
||||
if computed is not None:
|
||||
column_info["computed"] = computed
|
||||
if identity is not None:
|
||||
column_info["identity"] = identity
|
||||
return column_info
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_view_definition(
|
||||
self, connection, table_name, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
return get_view_definition_wrapper(
|
||||
self,
|
||||
connection,
|
||||
table_name=table_name,
|
||||
schema=schema,
|
||||
query=DUCKDB_VIEW_DEFINITIONS,
|
||||
)
|
@ -1,39 +0,0 @@
|
||||
{
|
||||
"name": "DuckDb",
|
||||
"displayName": "DuckDb Test Connection",
|
||||
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
|
||||
"steps": [
|
||||
{
|
||||
"name": "CheckAccess",
|
||||
"description": "Validate that we can properly reach the database and authenticate with the given credentials.",
|
||||
"errorMessage": "Failed to connect to DuckDb, please validate the credentials",
|
||||
"mandatory": true
|
||||
},
|
||||
{
|
||||
"name": "GetDatabases",
|
||||
"description": "List all the databases available to the user.",
|
||||
"errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.",
|
||||
"mandatory": true
|
||||
},
|
||||
{
|
||||
"name": "GetSchemas",
|
||||
"description": "List all the schemas available to the user.",
|
||||
"errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.",
|
||||
"mandatory": true
|
||||
},
|
||||
{
|
||||
"name": "GetTables",
|
||||
"description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
|
||||
"errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.",
|
||||
"mandatory": true
|
||||
},
|
||||
{
|
||||
"name": "GetViews",
|
||||
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
|
||||
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
|
||||
"mandatory": false
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -1,126 +0,0 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/services/connections/database/greenplumConnection.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "DuckDbConnection",
|
||||
"description": "DuckDb Database Connection Config",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.services.connections.database.DuckDbConnection",
|
||||
"definitions": {
|
||||
"greenplumType": {
|
||||
"description": "Service type.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"DuckDb"
|
||||
],
|
||||
"default": "DuckDb"
|
||||
},
|
||||
"greenplumScheme": {
|
||||
"description": "SQLAlchemy driver scheme options.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"postgresql+psycopg2"
|
||||
],
|
||||
"default": "postgresql+psycopg2"
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"type": {
|
||||
"title": "Service Type",
|
||||
"description": "Service Type",
|
||||
"$ref": "#/definitions/greenplumType",
|
||||
"default": "DuckDb"
|
||||
},
|
||||
"scheme": {
|
||||
"title": "Connection Scheme",
|
||||
"description": "SQLAlchemy driver scheme options.",
|
||||
"$ref": "#/definitions/greenplumScheme",
|
||||
"default": "postgresql+psycopg2"
|
||||
},
|
||||
"username": {
|
||||
"title": "Username",
|
||||
"description": "Username to connect to DuckDb. This user should have privileges to read all the metadata in DuckDb.",
|
||||
"type": "string"
|
||||
},
|
||||
"authType": {
|
||||
"title": "Auth Configuration Type",
|
||||
"description": "Choose Auth Config Type.",
|
||||
"oneOf": [
|
||||
{
|
||||
"$ref": "./common/basicAuth.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./common/iamAuthConfig.json"
|
||||
}
|
||||
]
|
||||
},
|
||||
"hostPort": {
|
||||
"title": "Host and Port",
|
||||
"description": "Host and port of the source service.",
|
||||
"type": "string"
|
||||
},
|
||||
"database": {
|
||||
"title": "Database",
|
||||
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank, OpenMetadata Ingestion attempts to scan all the databases.",
|
||||
"type": "string"
|
||||
},
|
||||
"sslMode": {
|
||||
"title": "SSL Mode",
|
||||
"description": "SSL Mode to connect to DuckDb database.",
|
||||
"enum": [
|
||||
"disable",
|
||||
"allow",
|
||||
"prefer",
|
||||
"require",
|
||||
"verify-ca",
|
||||
"verify-full"
|
||||
],
|
||||
"default": "disable"
|
||||
},
|
||||
"sslConfig": {
|
||||
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig"
|
||||
},
|
||||
"ingestAllDatabases": {
|
||||
"title": "Ingest All Databases",
|
||||
"description": "Ingest data from all databases in DuckDb. You can use databaseFilterPattern on top of this.",
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"connectionOptions": {
|
||||
"title": "Connection Options",
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
|
||||
},
|
||||
"connectionArguments": {
|
||||
"title": "Connection Arguments",
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
|
||||
},
|
||||
"supportsMetadataExtraction": {
|
||||
"title": "Supports Metadata Extraction",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||
},
|
||||
"supportsDBTExtraction": {
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
|
||||
},
|
||||
"supportsProfiler": {
|
||||
"title": "Supports Profiler",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
|
||||
},
|
||||
"supportsDatabase": {
|
||||
"title": "Supports Database",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
|
||||
},
|
||||
"supportsQueryComment": {
|
||||
"title": "Supports Query Comment",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"
|
||||
},
|
||||
"sampleDataStorageConfig": {
|
||||
"title": "Storage Config for Sample Data",
|
||||
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false,
|
||||
"required": [
|
||||
"hostPort",
|
||||
"username",
|
||||
"database"
|
||||
]
|
||||
}
|
@ -52,7 +52,6 @@
|
||||
"Greenplum",
|
||||
"Doris",
|
||||
"UnityCatalog",
|
||||
"DuckDb",
|
||||
"SAS",
|
||||
"Iceberg"
|
||||
],
|
||||
@ -168,9 +167,6 @@
|
||||
{
|
||||
"name": "UnityCatalog"
|
||||
},
|
||||
{
|
||||
"name": "DuckDb"
|
||||
},
|
||||
{
|
||||
"name": "SAS"
|
||||
},
|
||||
@ -294,9 +290,6 @@
|
||||
{
|
||||
"$ref": "./connections/database/unityCatalogConnection.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./connections/database/duckdbConnection.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./connections/database/sasConnection.json"
|
||||
},
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 2.2 KiB |
@ -32,7 +32,6 @@ import deltalake from '../assets/img/service-icon-delta-lake.png';
|
||||
import domo from '../assets/img/service-icon-domo.png';
|
||||
import doris from '../assets/img/service-icon-doris.png';
|
||||
import druid from '../assets/img/service-icon-druid.png';
|
||||
import duckdb from '../assets/img/service-icon-duckdb.png';
|
||||
import dynamodb from '../assets/img/service-icon-dynamodb.png';
|
||||
import fivetran from '../assets/img/service-icon-fivetran.png';
|
||||
import gcs from '../assets/img/service-icon-gcs.png';
|
||||
@ -195,7 +194,6 @@ export const COUCHBASE = couchbase;
|
||||
export const GREENPLUM = greenplum;
|
||||
export const ELASTIC_SEARCH = elasticSearch;
|
||||
export const OPEN_SEARCH = openSearch;
|
||||
export const DUCKDB = duckdb;
|
||||
export const PLUS = plus;
|
||||
export const NOSERVICE = noService;
|
||||
export const ICEBERGE = iceberge;
|
||||
|
@ -27,7 +27,6 @@ import deltaLakeConnection from '../jsons/connectionSchemas/connections/database
|
||||
import domoDatabaseConnection from '../jsons/connectionSchemas/connections/database/domoDatabaseConnection.json';
|
||||
import dorisConnection from '../jsons/connectionSchemas/connections/database/dorisConnection.json';
|
||||
import druidConnection from '../jsons/connectionSchemas/connections/database/druidConnection.json';
|
||||
import duckdbConnection from '../jsons/connectionSchemas/connections/database/duckdbConnection.json';
|
||||
import dynamoDBConnection from '../jsons/connectionSchemas/connections/database/dynamoDBConnection.json';
|
||||
import glueConnection from '../jsons/connectionSchemas/connections/database/glueConnection.json';
|
||||
import greenplumConnection from '../jsons/connectionSchemas/connections/database/greenplumConnection.json';
|
||||
@ -233,11 +232,6 @@ export const getDatabaseConfig = (type: DatabaseServiceType) => {
|
||||
|
||||
break;
|
||||
}
|
||||
case DatabaseServiceType.DuckDB: {
|
||||
schema = duckdbConnection;
|
||||
|
||||
break;
|
||||
}
|
||||
case DatabaseServiceType.SAS: {
|
||||
schema = sasConnection;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user