Created
April 24, 2020 21:27
-
-
Save TakaoNarikawa/aef13571eec97d78603688eef05b5389 to your computer and use it in GitHub Desktop.
Convert darknet pre-trained model to CoreML model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
Reads Darknet config and weights and creates Keras model with TF backend. | |
""" | |
import argparse | |
import configparser | |
import io | |
import os | |
from collections import defaultdict | |
from PIL import Image | |
from yolo3.utils import letterbox_image | |
import numpy as np | |
from keras import backend as K | |
from keras.engine.base_layer import Layer | |
from keras.layers import (Conv2D, Input, ZeroPadding2D, Add, | |
UpSampling2D, MaxPooling2D, Concatenate, Lambda) | |
from keras.layers.advanced_activations import LeakyReLU | |
from keras.layers.normalization import BatchNormalization | |
from keras.models import Model, load_model | |
from keras.regularizers import l2 | |
from keras.utils.vis_utils import plot_model as plot | |
import coremltools | |
from coremltools.proto import NeuralNetwork_pb2 | |
parser = argparse.ArgumentParser(description='Darknet To Keras Converter.') | |
parser.add_argument('config_path', help='Path to Darknet cfg file.') | |
parser.add_argument('weights_path', help='Path to Darknet weights file.') | |
parser.add_argument('output_path', help='Path to output Keras model file.') | |
parser.add_argument( | |
'-p', | |
'--plot_model', | |
help='Plot generated Keras model and save as image.', | |
action='store_true') | |
parser.add_argument( | |
'-w', | |
'--weights_only', | |
help='Save as Keras weights file instead of model file.', | |
action='store_true') | |
class Mish(Layer): | |
''' | |
Mish Activation Function. | |
.. math:: | |
mish(x) = x * tanh(softplus(x)) = x * tanh(ln(1 + e^{x})) | |
Shape: | |
- Input: Arbitrary. Use the keyword argument `input_shape` | |
(tuple of integers, does not include the samples axis) | |
when using this layer as the first layer in a model. | |
- Output: Same shape as the input. | |
Examples: | |
>>> X_input = Input(input_shape) | |
>>> X = Mish()(X_input) | |
''' | |
def __init__(self, **kwargs): | |
super(Mish, self).__init__(**kwargs) | |
self.supports_masking = True | |
def call(self, inputs): | |
return inputs * K.tanh(K.softplus(inputs)) | |
def get_config(self): | |
config = super(Mish, self).get_config() | |
return config | |
def compute_output_shape(self, input_shape): | |
return input_shape | |
def convert_mish(layer): | |
params = NeuralNetwork_pb2.CustomLayerParams() | |
params.className = "Mish" | |
params.description = "Mish Activation Layer" | |
return params | |
def unique_config_sections(config_file): | |
"""Convert all config sections to have unique names. | |
Adds unique suffixes to config sections for compability with configparser. | |
""" | |
section_counters = defaultdict(int) | |
output_stream = io.StringIO() | |
with open(config_file) as fin: | |
for line in fin: | |
if line.startswith('['): | |
section = line.strip().strip('[]') | |
_section = section + '_' + str(section_counters[section]) | |
section_counters[section] += 1 | |
line = line.replace(section, _section) | |
output_stream.write(line) | |
output_stream.seek(0) | |
return output_stream | |
# %% | |
def _main(args): | |
config_path = os.path.expanduser(args.config_path) | |
weights_path = os.path.expanduser(args.weights_path) | |
assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format( | |
config_path) | |
assert weights_path.endswith( | |
'.weights'), '{} is not a .weights file'.format(weights_path) | |
output_path = os.path.expanduser(args.output_path) | |
assert output_path.endswith( | |
'.mlmodel'), 'output path {} is not a .mlmodel file'.format(output_path) | |
output_root = os.path.splitext(output_path)[0] | |
# Load weights and config. | |
print('Loading weights.') | |
weights_file = open(weights_path, 'rb') | |
major, minor, revision = np.ndarray( | |
shape=(3, ), dtype='int32', buffer=weights_file.read(12)) | |
if (major*10+minor)>=2 and major<1000 and minor<1000: | |
seen = np.ndarray(shape=(1,), dtype='int64', buffer=weights_file.read(8)) | |
else: | |
seen = np.ndarray(shape=(1,), dtype='int32', buffer=weights_file.read(4)) | |
print('Weights Header: ', major, minor, revision, seen) | |
print('Parsing Darknet config.') | |
unique_config_file = unique_config_sections(config_path) | |
cfg_parser = configparser.ConfigParser() | |
cfg_parser.read_file(unique_config_file) | |
print('Creating Keras model.') | |
input_layer = Input(shape=(416, 416, 3)) | |
prev_layer = input_layer | |
all_layers = [] | |
weight_decay = float(cfg_parser['net_0']['decay'] | |
) if 'net_0' in cfg_parser.sections() else 5e-4 | |
count = 0 | |
out_index = [] | |
for section in cfg_parser.sections(): | |
print('Parsing section {}'.format(section)) | |
if section.startswith('convolutional'): | |
filters = int(cfg_parser[section]['filters']) | |
size = int(cfg_parser[section]['size']) | |
stride = int(cfg_parser[section]['stride']) | |
pad = int(cfg_parser[section]['pad']) | |
activation = cfg_parser[section]['activation'] | |
batch_normalize = 'batch_normalize' in cfg_parser[section] | |
padding = 'same' if pad == 1 and stride == 1 else 'valid' | |
# Setting weights. | |
# Darknet serializes convolutional weights as: | |
# [bias/beta, [gamma, mean, variance], conv_weights] | |
prev_layer_shape = K.int_shape(prev_layer) | |
weights_shape = (size, size, prev_layer_shape[-1], filters) | |
darknet_w_shape = (filters, weights_shape[2], size, size) | |
weights_size = np.product(weights_shape) | |
print('conv2d', 'bn' | |
if batch_normalize else ' ', activation, weights_shape) | |
conv_bias = np.ndarray( | |
shape=(filters, ), | |
dtype='float32', | |
buffer=weights_file.read(filters * 4)) | |
count += filters | |
if batch_normalize: | |
bn_weights = np.ndarray( | |
shape=(3, filters), | |
dtype='float32', | |
buffer=weights_file.read(filters * 12)) | |
count += 3 * filters | |
bn_weight_list = [ | |
bn_weights[0], # scale gamma | |
conv_bias, # shift beta | |
bn_weights[1], # running mean | |
bn_weights[2] # running var | |
] | |
conv_weights = np.ndarray( | |
shape=darknet_w_shape, | |
dtype='float32', | |
buffer=weights_file.read(weights_size * 4)) | |
count += weights_size | |
# DarkNet conv_weights are serialized Caffe-style: | |
# (out_dim, in_dim, height, width) | |
# We would like to set these to Tensorflow order: | |
# (height, width, in_dim, out_dim) | |
conv_weights = np.transpose(conv_weights, [2, 3, 1, 0]) | |
conv_weights = [conv_weights] if batch_normalize else [ | |
conv_weights, conv_bias | |
] | |
# Handle activation. | |
act_fn = None | |
if activation == 'leaky': | |
pass # Add advanced activation later. | |
elif activation == 'mish': | |
pass | |
elif activation != 'linear': | |
raise ValueError( | |
'Unknown activation function `{}` in section {}'.format( | |
activation, section)) | |
# Create Conv2D layer | |
if stride>1: | |
# Darknet uses left and top padding instead of 'same' mode | |
prev_layer = ZeroPadding2D(((1,0),(1,0)))(prev_layer) | |
conv_layer = (Conv2D( | |
filters, (size, size), | |
strides=(stride, stride), | |
kernel_regularizer=l2(weight_decay), | |
use_bias=not batch_normalize, | |
weights=conv_weights, | |
activation=act_fn, | |
padding=padding))(prev_layer) | |
if batch_normalize: | |
conv_layer = (BatchNormalization( | |
weights=bn_weight_list))(conv_layer) | |
prev_layer = conv_layer | |
if activation == 'linear': | |
all_layers.append(prev_layer) | |
elif activation == 'mish': | |
act_layer = Mish()(prev_layer) | |
prev_layer = act_layer | |
all_layers.append(act_layer) | |
elif activation == 'leaky': | |
act_layer = LeakyReLU(alpha=0.1)(prev_layer) | |
prev_layer = act_layer | |
all_layers.append(act_layer) | |
elif section.startswith('route'): | |
ids = [int(i) for i in cfg_parser[section]['layers'].split(',')] | |
layers = [all_layers[i] for i in ids] | |
if len(layers) > 1: | |
print('Concatenating route layers:', layers) | |
concatenate_layer = Concatenate()(layers) | |
all_layers.append(concatenate_layer) | |
prev_layer = concatenate_layer | |
else: | |
skip_layer = layers[0] # only one layer to route | |
all_layers.append(skip_layer) | |
prev_layer = skip_layer | |
elif section.startswith('maxpool'): | |
size = int(cfg_parser[section]['size']) | |
stride = int(cfg_parser[section]['stride']) | |
all_layers.append( | |
MaxPooling2D( | |
pool_size=(size, size), | |
strides=(stride, stride), | |
padding='same')(prev_layer)) | |
prev_layer = all_layers[-1] | |
elif section.startswith('shortcut'): | |
index = int(cfg_parser[section]['from']) | |
activation = cfg_parser[section]['activation'] | |
assert activation == 'linear', 'Only linear activation supported.' | |
all_layers.append(Add()([all_layers[index], prev_layer])) | |
prev_layer = all_layers[-1] | |
elif section.startswith('upsample'): | |
stride = int(cfg_parser[section]['stride']) | |
assert stride == 2, 'Only stride=2 supported.' | |
all_layers.append(UpSampling2D(stride)(prev_layer)) | |
prev_layer = all_layers[-1] | |
elif section.startswith('yolo'): | |
out_index.append(len(all_layers)-1) | |
all_layers.append(None) | |
prev_layer = all_layers[-1] | |
elif section.startswith('net'): | |
pass | |
else: | |
raise ValueError( | |
'Unsupported section header type: {}'.format(section)) | |
# Create and save model. | |
if len(out_index)==0: out_index.append(len(all_layers)-1) | |
model = Model(inputs=input_layer, outputs=[all_layers[i] for i in out_index]) | |
model.summary() | |
coreml_model = coremltools.converters.keras.convert( | |
model, | |
input_names='input1', image_input_names='input1', output_names=['output3', 'output2', 'output1'], image_scale=1/255., | |
add_custom_layers=True,custom_conversion_functions={ "Mish": convert_mish }) | |
coreml_model.input_description['input1'] = 'Input image' | |
coreml_model.output_description['output1'] = 'The 13x13 grid (Scale1)' | |
coreml_model.output_description['output2'] = 'The 26x26 grid (Scale2)' | |
coreml_model.output_description['output3'] = 'The 52x52 grid (Scale3)' | |
coreml_model.save(output_path) | |
if __name__ == '__main__': | |
_main(parser.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment