diff --git a/ingestion/pipelines/sample_usage.json b/ingestion/pipelines/sample_usage.json index d6b1d815c7f..53c5192b671 100644 --- a/ingestion/pipelines/sample_usage.json +++ b/ingestion/pipelines/sample_usage.json @@ -22,7 +22,8 @@ "bulk_sink": { "type": "metadata-usage", "config": { - "filename": "/tmp/sample_usage" + "filename": "/tmp/sample_usage", + "service_name": "bigquery_gcp" } }, "metadata_server": { diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 19bd0c4a4c5..637c3d6cc24 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -33,6 +33,7 @@ logger = logging.getLogger(__name__) class MetadataUsageSinkConfig(ConfigModel): filename: str + service_name: str class MetadataUsageBulkSink(BulkSink): @@ -51,17 +52,9 @@ class MetadataUsageBulkSink(BulkSink): self.file_handler = open(self.config.filename, "r") self.metadata = OpenMetadata(self.metadata_config) self.status = BulkSinkStatus() - self.tables_dict = {} self.table_join_dict = {} - self.__map_tables() self.today = datetime.today().strftime("%Y-%m-%d") - def __map_tables(self): - table_entities = self.metadata.list_entities(entity=Table) - for table in table_entities.entities: - if table.name.__root__ not in self.tables_dict.keys(): - self.tables_dict[table.name.__root__] = table - @classmethod def create( cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext @@ -82,8 +75,10 @@ class MetadataUsageBulkSink(BulkSink): table_usage = TableUsageCount(**json.loads(record)) if "." in table_usage.table: table_usage.table = table_usage.table.split(".")[1] - if table_usage.table in self.tables_dict: - table_entity = self.tables_dict[table_usage.table] + table_entity = self.__get_table_entity( + table_usage.database, table_usage.table + ) + if table_entity is not None: table_usage_request = TableUsageRequest( date=table_usage.date, count=table_usage.count ) @@ -96,7 +91,6 @@ class MetadataUsageBulkSink(BulkSink): table_usage.table, err ) ) - table_join_request = self.__get_table_joins(table_usage) logger.debug("table join request {}".format(table_join_request)) try: @@ -130,20 +124,21 @@ class MetadataUsageBulkSink(BulkSink): def __get_table_joins(self, table_usage): table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date) column_joins_dict = {} - joined_with = {} for column_join in table_usage.joins: + joined_with = {} if column_join.table_column is None or len(column_join.joined_with) == 0: continue - if column_join.table_column.column in column_joins_dict.keys(): joined_with = column_joins_dict[column_join.table_column.column] else: column_joins_dict[column_join.table_column.column] = {} - - main_column_fqdn = self.__get_column_fqdn(column_join.table_column) + main_column_fqdn = self.__get_column_fqdn( + table_usage.database, column_join.table_column + ) for column in column_join.joined_with: - joined_column_fqdn = self.__get_column_fqdn(column) - + joined_column_fqdn = self.__get_column_fqdn( + table_usage.database, column + ) if joined_column_fqdn in joined_with.keys(): column_joined_with = joined_with[joined_column_fqdn] column_joined_with.joinCount += 1 @@ -153,7 +148,9 @@ class MetadataUsageBulkSink(BulkSink): fullyQualifiedName=joined_column_fqdn, joinCount=1 ) else: - logger.info("Skipping join columns for {}".format(column)) + logger.info( + f"Skipping join columns for {column} {joined_column_fqdn}" + ) column_joins_dict[column_join.table_column.column] = joined_with for key, value in column_joins_dict.items(): @@ -162,14 +159,19 @@ class MetadataUsageBulkSink(BulkSink): ) return table_joins - def __get_column_fqdn(self, table_column: TableColumn): - if table_column.table not in self.tables_dict: + def __get_column_fqdn(self, database: str, table_column: TableColumn): + table_entity = self.__get_table_entity(database, table_column.table) + if table_entity is None: return None - table_entity = self.tables_dict[table_column.table] for tbl_column in table_entity.columns: if table_column.column.lower() == tbl_column.name.__root__.lower(): return tbl_column.fullyQualifiedName.__root__ + def __get_table_entity(self, database_name: str, table_name: str) -> Table: + table_fqn = f"{self.config.service_name}.{database_name}.{table_name}" + table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn) + return table_entity + def get_status(self): return self.status