mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-02 18:46:02 +00:00
parent
3f5f4ed72a
commit
dacd13f6d0
@ -6,6 +6,21 @@ source:
|
||||
type: Hive
|
||||
databaseSchema: default
|
||||
hostPort: localhost:10000
|
||||
|
||||
# metastoreConnection:
|
||||
# type: Mysql
|
||||
# username: APP
|
||||
# authType:
|
||||
# password: password
|
||||
# hostPort: localhost:3306
|
||||
# databaseSchema: demo_hive
|
||||
|
||||
# type: Postgres
|
||||
# username: APP
|
||||
# authType:
|
||||
# password: password
|
||||
# hostPort: localhost:5432
|
||||
# database: demo_hive
|
||||
sourceConfig:
|
||||
config:
|
||||
type: DatabaseMetadata
|
||||
|
||||
@ -12,7 +12,10 @@
|
||||
"""
|
||||
Source connection handler
|
||||
"""
|
||||
from typing import Optional
|
||||
from copy import deepcopy
|
||||
from enum import Enum
|
||||
from functools import singledispatch
|
||||
from typing import Any, Optional
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from pydantic import SecretStr
|
||||
@ -25,10 +28,17 @@ from metadata.generated.schema.entity.services.connections.database.hiveConnecti
|
||||
HiveConnection,
|
||||
HiveScheme,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||
MysqlConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
|
||||
PostgresConnection,
|
||||
)
|
||||
from metadata.ingestion.connections.builders import (
|
||||
create_generic_db_connection,
|
||||
get_connection_args_common,
|
||||
get_connection_options_dict,
|
||||
get_connection_url_common,
|
||||
init_empty_connection_arguments,
|
||||
)
|
||||
from metadata.ingestion.connections.test_connections import (
|
||||
@ -36,6 +46,9 @@ from metadata.ingestion.connections.test_connections import (
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
HIVE_POSTGRES_SCHEME = "hive+postgres"
|
||||
HIVE_MYSQL_SCHEME = "hive+mysql"
|
||||
|
||||
|
||||
def get_connection_url(connection: HiveConnection) -> str:
|
||||
"""
|
||||
@ -103,6 +116,68 @@ def get_connection(connection: HiveConnection) -> Engine:
|
||||
)
|
||||
|
||||
|
||||
@singledispatch
|
||||
def get_metastore_connection(connection: Any) -> Engine:
|
||||
"""
|
||||
Create connection
|
||||
"""
|
||||
raise NotImplementedError("Metastore not implemented")
|
||||
|
||||
|
||||
@get_metastore_connection.register
|
||||
def _(connection: PostgresConnection):
|
||||
|
||||
# import required to load sqlalchemy plugin
|
||||
# pylint: disable=import-outside-toplevel,unused-import
|
||||
from metadata.ingestion.source.database.hive.metastore_dialects.postgres import (
|
||||
HivePostgresMetaStoreDialect,
|
||||
)
|
||||
|
||||
class CustomPostgresScheme(Enum):
|
||||
HIVE_POSTGRES = HIVE_POSTGRES_SCHEME
|
||||
|
||||
class CustomPostgresConnection(PostgresConnection):
|
||||
scheme: Optional[CustomPostgresScheme]
|
||||
|
||||
connection_copy = deepcopy(connection.__dict__)
|
||||
connection_copy["scheme"] = CustomPostgresScheme.HIVE_POSTGRES
|
||||
|
||||
custom_connection = CustomPostgresConnection(**connection_copy)
|
||||
|
||||
return create_generic_db_connection(
|
||||
connection=custom_connection,
|
||||
get_connection_url_fn=get_connection_url_common,
|
||||
get_connection_args_fn=get_connection_args_common,
|
||||
)
|
||||
|
||||
|
||||
@get_metastore_connection.register
|
||||
def _(connection: MysqlConnection):
|
||||
|
||||
# import required to load sqlalchemy plugin
|
||||
# pylint: disable=import-outside-toplevel,unused-import
|
||||
from metadata.ingestion.source.database.hive.metastore_dialects.mysql import (
|
||||
HiveMysqlMetaStoreDialect,
|
||||
)
|
||||
|
||||
class CustomMysqlScheme(Enum):
|
||||
HIVE_MYSQL = HIVE_MYSQL_SCHEME
|
||||
|
||||
class CustomMysqlConnection(MysqlConnection):
|
||||
scheme: Optional[CustomMysqlScheme]
|
||||
|
||||
connection_copy = deepcopy(connection.__dict__)
|
||||
connection_copy["scheme"] = CustomMysqlScheme.HIVE_MYSQL
|
||||
|
||||
custom_connection = CustomMysqlConnection(**connection_copy)
|
||||
|
||||
return create_generic_db_connection(
|
||||
connection=custom_connection,
|
||||
get_connection_url_fn=get_connection_url_common,
|
||||
get_connection_args_fn=get_connection_args_common,
|
||||
)
|
||||
|
||||
|
||||
def test_connection(
|
||||
metadata: OpenMetadata,
|
||||
engine: Engine,
|
||||
@ -113,6 +188,10 @@ def test_connection(
|
||||
Test connection. This can be executed either as part
|
||||
of a metadata workflow or during an Automation Workflow
|
||||
"""
|
||||
|
||||
if service_connection.metastoreConnection:
|
||||
engine = get_metastore_connection(service_connection.metastoreConnection)
|
||||
|
||||
test_connection_db_schema_sources(
|
||||
metadata=metadata,
|
||||
engine=engine,
|
||||
|
||||
@ -12,12 +12,10 @@
|
||||
Hive source methods.
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Tuple
|
||||
|
||||
from pyhive.sqlalchemy_hive import HiveDialect, _type_map
|
||||
from sqlalchemy import types, util
|
||||
from sqlalchemy.engine import reflection
|
||||
from pyhive.sqlalchemy_hive import HiveDialect
|
||||
from sqlalchemy.inspection import inspect
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
|
||||
HiveConnection,
|
||||
@ -30,168 +28,19 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
)
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
|
||||
from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS
|
||||
|
||||
complex_data_types = ["struct", "map", "array", "union"]
|
||||
|
||||
_type_map.update(
|
||||
{
|
||||
"binary": types.BINARY,
|
||||
"char": types.CHAR,
|
||||
"varchar": types.VARCHAR,
|
||||
}
|
||||
from metadata.ingestion.source.database.hive.connection import get_metastore_connection
|
||||
from metadata.ingestion.source.database.hive.utils import (
|
||||
get_columns,
|
||||
get_table_comment,
|
||||
get_table_names,
|
||||
get_table_names_older_versions,
|
||||
get_view_definition,
|
||||
get_view_names,
|
||||
get_view_names_older_versions,
|
||||
)
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
|
||||
def get_columns(
|
||||
self, connection, table_name, schema=None, **kw
|
||||
): # pylint: disable=unused-argument,too-many-locals
|
||||
"""
|
||||
Method to handle table columns
|
||||
"""
|
||||
rows = self._get_table_columns( # pylint: disable=protected-access
|
||||
connection, table_name, schema
|
||||
)
|
||||
rows = [[col.strip() if col else None for col in row] for row in rows]
|
||||
rows = [row for row in rows if row[0] and row[0] != "# col_name"]
|
||||
result = []
|
||||
for col_name, col_type, comment in rows:
|
||||
if col_name == "# Partition Information":
|
||||
break
|
||||
|
||||
col_raw_type = col_type
|
||||
attype = re.sub(r"\(.*\)", "", col_type)
|
||||
col_type = re.search(r"^\w+", col_type).group(0)
|
||||
try:
|
||||
coltype = _type_map[col_type]
|
||||
|
||||
except KeyError:
|
||||
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
|
||||
coltype = types.NullType
|
||||
charlen = re.search(r"\(([\d,]+)\)", col_raw_type.lower())
|
||||
if charlen:
|
||||
charlen = charlen.group(1)
|
||||
if attype == "decimal":
|
||||
prec, scale = charlen.split(",")
|
||||
args = (int(prec), int(scale))
|
||||
else:
|
||||
args = (int(charlen),)
|
||||
coltype = coltype(*args)
|
||||
|
||||
result.append(
|
||||
{
|
||||
"name": col_name,
|
||||
"type": coltype,
|
||||
"comment": comment,
|
||||
"nullable": True,
|
||||
"default": None,
|
||||
"system_data_type": col_raw_type,
|
||||
"is_complex": col_type in complex_data_types,
|
||||
}
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def get_table_names_older_versions(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
query = "SHOW TABLES"
|
||||
if schema:
|
||||
query += " IN " + self.identifier_preparer.quote_identifier(schema)
|
||||
tables_in_schema = connection.execute(query)
|
||||
tables = []
|
||||
for row in tables_in_schema:
|
||||
# check number of columns in result
|
||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||
# else it is hive with 1 column in the result
|
||||
if len(row) > 1:
|
||||
tables.append(row[1])
|
||||
else:
|
||||
tables.append(row[0])
|
||||
return tables
|
||||
|
||||
|
||||
def get_table_names(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
query = "SHOW TABLES"
|
||||
if schema:
|
||||
query += " IN " + self.identifier_preparer.quote_identifier(schema)
|
||||
tables_in_schema = connection.execute(query)
|
||||
tables = []
|
||||
for row in tables_in_schema:
|
||||
# check number of columns in result
|
||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||
# else it is hive with 1 column in the result
|
||||
if len(row) > 1:
|
||||
tables.append(row[1])
|
||||
else:
|
||||
tables.append(row[0])
|
||||
# "SHOW TABLES" command in hive also fetches view names
|
||||
# Below code filters out view names from table names
|
||||
views = self.get_view_names(connection, schema)
|
||||
return [table for table in tables if table not in views]
|
||||
|
||||
|
||||
def get_view_names(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
query = "SHOW VIEWS"
|
||||
if schema:
|
||||
query += " IN " + self.identifier_preparer.quote_identifier(schema)
|
||||
view_in_schema = connection.execute(query)
|
||||
views = []
|
||||
for row in view_in_schema:
|
||||
# check number of columns in result
|
||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||
# else it is hive with 1 column in the result
|
||||
if len(row) > 1:
|
||||
views.append(row[1])
|
||||
else:
|
||||
views.append(row[0])
|
||||
return views
|
||||
|
||||
|
||||
def get_view_names_older_versions(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
# Hive does not provide functionality to query tableType for older version
|
||||
# This allows reflection to not crash at the cost of being inaccurate
|
||||
return []
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_table_comment( # pylint: disable=unused-argument
|
||||
self, connection, table_name, schema_name, **kw
|
||||
):
|
||||
"""
|
||||
Returns comment of table.
|
||||
"""
|
||||
cursor = connection.execute(
|
||||
HIVE_GET_COMMENTS.format(schema_name=schema_name, table_name=table_name)
|
||||
)
|
||||
try:
|
||||
for result in list(cursor):
|
||||
data = result.values()
|
||||
if data[1] and data[1].strip() == "comment":
|
||||
return {"text": data[2] if data and data[2] else None}
|
||||
except Exception:
|
||||
return {"text": None}
|
||||
return {"text": None}
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
@reflection.cache
|
||||
def get_view_definition(self, connection, view_name, schema=None, **kw):
|
||||
"""
|
||||
Gets the view definition
|
||||
"""
|
||||
full_view_name = f"`{view_name}`" if not schema else f"`{schema}`.`{view_name}`"
|
||||
res = connection.execute(f"SHOW CREATE TABLE {full_view_name}").fetchall()
|
||||
if res:
|
||||
return "\n".join(i[0] for i in res)
|
||||
return None
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
HiveDialect.get_columns = get_columns
|
||||
HiveDialect.get_table_comment = get_table_comment
|
||||
@ -206,6 +55,8 @@ class HiveSource(CommonDbSourceService):
|
||||
Database metadata from Hive Source
|
||||
"""
|
||||
|
||||
service_connection: HiveConnection
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
||||
config = WorkflowSource.parse_obj(config_dict)
|
||||
@ -227,15 +78,21 @@ class HiveSource(CommonDbSourceService):
|
||||
Fetching views in hive server with query "SHOW VIEWS" was possible
|
||||
only after hive 2.2.0 version
|
||||
"""
|
||||
result = dict(self.engine.execute("SELECT VERSION()").fetchone())
|
||||
if not self.service_connection.metastoreConnection:
|
||||
result = dict(self.engine.execute("SELECT VERSION()").fetchone())
|
||||
|
||||
version = result.get("_c0", "").split()
|
||||
if version and self._parse_version(version[0]) >= self._parse_version(
|
||||
HIVE_VERSION_WITH_VIEW_SUPPORT
|
||||
):
|
||||
HiveDialect.get_table_names = get_table_names
|
||||
HiveDialect.get_view_names = get_view_names
|
||||
HiveDialect.get_view_definition = get_view_definition
|
||||
version = result.get("_c0", "").split()
|
||||
if version and self._parse_version(version[0]) >= self._parse_version(
|
||||
HIVE_VERSION_WITH_VIEW_SUPPORT
|
||||
):
|
||||
HiveDialect.get_table_names = get_table_names
|
||||
HiveDialect.get_view_names = get_view_names
|
||||
HiveDialect.get_view_definition = get_view_definition
|
||||
else:
|
||||
HiveDialect.get_table_names = get_table_names_older_versions
|
||||
HiveDialect.get_view_names = get_view_names_older_versions
|
||||
else:
|
||||
HiveDialect.get_table_names = get_table_names_older_versions
|
||||
HiveDialect.get_view_names = get_view_names_older_versions
|
||||
self.engine = get_metastore_connection(
|
||||
self.service_connection.metastoreConnection
|
||||
)
|
||||
self.inspector = inspect(self.engine)
|
||||
|
||||
@ -0,0 +1,50 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Hive Metastore Dialect Mixin
|
||||
"""
|
||||
from sqlalchemy.engine import reflection
|
||||
|
||||
from metadata.ingestion.source.database.hive.utils import get_columns
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_all_table_comments,
|
||||
get_all_view_definitions,
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
class HiveMetaStoreDialectMixin:
|
||||
"""
|
||||
Mixin class
|
||||
"""
|
||||
|
||||
def get_columns(self, connection, table_name, schema=None, **kw):
|
||||
return get_columns(self, connection, table_name, schema, **kw)
|
||||
|
||||
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
|
||||
# Hive has no support for foreign keys.
|
||||
return []
|
||||
|
||||
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
|
||||
# Hive has no support for unique keys.
|
||||
return []
|
||||
|
||||
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
|
||||
# Hive has no support for primary keys.
|
||||
return []
|
||||
|
||||
@reflection.cache
|
||||
def get_all_view_definitions(self, connection, query):
|
||||
get_all_view_definitions(self, connection, query)
|
||||
|
||||
@reflection.cache
|
||||
def get_all_table_comments(self, connection, query):
|
||||
get_all_table_comments(self, connection, query)
|
||||
@ -0,0 +1,23 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Hive Metastore Mysql Dialect
|
||||
"""
|
||||
from sqlalchemy.dialects import registry
|
||||
|
||||
from .dialect import HiveMysqlMetaStoreDialect
|
||||
|
||||
__version__ = "0.1.0"
|
||||
registry.register(
|
||||
"hive.mysql",
|
||||
"metadata.ingestion.source.database.hive.metastore_dialects.mysql.dialect",
|
||||
"HiveMysqlMetaStoreDialect",
|
||||
)
|
||||
@ -0,0 +1,119 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Hive Metastore Mysql Dialect
|
||||
"""
|
||||
from sqlalchemy.dialects.mysql.pymysql import MySQLDialect_pymysql
|
||||
from sqlalchemy.engine import reflection
|
||||
|
||||
from metadata.ingestion.source.database.hive.metastore_dialects.mixin import (
|
||||
HiveMetaStoreDialectMixin,
|
||||
)
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_table_comment_wrapper,
|
||||
get_view_definition_wrapper,
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=abstract-method
|
||||
class HiveMysqlMetaStoreDialect(HiveMetaStoreDialectMixin, MySQLDialect_pymysql):
|
||||
"""
|
||||
Mysql metastore dialect class
|
||||
"""
|
||||
|
||||
name = "hive"
|
||||
driver = "mysql"
|
||||
supports_statement_cache = False
|
||||
|
||||
def get_schema_names(self, connection, **kw):
|
||||
# Equivalent to SHOW DATABASES
|
||||
return [row[0] for row in connection.execute("select NAME from DBS;")]
|
||||
|
||||
def get_view_names(self, connection, schema=None, **kw):
|
||||
# Hive does not provide functionality to query tableType
|
||||
# This allows reflection to not crash at the cost of being inaccurate
|
||||
query = self._get_table_names_base_query(schema=schema)
|
||||
query += """ WHERE TBL_TYPE = 'VIRTUAL_VIEW'"""
|
||||
return [row[0] for row in connection.execute(query)]
|
||||
|
||||
def _get_table_columns(self, connection, table_name, schema):
|
||||
query = f"""
|
||||
SELECT
|
||||
col.COLUMN_NAME,
|
||||
col.TYPE_NAME,
|
||||
col.COMMENT
|
||||
from
|
||||
COLUMNS_V2 col
|
||||
join CDS cds ON col.CD_ID = cds.CD_ID
|
||||
join SDS sds ON sds.CD_ID = cds.CD_ID
|
||||
join TBLS tbsl on sds.SD_ID = tbsl.SD_ID
|
||||
and tbsl.TBL_NAME = '{table_name}'
|
||||
"""
|
||||
if schema:
|
||||
query += f""" join DBS db on tbsl.DB_ID = db.DB_ID
|
||||
and db.NAME = '{schema}'"""
|
||||
|
||||
return connection.execute(query).fetchall()
|
||||
|
||||
def _get_table_names_base_query(self, schema=None):
|
||||
query = "SELECT TBL_NAME from TBLS tbl"
|
||||
if schema:
|
||||
query += f""" JOIN DBS db on tbl.DB_ID = db.DB_ID
|
||||
and db.NAME = '{schema}'"""
|
||||
return query
|
||||
|
||||
def get_table_names(self, connection, schema=None, **kw):
|
||||
query = self._get_table_names_base_query(schema=schema)
|
||||
query += """ WHERE TBL_TYPE != 'VIRTUAL_VIEW'"""
|
||||
return [row[0] for row in connection.execute(query)]
|
||||
|
||||
@reflection.cache
|
||||
def get_view_definition(self, connection, view_name, schema=None, **kw):
|
||||
query = """
|
||||
SELECT
|
||||
dbs.NAME `schema`,
|
||||
tbls.TBL_NAME view_name,
|
||||
tbls.VIEW_ORIGINAL_TEXT view_def
|
||||
from
|
||||
TBLS tbls
|
||||
JOIN DBS dbs on tbls.DB_ID = dbs.DB_ID
|
||||
where
|
||||
tbls.VIEW_ORIGINAL_TEXT is not null;
|
||||
"""
|
||||
return get_view_definition_wrapper(
|
||||
self,
|
||||
connection,
|
||||
table_name=view_name,
|
||||
schema=schema,
|
||||
query=query,
|
||||
)
|
||||
|
||||
@reflection.cache
|
||||
def get_table_comment(self, connection, table_name, schema=None, **kw):
|
||||
query = """
|
||||
SELECT
|
||||
DBS.NAME AS `schema`,
|
||||
TBLS.TBL_NAME AS table_name,
|
||||
TABLE_PARAMS.PARAM_VALUE AS table_comment
|
||||
FROM
|
||||
DBS
|
||||
JOIN
|
||||
TBLS ON DBS.DB_ID = TBLS.DB_ID
|
||||
LEFT JOIN TABLE_PARAMS ON TBLS.TBL_ID = TABLE_PARAMS.TBL_ID
|
||||
and TABLE_PARAMS.PARAM_KEY = 'comment'
|
||||
"""
|
||||
return get_table_comment_wrapper(
|
||||
self,
|
||||
connection,
|
||||
table_name=table_name,
|
||||
schema=schema,
|
||||
query=query,
|
||||
)
|
||||
@ -0,0 +1,23 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Hive Metastore Postgres Dialect
|
||||
"""
|
||||
from sqlalchemy.dialects import registry
|
||||
|
||||
from .dialect import HivePostgresMetaStoreDialect
|
||||
|
||||
__version__ = "0.1.0"
|
||||
registry.register(
|
||||
"hive.postgres",
|
||||
"metadata.ingestion.source.database.hive.metastore_dialects.postgres.dialect",
|
||||
"HivePostgresMetaStoreDialect",
|
||||
)
|
||||
@ -0,0 +1,124 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Hive Metastore Postgres Dialect Mixin
|
||||
"""
|
||||
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
|
||||
from sqlalchemy.engine import reflection
|
||||
|
||||
from metadata.ingestion.source.database.hive.metastore_dialects.mixin import (
|
||||
HiveMetaStoreDialectMixin,
|
||||
)
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_table_comment_wrapper,
|
||||
get_view_definition_wrapper,
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=abstract-method
|
||||
class HivePostgresMetaStoreDialect(HiveMetaStoreDialectMixin, PGDialect_psycopg2):
|
||||
"""
|
||||
Postgres metastore dialect class
|
||||
"""
|
||||
|
||||
name = "hive"
|
||||
driver = "postgres"
|
||||
supports_statement_cache = False
|
||||
|
||||
def get_schema_names(self, connection, **kw):
|
||||
# Equivalent to SHOW DATABASES
|
||||
return [row[0] for row in connection.execute('select "NAME" from "DBS";')]
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def get_view_names(self, connection, schema=None, **kw):
|
||||
# Hive does not provide functionality to query tableType
|
||||
# This allows reflection to not crash at the cost of being inaccurate
|
||||
query = self._get_table_names_base_query(schema=schema)
|
||||
query += """ WHERE "TBL_TYPE" = 'VIRTUAL_VIEW'"""
|
||||
return [row[0] for row in connection.execute(query)]
|
||||
|
||||
def _get_table_columns(self, connection, table_name, schema):
|
||||
query = f"""
|
||||
SELECT
|
||||
col."COLUMN_NAME",
|
||||
col."TYPE_NAME",
|
||||
col."COMMENT"
|
||||
from
|
||||
"COLUMNS_V2" col
|
||||
join "CDS" cds ON col."CD_ID" = cds."CD_ID"
|
||||
join "SDS" sds ON sds."CD_ID" = cds."CD_ID"
|
||||
join "TBLS" tbsl on sds."SD_ID" = tbsl."SD_ID"
|
||||
and tbsl."TBL_NAME" = '{table_name}'
|
||||
"""
|
||||
if schema:
|
||||
query += f""" join "DBS" db on tbsl."DB_ID" = db."DB_ID"
|
||||
and db."NAME" = '{schema}'"""
|
||||
|
||||
return connection.execute(query).fetchall()
|
||||
|
||||
def _get_table_names_base_query(self, schema=None):
|
||||
query = 'SELECT "TBL_NAME" from "TBLS" tbl'
|
||||
if schema:
|
||||
query += f""" JOIN "DBS" db on tbl."DB_ID" = db."DB_ID"
|
||||
and db."NAME" = '{schema}'"""
|
||||
return query
|
||||
|
||||
def get_table_names(self, connection, schema=None, **kw):
|
||||
query = self._get_table_names_base_query(schema=schema)
|
||||
query += """ WHERE "TBL_TYPE" != 'VIRTUAL_VIEW'"""
|
||||
return [row[0] for row in connection.execute(query)]
|
||||
|
||||
@reflection.cache
|
||||
def get_view_definition(self, connection, view_name, schema=None, **kw):
|
||||
query = """
|
||||
SELECT
|
||||
dbs."NAME" "schema",
|
||||
tbls."TBL_NAME" view_name,
|
||||
tbls."VIEW_ORIGINAL_TEXT" view_def
|
||||
from
|
||||
"TBLS" tbls
|
||||
JOIN "DBS" dbs on tbls."DB_ID" = dbs."DB_ID"
|
||||
where
|
||||
tbls."VIEW_ORIGINAL_TEXT" is not null;
|
||||
"""
|
||||
return get_view_definition_wrapper(
|
||||
self,
|
||||
connection,
|
||||
table_name=view_name,
|
||||
schema=schema,
|
||||
query=query,
|
||||
)
|
||||
|
||||
@reflection.cache
|
||||
def get_table_comment(self, connection, table_name, schema=None, **kw):
|
||||
query = """
|
||||
SELECT
|
||||
"DBS"."NAME" AS "schema",
|
||||
"TBLS"."TBL_NAME" AS table_name,
|
||||
"TABLE_PARAMS"."PARAM_VALUE" AS table_comment
|
||||
FROM
|
||||
"DBS"
|
||||
JOIN
|
||||
"TBLS" ON "DBS"."DB_ID" = "TBLS"."DB_ID"
|
||||
LEFT JOIN "TABLE_PARAMS" ON "TBLS"."TBL_ID" = "TABLE_PARAMS"."TBL_ID"
|
||||
and "TABLE_PARAMS"."PARAM_KEY" = 'comment'
|
||||
"""
|
||||
return get_table_comment_wrapper(
|
||||
self,
|
||||
connection,
|
||||
table_name=table_name,
|
||||
schema=schema,
|
||||
query=query,
|
||||
)
|
||||
|
||||
# pylint: disable=arguments-renamed
|
||||
def get_dialect_cls(self):
|
||||
return HivePostgresMetaStoreDialect
|
||||
180
ingestion/src/metadata/ingestion/source/database/hive/utils.py
Normal file
180
ingestion/src/metadata/ingestion/source/database/hive/utils.py
Normal file
@ -0,0 +1,180 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Hive source methods.
|
||||
"""
|
||||
import re
|
||||
|
||||
from pyhive.sqlalchemy_hive import _type_map
|
||||
from sqlalchemy import types, util
|
||||
from sqlalchemy.engine import reflection
|
||||
|
||||
from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS
|
||||
|
||||
complex_data_types = ["struct", "map", "array", "union"]
|
||||
|
||||
_type_map.update(
|
||||
{
|
||||
"binary": types.BINARY,
|
||||
"char": types.CHAR,
|
||||
"varchar": types.VARCHAR,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def get_columns(
|
||||
self, connection, table_name, schema=None, **kw
|
||||
): # pylint: disable=unused-argument,too-many-locals
|
||||
"""
|
||||
Method to handle table columns
|
||||
"""
|
||||
rows = self._get_table_columns( # pylint: disable=protected-access
|
||||
connection, table_name, schema
|
||||
)
|
||||
rows = [[col.strip() if col else None for col in row] for row in rows]
|
||||
rows = [row for row in rows if row[0] and row[0] != "# col_name"]
|
||||
result = []
|
||||
for col_name, col_type, comment in rows:
|
||||
if col_name == "# Partition Information":
|
||||
break
|
||||
|
||||
col_raw_type = col_type
|
||||
attype = re.sub(r"\(.*\)", "", col_type)
|
||||
col_type = re.search(r"^\w+", col_type).group(0)
|
||||
try:
|
||||
coltype = _type_map[col_type]
|
||||
|
||||
except KeyError:
|
||||
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
|
||||
coltype = types.NullType
|
||||
charlen = re.search(r"\(([\d,]+)\)", col_raw_type.lower())
|
||||
if charlen:
|
||||
charlen = charlen.group(1)
|
||||
if attype == "decimal":
|
||||
prec, scale = charlen.split(",")
|
||||
args = (int(prec), int(scale))
|
||||
else:
|
||||
args = (int(charlen),)
|
||||
coltype = coltype(*args)
|
||||
|
||||
result.append(
|
||||
{
|
||||
"name": col_name,
|
||||
"type": coltype,
|
||||
"comment": comment,
|
||||
"nullable": True,
|
||||
"default": None,
|
||||
"system_data_type": col_raw_type,
|
||||
"is_complex": col_type in complex_data_types,
|
||||
}
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def get_table_names_older_versions(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
query = "SHOW TABLES"
|
||||
if schema:
|
||||
query += " IN " + self.identifier_preparer.quote_identifier(schema)
|
||||
tables_in_schema = connection.execute(query)
|
||||
tables = []
|
||||
for row in tables_in_schema:
|
||||
# check number of columns in result
|
||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||
# else it is hive with 1 column in the result
|
||||
if len(row) > 1:
|
||||
tables.append(row[1])
|
||||
else:
|
||||
tables.append(row[0])
|
||||
return tables
|
||||
|
||||
|
||||
def get_table_names(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
query = "SHOW TABLES"
|
||||
if schema:
|
||||
query += " IN " + self.identifier_preparer.quote_identifier(schema)
|
||||
tables_in_schema = connection.execute(query)
|
||||
tables = []
|
||||
for row in tables_in_schema:
|
||||
# check number of columns in result
|
||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||
# else it is hive with 1 column in the result
|
||||
if len(row) > 1:
|
||||
tables.append(row[1])
|
||||
else:
|
||||
tables.append(row[0])
|
||||
# "SHOW TABLES" command in hive also fetches view names
|
||||
# Below code filters out view names from table names
|
||||
views = self.get_view_names(connection, schema)
|
||||
return [table for table in tables if table not in views]
|
||||
|
||||
|
||||
def get_view_names(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
query = "SHOW VIEWS"
|
||||
if schema:
|
||||
query += " IN " + self.identifier_preparer.quote_identifier(schema)
|
||||
view_in_schema = connection.execute(query)
|
||||
views = []
|
||||
for row in view_in_schema:
|
||||
# check number of columns in result
|
||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||
# else it is hive with 1 column in the result
|
||||
if len(row) > 1:
|
||||
views.append(row[1])
|
||||
else:
|
||||
views.append(row[0])
|
||||
return views
|
||||
|
||||
|
||||
def get_view_names_older_versions(
|
||||
self, connection, schema=None, **kw
|
||||
): # pylint: disable=unused-argument
|
||||
# Hive does not provide functionality to query tableType for older version
|
||||
# This allows reflection to not crash at the cost of being inaccurate
|
||||
return []
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_table_comment( # pylint: disable=unused-argument
|
||||
self, connection, table_name, schema_name, **kw
|
||||
):
|
||||
"""
|
||||
Returns comment of table.
|
||||
"""
|
||||
cursor = connection.execute(
|
||||
HIVE_GET_COMMENTS.format(schema_name=schema_name, table_name=table_name)
|
||||
)
|
||||
try:
|
||||
for result in list(cursor):
|
||||
data = result.values()
|
||||
if data[1] and data[1].strip() == "comment":
|
||||
return {"text": data[2] if data and data[2] else None}
|
||||
except Exception:
|
||||
return {"text": None}
|
||||
return {"text": None}
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
@reflection.cache
|
||||
def get_view_definition(self, connection, view_name, schema=None, **kw):
|
||||
"""
|
||||
Gets the view definition
|
||||
"""
|
||||
full_view_name = f"`{view_name}`" if not schema else f"`{schema}`.`{view_name}`"
|
||||
res = connection.execute(f"SHOW CREATE TABLE {full_view_name}").fetchall()
|
||||
if res:
|
||||
return "\n".join(i[0] for i in res)
|
||||
return None
|
||||
@ -70,8 +70,35 @@ Executing the profiler workflow or data quality tests, will require the user to
|
||||
|
||||
- **Username**: Specify the User to connect to Hive. It should have enough privileges to read all the metadata.
|
||||
- **Password**: Password to connect to Hive.
|
||||
- **Host and Port**: Enter the fully qualified hostname and port number for your Hive deployment in the Host and Port field.
|
||||
- **Auth Options (Optional)**: Enter the auth options string for hive connection.
|
||||
- **Host and Port**: This parameter specifies the host and port of the Hive server instance. This should be specified as a string in the format `hostname:port`. For example, you might set the hostPort parameter to `myhivehost:10000`.
|
||||
- **Auth Options (Optional)**: The auth parameter specifies the authentication method to use when connecting to the Hive server. Possible values are `LDAP`, `NONE`, `CUSTOM`, or `KERBEROS`. If you are using Kerberos authentication, you should set auth to `KERBEROS`. If you are using custom authentication, you should set auth to `CUSTOM` and provide additional options in the `authOptions` parameter.
|
||||
- **Kerberos Service Name**: This parameter specifies the Kerberos service name to use for authentication. This should only be specified if using Kerberos authentication. The default value is `hive`.
|
||||
- **Database Schema**: Schema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion attempts to scan all the schemas.
|
||||
- **Database Name**: Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.
|
||||
|
||||
|
||||
#### For MySQL Metastore Connection
|
||||
|
||||
You can also ingest the metadata using Mysql metastore. This step is optional if metastore details are not provided then we will query the hive server directly.
|
||||
|
||||
- **Username**: Specify the User to connect to MySQL Metastore. It should have enough privileges to read all the metadata.
|
||||
- **Password**: Password to connect to MySQL.
|
||||
- **Host and Port**: Enter the fully qualified hostname and port number for your MySQL Metastore deployment in the Host and Port field in the format `hostname:port`.
|
||||
- **databaseSchema**: Enter the database schema which is associated with the metastore.
|
||||
|
||||
{% partial file="/v1.1.1/connectors/database/advanced-configuration.md" /%}
|
||||
|
||||
#### For Postgres Metastore Connection
|
||||
|
||||
You can also ingest the metadata using Postgres metastore. This step is optional if metastore details are not provided then we will query the hive server directly.
|
||||
|
||||
- **Username**: Specify the User to connect to Postgres Metastore. It should have enough privileges to read all the metadata.
|
||||
- **Password**: Password to connect to Postgres.
|
||||
- **Host and Port**: Enter the fully qualified hostname and port number for your Postgres deployment in the Host and Port field in the format `hostname:port`.
|
||||
- **Database**: Initial Postgres database to connect to. Specify the name of database associated with metastore instance.
|
||||
|
||||
{% partial file="/v1.1.1/connectors/database/advanced-configuration.md" /%}
|
||||
|
||||
|
||||
{% partial file="/v1.1.1/connectors/database/advanced-configuration.md" /%}
|
||||
|
||||
|
||||
@ -99,6 +99,33 @@ This is a sample config for Hive:
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
|
||||
{% codeInfo srNumber=22 %}
|
||||
|
||||
#### For MySQL Metastore Connection:
|
||||
You can also ingest the metadata using Mysql metastore. This step is optional if metastore details are not provided then we will query the hive server directly.
|
||||
|
||||
- **username**: Specify the User to connect to MySQL Metastore. It should have enough privileges to read all the metadata.
|
||||
- **password**: Password to connect to MySQL.
|
||||
- **hostPort**: Enter the fully qualified hostname and port number for your MySQL Metastore deployment in the Host and Port field in the format `hostname:port`.
|
||||
- **databaseSchema**: Enter the database schema which is associated with the metastore.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=3 %}
|
||||
|
||||
#### For Postgres Metastore Connection:
|
||||
|
||||
You can also ingest the metadata using Postgres metastore. This step is optional if metastore details are not provided then we will query the hive server directly.
|
||||
|
||||
- **username**: Specify the User to connect to Postgres Metastore. It should have enough privileges to read all the metadata.
|
||||
- **password**: Password to connect to Postgres.
|
||||
- **hostPort**: Enter the fully qualified hostname and port number for your Postgres deployment in the Host and Port field in the format `hostname:port`.
|
||||
- **database**: Initial Postgres database to connect to. Specify the name of database associated with metastore instance.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
|
||||
#### Source Configuration - Source Config
|
||||
|
||||
{% codeInfo srNumber=7 %}
|
||||
@ -166,6 +193,26 @@ source:
|
||||
```yaml {% srNumber=4 %}
|
||||
hostPort: <hive connection host & port>
|
||||
```
|
||||
|
||||
```yaml {% srNumber=22 %}
|
||||
# For MySQL Metastore Connection
|
||||
# metastoreConnection:
|
||||
# type: Mysql
|
||||
# username: <username>
|
||||
# password: <password>
|
||||
# hostPort: <hostPort>
|
||||
# databaseSchema: metastore
|
||||
|
||||
```
|
||||
```yaml {% srNumber=23 %}
|
||||
# For Postgres Metastore Connection
|
||||
# metastoreConnection:
|
||||
# type: Postgres
|
||||
# username: <username>
|
||||
# password: <password>
|
||||
# hostPort: <hostPort>
|
||||
# database: metastore
|
||||
```
|
||||
```yaml {% srNumber=5 %}
|
||||
# connectionOptions:
|
||||
# key: value
|
||||
|
||||
@ -75,6 +75,22 @@
|
||||
"description": "Authentication options to pass to Hive connector. These options are based on SQLAlchemy.",
|
||||
"type": "string"
|
||||
},
|
||||
"metastoreConnection":{
|
||||
"title": "Hive Metastore Connection Details",
|
||||
"description": "Hive Metastore Connection Details",
|
||||
"oneOf": [
|
||||
{
|
||||
"title": "None",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"$ref": "./postgresConnection.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./mysqlConnection.json"
|
||||
}
|
||||
]
|
||||
},
|
||||
"connectionOptions": {
|
||||
"title": "Connection Options",
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
|
||||
|
||||
@ -9,66 +9,295 @@ Executing the profiler Workflow or data quality tests, will require the user to
|
||||
|
||||
You can find further information on the Hive connector in the [docs](https://docs.open-metadata.org/connectors/database/hive).
|
||||
|
||||
## Connection Details
|
||||
## Hive Server Connection Details
|
||||
|
||||
$$section
|
||||
### Scheme $(id="scheme")
|
||||
### Scheme
|
||||
SQLAlchemy driver scheme options. If you are unsure about this setting, you can use the default value. OpenMetadata supports both `Hive` and `Impala`.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Username $(id="username")
|
||||
### Username
|
||||
Username to connect to Hive. This user should have the necessary privileges described in the section above.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Password $(id="password")
|
||||
### Password
|
||||
Password to connect to Hive.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Host Port $(id="hostPort")
|
||||
### Host Port
|
||||
|
||||
This parameter specifies the host and port of the Hive instance. This should be specified as a string in the format `hostname:port`. For example, you might set the hostPort parameter to `myhivehost:10000`.
|
||||
|
||||
If your database service and Open Metadata are both running via docker locally, use `host.docker.internal:10000` as the value.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Auth $(id="auth")
|
||||
### Auth
|
||||
The auth parameter specifies the authentication method to use when connecting to the Hive server. Possible values are `LDAP`, `NONE`, `CUSTOM`, or `KERBEROS`. If you are using Kerberos authentication, you should set auth to `KERBEROS`. If you are using custom authentication, you should set auth to `CUSTOM` and provide additional options in the `authOptions` parameter.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Kerberos Service Name $(id="kerberosServiceName")
|
||||
### Kerberos Service Name
|
||||
This parameter specifies the Kerberos service name to use for authentication. This should only be specified if using Kerberos authentication. The default value is `hive`.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Database Schema $(id="databaseSchema")
|
||||
### Database Schema
|
||||
Schema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion attempts to scan all the schemas.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Database Name $(id="databaseName")
|
||||
### Database Name
|
||||
In OpenMetadata, the Database Service hierarchy works as follows:
|
||||
```
|
||||
Database Service > Database > Schema > Table
|
||||
```
|
||||
In the case of Hive, we won't have a Database as such. If you'd like to see your data in a database named something other than `default`, you can specify the name in this field.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Auth Options $(id="authOptions")
|
||||
### Auth Options
|
||||
Authentication options to pass to Hive connector. These options are based on SQLAlchemy.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Connection Options $(id="connectionOptions")
|
||||
### Connection Options
|
||||
Additional connection options to build the URL that can be sent to service during the connection. The connectionOptions parameter is specific to the connection method being used. For example, if you are using SSL encryption, you might set the connectionOptions parameter to {'ssl': 'true', 'sslTrustStore': '/path/to/truststore'}.
|
||||
|
||||
### Connection Arguments
|
||||
Additional connection arguments such as security or protocol configs that can be sent to service during connection.
|
||||
|
||||
## Hive Postgres Metastore Connection Details
|
||||
|
||||
### Username
|
||||
|
||||
Username to connect to Postgres. This user should have privileges to read all the metadata in Postgres.
|
||||
|
||||
### Auth Config
|
||||
There are 2 types of auth configs:
|
||||
- Basic Auth.
|
||||
- IAM based Auth.
|
||||
|
||||
User can authenticate the Postgres Instance with auth type as `Basic Authentication` i.e. Password **or** by using `IAM based Authentication` to connect to AWS related services.
|
||||
|
||||
### Basic Auth
|
||||
|
||||
|
||||
|
||||
### Password
|
||||
|
||||
Password to connect to Postgres.
|
||||
|
||||
### IAM Auth Config
|
||||
|
||||
### AWS Access Key ID
|
||||
|
||||
When you interact with AWS, you specify your AWS security credentials to verify who you are and whether you have permission to access the resources that you are requesting. AWS uses the security credentials to authenticate and authorize your requests ([docs](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)).
|
||||
|
||||
Access keys consist of two parts:
|
||||
1. An access key ID (for example, `AKIAIOSFODNN7EXAMPLE`),
|
||||
2. And a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`).
|
||||
|
||||
You must use both the access key ID and secret access key together to authenticate your requests.
|
||||
|
||||
You can find further information on how to manage your access keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)
|
||||
|
||||
### AWS Secret Access Key
|
||||
|
||||
Secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`).
|
||||
|
||||
### AWS Region
|
||||
|
||||
Each AWS Region is a separate geographic area in which AWS clusters data centers ([docs](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html)).
|
||||
|
||||
As AWS can have instances in multiple regions, we need to know the region the service you want reach belongs to.
|
||||
|
||||
Note that the AWS Region is the only required parameter when configuring a connection. When connecting to the services programmatically, there are different ways in which we can extract and use the rest of AWS configurations. You can find further information about configuring your credentials [here](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials).
|
||||
|
||||
### AWS Session Token
|
||||
|
||||
If you are using temporary credentials to access your services, you will need to inform the AWS Access Key ID and AWS Secrets Access Key. Also, these will include an AWS Session Token.
|
||||
|
||||
You can find more information on [Using temporary credentials with AWS resources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html).
|
||||
|
||||
### Endpoint URL
|
||||
|
||||
To connect programmatically to an AWS service, you use an endpoint. An *endpoint* is the URL of the entry point for an AWS web service. The AWS SDKs and the AWS Command Line Interface (AWS CLI) automatically use the default endpoint for each service in an AWS Region. But you can specify an alternate endpoint for your API requests.
|
||||
|
||||
Find more information on [AWS service endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html).
|
||||
|
||||
### Profile Name
|
||||
|
||||
A named profile is a collection of settings and credentials that you can apply to an AWS CLI command. When you specify a profile to run a command, the settings and credentials are used to run that command. Multiple named profiles can be stored in the config and credentials files.
|
||||
|
||||
You can inform this field if you'd like to use a profile other than `default`.
|
||||
|
||||
Find here more information about [Named profiles for the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html).
|
||||
|
||||
### Assume Role ARN
|
||||
|
||||
Typically, you use `AssumeRole` within your account or for cross-account access. In this field you'll set the `ARN` (Amazon Resource Name) of the policy of the other account.
|
||||
|
||||
A user who wants to access a role in a different account must also have permissions that are delegated from the account administrator. The administrator must attach a policy that allows the user to call `AssumeRole` for the `ARN` of the role in the other account.
|
||||
|
||||
This is a required field if you'd like to `AssumeRole`.
|
||||
|
||||
Find more information on [AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html).
|
||||
|
||||
### Assume Role Session Name
|
||||
|
||||
An identifier for the assumed role session. Use the role session name to uniquely identify a session when the same role is assumed by different principals or for different reasons.
|
||||
|
||||
By default, we'll use the name `OpenMetadataSession`.
|
||||
|
||||
Find more information about the [Role Session Name](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html#:~:text=An%20identifier%20for%20the%20assumed%20role%20session.).
|
||||
|
||||
### Assume Role Source Identity
|
||||
|
||||
The source identity specified by the principal that is calling the `AssumeRole` operation. You can use source identity information in AWS CloudTrail logs to determine who took actions with a role.
|
||||
|
||||
Find more information about [Source Identity](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html#:~:text=Required%3A%20No-,SourceIdentity,-The%20source%20identity).
|
||||
|
||||
### Host and Port
|
||||
|
||||
This parameter specifies the host and port of the Postgres instance. This should be specified as a string in the format `hostname:port`. For example, you might set the hostPort parameter to `localhost:5432`.
|
||||
|
||||
If your database service and Open Metadata are both running via docker locally, use `host.docker.internal:5432` as the value.
|
||||
|
||||
### Database
|
||||
|
||||
Initial Postgres database to connect to. If you want to ingest all databases, set `ingestAllDatabases` to true.
|
||||
|
||||
### SSL Mode
|
||||
|
||||
SSL Mode to connect to postgres database. E.g, `prefer`, `verify-ca`, `allow` etc.
|
||||
|
||||
$$note
|
||||
if you are using `IAM auth`, select either `allow` (recommended) or other option based on your use case.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Connection Arguments $(id="connectionArguments")
|
||||
### Classification Name
|
||||
|
||||
By default, the Postgres policy tags in OpenMetadata are classified under the name `PostgresPolicyTags`. However, you can create a custom classification name of your choice for these tags. Once you have ingested Postgres data, the custom classification name will be visible in the Classifications list on the Tags page.
|
||||
|
||||
### Ingest All Databases
|
||||
|
||||
If ticked, the workflow will be able to ingest all database in the cluster. If not ticked, the workflow will only ingest tables from the database set above.
|
||||
|
||||
### Connection Arguments
|
||||
|
||||
Additional connection arguments such as security or protocol configs that can be sent to service during connection.
|
||||
|
||||
### Connection Options
|
||||
|
||||
Additional connection options to build the URL that can be sent to service during the connection.
|
||||
|
||||
## Hive Mysql Metastore Connection Details
|
||||
|
||||
### Scheme $(id="scheme")
|
||||
SQLAlchemy driver scheme options. If you are unsure about this setting, you can use the default value.
|
||||
|
||||
### Username $(id="username")
|
||||
Username to connect to MySQL. This user should have access to the `INFORMATION_SCHEMA` to extract metadata. Other workflows may require different permissions -- refer to the section above for more information.
|
||||
|
||||
### Auth Config $(id="authType")
|
||||
There are 2 types of auth configs:
|
||||
- Basic Auth.
|
||||
- IAM based Auth.
|
||||
|
||||
User can authenticate the Mysql Instance with auth type as `Basic Authentication` i.e. Password **or** by using `IAM based Authentication` to connect to AWS related services.
|
||||
|
||||
|
||||
### Basic Auth
|
||||
|
||||
### Password $(id="password")
|
||||
Password to connect to MySQL.
|
||||
|
||||
### IAM Auth Config
|
||||
|
||||
$$note
|
||||
If you are using IAM auth, add <br />`"ssl": {"ssl-mode": "allow"}` under Connection Arguments
|
||||
$$
|
||||
|
||||
### AWS Access Key ID $(id="awsAccessKeyId")
|
||||
|
||||
When you interact with AWS, you specify your AWS security credentials to verify who you are and whether you have permission to access the resources that you are requesting. AWS uses the security credentials to authenticate and authorize your requests ([docs](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)).
|
||||
|
||||
Access keys consist of two parts:
|
||||
1. An access key ID (for example, `AKIAIOSFODNN7EXAMPLE`),
|
||||
2. And a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`).
|
||||
|
||||
You must use both the access key ID and secret access key together to authenticate your requests.
|
||||
|
||||
You can find further information on how to manage your access keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)
|
||||
|
||||
### AWS Secret Access Key $(id="awsSecretAccessKey")
|
||||
|
||||
Secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`).
|
||||
|
||||
### AWS Region $(id="awsRegion")
|
||||
|
||||
Each AWS Region is a separate geographic area in which AWS clusters data centers ([docs](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html)).
|
||||
|
||||
As AWS can have instances in multiple regions, we need to know the region the service you want reach belongs to.
|
||||
|
||||
Note that the AWS Region is the only required parameter when configuring a connection. When connecting to the services programmatically, there are different ways in which we can extract and use the rest of AWS configurations. You can find further information about configuring your credentials [here](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials).
|
||||
|
||||
### AWS Session Token $(id="awsSessionToken")
|
||||
|
||||
If you are using temporary credentials to access your services, you will need to inform the AWS Access Key ID and AWS Secrets Access Key. Also, these will include an AWS Session Token.
|
||||
|
||||
You can find more information on [Using temporary credentials with AWS resources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html).
|
||||
|
||||
### Endpoint URL $(id="endPointURL")
|
||||
|
||||
To connect programmatically to an AWS service, you use an endpoint. An *endpoint* is the URL of the entry point for an AWS web service. The AWS SDKs and the AWS Command Line Interface (AWS CLI) automatically use the default endpoint for each service in an AWS Region. But you can specify an alternate endpoint for your API requests.
|
||||
|
||||
Find more information on [AWS service endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html).
|
||||
|
||||
### Profile Name $(id="profileName")
|
||||
|
||||
A named profile is a collection of settings and credentials that you can apply to an AWS CLI command. When you specify a profile to run a command, the settings and credentials are used to run that command. Multiple named profiles can be stored in the config and credentials files.
|
||||
|
||||
You can inform this field if you'd like to use a profile other than `default`.
|
||||
|
||||
Find here more information about [Named profiles for the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html).
|
||||
|
||||
### Assume Role ARN $(id="assumeRoleArn")
|
||||
|
||||
Typically, you use `AssumeRole` within your account or for cross-account access. In this field you'll set the `ARN` (Amazon Resource Name) of the policy of the other account.
|
||||
|
||||
A user who wants to access a role in a different account must also have permissions that are delegated from the account administrator. The administrator must attach a policy that allows the user to call `AssumeRole` for the `ARN` of the role in the other account.
|
||||
|
||||
This is a required field if you'd like to `AssumeRole`.
|
||||
|
||||
Find more information on [AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html).
|
||||
|
||||
### Assume Role Session Name $(id="assumeRoleSessionName")
|
||||
|
||||
An identifier for the assumed role session. Use the role session name to uniquely identify a session when the same role is assumed by different principals or for different reasons.
|
||||
|
||||
By default, we'll use the name `OpenMetadataSession`.
|
||||
|
||||
Find more information about the [Role Session Name](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html#:~:text=An%20identifier%20for%20the%20assumed%20role%20session.).
|
||||
|
||||
### Assume Role Source Identity $(id="assumeRoleSourceIdentity")
|
||||
|
||||
The source identity specified by the principal that is calling the `AssumeRole` operation. You can use source identity information in AWS CloudTrail logs to determine who took actions with a role.
|
||||
|
||||
Find more information about [Source Identity](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html#:~:text=Required%3A%20No-,SourceIdentity,-The%20source%20identity).
|
||||
|
||||
### Host Port $(id="hostPort")
|
||||
|
||||
This parameter specifies the host and port of the MySQL instance. This should be specified as a string in the format `hostname:port`. For example, you might set the hostPort parameter to `localhost:3306`.
|
||||
|
||||
If your database service and Open Metadata are both running via docker locally, use `host.docker.internal:3306` as the value.
|
||||
|
||||
### Database Name $(id="databaseName")
|
||||
In OpenMetadata, the Database Service hierarchy works as follows:
|
||||
```
|
||||
Database Service > Database > Schema > Table
|
||||
```
|
||||
In the case of MySQL, we won't have a Database as such. If you'd like to see your data in a database named something other than `default`, you can specify the name in this field.
|
||||
|
||||
### Database Schema $(id="databaseSchema")
|
||||
This is an optional parameter. When set, the value will be used to restrict the metadata reading to a single database (corresponding to the value passed in this field). When left blank, OpenMetadata will scan all the databases.
|
||||
|
||||
### SSL CA $(id="sslCA")
|
||||
Provide the path to SSL CA file, which needs to be local in the ingestion process.
|
||||
|
||||
### SSL Certificate $(id="sslCert")
|
||||
Provide the path to SSL client certificate file (`ssl_cert`)
|
||||
|
||||
### SSL Key $(id="sslKey")
|
||||
Provide the path to SSL key file (`ssl_key`)
|
||||
|
||||
### Connection Options $(id="connectionOptions")
|
||||
Additional connection options to build the URL that can be sent to the service during the connection.
|
||||
|
||||
### Connection Arguments $(id="connectionArguments")
|
||||
Additional connection arguments such as security or protocol configs that can be sent to the service during connection.
|
||||
|
||||
@ -129,7 +129,7 @@ const SelectServiceType = ({
|
||||
}
|
||||
)}
|
||||
data-testid={type}
|
||||
flex="100px"
|
||||
flex="114px"
|
||||
key={type}
|
||||
onClick={() => handleServiceTypeClick(type)}>
|
||||
<Space
|
||||
|
||||
@ -107,8 +107,8 @@ export const ObjectFieldTemplate: FunctionComponent<ObjectFieldTemplateProps> =
|
||||
<>
|
||||
<Collapse
|
||||
className="advanced-properties-collapse"
|
||||
expandIconPosition="right">
|
||||
<Panel header={t('label.advanced')} key="1">
|
||||
expandIconPosition="end">
|
||||
<Panel header={`${title} ${t('label.advanced-config')}`} key="1">
|
||||
{advancedProperties.map((element, index) => (
|
||||
<div
|
||||
className={classNames('property-wrapper', {
|
||||
|
||||
@ -280,6 +280,9 @@ export const COMMON_UI_SCHEMA = {
|
||||
connection: {
|
||||
...DEF_UI_SCHEMA,
|
||||
},
|
||||
metastoreConnection: {
|
||||
...DEF_UI_SCHEMA,
|
||||
},
|
||||
};
|
||||
|
||||
export const OPEN_METADATA = 'OpenMetadata';
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
"admin-plural": "Admins",
|
||||
"admin-profile": "Admin profile",
|
||||
"advanced": "Advanced",
|
||||
"advanced-config": "Advanced Config",
|
||||
"advanced-configuration": "Advanced Configuration",
|
||||
"advanced-entity": "Advanced {{entity}}",
|
||||
"advanced-search": "Advanced Search",
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
"admin-plural": "Administradores",
|
||||
"admin-profile": "Perfil de administrador",
|
||||
"advanced": "Advanced",
|
||||
"advanced-config": "Advanced Config",
|
||||
"advanced-configuration": "Advanced Configuration",
|
||||
"advanced-entity": "{{entity}} avanzado",
|
||||
"advanced-search": "Advanced Search",
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
"admin-plural": "Admins",
|
||||
"admin-profile": "Profile admin",
|
||||
"advanced": "Advanced",
|
||||
"advanced-config": "Advanced Config",
|
||||
"advanced-configuration": "Advanced Configuration",
|
||||
"advanced-entity": "{{entity}} Avancée",
|
||||
"advanced-search": "Recherche Avancée",
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
"admin-plural": "管理者",
|
||||
"admin-profile": "管理者のプロファイル",
|
||||
"advanced": "Advanced",
|
||||
"advanced-config": "Advanced Config",
|
||||
"advanced-configuration": "Advanced Configuration",
|
||||
"advanced-entity": "高度な{{entity}}",
|
||||
"advanced-search": "Advanced Search",
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
"admin-plural": "Administradores",
|
||||
"admin-profile": "Perfil de administrador",
|
||||
"advanced": "Advanced",
|
||||
"advanced-config": "Advanced Config",
|
||||
"advanced-configuration": "Advanced Configuration",
|
||||
"advanced-entity": "Entidade avançada",
|
||||
"advanced-search": "Advanced Search",
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
"admin-plural": "管理员",
|
||||
"admin-profile": "管理员资料",
|
||||
"advanced": "Advanced",
|
||||
"advanced-config": "Advanced Config",
|
||||
"advanced-configuration": "Advanced Configuration",
|
||||
"advanced-entity": "高级{{entity}}",
|
||||
"advanced-search": "高级搜索",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user