From bc13e343a16da06dc8985dede7154bb5f116e541 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Fri, 4 Feb 2022 15:43:27 +0530 Subject: [PATCH] issue-2572: added comments for vertica columns (#2596) * issue-2572: added comments for vertica columns * moved sql queries to sql_queries.py * fixed sonar bug and sql query Co-authored-by: Mayur SIngal --- .../src/metadata/ingestion/source/vertica.py | 158 ++++++++++++++++++ ingestion/src/metadata/utils/sql_queries.py | 20 +++ 2 files changed, 178 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/vertica.py b/ingestion/src/metadata/ingestion/source/vertica.py index 2fc91fd27d9..6e0778f10fb 100644 --- a/ingestion/src/metadata/ingestion/source/vertica.py +++ b/ingestion/src/metadata/ingestion/source/vertica.py @@ -9,9 +9,167 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +from textwrap import dedent + +from sqlalchemy import exc, sql +from sqlalchemy.engine import reflection +from sqlalchemy_vertica.base import VerticaDialect + from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source_common import SQLConnectionConfig +from metadata.utils.sql_queries import VERTICA_GET_COLUMNS, VERTICA_GET_PRIMARY_KEYS + + +@reflection.cache +def get_columns(self, connection, table_name, schema=None, **kw): + if schema is not None: + schema_condition = "lower(table_schema) = '%(schema)s'" % { + "schema": schema.lower() + } + else: + schema_condition = "1" + + s = sql.text( + dedent( + VERTICA_GET_COLUMNS.format( + table=table_name.lower(), schema_condition=schema_condition + ) + ) + ) + + spk = sql.text( + dedent( + VERTICA_GET_PRIMARY_KEYS.format( + table=table_name.lower(), schema_condition=schema_condition + ) + ) + ) + + pk_columns = [x[0] for x in connection.execute(spk)] + columns = [] + for row in connection.execute(s): + name = row.column_name + dtype = row.data_type.lower() + primary_key = name in pk_columns + default = row.column_default + nullable = row.is_nullable + comment = row.comment + + column_info = self._get_column_info( + name, + dtype, + default, + nullable, + schema, + comment, + ) + column_info.update({"primary_key": primary_key}) + columns.append(column_info) + return columns + + +def _get_column_info( + self, + name, + format_type, + default, + nullable, + schema, + comment, +): + + # strip (*) from character varying(5), timestamp(5) + # with time zone, geometry(POLYGON), etc. + attype = re.sub(r"\(.*\)", "", format_type) + + charlen = re.search(r"\(([\d,]+)\)", format_type) + if charlen: + charlen = charlen.group(1) + args = re.search(r"\((.*)\)", format_type) + if args and args.group(1): + args = tuple(re.split(r"\s*,\s*", args.group(1))) + else: + args = () + kwargs = {} + + if attype == "numeric": + if charlen: + prec, scale = charlen.split(",") + args = (int(prec), int(scale)) + else: + args = () + elif attype == "integer": + args = () + elif attype in ("timestamptz", "timetz"): + kwargs["timezone"] = True + if charlen: + kwargs["precision"] = int(charlen) + args = () + elif attype in ( + "timestamp", + "time", + ): + kwargs["timezone"] = False + if charlen: + kwargs["precision"] = int(charlen) + args = () + elif attype.startswith("interval"): + field_match = re.match(r"interval (.+)", attype, re.I) + if charlen: + kwargs["precision"] = int(charlen) + if field_match: + kwargs["fields"] = field_match.group(1) + attype = "interval" + args = () + elif charlen: + args = (int(charlen),) + + if attype.upper() in self.ischema_names: + coltype = self.ischema_names[attype.upper()] + else: + coltype = None + + if coltype: + coltype = coltype(*args, **kwargs) + else: + util.warn("Did not recognize type '%s' of column '%s'" % (attype, name)) + coltype = sqltypes.NULLTYPE + # adjust the default value + autoincrement = False + if default is not None: + match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) + if match is not None: + if issubclass(coltype._type_affinity, sqltypes.Integer): + autoincrement = True + # the default is related to a Sequence + sch = schema + if "." not in match.group(2) and sch is not None: + # unconditionally quote the schema name. this could + # later be enhanced to obey quoting rules / + # "quote schema" + default = ( + match.group(1) + + ('"%s"' % sch) + + "." + + match.group(2) + + match.group(3) + ) + + column_info = dict( + name=name, + type=coltype, + nullable=nullable, + default=default, + autoincrement=autoincrement, + comment=comment, + ) + return column_info + + +VerticaDialect.get_columns = get_columns # pylint: disable=protected-access +VerticaDialect._get_column_info = _get_column_info # pylint: disable=protected-access class VerticaConfig(SQLConnectionConfig): diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 115707331ca..2130038f425 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -242,3 +242,23 @@ NEO4J_AMUNDSEN_DASHBOARD_QUERY = textwrap.dedent( order by dbg.name """ ) + +VERTICA_GET_COLUMNS = """ + SELECT column_name, data_type, column_default, is_nullable, comment + FROM v_catalog.columns col left join v_catalog.comments com on col.table_id=com.object_id and com.object_type='COLUMN' and col.column_name=com.child_object + WHERE lower(table_name) = '{table}' + AND {schema_condition} + UNION ALL + SELECT column_name, data_type, '' as column_default, true as is_nullable, '' as comment + FROM v_catalog.view_columns + WHERE lower(table_name) = '{table}' + AND {schema_condition} + """ + +VERTICA_GET_PRIMARY_KEYS = """ + SELECT column_name + FROM v_catalog.primary_keys + WHERE lower(table_name) = '{table}' + AND constraint_type = 'p' + AND {schema_condition} + """