| | import os |
| | import gc |
| | import pandas as pd |
| | import numpy as np |
| |
|
| | from typing import Tuple, List, Dict |
| | from io import BytesIO |
| | from PIL import Image |
| |
|
| | from pathlib import Path |
| | from huggingface_hub import hf_hub_download |
| |
|
| | from modules import shared |
| | from modules.deepbooru import re_special as tag_escape_pattern |
| |
|
| | |
| | from . import dbimutils |
| |
|
| | |
| | use_cpu = ('all' in shared.cmd_opts.use_cpu) or ( |
| | 'interrogate' in shared.cmd_opts.use_cpu) |
| |
|
| | if use_cpu: |
| | tf_device_name = '/cpu:0' |
| | else: |
| | tf_device_name = '/gpu:0' |
| |
|
| | if shared.cmd_opts.device_id is not None: |
| | try: |
| | tf_device_name = f'/gpu:{int(shared.cmd_opts.device_id)}' |
| | except ValueError: |
| | print('--device-id is not a integer') |
| |
|
| |
|
| | class Interrogator: |
| | @staticmethod |
| | def postprocess_tags( |
| | tags: Dict[str, float], |
| | |
| | threshold=0.35, |
| | additional_tags: List[str] = [], |
| | exclude_tags: List[str] = [], |
| | sort_by_alphabetical_order=False, |
| | add_confident_as_weight=False, |
| | replace_underscore=False, |
| | replace_underscore_excludes: List[str] = [], |
| | escape_tag=False |
| | ) -> Dict[str, float]: |
| |
|
| | tags = { |
| | **{t: 1.0 for t in additional_tags}, |
| | **tags |
| | } |
| |
|
| | |
| | tags = { |
| | t: c |
| |
|
| | |
| | for t, c in sorted( |
| | tags.items(), |
| | key=lambda i: i[0 if sort_by_alphabetical_order else 1], |
| | reverse=not sort_by_alphabetical_order |
| | ) |
| |
|
| | |
| | if ( |
| | c >= threshold |
| | and t not in exclude_tags |
| | ) |
| | } |
| |
|
| | new_tags = [] |
| | for tag in list(tags): |
| | new_tag = tag |
| |
|
| | if replace_underscore and tag not in replace_underscore_excludes: |
| | new_tag = new_tag.replace('_', ' ') |
| |
|
| | if escape_tag: |
| | new_tag = tag_escape_pattern.sub(r'\\\1', new_tag) |
| |
|
| | if add_confident_as_weight: |
| | new_tag = f'({new_tag}:{tags[tag]})' |
| |
|
| | new_tags.append((new_tag, tags[tag])) |
| | tags = dict(new_tags) |
| |
|
| | return tags |
| |
|
| | def __init__(self, name: str) -> None: |
| | self.name = name |
| |
|
| | def load(self): |
| | raise NotImplementedError() |
| |
|
| | def unload(self) -> bool: |
| | unloaded = False |
| |
|
| | if hasattr(self, 'model') and self.model is not None: |
| | del self.model |
| | unloaded = True |
| | print(f'Unloaded {self.name}') |
| |
|
| | if hasattr(self, 'tags'): |
| | del self.tags |
| |
|
| | return unloaded |
| |
|
| | def interrogate( |
| | self, |
| | image: Image |
| | ) -> Tuple[ |
| | Dict[str, float], |
| | Dict[str, float] |
| | ]: |
| | raise NotImplementedError() |
| |
|
| |
|
| | class DeepDanbooruInterrogator(Interrogator): |
| | def __init__(self, name: str, project_path: os.PathLike) -> None: |
| | super().__init__(name) |
| | self.project_path = project_path |
| |
|
| | def load(self) -> None: |
| | print(f'Loading {self.name} from {str(self.project_path)}') |
| |
|
| | |
| | |
| | from launch import is_installed, run_pip |
| | if not is_installed('deepdanbooru'): |
| | package = os.environ.get( |
| | 'DEEPDANBOORU_PACKAGE', |
| | 'git+https://github.com/KichangKim/DeepDanbooru.git@d91a2963bf87c6a770d74894667e9ffa9f6de7ff' |
| | ) |
| |
|
| | run_pip( |
| | f'install {package} tensorflow tensorflow-io', 'deepdanbooru') |
| |
|
| | import tensorflow as tf |
| |
|
| | |
| | |
| | |
| | for device in tf.config.experimental.list_physical_devices('GPU'): |
| | tf.config.experimental.set_memory_growth(device, True) |
| |
|
| | with tf.device(tf_device_name): |
| | import deepdanbooru.project as ddp |
| |
|
| | self.model = ddp.load_model_from_project( |
| | project_path=self.project_path, |
| | compile_model=False |
| | ) |
| |
|
| | print(f'Loaded {self.name} model from {str(self.project_path)}') |
| |
|
| | self.tags = ddp.load_tags_from_project( |
| | project_path=self.project_path |
| | ) |
| |
|
| | def unload(self) -> bool: |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | return False |
| |
|
| | def interrogate( |
| | self, |
| | image: Image |
| | ) -> Tuple[ |
| | Dict[str, float], |
| | Dict[str, float] |
| | ]: |
| | |
| | if not hasattr(self, 'model') or self.model is None: |
| | self.load() |
| |
|
| | import deepdanbooru.data as ddd |
| |
|
| | |
| | image_bufs = BytesIO() |
| | image.save(image_bufs, format='PNG') |
| | image = ddd.load_image_for_evaluate( |
| | image_bufs, |
| | self.model.input_shape[2], |
| | self.model.input_shape[1] |
| | ) |
| |
|
| | image = image.reshape((1, *image.shape[0:3])) |
| |
|
| | |
| | result = self.model.predict(image) |
| |
|
| | confidents = result[0].tolist() |
| | ratings = {} |
| | tags = {} |
| |
|
| | for i, tag in enumerate(self.tags): |
| | tags[tag] = confidents[i] |
| |
|
| | return ratings, tags |
| |
|
| |
|
| | class WaifuDiffusionInterrogator(Interrogator): |
| | def __init__( |
| | self, |
| | name: str, |
| | model_path='model.onnx', |
| | tags_path='selected_tags.csv', |
| | **kwargs |
| | ) -> None: |
| | super().__init__(name) |
| | self.model_path = model_path |
| | self.tags_path = tags_path |
| | self.kwargs = kwargs |
| |
|
| | def download(self) -> Tuple[os.PathLike, os.PathLike]: |
| | |
| | print(self.model_path, self.tags_path) |
| | if os.path.exists(self.model_path) and os.path.exists(self.tags_path): |
| | return self.model_path, self.tags_path |
| | print(f"Loading {self.name} model file from {self.kwargs['repo_id']}") |
| |
|
| | model_path = Path(hf_hub_download( |
| | **self.kwargs, filename=self.model_path)) |
| | tags_path = Path(hf_hub_download( |
| | **self.kwargs, filename=self.tags_path)) |
| | return model_path, tags_path |
| |
|
| | def load(self) -> None: |
| | model_path, tags_path = self.download() |
| |
|
| | |
| | |
| | |
| | from launch import is_installed, run_pip |
| | if not is_installed('onnxruntime'): |
| | package = os.environ.get( |
| | 'ONNXRUNTIME_PACKAGE', |
| | 'onnxruntime-gpu' |
| | ) |
| |
|
| | run_pip(f'install {package}', 'onnxruntime') |
| |
|
| | from onnxruntime import InferenceSession |
| |
|
| | |
| | |
| | providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] |
| | if use_cpu: |
| | providers.pop(0) |
| |
|
| | self.model = InferenceSession(str(model_path), providers=providers) |
| |
|
| | print(f'Loaded {self.name} model from {model_path}') |
| |
|
| | self.tags = pd.read_csv(tags_path) |
| |
|
| | def interrogate( |
| | self, |
| | image: Image |
| | ) -> Tuple[ |
| | Dict[str, float], |
| | Dict[str, float] |
| | ]: |
| | |
| | if not hasattr(self, 'model') or self.model is None: |
| | self.load() |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | _, height, _, _ = self.model.get_inputs()[0].shape |
| |
|
| | |
| | image = image.convert('RGBA') |
| | new_image = Image.new('RGBA', image.size, 'WHITE') |
| | new_image.paste(image, mask=image) |
| | image = new_image.convert('RGB') |
| | image = np.asarray(image) |
| |
|
| | |
| | image = image[:, :, ::-1] |
| |
|
| | image = dbimutils.make_square(image, height) |
| | image = dbimutils.smart_resize(image, height) |
| | image = image.astype(np.float32) |
| | image = np.expand_dims(image, 0) |
| |
|
| | |
| | input_name = self.model.get_inputs()[0].name |
| | label_name = self.model.get_outputs()[0].name |
| | confidents = self.model.run([label_name], {input_name: image})[0] |
| |
|
| | tags = self.tags[:][['name']] |
| | tags['confidents'] = confidents[0] |
| |
|
| | |
| | ratings = dict(tags[:4].values) |
| |
|
| | |
| | tags = dict(tags[4:].values) |
| |
|
| | return ratings, tags |
| |
|