mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-14 00:57:09 +00:00
required-fields-updated-in-snowflake-and-athena (#5143)
required-fields-updated-in-snowflake-and-athena (#5143)
This commit is contained in:
parent
f8f79ebb2d
commit
12e8a1fcf6
@ -41,11 +41,6 @@
|
|||||||
"description": "Host and port of the Athena service.",
|
"description": "Host and port of the Athena service.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"database": {
|
|
||||||
"title": "Database",
|
|
||||||
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank, OpenMetadata Ingestion attempts to scan all the databases.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"s3StagingDir": {
|
"s3StagingDir": {
|
||||||
"title": "S3 Staging Directory",
|
"title": "S3 Staging Directory",
|
||||||
"description": "S3 Staging Directory.",
|
"description": "S3 Staging Directory.",
|
||||||
@ -73,5 +68,6 @@
|
|||||||
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
|
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"additionalProperties": false
|
"additionalProperties": false,
|
||||||
|
"required": ["s3StagingDir", "awsConfig", "workgroup"]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -96,5 +96,5 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"additionalProperties": false,
|
"additionalProperties": false,
|
||||||
"required": ["username", "account"]
|
"required": ["username", "account", "password", "warehouse"]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -137,7 +137,7 @@ class SqlColumnHandler:
|
|||||||
)
|
)
|
||||||
table_columns = []
|
table_columns = []
|
||||||
columns = inspector.get_columns(
|
columns = inspector.get_columns(
|
||||||
table, schema, db_name=self.service_connection.database
|
table, schema, db_name=self._get_database_name()
|
||||||
)
|
)
|
||||||
for column in columns:
|
for column in columns:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -127,12 +127,17 @@ class SqlAlchemySource(Source, ABC):
|
|||||||
Method to fetch tags associated with table
|
Method to fetch tags associated with table
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_database_entity(self, database_name: Optional[str]) -> Database:
|
def _get_database_name(self) -> str:
|
||||||
|
if hasattr(self.service_connection, "database"):
|
||||||
|
return self.service_connection.database or "default"
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
def get_database_entity(self) -> Database:
|
||||||
"""
|
"""
|
||||||
Method to get database enetity from db name
|
Method to get database enetity from db name
|
||||||
"""
|
"""
|
||||||
return Database(
|
return Database(
|
||||||
name=database_name if database_name else "default",
|
name=self._get_database_name(),
|
||||||
service=EntityReference(
|
service=EntityReference(
|
||||||
id=self.service.id, type=self.service_connection.type.value
|
id=self.service.id, type=self.service_connection.type.value
|
||||||
),
|
),
|
||||||
@ -173,7 +178,7 @@ class SqlAlchemySource(Source, ABC):
|
|||||||
self.metadata,
|
self.metadata,
|
||||||
entity_type=DatabaseSchema,
|
entity_type=DatabaseSchema,
|
||||||
service_name=self.config.serviceName,
|
service_name=self.config.serviceName,
|
||||||
database_name=self.service_connection.database,
|
database_name=self._get_database_name(),
|
||||||
schema_name=schema,
|
schema_name=schema,
|
||||||
)
|
)
|
||||||
yield from self.delete_tables(schema_fqn)
|
yield from self.delete_tables(schema_fqn)
|
||||||
@ -253,7 +258,7 @@ class SqlAlchemySource(Source, ABC):
|
|||||||
schema, table_name, table_type, inspector
|
schema, table_name, table_type, inspector
|
||||||
)
|
)
|
||||||
|
|
||||||
database = self.get_database_entity(self.service_connection.database)
|
database = self.get_database_entity()
|
||||||
# check if we have any model to associate with
|
# check if we have any model to associate with
|
||||||
table_entity.dataModel = self.get_data_model(
|
table_entity.dataModel = self.get_data_model(
|
||||||
database.name.__root__, schema, table_name
|
database.name.__root__, schema, table_name
|
||||||
|
|||||||
@ -97,7 +97,8 @@ def get_connection_url_common(connection):
|
|||||||
url += "@"
|
url += "@"
|
||||||
|
|
||||||
url += connection.hostPort
|
url += connection.hostPort
|
||||||
url += f"/{connection.database}" if connection.database else ""
|
if hasattr(connection, "database"):
|
||||||
|
url += f"/{connection.database}" if connection.database else ""
|
||||||
|
|
||||||
options = (
|
options = (
|
||||||
connection.connectionOptions.dict()
|
connection.connectionOptions.dict()
|
||||||
@ -347,10 +348,10 @@ def _(connection: AthenaConnection):
|
|||||||
else:
|
else:
|
||||||
url += ":"
|
url += ":"
|
||||||
url += f"@athena.{connection.awsConfig.awsRegion}.amazonaws.com:443"
|
url += f"@athena.{connection.awsConfig.awsRegion}.amazonaws.com:443"
|
||||||
if connection.database:
|
|
||||||
url += f"/{connection.database}"
|
|
||||||
url += f"?s3_staging_dir={quote_plus(connection.s3StagingDir)}"
|
url += f"?s3_staging_dir={quote_plus(connection.s3StagingDir)}"
|
||||||
url += f"&work_group={connection.workgroup}"
|
if connection.workgroup:
|
||||||
|
url += f"&work_group={connection.workgroup}"
|
||||||
return url
|
return url
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -625,9 +625,9 @@ class SouceConnectionTest(TestCase):
|
|||||||
expected_args = {}
|
expected_args = {}
|
||||||
snowflake_conn_obj = SnowflakeConnection(
|
snowflake_conn_obj = SnowflakeConnection(
|
||||||
username="user",
|
username="user",
|
||||||
password=None,
|
password="test-pwd",
|
||||||
database="tiny",
|
database="tiny",
|
||||||
connectionArguments=None,
|
warehouse="COMPUTE_WH",
|
||||||
scheme=SnowflakeScheme.snowflake,
|
scheme=SnowflakeScheme.snowflake,
|
||||||
account="account.region_name.cloud_service",
|
account="account.region_name.cloud_service",
|
||||||
)
|
)
|
||||||
@ -637,8 +637,9 @@ class SouceConnectionTest(TestCase):
|
|||||||
expected_args = {"user": "user-to-be-impersonated"}
|
expected_args = {"user": "user-to-be-impersonated"}
|
||||||
snowflake_conn_obj = SnowflakeConnection(
|
snowflake_conn_obj = SnowflakeConnection(
|
||||||
username="user",
|
username="user",
|
||||||
password=None,
|
password="test-pwd",
|
||||||
database="tiny",
|
database="tiny",
|
||||||
|
warehouse="COMPUTE_WH",
|
||||||
connectionArguments={"user": "user-to-be-impersonated"},
|
connectionArguments={"user": "user-to-be-impersonated"},
|
||||||
scheme=SnowflakeScheme.snowflake,
|
scheme=SnowflakeScheme.snowflake,
|
||||||
account="account.region_name.cloud_service",
|
account="account.region_name.cloud_service",
|
||||||
@ -657,18 +658,16 @@ class SouceConnectionTest(TestCase):
|
|||||||
s3StagingDir="s3athena-postgres",
|
s3StagingDir="s3athena-postgres",
|
||||||
workgroup="primary",
|
workgroup="primary",
|
||||||
scheme=AthenaScheme.awsathena_rest,
|
scheme=AthenaScheme.awsathena_rest,
|
||||||
database=None,
|
|
||||||
)
|
)
|
||||||
assert expected_url == get_connection_url(athena_conn_obj)
|
assert expected_url == get_connection_url(athena_conn_obj)
|
||||||
|
|
||||||
# connection arguments witho db
|
# connection arguments witho db
|
||||||
expected_url = "awsathena+rest://key:secret_key@athena.us-east-2.amazonaws.com:443/test?s3_staging_dir=s3athena-postgres&work_group=primary"
|
expected_url = "awsathena+rest://key:secret_key@athena.us-east-2.amazonaws.com:443?s3_staging_dir=s3athena-postgres&work_group=primary"
|
||||||
athena_conn_obj = AthenaConnection(
|
athena_conn_obj = AthenaConnection(
|
||||||
awsConfig=awsCreds,
|
awsConfig=awsCreds,
|
||||||
s3StagingDir="s3athena-postgres",
|
s3StagingDir="s3athena-postgres",
|
||||||
workgroup="primary",
|
workgroup="primary",
|
||||||
scheme=AthenaScheme.awsathena_rest,
|
scheme=AthenaScheme.awsathena_rest,
|
||||||
database="test",
|
|
||||||
)
|
)
|
||||||
assert expected_url == get_connection_url(athena_conn_obj)
|
assert expected_url == get_connection_url(athena_conn_obj)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user