| import argparse |
|
|
| import huggingface_hub |
| import k_diffusion as K |
| import torch |
|
|
| from diffusers import UNet2DConditionModel |
|
|
|
|
| UPSCALER_REPO = "pcuenq/k-upscaler" |
|
|
|
|
| def resnet_to_diffusers_checkpoint(resnet, checkpoint, *, diffusers_resnet_prefix, resnet_prefix): |
| rv = { |
| |
| f"{diffusers_resnet_prefix}.norm1.linear.weight": checkpoint[f"{resnet_prefix}.main.0.mapper.weight"], |
| f"{diffusers_resnet_prefix}.norm1.linear.bias": checkpoint[f"{resnet_prefix}.main.0.mapper.bias"], |
| |
| f"{diffusers_resnet_prefix}.conv1.weight": checkpoint[f"{resnet_prefix}.main.2.weight"], |
| f"{diffusers_resnet_prefix}.conv1.bias": checkpoint[f"{resnet_prefix}.main.2.bias"], |
| |
| f"{diffusers_resnet_prefix}.norm2.linear.weight": checkpoint[f"{resnet_prefix}.main.4.mapper.weight"], |
| f"{diffusers_resnet_prefix}.norm2.linear.bias": checkpoint[f"{resnet_prefix}.main.4.mapper.bias"], |
| |
| f"{diffusers_resnet_prefix}.conv2.weight": checkpoint[f"{resnet_prefix}.main.6.weight"], |
| f"{diffusers_resnet_prefix}.conv2.bias": checkpoint[f"{resnet_prefix}.main.6.bias"], |
| } |
|
|
| if resnet.conv_shortcut is not None: |
| rv.update( |
| { |
| f"{diffusers_resnet_prefix}.conv_shortcut.weight": checkpoint[f"{resnet_prefix}.skip.weight"], |
| } |
| ) |
|
|
| return rv |
|
|
|
|
| def self_attn_to_diffusers_checkpoint(checkpoint, *, diffusers_attention_prefix, attention_prefix): |
| weight_q, weight_k, weight_v = checkpoint[f"{attention_prefix}.qkv_proj.weight"].chunk(3, dim=0) |
| bias_q, bias_k, bias_v = checkpoint[f"{attention_prefix}.qkv_proj.bias"].chunk(3, dim=0) |
| rv = { |
| |
| f"{diffusers_attention_prefix}.norm1.linear.weight": checkpoint[f"{attention_prefix}.norm_in.mapper.weight"], |
| f"{diffusers_attention_prefix}.norm1.linear.bias": checkpoint[f"{attention_prefix}.norm_in.mapper.bias"], |
| |
| f"{diffusers_attention_prefix}.attn1.to_q.weight": weight_q.squeeze(-1).squeeze(-1), |
| f"{diffusers_attention_prefix}.attn1.to_q.bias": bias_q, |
| |
| f"{diffusers_attention_prefix}.attn1.to_k.weight": weight_k.squeeze(-1).squeeze(-1), |
| f"{diffusers_attention_prefix}.attn1.to_k.bias": bias_k, |
| |
| f"{diffusers_attention_prefix}.attn1.to_v.weight": weight_v.squeeze(-1).squeeze(-1), |
| f"{diffusers_attention_prefix}.attn1.to_v.bias": bias_v, |
| |
| f"{diffusers_attention_prefix}.attn1.to_out.0.weight": checkpoint[f"{attention_prefix}.out_proj.weight"] |
| .squeeze(-1) |
| .squeeze(-1), |
| f"{diffusers_attention_prefix}.attn1.to_out.0.bias": checkpoint[f"{attention_prefix}.out_proj.bias"], |
| } |
|
|
| return rv |
|
|
|
|
| def cross_attn_to_diffusers_checkpoint( |
| checkpoint, *, diffusers_attention_prefix, diffusers_attention_index, attention_prefix |
| ): |
| weight_k, weight_v = checkpoint[f"{attention_prefix}.kv_proj.weight"].chunk(2, dim=0) |
| bias_k, bias_v = checkpoint[f"{attention_prefix}.kv_proj.bias"].chunk(2, dim=0) |
|
|
| rv = { |
| |
| f"{diffusers_attention_prefix}.norm{diffusers_attention_index}.linear.weight": checkpoint[ |
| f"{attention_prefix}.norm_dec.mapper.weight" |
| ], |
| f"{diffusers_attention_prefix}.norm{diffusers_attention_index}.linear.bias": checkpoint[ |
| f"{attention_prefix}.norm_dec.mapper.bias" |
| ], |
| |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.norm_cross.weight": checkpoint[ |
| f"{attention_prefix}.norm_enc.weight" |
| ], |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.norm_cross.bias": checkpoint[ |
| f"{attention_prefix}.norm_enc.bias" |
| ], |
| |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_q.weight": checkpoint[ |
| f"{attention_prefix}.q_proj.weight" |
| ] |
| .squeeze(-1) |
| .squeeze(-1), |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_q.bias": checkpoint[ |
| f"{attention_prefix}.q_proj.bias" |
| ], |
| |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_k.weight": weight_k.squeeze(-1).squeeze(-1), |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_k.bias": bias_k, |
| |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_v.weight": weight_v.squeeze(-1).squeeze(-1), |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_v.bias": bias_v, |
| |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_out.0.weight": checkpoint[ |
| f"{attention_prefix}.out_proj.weight" |
| ] |
| .squeeze(-1) |
| .squeeze(-1), |
| f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_out.0.bias": checkpoint[ |
| f"{attention_prefix}.out_proj.bias" |
| ], |
| } |
|
|
| return rv |
|
|
|
|
| def block_to_diffusers_checkpoint(block, checkpoint, block_idx, block_type): |
| block_prefix = "inner_model.u_net.u_blocks" if block_type == "up" else "inner_model.u_net.d_blocks" |
| block_prefix = f"{block_prefix}.{block_idx}" |
|
|
| diffusers_checkpoint = {} |
|
|
| if not hasattr(block, "attentions"): |
| n = 1 |
| elif not block.attentions[0].add_self_attention: |
| n = 2 |
| else: |
| n = 3 |
|
|
| for resnet_idx, resnet in enumerate(block.resnets): |
| |
| diffusers_resnet_prefix = f"{block_type}_blocks.{block_idx}.resnets.{resnet_idx}" |
| idx = n * resnet_idx if block_type == "up" else n * resnet_idx + 1 |
| resnet_prefix = f"{block_prefix}.{idx}" if block_type == "up" else f"{block_prefix}.{idx}" |
|
|
| diffusers_checkpoint.update( |
| resnet_to_diffusers_checkpoint( |
| resnet, checkpoint, diffusers_resnet_prefix=diffusers_resnet_prefix, resnet_prefix=resnet_prefix |
| ) |
| ) |
|
|
| if hasattr(block, "attentions"): |
| for attention_idx, attention in enumerate(block.attentions): |
| diffusers_attention_prefix = f"{block_type}_blocks.{block_idx}.attentions.{attention_idx}" |
| idx = n * attention_idx + 1 if block_type == "up" else n * attention_idx + 2 |
| self_attention_prefix = f"{block_prefix}.{idx}" |
| cross_attention_prefix = f"{block_prefix}.{idx }" |
| cross_attention_index = 1 if not attention.add_self_attention else 2 |
| idx = ( |
| n * attention_idx + cross_attention_index |
| if block_type == "up" |
| else n * attention_idx + cross_attention_index + 1 |
| ) |
| cross_attention_prefix = f"{block_prefix}.{idx }" |
|
|
| diffusers_checkpoint.update( |
| cross_attn_to_diffusers_checkpoint( |
| checkpoint, |
| diffusers_attention_prefix=diffusers_attention_prefix, |
| diffusers_attention_index=2, |
| attention_prefix=cross_attention_prefix, |
| ) |
| ) |
|
|
| if attention.add_self_attention is True: |
| diffusers_checkpoint.update( |
| self_attn_to_diffusers_checkpoint( |
| checkpoint, |
| diffusers_attention_prefix=diffusers_attention_prefix, |
| attention_prefix=self_attention_prefix, |
| ) |
| ) |
|
|
| return diffusers_checkpoint |
|
|
|
|
| def unet_to_diffusers_checkpoint(model, checkpoint): |
| diffusers_checkpoint = {} |
|
|
| |
| diffusers_checkpoint.update( |
| { |
| "conv_in.weight": checkpoint["inner_model.proj_in.weight"], |
| "conv_in.bias": checkpoint["inner_model.proj_in.bias"], |
| } |
| ) |
|
|
| |
| diffusers_checkpoint.update( |
| { |
| "time_proj.weight": checkpoint["inner_model.timestep_embed.weight"].squeeze(-1), |
| "time_embedding.linear_1.weight": checkpoint["inner_model.mapping.0.weight"], |
| "time_embedding.linear_1.bias": checkpoint["inner_model.mapping.0.bias"], |
| "time_embedding.linear_2.weight": checkpoint["inner_model.mapping.2.weight"], |
| "time_embedding.linear_2.bias": checkpoint["inner_model.mapping.2.bias"], |
| "time_embedding.cond_proj.weight": checkpoint["inner_model.mapping_cond.weight"], |
| } |
| ) |
|
|
| |
| for down_block_idx, down_block in enumerate(model.down_blocks): |
| diffusers_checkpoint.update(block_to_diffusers_checkpoint(down_block, checkpoint, down_block_idx, "down")) |
|
|
| |
| for up_block_idx, up_block in enumerate(model.up_blocks): |
| diffusers_checkpoint.update(block_to_diffusers_checkpoint(up_block, checkpoint, up_block_idx, "up")) |
|
|
| |
| diffusers_checkpoint.update( |
| { |
| "conv_out.weight": checkpoint["inner_model.proj_out.weight"], |
| "conv_out.bias": checkpoint["inner_model.proj_out.bias"], |
| } |
| ) |
|
|
| return diffusers_checkpoint |
|
|
|
|
| def unet_model_from_original_config(original_config): |
| in_channels = original_config["input_channels"] + original_config["unet_cond_dim"] |
| out_channels = original_config["input_channels"] + (1 if original_config["has_variance"] else 0) |
|
|
| block_out_channels = original_config["channels"] |
|
|
| assert ( |
| len(set(original_config["depths"])) == 1 |
| ), "UNet2DConditionModel currently do not support blocks with different number of layers" |
| layers_per_block = original_config["depths"][0] |
|
|
| class_labels_dim = original_config["mapping_cond_dim"] |
| cross_attention_dim = original_config["cross_cond_dim"] |
|
|
| attn1_types = [] |
| attn2_types = [] |
| for s, c in zip(original_config["self_attn_depths"], original_config["cross_attn_depths"]): |
| if s: |
| a1 = "self" |
| a2 = "cross" if c else None |
| elif c: |
| a1 = "cross" |
| a2 = None |
| else: |
| a1 = None |
| a2 = None |
| attn1_types.append(a1) |
| attn2_types.append(a2) |
|
|
| unet = UNet2DConditionModel( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| down_block_types=("KDownBlock2D", "KCrossAttnDownBlock2D", "KCrossAttnDownBlock2D", "KCrossAttnDownBlock2D"), |
| mid_block_type=None, |
| up_block_types=("KCrossAttnUpBlock2D", "KCrossAttnUpBlock2D", "KCrossAttnUpBlock2D", "KUpBlock2D"), |
| block_out_channels=block_out_channels, |
| layers_per_block=layers_per_block, |
| act_fn="gelu", |
| norm_num_groups=None, |
| cross_attention_dim=cross_attention_dim, |
| attention_head_dim=64, |
| time_cond_proj_dim=class_labels_dim, |
| resnet_time_scale_shift="scale_shift", |
| time_embedding_type="fourier", |
| timestep_post_act="gelu", |
| conv_in_kernel=1, |
| conv_out_kernel=1, |
| ) |
|
|
| return unet |
|
|
|
|
| def main(args): |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| orig_config_path = huggingface_hub.hf_hub_download(UPSCALER_REPO, "config_laion_text_cond_latent_upscaler_2.json") |
| orig_weights_path = huggingface_hub.hf_hub_download( |
| UPSCALER_REPO, "laion_text_cond_latent_upscaler_2_1_00470000_slim.pth" |
| ) |
| print(f"loading original model configuration from {orig_config_path}") |
| print(f"loading original model checkpoint from {orig_weights_path}") |
|
|
| print("converting to diffusers unet") |
| orig_config = K.config.load_config(open(orig_config_path))["model"] |
| model = unet_model_from_original_config(orig_config) |
|
|
| orig_checkpoint = torch.load(orig_weights_path, map_location=device)["model_ema"] |
| converted_checkpoint = unet_to_diffusers_checkpoint(model, orig_checkpoint) |
|
|
| model.load_state_dict(converted_checkpoint, strict=True) |
| model.save_pretrained(args.dump_path) |
| print(f"saving converted unet model in {args.dump_path}") |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
|
|
| parser.add_argument("--dump_path", default=None, type=str, required=True, help="Path to the output model.") |
| args = parser.parse_args() |
|
|
| main(args) |
|
|