datahub/smoke-test/tests/test_result_msg.py
2023-10-10 16:08:34 +05:30

47 lines
1.3 KiB
Python

import os
from slack_sdk import WebClient
datahub_stats = {}
def add_datahub_stats(stat_name, stat_val):
datahub_stats[stat_name] = stat_val
def send_to_slack(passed: str):
slack_api_token = os.getenv("SLACK_API_TOKEN")
slack_channel = os.getenv("SLACK_CHANNEL")
slack_thread_ts = os.getenv("SLACK_THREAD_TS")
test_identifier = os.getenv("TEST_IDENTIFIER", "LOCAL_TEST")
if slack_api_token is None or slack_channel is None:
return
client = WebClient(token=slack_api_token)
key: str
message = ""
for key, val in datahub_stats.items():
if key.startswith("num-"):
entity_type = key.replace("num-", "")
message += f"Num {entity_type} is {val}\n"
if slack_thread_ts is None:
client.chat_postMessage(
channel=slack_channel,
text=f"{test_identifier} Status - {passed}\n{message}",
)
else:
client.chat_postMessage(
channel=slack_channel,
text=f"{test_identifier} Status - {passed}\n{message}",
thread_ts=slack_thread_ts,
)
def send_message(exitstatus):
try:
send_to_slack("PASSED" if exitstatus == 0 else "FAILED")
except Exception as e:
# We don't want to fail pytest at all
print(f"Exception happened for sending msg to slack {e}")