Bigquery and Helpers fix - Ingestion (#2408)

* Bigquery Ingestion fix - error handling and keyerror

* Update helpers.py and column_type_parser

* Update helpers.py
This commit is contained in:
Ayush Shah 2022-01-25 14:30:09 +05:30 committed by GitHub
parent 47b9afa8c4
commit 917ccd7147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 14 deletions

View File

@ -10,10 +10,9 @@
# limitations under the License.
import json
import logging
import os
import tempfile
from typing import Any, Optional, Tuple
from typing import Optional, Tuple
from sqlalchemy_bigquery import _types
from sqlalchemy_bigquery._struct import STRUCT
@ -77,20 +76,28 @@ class BigquerySource(SQLSource):
def create(cls, config_dict, metadata_config_dict, ctx):
config: SQLConnectionConfig = BigQueryConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
if config.options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = config.options[
"credentials_path"
]
elif config.options.get("credentials", None):
cred_path = create_credential_temp_file(config.options.get("credentials"))
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_path
config.options["credentials_path"] = cred_path
del config.options["credentials"]
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
if config.options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = config.options[
"credentials_path"
]
elif config.options.get("credentials", None):
cred_path = create_credential_temp_file(
config.options.get("credentials")
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_path
config.options["credentials_path"] = cred_path
del config.options["credentials"]
else:
raise Exception(
"Please refer to the BigQuery connector documentation, especially the credentials part "
"https://docs.open-metadata.org/connectors/bigquery"
)
return cls(config, metadata_config, ctx)
def close(self):
super().close()
if self.config.options["credentials_path"]:
if self.config.options.get("credentials_path"):
os.unlink(self.config.options["credentials_path"])
def standardize_schema_table_names(

View File

@ -96,7 +96,7 @@ class ColumnTypeParser:
"MONEY": "NUMBER",
"NCHAR": "CHAR",
"NTEXT": "TEXT",
"NULL": "NULL",
"NULL": "VARCHAR",
"NUMBER": "NUMBER",
"NUMERIC": "NUMERIC",
"NVARCHAR": "VARCHAR",

View File

@ -64,7 +64,9 @@ def get_database_service_or_create(
return service
else:
password = (
config.password.get_secret_value() if hasattr(config, "password") else None
config.password.get_secret_value()
if hasattr(config, "password") and config.password
else None
)
service = {
"databaseConnection": {