issue-2572: added comments for vertica columns (#2596)

* issue-2572: added comments for vertica columns

* moved sql queries to sql_queries.py

* fixed sonar bug and sql query

Co-authored-by: Mayur SIngal <mayursingal@Mayurs-MacBook-Pro.local>
This commit is contained in:
Mayur Singal 2022-02-04 15:43:27 +05:30 committed by GitHub
parent 9a0ffc08e4
commit bc13e343a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 178 additions and 0 deletions

View File

@ -9,9 +9,167 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from textwrap import dedent
from sqlalchemy import exc, sql
from sqlalchemy.engine import reflection
from sqlalchemy_vertica.base import VerticaDialect
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
from metadata.utils.sql_queries import VERTICA_GET_COLUMNS, VERTICA_GET_PRIMARY_KEYS
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
if schema is not None:
schema_condition = "lower(table_schema) = '%(schema)s'" % {
"schema": schema.lower()
}
else:
schema_condition = "1"
s = sql.text(
dedent(
VERTICA_GET_COLUMNS.format(
table=table_name.lower(), schema_condition=schema_condition
)
)
)
spk = sql.text(
dedent(
VERTICA_GET_PRIMARY_KEYS.format(
table=table_name.lower(), schema_condition=schema_condition
)
)
)
pk_columns = [x[0] for x in connection.execute(spk)]
columns = []
for row in connection.execute(s):
name = row.column_name
dtype = row.data_type.lower()
primary_key = name in pk_columns
default = row.column_default
nullable = row.is_nullable
comment = row.comment
column_info = self._get_column_info(
name,
dtype,
default,
nullable,
schema,
comment,
)
column_info.update({"primary_key": primary_key})
columns.append(column_info)
return columns
def _get_column_info(
self,
name,
format_type,
default,
nullable,
schema,
comment,
):
# strip (*) from character varying(5), timestamp(5)
# with time zone, geometry(POLYGON), etc.
attype = re.sub(r"\(.*\)", "", format_type)
charlen = re.search(r"\(([\d,]+)\)", format_type)
if charlen:
charlen = charlen.group(1)
args = re.search(r"\((.*)\)", format_type)
if args and args.group(1):
args = tuple(re.split(r"\s*,\s*", args.group(1)))
else:
args = ()
kwargs = {}
if attype == "numeric":
if charlen:
prec, scale = charlen.split(",")
args = (int(prec), int(scale))
else:
args = ()
elif attype == "integer":
args = ()
elif attype in ("timestamptz", "timetz"):
kwargs["timezone"] = True
if charlen:
kwargs["precision"] = int(charlen)
args = ()
elif attype in (
"timestamp",
"time",
):
kwargs["timezone"] = False
if charlen:
kwargs["precision"] = int(charlen)
args = ()
elif attype.startswith("interval"):
field_match = re.match(r"interval (.+)", attype, re.I)
if charlen:
kwargs["precision"] = int(charlen)
if field_match:
kwargs["fields"] = field_match.group(1)
attype = "interval"
args = ()
elif charlen:
args = (int(charlen),)
if attype.upper() in self.ischema_names:
coltype = self.ischema_names[attype.upper()]
else:
coltype = None
if coltype:
coltype = coltype(*args, **kwargs)
else:
util.warn("Did not recognize type '%s' of column '%s'" % (attype, name))
coltype = sqltypes.NULLTYPE
# adjust the default value
autoincrement = False
if default is not None:
match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
if match is not None:
if issubclass(coltype._type_affinity, sqltypes.Integer):
autoincrement = True
# the default is related to a Sequence
sch = schema
if "." not in match.group(2) and sch is not None:
# unconditionally quote the schema name. this could
# later be enhanced to obey quoting rules /
# "quote schema"
default = (
match.group(1)
+ ('"%s"' % sch)
+ "."
+ match.group(2)
+ match.group(3)
)
column_info = dict(
name=name,
type=coltype,
nullable=nullable,
default=default,
autoincrement=autoincrement,
comment=comment,
)
return column_info
VerticaDialect.get_columns = get_columns # pylint: disable=protected-access
VerticaDialect._get_column_info = _get_column_info # pylint: disable=protected-access
class VerticaConfig(SQLConnectionConfig):

View File

@ -242,3 +242,23 @@ NEO4J_AMUNDSEN_DASHBOARD_QUERY = textwrap.dedent(
order by dbg.name
"""
)
VERTICA_GET_COLUMNS = """
SELECT column_name, data_type, column_default, is_nullable, comment
FROM v_catalog.columns col left join v_catalog.comments com on col.table_id=com.object_id and com.object_type='COLUMN' and col.column_name=com.child_object
WHERE lower(table_name) = '{table}'
AND {schema_condition}
UNION ALL
SELECT column_name, data_type, '' as column_default, true as is_nullable, '' as comment
FROM v_catalog.view_columns
WHERE lower(table_name) = '{table}'
AND {schema_condition}
"""
VERTICA_GET_PRIMARY_KEYS = """
SELECT column_name
FROM v_catalog.primary_keys
WHERE lower(table_name) = '{table}'
AND constraint_type = 'p'
AND {schema_condition}
"""