| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  | import datetime | 
					
						
							|  |  |  | import time | 
					
						
							| 
									
										
										
										
											2024-01-12 12:34:01 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  | import click | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  | from sqlalchemy import func | 
					
						
							| 
									
										
										
										
											2024-01-12 12:34:01 +08:00
										 |  |  | from werkzeug.exceptions import NotFound | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-06 13:21:13 +08:00
										 |  |  | import app | 
					
						
							| 
									
										
										
										
											2024-07-12 16:51:43 +08:00
										 |  |  | from configs import dify_config | 
					
						
							| 
									
										
										
										
											2024-02-22 23:31:57 +08:00
										 |  |  | from core.rag.index_processor.index_processor_factory import IndexProcessorFactory | 
					
						
							| 
									
										
										
										
											2024-02-06 13:21:13 +08:00
										 |  |  | from extensions.ext_database import db | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  | from extensions.ext_redis import redis_client | 
					
						
							| 
									
										
										
										
											2024-02-06 13:21:13 +08:00
										 |  |  | from models.dataset import Dataset, DatasetQuery, Document | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  | from services.feature_service import FeatureService | 
					
						
							| 
									
										
										
										
											2024-02-06 13:21:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  | @app.celery.task(queue="dataset") | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  | def clean_unused_datasets_task(): | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |     click.echo(click.style("Start clean unused datasets indexes.", fg="green")) | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |     plan_sandbox_clean_day_setting = dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING | 
					
						
							|  |  |  |     plan_pro_clean_day_setting = dify_config.PLAN_PRO_CLEAN_DAY_SETTING | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  |     start_at = time.perf_counter() | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |     plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting) | 
					
						
							|  |  |  |     plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting) | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  |     page = 1 | 
					
						
							|  |  |  |     while True: | 
					
						
							|  |  |  |         try: | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  |             # Subquery for counting new documents | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |             document_subquery_new = ( | 
					
						
							|  |  |  |                 db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) | 
					
						
							|  |  |  |                 .filter( | 
					
						
							|  |  |  |                     Document.indexing_status == "completed", | 
					
						
							|  |  |  |                     Document.enabled == True, | 
					
						
							|  |  |  |                     Document.archived == False, | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                     Document.updated_at > plan_sandbox_clean_day, | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                 ) | 
					
						
							|  |  |  |                 .group_by(Document.dataset_id) | 
					
						
							|  |  |  |                 .subquery() | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |             # Subquery for counting old documents | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |             document_subquery_old = ( | 
					
						
							|  |  |  |                 db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) | 
					
						
							|  |  |  |                 .filter( | 
					
						
							|  |  |  |                     Document.indexing_status == "completed", | 
					
						
							|  |  |  |                     Document.enabled == True, | 
					
						
							|  |  |  |                     Document.archived == False, | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                     Document.updated_at < plan_sandbox_clean_day, | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                 ) | 
					
						
							|  |  |  |                 .group_by(Document.dataset_id) | 
					
						
							|  |  |  |                 .subquery() | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |             # Main query with join and filter | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |             datasets = ( | 
					
						
							|  |  |  |                 db.session.query(Dataset) | 
					
						
							|  |  |  |                 .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) | 
					
						
							|  |  |  |                 .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) | 
					
						
							|  |  |  |                 .filter( | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                     Dataset.created_at < plan_sandbox_clean_day, | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                     func.coalesce(document_subquery_new.c.document_count, 0) == 0, | 
					
						
							|  |  |  |                     func.coalesce(document_subquery_old.c.document_count, 0) > 0, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 .order_by(Dataset.created_at.desc()) | 
					
						
							|  |  |  |                 .paginate(page=page, per_page=50) | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  |         except NotFound: | 
					
						
							|  |  |  |             break | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  |         if datasets.items is None or len(datasets.items) == 0: | 
					
						
							|  |  |  |             break | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  |         page += 1 | 
					
						
							|  |  |  |         for dataset in datasets: | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |             dataset_query = ( | 
					
						
							|  |  |  |                 db.session.query(DatasetQuery) | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                 .filter(DatasetQuery.created_at > plan_sandbox_clean_day, DatasetQuery.dataset_id == dataset.id) | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                 .all() | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  |             if not dataset_query or len(dataset_query) == 0: | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  |                 try: | 
					
						
							|  |  |  |                     # remove index | 
					
						
							|  |  |  |                     index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() | 
					
						
							|  |  |  |                     index_processor.clean(dataset, None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # update document | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                     update_params = {Document.enabled: False} | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |                     Document.query.filter_by(dataset_id=dataset.id).update(update_params) | 
					
						
							|  |  |  |                     db.session.commit() | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                     click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green")) | 
					
						
							| 
									
										
										
										
											2024-07-17 17:36:11 +08:00
										 |  |  |                 except Exception as e: | 
					
						
							|  |  |  |                     click.echo( | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |                         click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") | 
					
						
							|  |  |  |                     ) | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  |     page = 1 | 
					
						
							|  |  |  |     while True: | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             # Subquery for counting new documents | 
					
						
							|  |  |  |             document_subquery_new = ( | 
					
						
							|  |  |  |                 db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) | 
					
						
							|  |  |  |                 .filter( | 
					
						
							|  |  |  |                     Document.indexing_status == "completed", | 
					
						
							|  |  |  |                     Document.enabled == True, | 
					
						
							|  |  |  |                     Document.archived == False, | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                     Document.updated_at > plan_pro_clean_day, | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  |                 ) | 
					
						
							|  |  |  |                 .group_by(Document.dataset_id) | 
					
						
							|  |  |  |                 .subquery() | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Subquery for counting old documents | 
					
						
							|  |  |  |             document_subquery_old = ( | 
					
						
							|  |  |  |                 db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) | 
					
						
							|  |  |  |                 .filter( | 
					
						
							|  |  |  |                     Document.indexing_status == "completed", | 
					
						
							|  |  |  |                     Document.enabled == True, | 
					
						
							|  |  |  |                     Document.archived == False, | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                     Document.updated_at < plan_pro_clean_day, | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  |                 ) | 
					
						
							|  |  |  |                 .group_by(Document.dataset_id) | 
					
						
							|  |  |  |                 .subquery() | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Main query with join and filter | 
					
						
							|  |  |  |             datasets = ( | 
					
						
							|  |  |  |                 db.session.query(Dataset) | 
					
						
							|  |  |  |                 .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) | 
					
						
							|  |  |  |                 .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) | 
					
						
							|  |  |  |                 .filter( | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                     Dataset.created_at < plan_pro_clean_day, | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  |                     func.coalesce(document_subquery_new.c.document_count, 0) == 0, | 
					
						
							|  |  |  |                     func.coalesce(document_subquery_old.c.document_count, 0) > 0, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 .order_by(Dataset.created_at.desc()) | 
					
						
							|  |  |  |                 .paginate(page=page, per_page=50) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         except NotFound: | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  |         if datasets.items is None or len(datasets.items) == 0: | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  |         page += 1 | 
					
						
							|  |  |  |         for dataset in datasets: | 
					
						
							|  |  |  |             dataset_query = ( | 
					
						
							|  |  |  |                 db.session.query(DatasetQuery) | 
					
						
							| 
									
										
										
										
											2024-10-17 10:40:22 +08:00
										 |  |  |                 .filter(DatasetQuery.created_at > plan_pro_clean_day, DatasetQuery.dataset_id == dataset.id) | 
					
						
							| 
									
										
										
										
											2024-10-16 22:24:50 +08:00
										 |  |  |                 .all() | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             if not dataset_query or len(dataset_query) == 0: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     features_cache_key = f"features:{dataset.tenant_id}" | 
					
						
							|  |  |  |                     plan = redis_client.get(features_cache_key) | 
					
						
							|  |  |  |                     if plan is None: | 
					
						
							|  |  |  |                         features = FeatureService.get_features(dataset.tenant_id) | 
					
						
							|  |  |  |                         redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) | 
					
						
							|  |  |  |                         plan = features.billing.subscription.plan | 
					
						
							|  |  |  |                     if plan == "sandbox": | 
					
						
							|  |  |  |                         # remove index | 
					
						
							|  |  |  |                         index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() | 
					
						
							|  |  |  |                         index_processor.clean(dataset, None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                         # update document | 
					
						
							|  |  |  |                         update_params = {Document.enabled: False} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                         Document.query.filter_by(dataset_id=dataset.id).update(update_params) | 
					
						
							|  |  |  |                         db.session.commit() | 
					
						
							|  |  |  |                         click.echo( | 
					
						
							|  |  |  |                             click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green") | 
					
						
							|  |  |  |                         ) | 
					
						
							|  |  |  |                 except Exception as e: | 
					
						
							|  |  |  |                     click.echo( | 
					
						
							|  |  |  |                         click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") | 
					
						
							|  |  |  |                     ) | 
					
						
							| 
									
										
										
										
											2024-01-02 15:29:18 +08:00
										 |  |  |     end_at = time.perf_counter() | 
					
						
							| 
									
										
										
										
											2024-08-15 12:54:05 +08:00
										 |  |  |     click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green")) |