| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  | import base64 | 
					
						
							|  |  |  | import hashlib | 
					
						
							|  |  |  | import hmac | 
					
						
							|  |  |  | import logging | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | from typing import Optional | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-12 12:34:01 +08:00
										 |  |  | from flask import current_app | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-06 13:21:13 +08:00
										 |  |  | from extensions.ext_storage import storage | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-21 16:58:06 +08:00
										 |  |  | IMAGE_EXTENSIONS = ['jpg', 'jpeg', 'png', 'webp', 'gif', 'svg'] | 
					
						
							|  |  |  | IMAGE_EXTENSIONS.extend([ext.upper() for ext in IMAGE_EXTENSIONS]) | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-04-08 18:51:46 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  | class UploadFileParser: | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def get_image_data(cls, upload_file, force_url: bool = False) -> Optional[str]: | 
					
						
							|  |  |  |         if not upload_file: | 
					
						
							|  |  |  |             return None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-21 16:58:06 +08:00
										 |  |  |         if upload_file.extension not in IMAGE_EXTENSIONS: | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  |             return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if current_app.config['MULTIMODAL_SEND_IMAGE_FORMAT'] == 'url' or force_url: | 
					
						
							| 
									
										
										
										
											2024-04-08 18:51:46 +08:00
										 |  |  |             return cls.get_signed_temp_image_url(upload_file.id) | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  |         else: | 
					
						
							|  |  |  |             # get image file base64 | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 data = storage.load(upload_file.key) | 
					
						
							|  |  |  |             except FileNotFoundError: | 
					
						
							|  |  |  |                 logging.error(f'File not found: {upload_file.key}') | 
					
						
							|  |  |  |                 return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             encoded_string = base64.b64encode(data).decode('utf-8') | 
					
						
							|  |  |  |             return f'data:{upload_file.mime_type};base64,{encoded_string}' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							| 
									
										
										
										
											2024-04-08 18:51:46 +08:00
										 |  |  |     def get_signed_temp_image_url(cls, upload_file_id) -> str: | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  |         """
 | 
					
						
							|  |  |  |         get signed url from upload file | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param upload_file: UploadFile object | 
					
						
							|  |  |  |         :return: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         base_url = current_app.config.get('FILES_URL') | 
					
						
							| 
									
										
										
										
											2024-04-08 18:51:46 +08:00
										 |  |  |         image_preview_url = f'{base_url}/files/{upload_file_id}/image-preview' | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         timestamp = str(int(time.time())) | 
					
						
							|  |  |  |         nonce = os.urandom(16).hex() | 
					
						
							| 
									
										
										
										
											2024-04-08 18:51:46 +08:00
										 |  |  |         data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}" | 
					
						
							| 
									
										
										
										
											2023-11-13 22:05:46 +08:00
										 |  |  |         secret_key = current_app.config['SECRET_KEY'].encode() | 
					
						
							|  |  |  |         sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() | 
					
						
							|  |  |  |         encoded_sign = base64.urlsafe_b64encode(sign).decode() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return f"{image_preview_url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def verify_image_file_signature(cls, upload_file_id: str, timestamp: str, nonce: str, sign: str) -> bool: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         verify signature | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param upload_file_id: file id | 
					
						
							|  |  |  |         :param timestamp: timestamp | 
					
						
							|  |  |  |         :param nonce: nonce | 
					
						
							|  |  |  |         :param sign: signature | 
					
						
							|  |  |  |         :return: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}" | 
					
						
							|  |  |  |         secret_key = current_app.config['SECRET_KEY'].encode() | 
					
						
							|  |  |  |         recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() | 
					
						
							|  |  |  |         recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # verify signature | 
					
						
							|  |  |  |         if sign != recalculated_encoded_sign: | 
					
						
							|  |  |  |             return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         current_time = int(time.time()) | 
					
						
							| 
									
										
										
										
											2024-05-27 16:20:49 +02:00
										 |  |  |         return current_time - int(timestamp) <= current_app.config.get('FILES_ACCESS_TIMEOUT') |