mirror of
				https://github.com/langgenius/dify.git
				synced 2025-10-30 18:33:30 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			91 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			91 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import datetime
 | |
| import logging
 | |
| import time
 | |
| 
 | |
| import click
 | |
| from werkzeug.exceptions import NotFound
 | |
| 
 | |
| import app
 | |
| from configs import dify_config
 | |
| from extensions.ext_database import db
 | |
| from extensions.ext_redis import redis_client
 | |
| from models.model import (
 | |
|     App,
 | |
|     Message,
 | |
|     MessageAgentThought,
 | |
|     MessageAnnotation,
 | |
|     MessageChain,
 | |
|     MessageFeedback,
 | |
|     MessageFile,
 | |
| )
 | |
| from models.web import SavedMessage
 | |
| from services.feature_service import FeatureService
 | |
| 
 | |
| _logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| @app.celery.task(queue="dataset")
 | |
| def clean_messages():
 | |
|     click.echo(click.style("Start clean messages.", fg="green"))
 | |
|     start_at = time.perf_counter()
 | |
|     plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
 | |
|         days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
 | |
|     )
 | |
|     while True:
 | |
|         try:
 | |
|             # Main query with join and filter
 | |
|             messages = (
 | |
|                 db.session.query(Message)
 | |
|                 .filter(Message.created_at < plan_sandbox_clean_message_day)
 | |
|                 .order_by(Message.created_at.desc())
 | |
|                 .limit(100)
 | |
|                 .all()
 | |
|             )
 | |
| 
 | |
|         except NotFound:
 | |
|             break
 | |
|         if not messages:
 | |
|             break
 | |
|         for message in messages:
 | |
|             plan_sandbox_clean_message_day = message.created_at
 | |
|             app = db.session.query(App).filter_by(id=message.app_id).first()
 | |
|             if not app:
 | |
|                 _logger.warning(
 | |
|                     "Expected App record to exist, but none was found, app_id=%s, message_id=%s",
 | |
|                     message.app_id,
 | |
|                     message.id,
 | |
|                 )
 | |
|                 continue
 | |
|             features_cache_key = f"features:{app.tenant_id}"
 | |
|             plan_cache = redis_client.get(features_cache_key)
 | |
|             if plan_cache is None:
 | |
|                 features = FeatureService.get_features(app.tenant_id)
 | |
|                 redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
 | |
|                 plan = features.billing.subscription.plan
 | |
|             else:
 | |
|                 plan = plan_cache.decode()
 | |
|             if plan == "sandbox":
 | |
|                 # clean related message
 | |
|                 db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
 | |
|                     synchronize_session=False
 | |
|                 )
 | |
|                 db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
 | |
|                     synchronize_session=False
 | |
|                 )
 | |
|                 db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
 | |
|                     synchronize_session=False
 | |
|                 )
 | |
|                 db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
 | |
|                     synchronize_session=False
 | |
|                 )
 | |
|                 db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
 | |
|                     synchronize_session=False
 | |
|                 )
 | |
|                 db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
 | |
|                     synchronize_session=False
 | |
|                 )
 | |
|                 db.session.query(Message).filter(Message.id == message.id).delete()
 | |
|                 db.session.commit()
 | |
|     end_at = time.perf_counter()
 | |
|     click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))
 | 
