mirror of
				https://github.com/AUTOMATIC1111/stable-diffusion-webui.git
				synced 2025-10-31 01:54:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			174 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			174 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os.path
 | |
| from concurrent.futures import ProcessPoolExecutor
 | |
| import multiprocessing
 | |
| import time
 | |
| import re
 | |
| 
 | |
| re_special = re.compile(r'([\\()])')
 | |
| 
 | |
| def get_deepbooru_tags(pil_image):
 | |
|     """
 | |
|     This method is for running only one image at a time for simple use.  Used to the img2img interrogate.
 | |
|     """
 | |
|     from modules import shared  # prevents circular reference
 | |
| 
 | |
|     try:
 | |
|         create_deepbooru_process(shared.opts.interrogate_deepbooru_score_threshold, create_deepbooru_opts())
 | |
|         return get_tags_from_process(pil_image)
 | |
|     finally:
 | |
|         release_process()
 | |
| 
 | |
| 
 | |
| OPT_INCLUDE_RANKS = "include_ranks"
 | |
| def create_deepbooru_opts():
 | |
|     from modules import shared
 | |
| 
 | |
|     return {
 | |
|         "use_spaces": shared.opts.deepbooru_use_spaces,
 | |
|         "use_escape": shared.opts.deepbooru_escape,
 | |
|         "alpha_sort": shared.opts.deepbooru_sort_alpha,
 | |
|         OPT_INCLUDE_RANKS: shared.opts.interrogate_return_ranks,
 | |
|     }
 | |
| 
 | |
| 
 | |
| def deepbooru_process(queue, deepbooru_process_return, threshold, deepbooru_opts):
 | |
|     model, tags = get_deepbooru_tags_model()
 | |
|     while True: # while process is running, keep monitoring queue for new image
 | |
|         pil_image = queue.get()
 | |
|         if pil_image == "QUIT":
 | |
|             break
 | |
|         else:
 | |
|             deepbooru_process_return["value"] = get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts)
 | |
| 
 | |
| 
 | |
| def create_deepbooru_process(threshold, deepbooru_opts):
 | |
|     """
 | |
|     Creates deepbooru process.  A queue is created to send images into the process.  This enables multiple images
 | |
|     to be processed in a row without reloading the model or creating a new process.  To return the data, a shared
 | |
|     dictionary is created to hold the tags created.  To wait for tags to be returned, a value of -1 is assigned
 | |
|     to the dictionary and the method adding the image to the queue should wait for this value to be updated with
 | |
|     the tags.
 | |
|     """
 | |
|     from modules import shared  # prevents circular reference
 | |
|     context = multiprocessing.get_context("spawn")
 | |
|     shared.deepbooru_process_manager = context.Manager()
 | |
|     shared.deepbooru_process_queue = shared.deepbooru_process_manager.Queue()
 | |
|     shared.deepbooru_process_return = shared.deepbooru_process_manager.dict()
 | |
|     shared.deepbooru_process_return["value"] = -1
 | |
|     shared.deepbooru_process = context.Process(target=deepbooru_process, args=(shared.deepbooru_process_queue, shared.deepbooru_process_return, threshold, deepbooru_opts))
 | |
|     shared.deepbooru_process.start()
 | |
| 
 | |
| 
 | |
| def get_tags_from_process(image):
 | |
|     from modules import shared
 | |
| 
 | |
|     shared.deepbooru_process_return["value"] = -1
 | |
|     shared.deepbooru_process_queue.put(image)
 | |
|     while shared.deepbooru_process_return["value"] == -1:
 | |
|         time.sleep(0.2)
 | |
|     caption = shared.deepbooru_process_return["value"]
 | |
|     shared.deepbooru_process_return["value"] = -1
 | |
| 
 | |
|     return caption
 | |
| 
 | |
| 
 | |
| def release_process():
 | |
|     """
 | |
|     Stops the deepbooru process to return used memory
 | |
|     """
 | |
|     from modules import shared  # prevents circular reference
 | |
|     shared.deepbooru_process_queue.put("QUIT")
 | |
|     shared.deepbooru_process.join()
 | |
|     shared.deepbooru_process_queue = None
 | |
|     shared.deepbooru_process = None
 | |
|     shared.deepbooru_process_return = None
 | |
