Add Database name to DESCRIBE statement support Databricks struct fields. (#3720)

This commit is contained in:
Andrew Grossnickle 2022-04-19 01:27:42 -06:00 committed by GitHub
parent 924e211ba1
commit ceb01f0a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 6 deletions

View File

@ -70,6 +70,12 @@ def _get_column_rows(self, connection, table_name, schema):
def get_columns(self, connection, table_name, schema=None, **kw):
# This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns
# to add support for struct, array & map datatype
# Extract the Database Name from the keyword arguments parameter if it is present. This
# value should match what is provided in the 'source.config.database' field in the
# Databricks ingest config file.
db_name = kw["db_name"] if "db_name" in kw else None
rows = _get_column_rows(self, connection, table_name, schema)
result = []
for (col_name, col_type, _comment) in rows:
@ -96,11 +102,17 @@ def get_columns(self, connection, table_name, schema=None, **kw):
"comment": _comment,
}
if col_type in {"array", "struct", "map"}:
rows = dict(
connection.execute(
"DESCRIBE {} {}".format(table_name, col_name)
).fetchall()
)
if db_name is not None:
rows = dict(
connection.execute(
f"DESCRIBE {db_name}.{table_name} {col_name}"
).fetchall()
)
else:
rows = dict(
connection.execute(f"DESCRIBE {table_name} {col_name}").fetchall()
)
col_info["raw_data_type"] = rows["data_type"]
result.append(col_info)
return result

View File

@ -621,7 +621,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
unique_columns.extend(constraint.get("column_names"))
table_columns = []
columns = inspector.get_columns(table, schema)
columns = inspector.get_columns(table, schema, db_name=self.service_connection.database)
try:
for column in columns:
try: