Fixing Snowflake Query and arrayDataType issue (#3174)

This commit is contained in:
Ayush Shah 2022-03-07 12:25:01 +05:30 committed by GitHub
parent 23b97f5fe1
commit ce99d52cd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 8 deletions

View File

@ -8,12 +8,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import traceback
from typing import Optional
from snowflake.sqlalchemy.custom_types import VARIANT
from snowflake.sqlalchemy.snowdialect import ischema_names
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
@ -26,6 +28,8 @@ GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY")
ischema_names["VARIANT"] = VARIANT
ischema_names["GEOGRAPHY"] = GEOGRAPHY
logger: logging.Logger = logging.getLogger(__name__)
class SnowflakeConfig(SQLConnectionConfig):
scheme = "snowflake"
@ -53,6 +57,25 @@ class SnowflakeSource(SQLSource):
def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx)
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
resp_sample_data = super().fetch_sample_data(schema, table)
if not resp_sample_data:
try:
logger.info("Using Table Name with quotes to fetch the data")
query = self.config.query.format(schema, f'"{table}"')
logger.info(query)
results = self.connection.execute(query)
cols = []
for col in results.keys():
cols.append(col.replace(".", "_DOT_"))
rows = []
for res in results:
row = list(res)
rows.append(row)
return TableData(columns=cols, rows=rows)
except Exception as err:
logger.error(err)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SnowflakeConfig.parse_obj(config_dict)

View File

@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import (
Constraint,
ConstraintType,
DataModel,
DataType,
ModelType,
Table,
TableConstraint,
@ -397,6 +398,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
model_fqdn = f"{schema}.{model_name}"
except Exception as err:
logger.debug(traceback.print_exc())
logger.error(err)
self.data_models[model_fqdn] = model
@ -562,8 +564,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
col_type, column["type"]
)
if col_type == "NULL" or col_type is None:
col_type = "VARCHAR"
data_type_display = "varchar"
col_type = DataType.VARCHAR.name
data_type_display = col_type.lower()
logger.warning(
"Unknown type {} mapped to VARCHAR: {}".format(
repr(column["type"]), column["name"]
@ -579,10 +581,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
col_data_length = (
1 if col_data_length is None else col_data_length
)
if col_type == "ARRAY":
if arr_data_type is None:
arr_data_type = "VARCHAR"
dataTypeDisplay = col_type + "<" + arr_data_type + ">"
if col_type == "ARRAY" and arr_data_type is None:
arr_data_type = DataType.VARCHAR.name
dataTypeDisplay = f"array<{arr_data_type}>"
om_column = Column(
name=column["name"],
description=column.get("comment", None),
@ -640,7 +642,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
]
except Exception as err:
logger.debug(traceback.print_exc())
logger.debug(err)
logger.error(err)
om_column = col_dict
except Exception as err:
logger.debug(traceback.print_exc())