From 4371e7a6f02f25e0470f062e1029062abfdb5f8c Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Tue, 10 Aug 2021 15:00:41 -0700 Subject: [PATCH] Fix: usage connectors --- ingestion/pipelines/redshift_usage.json | 2 +- .../ingestion/bulksink/metadata_usage_rest.py | 37 ++++++++++++------- .../ingestion/stage/table_usage_stage.py | 3 +- ingestion/src/metadata/utils/helpers.py | 2 +- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/ingestion/pipelines/redshift_usage.json b/ingestion/pipelines/redshift_usage.json index f75ab1f8184..56c394d3bd7 100644 --- a/ingestion/pipelines/redshift_usage.json +++ b/ingestion/pipelines/redshift_usage.json @@ -2,7 +2,7 @@ "source": { "type": "redshift-usage", "config": { - "host_port": "cluster.name.region.redshift.amazonaws.com:5439", + "host_port": "cluster.name.region.redshift.amazonaws.com:5439", "username": "username", "password": "strong_password", "database": "warehouse", diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py index 9ba9682fe08..8838aed8925 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py @@ -44,6 +44,7 @@ class MetadataUsageBulkSink(BulkSink): self.client = REST(self.metadata_config) self.status = BulkSinkStatus() self.tables_dict = {} + self.table_join_dict = {} self.__map_tables() def __map_tables(self): @@ -73,20 +74,18 @@ class MetadataUsageBulkSink(BulkSink): table_usage_request = TableUsageRequest(date=table_usage.date, count=table_usage.count) try: self.client.publish_usage_for_a_table(table_entity, table_usage_request) - self.status.records.append(table_usage_request) except APIError as err: self.status.failures.append(table_usage_request) - logger.error("Failed to update usage and query join {}".format(err)) + logger.error("Failed to update usage for {} {}".format(table_usage.table, err)) table_join_request = self.__get_table_joins(table_usage) logger.debug("table join request {}".format(table_join_request)) try: if table_join_request is not None and len(table_join_request.columnJoins) > 0: self.client.publish_frequently_joined_with(table_entity, table_join_request) - self.status.records.append(table_join_request) except APIError as err: self.status.failures.append(table_join_request) - logger.error("Failed to update usage and query join {}".format(err)) + logger.error("Failed to update query join for {}, {}".format(table_usage.table, err)) else: logger.warning("Table does not exist, skipping usage publish {}, {}".format(table_usage.table, @@ -94,21 +93,33 @@ class MetadataUsageBulkSink(BulkSink): def __get_table_joins(self, table_usage): table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date) + column_joins_dict = {} + joined_with = {} for column_join in table_usage.joins: if column_join.table_column is None or len(column_join.joined_with) == 0: continue - logger.debug("main column join {}".format(column_join.table_column)) + + if column_join.table_column.column in column_joins_dict.keys(): + joined_with = column_joins_dict[column_join.table_column.column] + else: + column_joins_dict[column_join.table_column.column] = {} + main_column_fqdn = self.__get_column_fqdn(column_join.table_column) - logger.debug("main column fqdn join {}".format(main_column_fqdn)) - joined_with = [] for column in column_join.joined_with: - logger.debug("joined column {}".format(column)) joined_column_fqdn = self.__get_column_fqdn(column) - logger.debug("joined column fqdn {}".format(joined_column_fqdn)) - if joined_column_fqdn is not None: - joined_with.append(ColumnJoinedWith(fullyQualifiedName=joined_column_fqdn, joinCount=1)) - table_joins.columnJoins.append(ColumnJoins(columnName=column_join.table_column.column, - joinedWith=joined_with)) + if joined_column_fqdn in joined_with.keys(): + column_joined_with = joined_with[joined_column_fqdn] + column_joined_with.joinCount += 1 + joined_with[joined_column_fqdn] = column_joined_with + else: + joined_with[joined_column_fqdn] = ColumnJoinedWith(fullyQualifiedName=joined_column_fqdn, + joinCount=1) + column_joins_dict[column_join.table_column.column] = joined_with + + for key, value in column_joins_dict.items(): + table_joins.columnJoins.append(ColumnJoins(columnName=key, + joinedWith=list(value.values()))) + print(table_joins) return table_joins def __get_column_fqdn(self, table_column: TableColumn): diff --git a/ingestion/src/metadata/ingestion/stage/table_usage_stage.py b/ingestion/src/metadata/ingestion/stage/table_usage_stage.py index e02d2c7b771..71393b7b713 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage_stage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage_stage.py @@ -42,7 +42,8 @@ def get_table_column_join(table, table_aliases, joins): except ValueError as err: logger.error("Error in parsing sql query joins {}".format(err)) pass - + if table == "venue": + print(table_column) return TableColumnJoin(table_column=table_column, joined_with=joined_with) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index b218d56f461..d8d64e339b0 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -41,7 +41,7 @@ def get_service_or_create(config, metadata_config) -> DatabaseServiceEntity: if service is not None: return service else: - service = {'jdbc': {'connectionUrl': config.get_sql_alchemy_url(), 'driverClass': 'jdbc'}, + service = {'jdbc': {'connectionUrl': config.get_connection_url(), 'driverClass': 'jdbc'}, 'name': config.service_name, 'description': '', 'serviceType': config.get_service_type()} created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service)) return created_service