Merge pull request #168 from open-metadata/fix_ingestion_usage

Ingestion: add compute.percentile after publishing usage
This commit is contained in:
Suresh Srinivas 2021-08-14 14:43:59 -07:00 committed by GitHub
commit e5c9f9ae09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 26 additions and 0 deletions

View File

@ -124,8 +124,11 @@ class ElasticsearchBulkSink(BulkSink):
column_names=column_names, column_names=column_names,
column_descriptions=column_descriptions, column_descriptions=column_descriptions,
monthly_stats=table.usageSummary.monthlyStats.count, monthly_stats=table.usageSummary.monthlyStats.count,
monthly_percentile_rank=table.usageSummary.monthlyStats.percentileRank,
weekly_stats=table.usageSummary.weeklyStats.count, weekly_stats=table.usageSummary.weeklyStats.count,
weekly_percentile_rank=table.usageSummary.weeklyStats.percentileRank,
daily_stats=table.usageSummary.dailyStats.count, daily_stats=table.usageSummary.dailyStats.count,
daily_percentile_rank=table.usageSummary.dailyStats.percentileRank,
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
fqdn=fqdn, fqdn=fqdn,

View File

@ -79,9 +79,18 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"monthly_stats":{ "monthly_stats":{
"type": "long" "type": "long"
}, },
"monthly_percentile_rank":{
"type": "long"
},
"weekly_stats":{ "weekly_stats":{
"type": "long" "type": "long"
}, },
"weekly_percentile_rank":{
"type": "long"
},
"daily_percentile_rank": {
"type": "long"
},
"daily_stats": { "daily_stats": {
"type": "long" "type": "long"
} }

View File

@ -15,6 +15,7 @@
import json import json
import logging import logging
from datetime import datetime
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.table import ColumnJoins, TableJoins from metadata.generated.schema.entity.data.table import ColumnJoins, TableJoins
@ -46,6 +47,7 @@ class MetadataUsageBulkSink(BulkSink):
self.tables_dict = {} self.tables_dict = {}
self.table_join_dict = {} self.table_join_dict = {}
self.__map_tables() self.__map_tables()
self.today = datetime.today().strftime('%Y-%m-%d')
def __map_tables(self): def __map_tables(self):
tables = self.client.list_tables('columns') tables = self.client.list_tables('columns')
@ -90,6 +92,11 @@ class MetadataUsageBulkSink(BulkSink):
else: else:
logger.warning("Table does not exist, skipping usage publish {}, {}".format(table_usage.table, logger.warning("Table does not exist, skipping usage publish {}, {}".format(table_usage.table,
table_usage.database)) table_usage.database))
try:
self.client.compute_percentile('table', self.today)
self.client.compute_percentile('database', self.today)
except APIError:
logger.error("Failed to publish compute.percentile")
def __get_table_joins(self, table_usage): def __get_table_joins(self, table_usage):
table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date) table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date)

View File

@ -179,8 +179,11 @@ class TableESDocument(BaseModel):
column_names: List[str] column_names: List[str]
column_descriptions: List[str] column_descriptions: List[str]
monthly_stats: int monthly_stats: int
monthly_percentile_rank: int
weekly_stats: int weekly_stats: int
weekly_percentile_rank: int
daily_stats: int daily_stats: int
daily_percentile_rank: int
tags: List[str] tags: List[str]
fqdn: str fqdn: str
tier: Optional[str] = None tier: Optional[str] = None

View File

@ -310,6 +310,10 @@ class REST(object):
resp = self.get('/tags/{}'.format(category)) resp = self.get('/tags/{}'.format(category))
return [Tag(**d) for d in resp['children']] return [Tag(**d) for d in resp['children']]
def compute_percentile(self, entity_type:str, date:str):
resp = self.post('/usage/compute.percentile/{}/{}'.format(entity_type, date))
logger.debug("published compute percentile {}".format(resp))
def __enter__(self): def __enter__(self):
return self return self