mirror of
				https://github.com/langgenius/dify.git
				synced 2025-11-04 04:43:09 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			82 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			82 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import datetime
 | 
						|
import time
 | 
						|
 | 
						|
import click
 | 
						|
from werkzeug.exceptions import NotFound
 | 
						|
 | 
						|
import app
 | 
						|
from configs import dify_config
 | 
						|
from extensions.ext_database import db
 | 
						|
from extensions.ext_redis import redis_client
 | 
						|
from models.model import (
 | 
						|
    App,
 | 
						|
    Message,
 | 
						|
    MessageAgentThought,
 | 
						|
    MessageAnnotation,
 | 
						|
    MessageChain,
 | 
						|
    MessageFeedback,
 | 
						|
    MessageFile,
 | 
						|
)
 | 
						|
from models.web import SavedMessage
 | 
						|
from services.feature_service import FeatureService
 | 
						|
 | 
						|
 | 
						|
@app.celery.task(queue="dataset")
 | 
						|
def clean_messages():
 | 
						|
    click.echo(click.style("Start clean messages.", fg="green"))
 | 
						|
    start_at = time.perf_counter()
 | 
						|
    plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
 | 
						|
        days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
 | 
						|
    )
 | 
						|
    while True:
 | 
						|
        try:
 | 
						|
            # Main query with join and filter
 | 
						|
            # FIXME:for mypy no paginate method error
 | 
						|
            messages = (
 | 
						|
                db.session.query(Message)  # type: ignore
 | 
						|
                .filter(Message.created_at < plan_sandbox_clean_message_day)
 | 
						|
                .order_by(Message.created_at.desc())
 | 
						|
                .limit(100)
 | 
						|
                .all()
 | 
						|
            )
 | 
						|
 | 
						|
        except NotFound:
 | 
						|
            break
 | 
						|
        if not messages:
 | 
						|
            break
 | 
						|
        for message in messages:
 | 
						|
            plan_sandbox_clean_message_day = message.created_at
 | 
						|
            app = App.query.filter_by(id=message.app_id).first()
 | 
						|
            features_cache_key = f"features:{app.tenant_id}"
 | 
						|
            plan_cache = redis_client.get(features_cache_key)
 | 
						|
            if plan_cache is None:
 | 
						|
                features = FeatureService.get_features(app.tenant_id)
 | 
						|
                redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
 | 
						|
                plan = features.billing.subscription.plan
 | 
						|
            else:
 | 
						|
                plan = plan_cache.decode()
 | 
						|
            if plan == "sandbox":
 | 
						|
                # clean related message
 | 
						|
                db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
 | 
						|
                    synchronize_session=False
 | 
						|
                )
 | 
						|
                db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
 | 
						|
                    synchronize_session=False
 | 
						|
                )
 | 
						|
                db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
 | 
						|
                    synchronize_session=False
 | 
						|
                )
 | 
						|
                db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
 | 
						|
                    synchronize_session=False
 | 
						|
                )
 | 
						|
                db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
 | 
						|
                    synchronize_session=False
 | 
						|
                )
 | 
						|
                db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
 | 
						|
                    synchronize_session=False
 | 
						|
                )
 | 
						|
                db.session.query(Message).filter(Message.id == message.id).delete()
 | 
						|
                db.session.commit()
 | 
						|
    end_at = time.perf_counter()
 | 
						|
    click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))
 |