Created
August 20, 2020 18:50
-
-
Save orwa-te/53b5ca83e36b43ef704aae53ecb751dd to your computer and use it in GitHub Desktop.
TFOS code to train my model on Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Adapted from: https://www.tensorflow.org/beta/tutorials/distribute/multi_worker_with_keras | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
def main_fun(args, ctx): | |
import tensorflow as tf | |
import numpy as np | |
import imagecodecs | |
#My params | |
work_images=2 | |
import time | |
start_time=time.time() | |
def build_unet_model(shape = (None,None,4)): | |
from tensorflow.keras import Input | |
from tensorflow.keras.models import Model | |
from tensorflow.keras.layers import Conv2D,MaxPooling2D, UpSampling2D,BatchNormalization,Dropout,concatenate | |
from tensorflow.keras.optimizers import Adam | |
# import tensorflow as tf | |
# from keras.callbacks import ModelCheckpoint, LearningRateScheduler | |
# from keras.preprocessing.image import ImageDataGenerator | |
# from keras import backend as keras | |
# Left side of the U-Net | |
inputs = Input(shape) | |
# in_shape = inputs.shape | |
# print(in_shape) | |
conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(inputs) | |
conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv1) | |
conv1 = BatchNormalization()(conv1) | |
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1) | |
conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool1) | |
conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv2) | |
conv2 = BatchNormalization()(conv2) | |
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2) | |
conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool2) | |
conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv3) | |
conv3 = BatchNormalization()(conv3) | |
pool3 = MaxPooling2D(pool_size=(2, 2))(conv3) | |
conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool3) | |
conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv4) | |
conv4 = BatchNormalization()(conv4) | |
drop4 = Dropout(0.5)(conv4) | |
pool4 = MaxPooling2D(pool_size=(2, 2))(drop4) | |
# Bottom of the U-Net | |
conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool4) | |
conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv5) | |
conv5 = BatchNormalization()(conv5) | |
drop5 = Dropout(0.5)(conv5) | |
# Upsampling Starts, right side of the U-Net | |
up6 = Conv2D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(drop5)) | |
merge6 = concatenate([drop4,up6], axis = 3) | |
conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge6) | |
conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv6) | |
conv6 = BatchNormalization()(conv6) | |
up7 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(conv6)) | |
merge7 = concatenate([conv3,up7], axis = 3) | |
conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge7) | |
conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv7) | |
conv7 = BatchNormalization()(conv7) | |
up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(conv7)) | |
merge8 = concatenate([conv2,up8], axis = 3) | |
conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge8) | |
conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv8) | |
conv8 = BatchNormalization()(conv8) | |
up9 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(conv8)) | |
merge9 = concatenate([conv1,up9], axis = 3) | |
conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge9) | |
conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv9) | |
conv9 = Conv2D(16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv9) | |
conv9 = BatchNormalization()(conv9) | |
# Output layer of the U-Net with a softmax activation | |
conv10 = Conv2D(9, 1, activation = 'softmax')(conv9) | |
model = Model(inputs = inputs, outputs = conv10) | |
model.compile(optimizer = Adam(lr = 0.0001), loss = 'categorical_crossentropy', metrics = ['accuracy']) | |
return model | |
# Resizing the image to nearest dimensions multipls of 'stride' | |
def resize(img, stride, n_h, n_w): | |
#h,l,_ = img.shape | |
ne_h = (n_h*stride) + stride | |
ne_w = (n_w*stride) + stride | |
# img_resized = imresize(img, (ne_h,ne_w)) | |
img_resized = img.reshape(ne_h,ne_w) | |
return img_resized | |
# Padding at the bottem and at the left of images to be able to crop them into 128*128 images for training | |
def padding(img, w, h, c, crop_size, stride, n_h, n_w): | |
w_extra = w - ((n_w-1)*stride) | |
w_toadd = crop_size - w_extra | |
h_extra = h - ((n_h-1)*stride) | |
h_toadd = crop_size - h_extra | |
img_pad = np.zeros(((h+h_toadd), (w+w_toadd), c)) | |
#img_pad[:h, :w,:] = img | |
#img_pad = img_pad+img | |
img_pad = np.pad(img, [(0, h_toadd), (0, w_toadd), (0,0)], mode='constant') | |
return img_pad | |
# Adding pixels to make the image with shape in multiples of stride | |
def add_pixals(img, h, w, c, n_h, n_w, crop_size, stride): | |
w_extra = w - ((n_w-1)*stride) | |
w_toadd = crop_size - w_extra | |
h_extra = h - ((n_h-1)*stride) | |
h_toadd = crop_size - h_extra | |
img_add = np.zeros(((h+h_toadd), (w+w_toadd), c)) | |
img_add[:h, :w,:] = img | |
img_add[h:, :w,:] = img[:h_toadd,:, :] | |
img_add[:h,w:,:] = img[:,:w_toadd,:] | |
img_add[h:,w:,:] = img[h-h_toadd:h,w-w_toadd:w,:] | |
return img_add | |
# Slicing the image into crop_size*crop_size crops with a stride of crop_size/2 and makking list out of them | |
def crops(a, crop_size = 128): | |
#stride = int(crop_size/2) | |
stride = 48 | |
croped_images = [] | |
h, w, c = a.shape | |
n_h = int(int(h/stride)) | |
n_w = int(int(w/stride)) | |
# Padding using the padding function we wrote | |
a = padding(a, w, h, c, crop_size, stride, n_h, n_w) | |
# Resizing as required | |
##a = resize(a, stride, n_h, n_w) | |
# Adding pixals as required | |
#a = add_pixals(a, h, w, c, n_h, n_w, crop_size, stride) | |
# Slicing the image into 128*128 crops with a stride of 64 | |
for i in range(n_h-1): | |
for j in range(n_w-1): | |
crop_x = a[(i*stride):((i*stride)+crop_size), (j*stride):((j*stride)+crop_size), :] | |
croped_images.append(crop_x) | |
return croped_images | |
def get_images_paths(parent_path, is_label): | |
sub_path='gt' | |
if(is_label==False): | |
sub_path='sat' | |
paths=[] | |
for i in range(work_images): | |
paths.append(parent_path+'/'+sub_path+'/'+str(i+1)+'.tif') | |
return paths | |
def read_tif_image_from_hdfs(path, imagecodecs): | |
from pyarrow import hdfs | |
connect = hdfs.connect(host='master',port=9000) | |
img_file = connect.open(path, mode='rb') | |
img_bytes = img_file.read() | |
numpy_img = imagecodecs.tiff_decode(img_bytes) | |
return numpy_img | |
color_dict = {0: (0, 0, 0), | |
1: (0, 125, 0), | |
2: (150, 80, 0), | |
3: (255, 255, 0), | |
4: (100, 100, 100), | |
5: (0, 255, 0), | |
6: (0, 0, 150), | |
7: (150, 150, 255), | |
8: (255, 255, 255)} | |
def rgb_to_onehot(rgb_arr, color_dict): | |
num_classes = len(color_dict) | |
shape = rgb_arr.shape[:2]+(num_classes,) | |
#print(shape) | |
arr = np.zeros( shape, dtype=np.int8 ) | |
for i, cls in enumerate(color_dict): | |
arr[:,:,i] = np.all(rgb_arr.reshape( (-1,3) ) == color_dict[i], axis=1).reshape(shape[:2]) | |
return arr | |
def onehot_to_rgb(onehot, color_dict): | |
single_layer = np.argmax(onehot, axis=-1) | |
output = np.zeros( onehot.shape[:2]+(3,) ) | |
for k in color_dict.keys(): | |
output[single_layer==k] = color_dict[k] | |
return np.uint8(output) | |
hdfs_path='/Inter-IIT-CSRE/The-Eye-in-the-Sky-dataset' | |
filelist_trainx = get_images_paths(hdfs_path, False) | |
filelist_trainy = get_images_paths(hdfs_path, True) | |
# Reading, padding, cropping and making array of all the cropped images of all the trainig sat images | |
trainx_list = [] | |
for fname in filelist_trainx[:work_images]: | |
# Reading the image | |
image = read_tif_image_from_hdfs(fname, imagecodecs) | |
# Padding as required and cropping | |
crops_list = crops(image) | |
#print(len(crops_list)) | |
trainx_list = trainx_list + crops_list | |
# Array of all the cropped Training sat Images | |
trainx = np.asarray(trainx_list) | |
#print("size="+str(len(trainx))) | |
# Reading, padding, cropping and making array of all the cropped images of all the trainig gt images | |
trainy_list = [] | |
for fname in filelist_trainy[:work_images]: | |
# Reading the image | |
image = read_tif_image_from_hdfs(fname, imagecodecs) | |
# Padding as required and cropping | |
crops_list =crops(image) | |
trainy_list = trainy_list + crops_list | |
# Array of all the cropped Training gt Images | |
trainy = np.asarray(trainy_list) | |
# Convert trainy and testy into one hot encode | |
trainy_hot = [] | |
for i in range(trainy.shape[0]): | |
hot_img = rgb_to_onehot(trainy[i], color_dict) | |
trainy_hot.append(hot_img) | |
trainy_hot = np.asarray(trainy_hot) | |
#Use only required data | |
import math | |
#deduct extra examples to fit generator | |
int_num_batches = math.floor(trainx.shape[0]/args.batch_size) | |
trainx = trainx[:int_num_batches*args.batch_size] | |
trainy_hot = trainy_hot[:int_num_batches*args.batch_size] | |
num_training_examples = trainx.shape[0] | |
#scale input data | |
trainx = trainx.astype(np.float16) / np.max(trainx) | |
import os | |
import json | |
os.environ["TF_CONFIG"] = json.dumps({"cluster": {"worker": ["192.168.198.131:33035"]},"task": {"type": "worker", "index": 0}}) | |
#print("\ntf_param="+os.environ["TF_CONFIG"]+"\n") | |
#Create distribute strategy | |
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() | |
#convert numpy data to tensorflow dataset | |
ds = tf.data.Dataset.from_tensor_slices((trainx, trainy_hot)) | |
#set options for sharding policy | |
options = tf.data.Options() | |
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA | |
ds = ds.with_options(options).shuffle(num_training_examples) | |
ds=ds.batch(args.batch_size) | |
#Delete unused data | |
del trainx | |
del trainy | |
del trainy_hot | |
del trainx_list | |
del trainy_list | |
import gc | |
gc.collect() | |
steps_per_epoch = num_training_examples/args.batch_size | |
print("training "+str(num_training_examples)+" examples....\n batch_size="+str(args.batch_size)+"\n steps_per_epoch="+str(steps_per_epoch)) | |
with strategy.scope(): | |
multi_worker_model = build_unet_model() | |
history = multi_worker_model.fit(ds, epochs=args.epochs, verbose=1, steps_per_epoch=steps_per_epoch) | |
multi_worker_model.save("my_model.h5") | |
end_time=time.time() | |
print("time elapsed_inside_worker="+str(end_time-start_time)+"\n") | |
if __name__ == '__main__': | |
import argparse | |
from pyspark.context import SparkContext | |
from pyspark.conf import SparkConf | |
from tensorflowonspark import TFCluster | |
import time | |
start_time=time.time() | |
sc = SparkContext(conf=SparkConf().setAppName("unet_train_keras")) | |
executors = sc._conf.get("spark.executor.instances") | |
num_executors = int(executors) if executors is not None else 1 | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=32) | |
parser.add_argument("--buffer_size", help="size of shuffle buffer", type=int, default=10000) | |
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) | |
parser.add_argument("--epochs", help="number of epochs", type=int, default=10) | |
parser.add_argument("--model_dir", help="path to save model/checkpoint", default="mnist_model") | |
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export") | |
parser.add_argument("--steps_per_epoch", help="number of steps per epoch", type=int, default=-1) | |
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") | |
args = parser.parse_args() | |
print("args:", args) | |
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, num_ps=0, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.TENSORFLOW, master_node='chief', log_dir=args.model_dir) | |
end_time=time.time() | |
print("time elapsed="+str(end_time-start_time)+"\n") | |
cluster.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment