Fix #10999: Add support for materialized views in oracle (#11005)

This commit is contained in:
Mayur Singal 2023-04-12 16:21:27 +05:30 committed by GitHub
parent 288358952b
commit ae6683862f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 382 additions and 193 deletions

View File

@ -11,21 +11,13 @@
# pylint: disable=protected-access
"""Oracle source module"""
import re
import traceback
from typing import Iterable, Optional
from sqlalchemy import sql, util
from sqlalchemy.dialects.oracle.base import (
FLOAT,
INTEGER,
INTERVAL,
NUMBER,
TIMESTAMP,
OracleDialect,
ischema_names,
)
from sqlalchemy.engine import reflection
from sqlalchemy.sql import sqltypes
from sqlalchemy.dialects.oracle.base import INTERVAL, OracleDialect, ischema_names
from sqlalchemy.engine import Inspector
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
OracleConnection,
)
@ -37,20 +29,30 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.oracle.queries import (
ORACLE_ALL_TABLE_COMMENTS,
ORACLE_ALL_VIEW_DEFINITIONS,
ORACLE_GET_COLUMNS,
ORACLE_IDENTITY_TYPE,
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.oracle.utils import (
_get_col_type,
get_columns,
get_mview_definition,
get_mview_definition_dialect,
get_mview_names,
get_mview_names_dialect,
get_table_comment,
get_table_names,
get_view_definition,
)
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
get_all_view_definitions,
get_table_comment_wrapper,
get_view_definition_wrapper,
)
logger = ingestion_logger()
ischema_names.update(
{
"ROWID": create_sqlalchemy_type("ROWID"),
@ -59,178 +61,17 @@ ischema_names.update(
}
)
@reflection.cache
def get_table_comment(
self,
connection,
table_name: str,
schema: str = None,
resolve_synonyms=False,
dblink="",
**kw,
): # pylint: disable=unused-argument
return get_table_comment_wrapper(
self,
connection,
table_name=table_name.lower(),
schema=schema.lower() if schema else None,
query=ORACLE_ALL_TABLE_COMMENTS,
)
@reflection.cache
def get_view_definition(
self,
connection,
view_name: str,
schema: str = None,
resolve_synonyms=False,
dblink="",
**kw,
): # pylint: disable=unused-argument
return get_view_definition_wrapper(
self,
connection,
table_name=view_name.lower(),
schema=schema.lower() if schema else None,
query=ORACLE_ALL_VIEW_DEFINITIONS,
)
def _get_col_type(
self, coltype, precision, scale, length, colname
): # pylint: disable=too-many-branches
raw_type = coltype
if coltype == "NUMBER":
if precision is None and scale == 0:
coltype = INTEGER()
else:
coltype = NUMBER(precision, scale)
if precision is not None:
if scale is not None:
raw_type += f"({precision},{scale})"
else:
raw_type += f"({precision})"
elif coltype == "FLOAT":
# TODO: support "precision" here as "binary_precision"
coltype = FLOAT()
elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
coltype = self.ischema_names.get(coltype)(length)
if length:
raw_type += f"({length})"
elif "WITH TIME ZONE" in coltype or "TIMESTAMP" in coltype:
coltype = TIMESTAMP(timezone=True)
elif "INTERVAL" in coltype:
coltype = INTERVAL()
else:
coltype = re.sub(r"\(\d+\)", "", coltype)
try:
coltype = self.ischema_names[coltype]
except KeyError:
util.warn(f"Did not recognize type '{coltype}' of column '{colname}'")
coltype = sqltypes.NULLTYPE
return coltype, raw_type
# pylint: disable=too-many-locals
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
Dialect method overridden to add raw data type
kw arguments can be:
oracle_resolve_synonyms
dblink
"""
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
dblink = kw.get("dblink", "")
info_cache = kw.get("info_cache")
(table_name, schema, dblink, _) = self._prepare_reflection_args(
connection,
table_name,
schema,
resolve_synonyms,
dblink,
info_cache=info_cache,
)
columns = []
char_length_col = "data_length"
if self._supports_char_length:
char_length_col = "char_length"
identity_cols = "NULL as default_on_null, NULL as identity_options"
if self.server_version_info >= (12,):
identity_cols = ORACLE_IDENTITY_TYPE.format(dblink=dblink)
params = {"table_name": table_name}
text = ORACLE_GET_COLUMNS.format(
dblink=dblink, char_length_col=char_length_col, identity_cols=identity_cols
)
if schema is not None:
params["owner"] = schema
text += " AND col.owner = :owner "
text += " ORDER BY col.column_id"
cols = connection.execute(sql.text(text), params)
for row in cols:
colname = self.normalize_name(row[0])
length = row[2]
nullable = row[5] == "Y"
default = row[6]
generated = row[8]
default_on_nul = row[9]
identity_options = row[10]
coltype, raw_coltype = self._get_col_type(
row.data_type, row.data_precision, row.data_scale, length, colname
)
computed = None
if generated == "YES":
computed = {"sqltext": default}
default = None
identity = None
if identity_options is not None:
identity = self._parse_identity_options(identity_options, default_on_nul)
default = None
cdict = {
"name": colname,
"type": coltype,
"nullable": nullable,
"default": default,
"autoincrement": "auto",
"comment": row.comments,
"system_data_type": raw_coltype,
}
if row.column_name.lower() == row.column_name:
cdict["quote"] = True
if computed is not None:
cdict["computed"] = computed
if identity is not None:
cdict["identity"] = identity
columns.append(cdict)
return columns
OracleDialect.get_table_comment = get_table_comment
OracleDialect.get_columns = get_columns
OracleDialect._get_col_type = _get_col_type
OracleDialect.get_view_definition = get_view_definition
OracleDialect.get_mview_definition = get_mview_definition_dialect
OracleDialect.get_all_view_definitions = get_all_view_definitions
OracleDialect.get_all_table_comments = get_all_table_comments
OracleDialect.get_table_names = get_table_names
Inspector.get_mview_names = get_mview_names
Inspector.get_mview_definition = get_mview_definition
OracleDialect.get_mview_names = get_mview_names_dialect
class OracleSource(CommonDbSourceService):
@ -248,3 +89,49 @@ class OracleSource(CommonDbSourceService):
f"Expected OracleConnection, but got {connection}"
)
return cls(config, metadata_config)
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the table
name and type. By default, use the inspector method
to get the names and pass the Regular type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., external, foreign,...
"""
regular_tables = [
TableNameAndType(name=table_name)
for table_name in self.inspector.get_table_names(schema_name) or []
]
material_tables = [
TableNameAndType(name=table_name, type_=TableType.MaterializedView)
for table_name in self.inspector.get_mview_names(schema_name) or []
]
return regular_tables + material_tables
def get_view_definition(
self, table_type: str, table_name: str, schema_name: str, inspector: Inspector
) -> Optional[str]:
if table_type not in {TableType.View, TableType.MaterializedView}:
return None
definition_fn = inspector.get_view_definition
if table_type == TableType.MaterializedView:
definition_fn = inspector.get_mview_definition
try:
view_definition = definition_fn(table_name, schema_name)
view_definition = "" if view_definition is None else str(view_definition)
return view_definition
except NotImplementedError:
logger.warning("View definition not implemented")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch view definition for {table_name}: {exc}")
return None

View File

@ -23,12 +23,35 @@ where comments is not null and owner not in ('SYSTEM', 'SYS')
ORACLE_ALL_VIEW_DEFINITIONS = """
SELECT
LOWER(view_name) "view_name",
LOWER(owner) "schema",
text view_def
FROM all_views
where text is not null and owner not in ('SYSTEM', 'SYS')
SELECT
LOWER(view_name) AS "view_name",
LOWER(owner) AS "schema",
DBMS_METADATA.GET_DDL('VIEW', view_name, owner) AS view_def
FROM all_views
WHERE owner NOT IN ('SYSTEM', 'SYS')
"""
ORACLE_ALL_MATERIALIZED_VIEW_DEFINITIONS = """
SELECT
LOWER(mview_name) AS "view_name",
LOWER(owner) AS "schema",
DBMS_METADATA.GET_DDL('MATERIALIZED_VIEW', mview_name, owner) AS view_def
FROM all_mviews
WHERE owner NOT IN ('SYSTEM', 'SYS')
"""
GET_MATERIALIZED_VIEW_NAMES = """
SELECT mview_name FROM all_mviews WHERE owner = :owner
"""
ORACLE_GET_TABLE_NAMES = """
SELECT table_name FROM all_tables WHERE
{tablespace}
OWNER = :owner
AND IOT_NAME IS NULL
AND DURATION IS NULL
AND TABLE_NAME NOT IN
(SELECT mview_name FROM all_mviews WHERE owner = :owner)
"""
ORACLE_IDENTITY_TYPE = """\

View File

@ -0,0 +1,279 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to define overrided sqlalchamy methods
"""
# pylint: disable=protected-access,unused-argument
import re
from sqlalchemy import sql, util
from sqlalchemy.dialects.oracle.base import FLOAT, INTEGER, INTERVAL, NUMBER, TIMESTAMP
from sqlalchemy.engine import reflection
from sqlalchemy.sql import sqltypes
from metadata.ingestion.source.database.oracle.queries import (
GET_MATERIALIZED_VIEW_NAMES,
ORACLE_ALL_MATERIALIZED_VIEW_DEFINITIONS,
ORACLE_ALL_TABLE_COMMENTS,
ORACLE_ALL_VIEW_DEFINITIONS,
ORACLE_GET_COLUMNS,
ORACLE_GET_TABLE_NAMES,
ORACLE_IDENTITY_TYPE,
)
from metadata.utils.sqlalchemy_utils import (
get_table_comment_wrapper,
get_view_definition_wrapper,
)
@reflection.cache
def get_table_comment(
self,
connection,
table_name: str,
schema: str = None,
resolve_synonyms=False,
dblink="",
**kw,
):
return get_table_comment_wrapper(
self,
connection,
table_name=table_name.lower(),
schema=schema.lower() if schema else None,
query=ORACLE_ALL_TABLE_COMMENTS,
)
@reflection.cache
def get_view_definition(
self,
connection,
view_name: str,
schema: str = None,
resolve_synonyms=False,
dblink="",
**kw,
):
return get_view_definition_wrapper(
self,
connection,
table_name=view_name.lower(),
schema=schema.lower() if schema else None,
query=ORACLE_ALL_VIEW_DEFINITIONS,
)
@reflection.cache
def get_mview_definition_dialect(
self,
connection,
view_name: str,
schema: str = None,
resolve_synonyms=False,
dblink="",
**kw,
):
return get_view_definition_wrapper(
self,
connection,
table_name=view_name.lower(),
schema=schema.lower() if schema else None,
query=ORACLE_ALL_MATERIALIZED_VIEW_DEFINITIONS,
)
def _get_col_type(
self, coltype, precision, scale, length, colname
): # pylint: disable=too-many-branches
raw_type = coltype
if coltype == "NUMBER":
if precision is None and scale == 0:
coltype = INTEGER()
else:
coltype = NUMBER(precision, scale)
if precision is not None:
if scale is not None:
raw_type += f"({precision},{scale})"
else:
raw_type += f"({precision})"
elif coltype == "FLOAT":
# TODO: support "precision" here as "binary_precision"
coltype = FLOAT()
elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
coltype = self.ischema_names.get(coltype)(length)
if length:
raw_type += f"({length})"
elif "WITH TIME ZONE" in coltype or "TIMESTAMP" in coltype:
coltype = TIMESTAMP(timezone=True)
elif "INTERVAL" in coltype:
coltype = INTERVAL()
else:
coltype = re.sub(r"\(\d+\)", "", coltype)
try:
coltype = self.ischema_names[coltype]
except KeyError:
util.warn(f"Did not recognize type '{coltype}' of column '{colname}'")
coltype = sqltypes.NULLTYPE
return coltype, raw_type
# pylint: disable=too-many-locals
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
Dialect method overridden to add raw data type
kw arguments can be:
oracle_resolve_synonyms
dblink
"""
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
dblink = kw.get("dblink", "")
info_cache = kw.get("info_cache")
(table_name, schema, dblink, _) = self._prepare_reflection_args(
connection,
table_name,
schema,
resolve_synonyms,
dblink,
info_cache=info_cache,
)
columns = []
char_length_col = "data_length"
if self._supports_char_length:
char_length_col = "char_length"
identity_cols = "NULL as default_on_null, NULL as identity_options"
if self.server_version_info >= (12,):
identity_cols = ORACLE_IDENTITY_TYPE.format(dblink=dblink)
params = {"table_name": table_name}
text = ORACLE_GET_COLUMNS.format(
dblink=dblink, char_length_col=char_length_col, identity_cols=identity_cols
)
if schema is not None:
params["owner"] = schema
text += " AND col.owner = :owner "
text += " ORDER BY col.column_id"
cols = connection.execute(sql.text(text), params)
for row in cols:
colname = self.normalize_name(row[0])
length = row[2]
nullable = row[5] == "Y"
default = row[6]
generated = row[8]
default_on_nul = row[9]
identity_options = row[10]
coltype, raw_coltype = self._get_col_type(
row.data_type, row.data_precision, row.data_scale, length, colname
)
computed = None
if generated == "YES":
computed = {"sqltext": default}
default = None
identity = None
if identity_options is not None:
identity = self._parse_identity_options(identity_options, default_on_nul)
default = None
cdict = {
"name": colname,
"type": coltype,
"nullable": nullable,
"default": default,
"autoincrement": "auto",
"comment": row.comments,
"system_data_type": raw_coltype,
}
if row.column_name.lower() == row.column_name:
cdict["quote"] = True
if computed is not None:
cdict["computed"] = computed
if identity is not None:
cdict["identity"] = identity
columns.append(cdict)
return columns
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
"""
Exclude the materialized views from regular table names
"""
schema = self.denormalize_name(schema or self.default_schema_name)
# note that table_names() isn't loading DBLINKed or synonym'ed tables
if schema is None:
schema = self.default_schema_name
tablespace = ""
if self.exclude_tablespaces:
exclude_tablespace = ", ".join([f"'{ts}'" for ts in self.exclude_tablespaces])
tablespace = (
"nvl(tablespace_name, 'no tablespace') "
f"NOT IN ({exclude_tablespace}) AND "
)
sql_str = ORACLE_GET_TABLE_NAMES.format(tablespace=tablespace)
cursor = connection.execute(sql.text(sql_str), {"owner": schema})
return [self.normalize_name(row[0]) for row in cursor]
def get_mview_names(self, schema=None):
"""Return all materialized view names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
"""
with self._operation_context() as conn:
return self.dialect.get_mview_names(conn, schema, info_cache=self.info_cache)
@reflection.cache
def get_mview_names_dialect(self, connection, schema=None, **kw):
schema = self.denormalize_name(schema or self.default_schema_name)
sql_query = sql.text(GET_MATERIALIZED_VIEW_NAMES)
cursor = connection.execute(sql_query, {"owner": self.denormalize_name(schema)})
return [self.normalize_name(row[0]) for row in cursor]
def get_mview_definition(self, mview_name, schema=None):
"""Return definition for `mview_name`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
"""
with self._operation_context() as conn:
return self.dialect.get_mview_definition(
conn, mview_name, schema, info_cache=self.info_cache
)