diff --git a/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py index c07ccd020c0..a58eecb7e5f 100644 --- a/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py @@ -124,8 +124,11 @@ class ElasticsearchBulkSink(BulkSink): column_names=column_names, column_descriptions=column_descriptions, monthly_stats=table.usageSummary.monthlyStats.count, + monthly_percentile_rank=table.usageSummary.monthlyStats.percentileRank, weekly_stats=table.usageSummary.weeklyStats.count, + weekly_percentile_rank=table.usageSummary.weeklyStats.percentileRank, daily_stats=table.usageSummary.dailyStats.count, + daily_percentile_rank=table.usageSummary.dailyStats.percentileRank, tier=tier, tags=list(tags), fqdn=fqdn, diff --git a/ingestion/src/metadata/ingestion/bulksink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/bulksink/elasticsearch_constants.py index d89b1545fb8..c317b65c7c3 100644 --- a/ingestion/src/metadata/ingestion/bulksink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/bulksink/elasticsearch_constants.py @@ -79,9 +79,18 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "monthly_stats":{ "type": "long" }, + "monthly_percentile_rank":{ + "type": "long" + }, "weekly_stats":{ "type": "long" }, + "weekly_percentile_rank":{ + "type": "long" + }, + "daily_percentile_rank": { + "type": "long" + }, "daily_stats": { "type": "long" } diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 6f4dc5cfa8d..cadf40c20fd 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -15,6 +15,7 @@ import json import logging +from datetime import datetime from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.table import ColumnJoins, TableJoins @@ -46,6 +47,7 @@ class MetadataUsageBulkSink(BulkSink): self.tables_dict = {} self.table_join_dict = {} self.__map_tables() + self.today = datetime.today().strftime('%Y-%m-%d') def __map_tables(self): tables = self.client.list_tables('columns') @@ -90,6 +92,11 @@ class MetadataUsageBulkSink(BulkSink): else: logger.warning("Table does not exist, skipping usage publish {}, {}".format(table_usage.table, table_usage.database)) + try: + self.client.compute_percentile('table', self.today) + self.client.compute_percentile('database', self.today) + except APIError: + logger.error("Failed to publish compute.percentile") def __get_table_joins(self, table_usage): table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date) diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index c3a4176e0f0..e7a7b6f62d0 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -179,8 +179,11 @@ class TableESDocument(BaseModel): column_names: List[str] column_descriptions: List[str] monthly_stats: int + monthly_percentile_rank: int weekly_stats: int + weekly_percentile_rank: int daily_stats: int + daily_percentile_rank: int tags: List[str] fqdn: str tier: Optional[str] = None diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index a823d2951ca..4ba34b180da 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -310,6 +310,10 @@ class REST(object): resp = self.get('/tags/{}'.format(category)) return [Tag(**d) for d in resp['children']] + def compute_percentile(self, entity_type:str, date:str): + resp = self.post('/usage/compute.percentile/{}/{}'.format(entity_type, date)) + logger.debug("published compute percentile {}".format(resp)) + def __enter__(self): return self