|     shared.deepbooru_process_manager = None
 | |
| 
 | |
| def get_deepbooru_tags_model():
 | |
|     import deepdanbooru as dd
 | |
|     import tensorflow as tf
 | |
|     import numpy as np
 | |
|     this_folder = os.path.dirname(__file__)
 | |
|     model_path = os.path.abspath(os.path.join(this_folder, '..', 'models', 'deepbooru'))
 | |
|     if not os.path.exists(os.path.join(model_path, 'project.json')):
 | |
|         # there is no point importing these every time
 | |
|         import zipfile
 | |
|         from basicsr.utils.download_util import load_file_from_url
 | |
|         load_file_from_url(
 | |
|             r"https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20211112-sgd-e28/deepdanbooru-v3-20211112-sgd-e28.zip",
 | |
|             model_path)
 | |
|         with zipfile.ZipFile(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"), "r") as zip_ref:
 | |
|             zip_ref.extractall(model_path)
 | |
|         os.remove(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"))
 | |
| 
 | |
|     tags = dd.project.load_tags_from_project(model_path)
 | |
|     model = dd.project.load_model_from_project(
 | |
|         model_path, compile_model=False
 | |
|     )
 | |
|     return model, tags
 | |
| 
 | |
| 
 | |
| def get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts):
 | |
|     import deepdanbooru as dd
 | |
|     import tensorflow as tf
 | |
|     import numpy as np
 | |
| 
 | |
|     alpha_sort = deepbooru_opts['alpha_sort']
 | |
|     use_spaces = deepbooru_opts['use_spaces']
 | |
|     use_escape = deepbooru_opts['use_escape']
 | |
|     include_ranks = deepbooru_opts['include_ranks']
 | |
| 
 | |
|     width = model.input_shape[2]
 | |
|     height = model.input_shape[1]
 | |
|     image = np.array(pil_image)
 | |
|     image = tf.image.resize(
 | |
|         image,
 | |
|         size=(height, width),
 | |
|         method=tf.image.ResizeMethod.AREA,
 | |
|         preserve_aspect_ratio=True,
 | |
|     )
 | |
|     image = image.numpy()  # EagerTensor to np.array
 | |
|     image = dd.image.transform_and_pad_image(image, width, height)
 | |
|     image = image / 255.0
 | |
|     image_shape = image.shape
 | |
|     image = image.reshape((1, image_shape[0], image_shape[1], image_shape[2]))
 | |
| 
 | |
|     y = model.predict(image)[0]
 | |
| 
 | |
|     result_dict = {}
 | |
| 
 | |
|     for i, tag in enumerate(tags):
 | |
|         result_dict[tag] = y[i]
 | |
| 
 | |
|     unsorted_tags_in_theshold = []
 | |
|     result_tags_print = []
 | |
|     for tag in tags:
 | |
|         if result_dict[tag] >= threshold:
 | |
|             if tag.startswith("rating:"):
 | |
|                 continue
 | |
|             unsorted_tags_in_theshold.append((result_dict[tag], tag))
 | |
|             result_tags_print.append(f'{result_dict[tag]} {tag}')
 | |
| 
 | |
|     # sort tags
 | |
|     result_tags_out = []
 | |
|     sort_ndx = 0
 | |
|     if alpha_sort:
 | |
|         sort_ndx = 1
 | |
| 
 | |
|     # sort by reverse by likelihood and normal for alpha, and format tag text as requested
 | |
|     unsorted_tags_in_theshold.sort(key=lambda y: y[sort_ndx], reverse=(not alpha_sort))
 | |
|     for weight, tag in unsorted_tags_in_theshold:
 | |
|         tag_outformat = tag
 | |
|         if use_spaces:
 | |
|             tag_outformat = tag_outformat.replace('_', ' ')
 | |
|         if use_escape:
 | |
|             tag_outformat = re.sub(re_special, r'\\\1', tag_outformat)
 | |
|         if include_ranks:
 | |
|             tag_outformat = f"({tag_outformat}:{weight:.3f})"
 | |
| 
 | |
|         result_tags_out.append(tag_outformat)
 | |
| 
 | |
|     print('\n'.join(sorted(result_tags_print, reverse=True)))
 | |
| 
 | |
|     return ', '.join(result_tags_out)
 | 
