# # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import base64 import datetime import hashlib import os import socket import time import uuid import requests import importlib from .common import string_to_bytes def current_timestamp(): return int(time.time() * 1000) def timestamp_to_date(timestamp, format_string="%Y-%m-%d %H:%M:%S"): if not timestamp: timestamp = time.time() timestamp = int(timestamp) / 1000 time_array = time.localtime(timestamp) str_date = time.strftime(format_string, time_array) return str_date def date_string_to_timestamp(time_str, format_string="%Y-%m-%d %H:%M:%S"): time_array = time.strptime(time_str, format_string) time_stamp = int(time.mktime(time_array) * 1000) return time_stamp def get_lan_ip(): if os.name != "nt": import fcntl import struct def get_interface_ip(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa( fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', string_to_bytes(ifname[:15])))[20:24]) ip = socket.gethostbyname(socket.getfqdn()) if ip.startswith("127.") and os.name != "nt": interfaces = [ "bond1", "eth0", "eth1", "eth2", "wlan0", "wlan1", "wifi0", "ath0", "ath1", "ppp0", ] for ifname in interfaces: try: ip = get_interface_ip(ifname) break except IOError: pass return ip or '' def from_dict_hook(in_dict: dict): if "type" in in_dict and "data" in in_dict: if in_dict["module"] is None: return in_dict["data"] else: return getattr(importlib.import_module( in_dict["module"]), in_dict["type"])(**in_dict["data"]) else: return in_dict def get_uuid(): return uuid.uuid1().hex def datetime_format(date_time: datetime.datetime) -> datetime.datetime: return datetime.datetime(date_time.year, date_time.month, date_time.day, date_time.hour, date_time.minute, date_time.second) def get_format_time() -> datetime.datetime: return datetime_format(datetime.datetime.now()) def str2date(date_time: str): return datetime.datetime.strptime(date_time, '%Y-%m-%d') def elapsed2time(elapsed): seconds = elapsed / 1000 minuter, second = divmod(seconds, 60) hour, minuter = divmod(minuter, 60) return '%02d:%02d:%02d' % (hour, minuter, second) def download_img(url): if not url: return "" response = requests.get(url) return "data:" + \ response.headers.get('Content-Type', 'image/jpg') + ";" + \ "base64," + base64.b64encode(response.content).decode("utf-8") def delta_seconds(date_string: str): dt = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S") return (datetime.datetime.now() - dt).total_seconds() def hash_str2int(line: str, mod: int = 10 ** 8) -> int: return int(hashlib.sha1(line.encode("utf-8")).hexdigest(), 16) % mod