From ae6683862f556be3920f31e0001e8b8e0cbb2ade Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 12 Apr 2023 16:21:27 +0530 Subject: [PATCH] Fix #10999: Add support for materialized views in oracle (#11005) --- .../source/database/oracle/metadata.py | 261 +++++----------- .../source/database/oracle/queries.py | 35 ++- .../ingestion/source/database/oracle/utils.py | 279 ++++++++++++++++++ 3 files changed, 382 insertions(+), 193 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/oracle/utils.py diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py index 30310ea35ef..70aa0ef47db 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py @@ -11,21 +11,13 @@ # pylint: disable=protected-access """Oracle source module""" -import re +import traceback +from typing import Iterable, Optional -from sqlalchemy import sql, util -from sqlalchemy.dialects.oracle.base import ( - FLOAT, - INTEGER, - INTERVAL, - NUMBER, - TIMESTAMP, - OracleDialect, - ischema_names, -) -from sqlalchemy.engine import reflection -from sqlalchemy.sql import sqltypes +from sqlalchemy.dialects.oracle.base import INTERVAL, OracleDialect, ischema_names +from sqlalchemy.engine import Inspector +from metadata.generated.schema.entity.data.table import TableType from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( OracleConnection, ) @@ -37,20 +29,30 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type -from metadata.ingestion.source.database.common_db_source import CommonDbSourceService -from metadata.ingestion.source.database.oracle.queries import ( - ORACLE_ALL_TABLE_COMMENTS, - ORACLE_ALL_VIEW_DEFINITIONS, - ORACLE_GET_COLUMNS, - ORACLE_IDENTITY_TYPE, +from metadata.ingestion.source.database.common_db_source import ( + CommonDbSourceService, + TableNameAndType, ) +from metadata.ingestion.source.database.oracle.utils import ( + _get_col_type, + get_columns, + get_mview_definition, + get_mview_definition_dialect, + get_mview_names, + get_mview_names_dialect, + get_table_comment, + get_table_names, + get_view_definition, +) +from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( get_all_table_comments, get_all_view_definitions, - get_table_comment_wrapper, - get_view_definition_wrapper, ) +logger = ingestion_logger() + + ischema_names.update( { "ROWID": create_sqlalchemy_type("ROWID"), @@ -59,178 +61,17 @@ ischema_names.update( } ) - -@reflection.cache -def get_table_comment( - self, - connection, - table_name: str, - schema: str = None, - resolve_synonyms=False, - dblink="", - **kw, -): # pylint: disable=unused-argument - return get_table_comment_wrapper( - self, - connection, - table_name=table_name.lower(), - schema=schema.lower() if schema else None, - query=ORACLE_ALL_TABLE_COMMENTS, - ) - - -@reflection.cache -def get_view_definition( - self, - connection, - view_name: str, - schema: str = None, - resolve_synonyms=False, - dblink="", - **kw, -): # pylint: disable=unused-argument - return get_view_definition_wrapper( - self, - connection, - table_name=view_name.lower(), - schema=schema.lower() if schema else None, - query=ORACLE_ALL_VIEW_DEFINITIONS, - ) - - -def _get_col_type( - self, coltype, precision, scale, length, colname -): # pylint: disable=too-many-branches - raw_type = coltype - if coltype == "NUMBER": - if precision is None and scale == 0: - coltype = INTEGER() - else: - coltype = NUMBER(precision, scale) - if precision is not None: - if scale is not None: - raw_type += f"({precision},{scale})" - else: - raw_type += f"({precision})" - - elif coltype == "FLOAT": - # TODO: support "precision" here as "binary_precision" - coltype = FLOAT() - elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): - coltype = self.ischema_names.get(coltype)(length) - if length: - raw_type += f"({length})" - elif "WITH TIME ZONE" in coltype or "TIMESTAMP" in coltype: - coltype = TIMESTAMP(timezone=True) - elif "INTERVAL" in coltype: - coltype = INTERVAL() - else: - coltype = re.sub(r"\(\d+\)", "", coltype) - try: - coltype = self.ischema_names[coltype] - except KeyError: - util.warn(f"Did not recognize type '{coltype}' of column '{colname}'") - coltype = sqltypes.NULLTYPE - return coltype, raw_type - - -# pylint: disable=too-many-locals -@reflection.cache -def get_columns(self, connection, table_name, schema=None, **kw): - """ - - Dialect method overridden to add raw data type - - kw arguments can be: - - oracle_resolve_synonyms - - dblink - - """ - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") - - (table_name, schema, dblink, _) = self._prepare_reflection_args( - connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, - ) - columns = [] - - char_length_col = "data_length" - if self._supports_char_length: - char_length_col = "char_length" - - identity_cols = "NULL as default_on_null, NULL as identity_options" - if self.server_version_info >= (12,): - identity_cols = ORACLE_IDENTITY_TYPE.format(dblink=dblink) - - params = {"table_name": table_name} - - text = ORACLE_GET_COLUMNS.format( - dblink=dblink, char_length_col=char_length_col, identity_cols=identity_cols - ) - if schema is not None: - params["owner"] = schema - text += " AND col.owner = :owner " - text += " ORDER BY col.column_id" - - cols = connection.execute(sql.text(text), params) - - for row in cols: - colname = self.normalize_name(row[0]) - length = row[2] - nullable = row[5] == "Y" - default = row[6] - generated = row[8] - default_on_nul = row[9] - identity_options = row[10] - - coltype, raw_coltype = self._get_col_type( - row.data_type, row.data_precision, row.data_scale, length, colname - ) - - computed = None - if generated == "YES": - computed = {"sqltext": default} - default = None - - identity = None - if identity_options is not None: - identity = self._parse_identity_options(identity_options, default_on_nul) - default = None - - cdict = { - "name": colname, - "type": coltype, - "nullable": nullable, - "default": default, - "autoincrement": "auto", - "comment": row.comments, - "system_data_type": raw_coltype, - } - if row.column_name.lower() == row.column_name: - cdict["quote"] = True - if computed is not None: - cdict["computed"] = computed - if identity is not None: - cdict["identity"] = identity - - columns.append(cdict) - return columns - - OracleDialect.get_table_comment = get_table_comment OracleDialect.get_columns = get_columns OracleDialect._get_col_type = _get_col_type OracleDialect.get_view_definition = get_view_definition +OracleDialect.get_mview_definition = get_mview_definition_dialect OracleDialect.get_all_view_definitions = get_all_view_definitions OracleDialect.get_all_table_comments = get_all_table_comments +OracleDialect.get_table_names = get_table_names +Inspector.get_mview_names = get_mview_names +Inspector.get_mview_definition = get_mview_definition +OracleDialect.get_mview_names = get_mview_names_dialect class OracleSource(CommonDbSourceService): @@ -248,3 +89,49 @@ class OracleSource(CommonDbSourceService): f"Expected OracleConnection, but got {connection}" ) return cls(config, metadata_config) + + def query_table_names_and_types( + self, schema_name: str + ) -> Iterable[TableNameAndType]: + """ + Connect to the source database to get the table + name and type. By default, use the inspector method + to get the names and pass the Regular type. + + This is useful for sources where we need fine-grained + logic on how to handle table types, e.g., external, foreign,... + """ + + regular_tables = [ + TableNameAndType(name=table_name) + for table_name in self.inspector.get_table_names(schema_name) or [] + ] + material_tables = [ + TableNameAndType(name=table_name, type_=TableType.MaterializedView) + for table_name in self.inspector.get_mview_names(schema_name) or [] + ] + + return regular_tables + material_tables + + def get_view_definition( + self, table_type: str, table_name: str, schema_name: str, inspector: Inspector + ) -> Optional[str]: + if table_type not in {TableType.View, TableType.MaterializedView}: + return None + + definition_fn = inspector.get_view_definition + if table_type == TableType.MaterializedView: + definition_fn = inspector.get_mview_definition + + try: + view_definition = definition_fn(table_name, schema_name) + view_definition = "" if view_definition is None else str(view_definition) + return view_definition + + except NotImplementedError: + logger.warning("View definition not implemented") + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to fetch view definition for {table_name}: {exc}") + return None diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/queries.py b/ingestion/src/metadata/ingestion/source/database/oracle/queries.py index 4364c07a25c..6923455aa7d 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/queries.py @@ -23,12 +23,35 @@ where comments is not null and owner not in ('SYSTEM', 'SYS') ORACLE_ALL_VIEW_DEFINITIONS = """ -SELECT - LOWER(view_name) "view_name", - LOWER(owner) "schema", - text view_def -FROM all_views -where text is not null and owner not in ('SYSTEM', 'SYS') +SELECT +LOWER(view_name) AS "view_name", +LOWER(owner) AS "schema", +DBMS_METADATA.GET_DDL('VIEW', view_name, owner) AS view_def +FROM all_views +WHERE owner NOT IN ('SYSTEM', 'SYS') +""" + +ORACLE_ALL_MATERIALIZED_VIEW_DEFINITIONS = """ +SELECT +LOWER(mview_name) AS "view_name", +LOWER(owner) AS "schema", +DBMS_METADATA.GET_DDL('MATERIALIZED_VIEW', mview_name, owner) AS view_def +FROM all_mviews +WHERE owner NOT IN ('SYSTEM', 'SYS') +""" + +GET_MATERIALIZED_VIEW_NAMES = """ +SELECT mview_name FROM all_mviews WHERE owner = :owner +""" + +ORACLE_GET_TABLE_NAMES = """ +SELECT table_name FROM all_tables WHERE +{tablespace} +OWNER = :owner +AND IOT_NAME IS NULL +AND DURATION IS NULL +AND TABLE_NAME NOT IN +(SELECT mview_name FROM all_mviews WHERE owner = :owner) """ ORACLE_IDENTITY_TYPE = """\ diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py new file mode 100644 index 00000000000..2b6b5d2cced --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py @@ -0,0 +1,279 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Utils module to define overrided sqlalchamy methods +""" +# pylint: disable=protected-access,unused-argument +import re + +from sqlalchemy import sql, util +from sqlalchemy.dialects.oracle.base import FLOAT, INTEGER, INTERVAL, NUMBER, TIMESTAMP +from sqlalchemy.engine import reflection +from sqlalchemy.sql import sqltypes + +from metadata.ingestion.source.database.oracle.queries import ( + GET_MATERIALIZED_VIEW_NAMES, + ORACLE_ALL_MATERIALIZED_VIEW_DEFINITIONS, + ORACLE_ALL_TABLE_COMMENTS, + ORACLE_ALL_VIEW_DEFINITIONS, + ORACLE_GET_COLUMNS, + ORACLE_GET_TABLE_NAMES, + ORACLE_IDENTITY_TYPE, +) +from metadata.utils.sqlalchemy_utils import ( + get_table_comment_wrapper, + get_view_definition_wrapper, +) + + +@reflection.cache +def get_table_comment( + self, + connection, + table_name: str, + schema: str = None, + resolve_synonyms=False, + dblink="", + **kw, +): + return get_table_comment_wrapper( + self, + connection, + table_name=table_name.lower(), + schema=schema.lower() if schema else None, + query=ORACLE_ALL_TABLE_COMMENTS, + ) + + +@reflection.cache +def get_view_definition( + self, + connection, + view_name: str, + schema: str = None, + resolve_synonyms=False, + dblink="", + **kw, +): + + return get_view_definition_wrapper( + self, + connection, + table_name=view_name.lower(), + schema=schema.lower() if schema else None, + query=ORACLE_ALL_VIEW_DEFINITIONS, + ) + + +@reflection.cache +def get_mview_definition_dialect( + self, + connection, + view_name: str, + schema: str = None, + resolve_synonyms=False, + dblink="", + **kw, +): + + return get_view_definition_wrapper( + self, + connection, + table_name=view_name.lower(), + schema=schema.lower() if schema else None, + query=ORACLE_ALL_MATERIALIZED_VIEW_DEFINITIONS, + ) + + +def _get_col_type( + self, coltype, precision, scale, length, colname +): # pylint: disable=too-many-branches + raw_type = coltype + if coltype == "NUMBER": + if precision is None and scale == 0: + coltype = INTEGER() + else: + coltype = NUMBER(precision, scale) + if precision is not None: + if scale is not None: + raw_type += f"({precision},{scale})" + else: + raw_type += f"({precision})" + + elif coltype == "FLOAT": + # TODO: support "precision" here as "binary_precision" + coltype = FLOAT() + elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): + coltype = self.ischema_names.get(coltype)(length) + if length: + raw_type += f"({length})" + elif "WITH TIME ZONE" in coltype or "TIMESTAMP" in coltype: + coltype = TIMESTAMP(timezone=True) + elif "INTERVAL" in coltype: + coltype = INTERVAL() + else: + coltype = re.sub(r"\(\d+\)", "", coltype) + try: + coltype = self.ischema_names[coltype] + except KeyError: + util.warn(f"Did not recognize type '{coltype}' of column '{colname}'") + coltype = sqltypes.NULLTYPE + return coltype, raw_type + + +# pylint: disable=too-many-locals +@reflection.cache +def get_columns(self, connection, table_name, schema=None, **kw): + """ + + Dialect method overridden to add raw data type + + kw arguments can be: + + oracle_resolve_synonyms + + dblink + + """ + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + dblink = kw.get("dblink", "") + info_cache = kw.get("info_cache") + + (table_name, schema, dblink, _) = self._prepare_reflection_args( + connection, + table_name, + schema, + resolve_synonyms, + dblink, + info_cache=info_cache, + ) + columns = [] + + char_length_col = "data_length" + if self._supports_char_length: + char_length_col = "char_length" + + identity_cols = "NULL as default_on_null, NULL as identity_options" + if self.server_version_info >= (12,): + identity_cols = ORACLE_IDENTITY_TYPE.format(dblink=dblink) + + params = {"table_name": table_name} + + text = ORACLE_GET_COLUMNS.format( + dblink=dblink, char_length_col=char_length_col, identity_cols=identity_cols + ) + if schema is not None: + params["owner"] = schema + text += " AND col.owner = :owner " + text += " ORDER BY col.column_id" + + cols = connection.execute(sql.text(text), params) + + for row in cols: + colname = self.normalize_name(row[0]) + length = row[2] + nullable = row[5] == "Y" + default = row[6] + generated = row[8] + default_on_nul = row[9] + identity_options = row[10] + + coltype, raw_coltype = self._get_col_type( + row.data_type, row.data_precision, row.data_scale, length, colname + ) + + computed = None + if generated == "YES": + computed = {"sqltext": default} + default = None + + identity = None + if identity_options is not None: + identity = self._parse_identity_options(identity_options, default_on_nul) + default = None + + cdict = { + "name": colname, + "type": coltype, + "nullable": nullable, + "default": default, + "autoincrement": "auto", + "comment": row.comments, + "system_data_type": raw_coltype, + } + if row.column_name.lower() == row.column_name: + cdict["quote"] = True + if computed is not None: + cdict["computed"] = computed + if identity is not None: + cdict["identity"] = identity + + columns.append(cdict) + return columns + + +@reflection.cache +def get_table_names(self, connection, schema=None, **kw): + """ + Exclude the materialized views from regular table names + """ + schema = self.denormalize_name(schema or self.default_schema_name) + + # note that table_names() isn't loading DBLINKed or synonym'ed tables + if schema is None: + schema = self.default_schema_name + + tablespace = "" + + if self.exclude_tablespaces: + exclude_tablespace = ", ".join([f"'{ts}'" for ts in self.exclude_tablespaces]) + tablespace = ( + "nvl(tablespace_name, 'no tablespace') " + f"NOT IN ({exclude_tablespace}) AND " + ) + sql_str = ORACLE_GET_TABLE_NAMES.format(tablespace=tablespace) + + cursor = connection.execute(sql.text(sql_str), {"owner": schema}) + return [self.normalize_name(row[0]) for row in cursor] + + +def get_mview_names(self, schema=None): + """Return all materialized view names in `schema`. + + :param schema: Optional, retrieve names from a non-default schema. + For special quoting, use :class:`.quoted_name`. + + """ + + with self._operation_context() as conn: + return self.dialect.get_mview_names(conn, schema, info_cache=self.info_cache) + + +@reflection.cache +def get_mview_names_dialect(self, connection, schema=None, **kw): + schema = self.denormalize_name(schema or self.default_schema_name) + sql_query = sql.text(GET_MATERIALIZED_VIEW_NAMES) + cursor = connection.execute(sql_query, {"owner": self.denormalize_name(schema)}) + return [self.normalize_name(row[0]) for row in cursor] + + +def get_mview_definition(self, mview_name, schema=None): + """Return definition for `mview_name`. + + :param schema: Optional, retrieve names from a non-default schema. + For special quoting, use :class:`.quoted_name`. + + """ + + with self._operation_context() as conn: + return self.dialect.get_mview_definition( + conn, mview_name, schema, info_cache=self.info_cache + )