Created
December 5, 2021 07:24
-
-
Save Shamim-38/499a6551a7dd37ffc772fd9f1285038f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Writer: Md Shamimul Islam | |
import cv2 | |
import numpy as np | |
import os | |
from matplotlib import pyplot as plt | |
import time | |
import mediapipe as mp | |
import glob | |
import os | |
import os.path | |
import os | |
print(os.path.abspath(".")) | |
mp_holistic = mp.solutions.holistic # Holistic model | |
mp_drawing = mp.solutions.drawing_utils # Drawing utilities | |
def mediapipe_detection(image, model): | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB | |
image.flags.writeable = False # Image is no longer writeable | |
results = model.process(image) # Make prediction | |
image.flags.writeable = True # Image is now writeable | |
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR | |
return image, results | |
def draw_landmarks(image, results): | |
mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS) # Draw face connections | |
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections | |
mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections | |
mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections | |
def draw_styled_landmarks(image, results): | |
# Draw face connections | |
mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, | |
mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), | |
mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1) | |
) | |
# Draw pose connections | |
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, | |
mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), | |
mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2) | |
) | |
# Draw left hand connections | |
mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, | |
mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), | |
mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2) | |
) | |
# Draw right hand connections | |
mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, | |
mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), | |
mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) | |
) | |
""" | |
cap = cv2.VideoCapture(0) | |
# Set mediapipe model | |
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: | |
while cap.isOpened(): | |
# Read feed | |
ret, frame = cap.read() | |
# Make detections | |
image, results = mediapipe_detection(frame, holistic) | |
print(results) | |
# Draw landmarks | |
draw_styled_landmarks(image, results) | |
# Show to screen | |
cv2.imshow('OpenCV Feed', image) | |
# Break gracefully | |
if cv2.waitKey(10) & 0xFF == ord('q'): | |
break | |
cap.release() | |
cv2.destroyAllWindows() | |
#draw_landmarks(frame, results) | |
#plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
#### Extract Keypoint Values | |
#print(len(results.left_hand_landmarks.landmark)) | |
pose = [] | |
for res in results.pose_landmarks.landmark: | |
test = np.array([res.x, res.y, res.z, res.visibility]) | |
pose.append(test) | |
pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(132) | |
face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404) | |
lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3) | |
rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3) | |
face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404) | |
""" | |
def extract_keypoints(results): | |
pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4) | |
face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3) | |
lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3) | |
rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3) | |
return np.concatenate([pose, face, lh, rh]) | |
#result_test = extract_keypoints(results) | |
#print(result_test) | |
#np.save('0', result_test) | |
def get_video_parts(video_path): | |
"""Given a full path to a video, return its parts.""" | |
parts = video_path.split(os.path.sep) | |
filename = parts[2] | |
filename_no_ext = filename.split('.')[0] | |
classname = parts[1] | |
train_or_test = parts[0] | |
return train_or_test, classname, filename_no_ext, filename | |
#np.load('0.npy') | |
DATA_PATH = os.path.join('data') | |
data_file = [] | |
classes = np.array(['33_onno', '34_sobai','35_sabdhane']) | |
sequence_length = 50 | |
sequence = 0 | |
s = 0 | |
i = 0 | |
j = 0 | |
""" | |
for folder in classes: | |
print(folder) | |
class_classes = glob.glob(os.path.join(DATA_PATH, folder, '*')) | |
print(class_classes) | |
""" | |
""" | |
# Set mediapipe model | |
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: | |
for vid_class in classes: | |
print(vid_class) | |
class_files = glob.glob(os.path.join(DATA_PATH, vid_class, '*.mp4')) | |
print(class_files) | |
i += 1 | |
for video_path in class_files: | |
print(video_path) | |
j += 1 | |
cap = cv2.VideoCapture(video_path) | |
property_id = int(cv2.CAP_PROP_FRAME_COUNT) | |
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
print("The total frame number is {0}".format(str(length))) | |
print("The property_id number is {0}".format(str(property_id))) | |
# Get the parts of the file. | |
video_parts = get_video_parts(video_path) | |
#print(video_parts) | |
train_or_test, classname, filename_no_ext, filename = video_parts | |
#print(classname) | |
#print(filename_no_ext) | |
#print(filename) | |
sequence += 1 | |
#while cap.isOpened(): | |
for frame_num in range(length): | |
# Read feed | |
ret, frame = cap.read() | |
print(frame) | |
s += 1 | |
print(i) | |
print(j) | |
print(s) | |
print("################# Done ########################################") | |
#print(frame) | |
image, results = mediapipe_detection(frame, holistic) | |
# NEW Apply wait logic | |
if frame_num == 0: | |
cv2.putText(image, 'STARTING COLLECTION', (120,200), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA) | |
cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(filename, sequence), (15,12), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) | |
# Show to screen | |
cv2.imshow('OpenCV Feed', image) | |
cv2.waitKey(500) | |
else: | |
cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(filename, sequence), (15,12), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) | |
# Show to screen | |
cv2.imshow('OpenCV Feed', image) | |
# NEW Export keypoints | |
keypoints = extract_keypoints(results) | |
print(keypoints) | |
npy_path = os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext, str(frame_num)) | |
if not(os.path.exists(os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext))): | |
os.mkdir(os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext)) | |
np.save(npy_path, keypoints) | |
cap.release() | |
cv2.destroyAllWindows() | |
""" | |
####################### pre-processing ############################################# | |
from sklearn.model_selection import train_test_split | |
from tensorflow.keras.utils import to_categorical | |
import glob | |
import os | |
import os.path | |
import numpy as np | |
DATA_PATH = os.path.join('data') | |
sequence_length = 30 | |
#lebels = ['33_onno', '34_sobai','35_sabdhane'] | |
lebels = ['33_onno', '34_sobai', '35_sabdhane'] | |
label_map = {label:num for num, label in enumerate(lebels)} | |
print(label_map) | |
def get_video_parts(video_path): | |
"""Given a full path to a video, return its parts.""" | |
parts = video_path.split(os.path.sep) | |
filename = parts[2] | |
filename_no_ext = filename.split('.')[0] | |
classname = parts[1] | |
train_or_test = parts[0] | |
return train_or_test, classname, filename_no_ext, filename | |
step_size = 5 | |
sequences, labels = [], [] | |
for lebel in lebels: | |
class_files = glob.glob(os.path.join(DATA_PATH, lebel, '*.mp4')) | |
for class_file in class_files: | |
video_parts = get_video_parts(class_file) | |
train_or_test, classname, filename_no_ext, filename = video_parts | |
generated_files = glob.glob(os.path.join(DATA_PATH, lebel, filename_no_ext, '*.npy')) | |
#print(generated_files) | |
#print(generated_files[5]) | |
total_npy = len(generated_files) | |
print("The total_npy number is {0}".format(str(total_npy))) | |
j = 0 | |
i = 0 | |
step_size = int(((total_npy-30)+5)/5) | |
print(step_size) | |
#window = [] | |
print("The Folder name is "+ filename_no_ext) | |
while(i < step_size): | |
window = [] | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 <= total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
i += 1 | |
j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
""" | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
j += 5 | |
i += 1 | |
if i < step_size: | |
for frame_num in range(j, j+30): | |
#print(frame_num) | |
if j+30 < total_npy: | |
print("The frame number is {0}".format(str(frame_num))) | |
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num))) | |
window.append(res) | |
else: | |
break | |
#i += 1 | |
#j += 5 | |
print(j) | |
#generated_files = generated_files[i * 5] | |
sequences.append(window) | |
labels.append(label_map[lebel]) | |
""" | |
print(np.array(sequences).shape) | |
print(np.array(labels).shape) | |
X = np.array(sequences) | |
print(X) | |
print(X.shape) | |
y = to_categorical(labels).astype(int) | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05) | |
print(y_test.shape) | |
############# Build and Train LSTM Neural Network ################################### | |
from tensorflow.keras.models import Sequential | |
from tensorflow.keras.layers import LSTM, Dense | |
from tensorflow.keras.callbacks import TensorBoard | |
log_dir = os.path.join('Logs') | |
tb_callback = TensorBoard(log_dir=log_dir) | |
model = Sequential() | |
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662))) | |
model.add(LSTM(128, return_sequences=True, activation='relu')) | |
model.add(LSTM(64, return_sequences=False, activation='relu')) | |
model.add(Dense(64, activation='relu')) | |
model.add(Dense(32, activation='relu')) | |
model.add(Dense(classes.shape[0], activation='softmax')) | |
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy']) | |
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback]) | |
print(model.summary()) | |
#### Make Predictions ################ | |
res = model.predict(X_test) | |
print(classes[np.argmax(res[4])]) | |
print(classes[np.argmax(y_test[4])]) | |
#### Save Weights ########## | |
model.save('action.h5') | |
#del model | |
model.load_weights('action.h5') | |
################### Evaluation using Confusion Matrix and Accuracy ################ | |
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score | |
yhat = model.predict(X_test) | |
ytrue = np.argmax(y_test, axis=1).tolist() | |
yhat = np.argmax(yhat, axis=1).tolist() | |
print(multilabel_confusion_matrix(ytrue, yhat)) | |
print(accuracy_score(ytrue, yhat)) | |
""" | |
############### Test in Real Time ########################################### | |
from scipy import stats | |
colors = [(245,117,16), (117,245,16), (16,117,245)] | |
def prob_viz(res, actions, input_frame, colors): | |
output_frame = input_frame.copy() | |
for num, prob in enumerate(res): | |
cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1) | |
cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA) | |
return output_frame | |
plt.figure(figsize=(18,18)) | |
plt.imshow(prob_viz(res, actions, image, colors)) | |
# 1. New detection variables | |
sequence = [] | |
sentence = [] | |
predictions = [] | |
threshold = 0.5 | |
cap = cv2.VideoCapture(0) | |
# Set mediapipe model | |
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: | |
while cap.isOpened(): | |
# Read feed | |
ret, frame = cap.read() | |
# Make detections | |
image, results = mediapipe_detection(frame, holistic) | |
print(results) | |
# Draw landmarks | |
draw_styled_landmarks(image, results) | |
# 2. Prediction logic | |
keypoints = extract_keypoints(results) | |
sequence.append(keypoints) | |
sequence = sequence[-30:] | |
if len(sequence) == 30: | |
res = model.predict(np.expand_dims(sequence, axis=0))[0] | |
print(actions[np.argmax(res)]) | |
predictions.append(np.argmax(res)) | |
#3. Viz logic | |
if np.unique(predictions[-10:])[0]==np.argmax(res): | |
if res[np.argmax(res)] > threshold: | |
if len(sentence) > 0: | |
if lebels[np.argmax(res)] != sentence[-1]: | |
sentence.append(lebels[np.argmax(res)]) | |
else: | |
sentence.append(lebels[np.argmax(res)]) | |
if len(sentence) > 5: | |
sentence = sentence[-5:] | |
# Viz probabilities | |
image = prob_viz(res, lebels, image, colors) | |
cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1) | |
cv2.putText(image, ' '.join(sentence), (3,30), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) | |
# Show to screen | |
cv2.imshow('OpenCV Feed', image) | |
# Break gracefully | |
if cv2.waitKey(10) & 0xFF == ord('q'): | |
break | |
cap.release() | |
cv2.destroyAllWindows() | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment