import datetime from typing import Optional from flask_login import current_user # type: ignore from core.rag.index_processor.constant.built_in_field import BuiltInField from extensions.ext_database import db from extensions.ext_redis import redis_client from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding from services.dataset_service import DocumentService from services.entities.knowledge_entities.knowledge_entities import ( MetadataArgs, MetadataOperationData, ) from tasks.update_documents_metadata_task import update_documents_metadata_task class MetadataService: @staticmethod def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMetadata: metadata = DatasetMetadata( dataset_id=dataset_id, type=metadata_args.type, name=metadata_args.name, created_by=current_user.id, ) db.session.add(metadata) db.session.commit() return metadata @staticmethod def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata: lock_key = f"dataset_metadata_lock_{dataset_id}" MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) metadata = DatasetMetadata.query.filter_by(id=metadata_id).first() if metadata is None: raise ValueError("Metadata not found.") old_name = metadata.name metadata.name = name metadata.updated_by = current_user.id metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) # update related documents documents = [] dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all() if dataset_metadata_bindings: document_ids = [binding.document_id for binding in dataset_metadata_bindings] documents = DocumentService.get_document_by_ids(document_ids) for document in documents: document.doc_metadata[name] = document.doc_metadata.pop(old_name) db.session.add(document) db.session.commit() if document_ids: update_documents_metadata_task.delay(dataset_id, document_ids, lock_key) return metadata @staticmethod def delete_metadata(dataset_id: str, metadata_id: str): lock_key = f"dataset_metadata_lock_{dataset_id}" MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) metadata = DatasetMetadata.query.filter_by(id=metadata_id).first() if metadata is None: raise ValueError("Metadata not found.") db.session.delete(metadata) # delete related documents dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all() if dataset_metadata_bindings: document_ids = [binding.document_id for binding in dataset_metadata_bindings] documents = DocumentService.get_document_by_ids(document_ids) for document in documents: document.doc_metadata.pop(metadata.name) db.session.add(document) db.session.commit() if document_ids: update_documents_metadata_task.delay(dataset_id, document_ids, lock_key) @staticmethod def get_built_in_fields(): return [ {"name": BuiltInField.document_name, "type": "string"}, {"name": BuiltInField.uploader, "type": "string"}, {"name": BuiltInField.upload_date, "type": "date"}, {"name": BuiltInField.last_update_date, "type": "date"}, {"name": BuiltInField.source, "type": "string"}, ] @staticmethod def enable_built_in_field(dataset: Dataset): if dataset.built_in_fields: return lock_key = f"dataset_metadata_lock_{dataset.id}" MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) dataset.built_in_fields = True db.session.add(dataset) documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) document_ids = [] if documents: for document in documents: document.doc_metadata[BuiltInField.document_name] = document.name document.doc_metadata[BuiltInField.uploader] = document.uploader document.doc_metadata[BuiltInField.upload_date] = document.upload_date.strftime("%Y-%m-%d %H:%M:%S") document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.strftime( "%Y-%m-%d %H:%M:%S" ) document.doc_metadata[BuiltInField.source] = document.data_source_type db.session.add(document) document_ids.append(document.id) db.session.commit() if document_ids: update_documents_metadata_task.delay(dataset.id, document_ids, lock_key) @staticmethod def disable_built_in_field(dataset: Dataset): if not dataset.built_in_fields: return lock_key = f"dataset_metadata_lock_{dataset.id}" MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) dataset.built_in_fields = False db.session.add(dataset) documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) document_ids = [] if documents: for document in documents: document.doc_metadata.pop(BuiltInField.document_name) document.doc_metadata.pop(BuiltInField.uploader) document.doc_metadata.pop(BuiltInField.upload_date) document.doc_metadata.pop(BuiltInField.last_update_date) document.doc_metadata.pop(BuiltInField.source) db.session.add(document) document_ids.append(document.id) db.session.commit() if document_ids: update_documents_metadata_task.delay(dataset.id, document_ids, lock_key) @staticmethod def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData): for operation in metadata_args.operation_data: lock_key = f"document_metadata_lock_{operation.document_id}" MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id) document = DocumentService.get_document(operation.document_id) if document is None: raise ValueError("Document not found.") document.doc_metadata = {} for metadata_value in metadata_args.fields: document.doc_metadata[metadata_value.name] = metadata_value.value if dataset.built_in_fields: document.doc_metadata[BuiltInField.document_name] = document.name document.doc_metadata[BuiltInField.uploader] = document.uploader document.doc_metadata[BuiltInField.upload_date] = document.upload_date.strftime("%Y-%m-%d %H:%M:%S") document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.strftime( "%Y-%m-%d %H:%M:%S" ) document.doc_metadata[BuiltInField.source] = document.data_source_type # deal metadata bindding DatasetMetadataBinding.query.filter_by(document_id=operation.document_id).delete() for metadata_value in operation.metadata_list: dataset_metadata_binding = DatasetMetadataBinding( tenant_id=current_user.tenant_id, dataset_id=dataset.id, document_id=operation.document_id, metadata_id=metadata_value.id, created_by=current_user.id, ) db.session.add(dataset_metadata_binding) db.session.add(document) db.session.commit() update_documents_metadata_task.delay(dataset.id, [document.id], lock_key) @staticmethod def knowledge_base_metadata_lock_check(dataset_id: Optional[str], document_id: Optional[str]): if dataset_id: lock_key = f"dataset_metadata_lock_{dataset_id}" if redis_client.get(lock_key): raise ValueError("Another knowledge base metadata operation is running, please wait a moment.") redis_client.set(lock_key, 1, ex=3600) if document_id: lock_key = f"document_metadata_lock_{document_id}" if redis_client.get(lock_key): raise ValueError("Another document metadata operation is running, please wait a moment.") redis_client.set(lock_key, 1, ex=3600)