mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-13 17:32:53 +00:00
databricks foreign table issue: skip table metadata for foreign tables (#17343)
* skip get_columns for foreign catalog tables * get table type before executing column metadata * remove duplicate query, pycheckstyle fix * skip fk table instead of reaching till column metadata * add debug log
This commit is contained in:
parent
cf6f2e6c2f
commit
264be13b66
@ -18,7 +18,7 @@ from typing import Iterable, Optional, Tuple, Union
|
|||||||
from pydantic import EmailStr
|
from pydantic import EmailStr
|
||||||
from pydantic_core import PydanticCustomError
|
from pydantic_core import PydanticCustomError
|
||||||
from pyhive.sqlalchemy_hive import _type_map
|
from pyhive.sqlalchemy_hive import _type_map
|
||||||
from sqlalchemy import types, util
|
from sqlalchemy import exc, types, util
|
||||||
from sqlalchemy.engine import reflection
|
from sqlalchemy.engine import reflection
|
||||||
from sqlalchemy.engine.reflection import Inspector
|
from sqlalchemy.engine.reflection import Inspector
|
||||||
from sqlalchemy.exc import DatabaseError
|
from sqlalchemy.exc import DatabaseError
|
||||||
@ -286,15 +286,42 @@ def get_table_names(
|
|||||||
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
|
||||||
# else it is hive with 1 column in the result
|
# else it is hive with 1 column in the result
|
||||||
if len(row) > 1:
|
if len(row) > 1:
|
||||||
tables.append(row[1])
|
table_name = row[1]
|
||||||
else:
|
else:
|
||||||
tables.append(row[0])
|
table_name = row[0]
|
||||||
|
if schema:
|
||||||
|
table_type = get_table_type(connection, schema, table_name)
|
||||||
|
if not table_type or table_type == "FOREIGN":
|
||||||
|
# skip the table if it's foreign table / error in fetching table_type
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping metadata ingestion for unsupported foreign table {table_name}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
tables.append(table_name)
|
||||||
|
|
||||||
# "SHOW TABLES" command in hive also fetches view names
|
# "SHOW TABLES" command in hive also fetches view names
|
||||||
# Below code filters out view names from table names
|
# Below code filters out view names from table names
|
||||||
views = self.get_view_names(connection, schema)
|
views = self.get_view_names(connection, schema)
|
||||||
return [table for table in tables if table not in views]
|
return [table for table in tables if table not in views]
|
||||||
|
|
||||||
|
|
||||||
|
def get_table_type(connection, schema, table):
|
||||||
|
"""get table type (regular/foreign)"""
|
||||||
|
try:
|
||||||
|
query = DATABRICKS_GET_TABLE_COMMENTS.format(
|
||||||
|
schema_name=schema, table_name=table
|
||||||
|
)
|
||||||
|
rows = connection.execute(query)
|
||||||
|
for row in rows:
|
||||||
|
row_dict = dict(row)
|
||||||
|
if row_dict.get("col_name") == "Type":
|
||||||
|
# get type of table
|
||||||
|
return row_dict.get("data_type")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
DatabricksDialect.get_table_comment = get_table_comment
|
DatabricksDialect.get_table_comment = get_table_comment
|
||||||
DatabricksDialect.get_view_names = get_view_names
|
DatabricksDialect.get_view_names = get_view_names
|
||||||
DatabricksDialect.get_columns = get_columns
|
DatabricksDialect.get_columns = get_columns
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user