Fix: usage connectors

This commit is contained in:
Suresh Srinivas 2021-08-10 15:00:41 -07:00
parent a674f5bc2e
commit 4371e7a6f0
4 changed files with 28 additions and 16 deletions

View File

@ -2,7 +2,7 @@
"source": {
"type": "redshift-usage",
"config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username",
"password": "strong_password",
"database": "warehouse",

View File

@ -44,6 +44,7 @@ class MetadataUsageBulkSink(BulkSink):
self.client = REST(self.metadata_config)
self.status = BulkSinkStatus()
self.tables_dict = {}
self.table_join_dict = {}
self.__map_tables()
def __map_tables(self):
@ -73,20 +74,18 @@ class MetadataUsageBulkSink(BulkSink):
table_usage_request = TableUsageRequest(date=table_usage.date, count=table_usage.count)
try:
self.client.publish_usage_for_a_table(table_entity, table_usage_request)
self.status.records.append(table_usage_request)
except APIError as err:
self.status.failures.append(table_usage_request)
logger.error("Failed to update usage and query join {}".format(err))
logger.error("Failed to update usage for {} {}".format(table_usage.table, err))
table_join_request = self.__get_table_joins(table_usage)
logger.debug("table join request {}".format(table_join_request))
try:
if table_join_request is not None and len(table_join_request.columnJoins) > 0:
self.client.publish_frequently_joined_with(table_entity, table_join_request)
self.status.records.append(table_join_request)
except APIError as err:
self.status.failures.append(table_join_request)
logger.error("Failed to update usage and query join {}".format(err))
logger.error("Failed to update query join for {}, {}".format(table_usage.table, err))
else:
logger.warning("Table does not exist, skipping usage publish {}, {}".format(table_usage.table,
@ -94,21 +93,33 @@ class MetadataUsageBulkSink(BulkSink):
def __get_table_joins(self, table_usage):
table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date)
column_joins_dict = {}
joined_with = {}
for column_join in table_usage.joins:
if column_join.table_column is None or len(column_join.joined_with) == 0:
continue
logger.debug("main column join {}".format(column_join.table_column))
if column_join.table_column.column in column_joins_dict.keys():
joined_with = column_joins_dict[column_join.table_column.column]
else:
column_joins_dict[column_join.table_column.column] = {}
main_column_fqdn = self.__get_column_fqdn(column_join.table_column)
logger.debug("main column fqdn join {}".format(main_column_fqdn))
joined_with = []
for column in column_join.joined_with:
logger.debug("joined column {}".format(column))
joined_column_fqdn = self.__get_column_fqdn(column)
logger.debug("joined column fqdn {}".format(joined_column_fqdn))
if joined_column_fqdn is not None:
joined_with.append(ColumnJoinedWith(fullyQualifiedName=joined_column_fqdn, joinCount=1))
table_joins.columnJoins.append(ColumnJoins(columnName=column_join.table_column.column,
joinedWith=joined_with))
if joined_column_fqdn in joined_with.keys():
column_joined_with = joined_with[joined_column_fqdn]
column_joined_with.joinCount += 1
joined_with[joined_column_fqdn] = column_joined_with
else:
joined_with[joined_column_fqdn] = ColumnJoinedWith(fullyQualifiedName=joined_column_fqdn,
joinCount=1)
column_joins_dict[column_join.table_column.column] = joined_with
for key, value in column_joins_dict.items():
table_joins.columnJoins.append(ColumnJoins(columnName=key,
joinedWith=list(value.values())))
print(table_joins)
return table_joins
def __get_column_fqdn(self, table_column: TableColumn):

View File

@ -42,7 +42,8 @@ def get_table_column_join(table, table_aliases, joins):
except ValueError as err:
logger.error("Error in parsing sql query joins {}".format(err))
pass
if table == "venue":
print(table_column)
return TableColumnJoin(table_column=table_column, joined_with=joined_with)

View File

@ -41,7 +41,7 @@ def get_service_or_create(config, metadata_config) -> DatabaseServiceEntity:
if service is not None:
return service
else:
service = {'jdbc': {'connectionUrl': config.get_sql_alchemy_url(), 'driverClass': 'jdbc'},
service = {'jdbc': {'connectionUrl': config.get_connection_url(), 'driverClass': 'jdbc'},
'name': config.service_name, 'description': '', 'serviceType': config.get_service_type()}
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service))
return created_service