Created
April 21, 2018 12:33
-
-
Save danoneata/59e216f473d83eac061665d0dcafcb3b to your computer and use it in GitHub Desktop.
Time series classification for gesture recognition
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import pdb | |
import os | |
import sys | |
from collections import namedtuple | |
import numpy as np | |
import pandas as pd | |
from sklearn.metrics import ( | |
accuracy_score, | |
) | |
from sklearn.model_selection import ( | |
StratifiedKFold, | |
) | |
from sklearn.neighbors import ( | |
KNeighborsClassifier, | |
) | |
from sklearn.svm import ( | |
SVC, | |
) | |
from dtaidistance import dtw | |
from tsfresh import ( | |
extract_features, | |
select_features, | |
) | |
from tsfresh.utilities.dataframe_functions import impute | |
SEED = 1337 | |
DATA_DIR = 'data/alignedset' | |
IDX_TO_CLS = { | |
0: "spread", | |
1: "fist", | |
2: "wavein", | |
3: "doubletap", | |
4: "waveout", | |
5: "like", | |
6: "vshape", | |
7: "point", | |
8: "ok", | |
9: "come", | |
} | |
CLS_TO_IDX = {v: k for k, v in IDX_TO_CLS.items()} | |
N_CHANNELS = 8 | |
N_SAMPLES = 300 | |
Datum = namedtuple('Datum', 'signal label user') | |
def load_data(): | |
load_data_1 = lambda f: np.loadtxt(os.path.join(DATA_DIR, f)) | |
get_label = lambda f: int(f.split('_')[0]) | |
get_user = lambda f: f.split('_')[2].lower() | |
return [ | |
Datum( | |
signal=load_data_1(f), | |
label=get_label(f), | |
user=get_user(f), | |
) for f in os.listdir(DATA_DIR) | |
] | |
def compute_dtw_distances(data, channels=range(N_CHANNELS)): | |
n = len(data) | |
n_channels = len(channels) | |
dists = np.zeros((n_channels, n, n)) | |
for i in range(n): | |
for j in range(i + 1, n): | |
for c in channels: | |
print(i, j, c) | |
dist = dtw.distance_fast( | |
data[i].signal[c], | |
data[j].signal[c], | |
) | |
dists[c][i][j] += dist | |
dists[c][j][i] += dist | |
for c in channels: | |
np.fill_diagonal(dists[c], np.inf) | |
return dists | |
def extract_features(data): | |
data1 = np.vstack( | |
np.hstack(( | |
np.ones((N_SAMPLES, 1)) * i, | |
np.arange(N_SAMPLES).reshape(N_SAMPLES, 1), | |
datum.signal.T)) | |
for i, datum in enumerate(data)) | |
channel_cols = ["channel_{}".format(i) for i in range(8)] | |
data_frame = pd.DataFrame(data1, columns=["id", "time"] + channel_cols) | |
data_frame[channel_cols] = data_frame[channel_cols].apply(pd.to_numeric) | |
return extract_features( | |
data_frame, | |
column_id="id", | |
column_sort="time", | |
column_kind=None, | |
column_value=None, | |
) | |
def evaluate1_knn_precomputed(D, y, train_index, test_index, k=9, channel=None): | |
D_train, D_test = D[np.ix_(train_index, train_index)], D[np.ix_(test_index, train_index)] | |
y_train, y_test = y[train_index], y[test_index] | |
neigh = KNeighborsClassifier(n_neighbors=k, metric='precomputed') | |
neigh.fit(D_train, y_train) | |
y_pred = neigh.predict(D_test) | |
return 100 * accuracy_score(y_test, y_pred) | |
def evaluate1_svm(X, y, train_index, test_index): | |
# Is it okay to impute missing values using the entire data? | |
X = impute(pd.DataFrame(X)) | |
X_train, X_test = X.ix[train_index], X.ix[test_index] | |
y_train, y_test = y[train_index], y[test_index] | |
# Feature selection | |
features = select_features(X_train, y_train, fdr_level=0.001) | |
cols = features.columns | |
X_train = X_train[cols].values | |
X_test = X_test[cols].values | |
svm = SVC(kernel='linear', C=0.01) | |
svm.fit(X_train, y_train) | |
y_pred = svm.predict(X_test) | |
return 100 * accuracy_score(y_test, y_pred) | |
def evaluate(evaluate1, X, y): | |
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED) | |
accs = [evaluate1(X, y, *split) for split in skf.split(X, y)] | |
print( | |
'{:.3f} ± {:.2f} |'.format(np.mean(accs), np.std(accs)), | |
' '.join('{:.1f}'.format(acc) for acc in accs), | |
) | |
METHODS = { | |
'dtw': { | |
'filename': 'distances', | |
'process-data': compute_dtw_distances, | |
'evaluate1': evaluate1_knn_precomputed, | |
}, | |
'feats': { | |
'filename': 'features', | |
'process-data': extract_features, | |
'evaluate1': evaluate1_svm, | |
} | |
} | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Gesture recognition based on EMG data.', | |
) | |
parser.add_argument( | |
'-m', '--method', | |
choices=METHODS, | |
required=True, | |
help='which method to use.', | |
) | |
parser.add_argument( | |
'-t', '--todo', | |
choices={'preprocess', 'evaluate'}, | |
required=True, | |
help='what to do.', | |
) | |
args = parser.parse_args() | |
d = METHODS[args.method] | |
filename = d['filename'] | |
process_data = d['process-data'] | |
evaluate1 = d['evaluate1'] | |
if args.todo == 'preprocess': | |
data = load_data() | |
data1 = process_data(data) | |
labels = np.array([datum.label for datum in data]) | |
np.savez(filename + '.npz', data=data1, labels=labels) | |
if args.todo == 'evaluate': | |
data = np.load(filename + '.npz') | |
X = data['data'] | |
y = data['labels'] | |
if args.method == 'dtw': | |
# Note: I keep the channels separated in case I want to discard | |
# some or do a weighted sum. | |
X = np.sum(X, axis=0) | |
# sklearn doesn't like nan's and inf's so replace them with zeros. | |
np.fill_diagonal(X, 0) | |
evaluate(evaluate1=evaluate1, X=X, y=data['labels']) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment