-
-
Save detectivebag/9f66cb82aaefc891174f9fc13c703fc0 to your computer and use it in GitHub Desktop.
Dask Parameter Server - Initial WIP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ==== dask-ps | |
import dask | |
import dask.array as da | |
from dask import delayed | |
from dask_glm import families | |
from dask_glm.algorithms import lbfgs | |
from distributed import LocalCluster, Client, worker_client | |
import numpy as np | |
import time | |
from sklearn import datasets | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.metrics import roc_auc_score | |
cluster = LocalCluster(n_workers=0) | |
cluster.start_worker(1, name="ps") | |
cluster.start_worker(1, name="w1") | |
cluster.start_worker(1, name="w2") | |
client = Client(cluster) | |
STEP_SIZE = 1.0 | |
N = 10000 | |
D = 10 | |
ITER = 10 | |
X_local, y_local = datasets.make_classification(n_classes=2, n_samples=N, n_features=D) | |
X = da.from_array(X_local, 1000) | |
y = da.from_array(y_local, 1000) | |
XD = X.to_delayed().flatten().tolist() # a list of numpy arrays, one for each chunk | |
yD = y.to_delayed().flatten().tolist() | |
STEP_SIZE /= len(XD) # need to adjust based on parallelism for convegence? | |
family = families.Logistic() | |
pointwise_gradient = family.pointwise_gradient | |
pointwise_loss = family.pointwise_loss | |
def local_update(X, y, beta): | |
return pointwise_gradient(beta, X, y) | |
def parameter_server(): | |
beta = np.zeros(D) | |
gti = np.zeros(D) | |
with worker_client() as c: | |
betas = c.channel('betas', maxlen=1) | |
[future_beta] = c.scatter([beta]) | |
betas.append(future_beta) | |
betas.flush() | |
updates = c.channel('updates') | |
for update in updates: | |
print("received update: %s" % update) | |
update = update.result() | |
gti += update ** 2 | |
adj_grad = update / (1e-6 + np.sqrt(gti)) | |
beta = beta - STEP_SIZE * adj_grad | |
[future_beta] = c.scatter([beta]) | |
betas.append(future_beta) | |
betas.flush() | |
def worker(X, y): | |
with worker_client(separate_thread=False) as c: | |
for i in range(ITER): | |
betas = c.channel('betas', maxlen=1) | |
time.sleep(0.01) | |
last_beta = betas.data[-1] | |
#subset_beta = c.submit(operator.getitem, last_beta, idx).result() | |
#params = subset_beta.result() | |
beta = last_beta.result() | |
print("Computing update with latest beta: %s" % beta) | |
update = local_update(X, y, beta) #.compute() | |
[update_future] = c.scatter([update]) | |
updates = c.channel('updates') | |
updates.append(update_future) | |
updates.flush() | |
res = [delayed(worker)(xx, yy) for xx, yy in zip(XD, yD)] | |
# start PS | |
res_ps = client.submit(parameter_server, workers=['ps'], pure=False) | |
# start workers computing | |
res2 = [d.compute(workers=['w1', 'w2']) for d in res] | |
# collect beta from PS | |
beta_chan = client.channel('betas', maxlen=1) | |
time.sleep(0.1) | |
beta_sgd = beta_chan.data[-1].result() | |
# compare to L-BFGS solution | |
beta_lbfgs = lbfgs(X, y, lamduh=1e-7) | |
# also compare to sklearn LR: | |
lr = LogisticRegression(fit_intercept=False, C=1000000, solver='lbfgs') | |
lr.fit(X, y) | |
print("sklearn LR (lbfgs):\nBeta: %s" % lr.coef_) | |
print("Score: %s" % lr.score(X, y)) | |
print("AUC: %s" % roc_auc_score(y, lr.decision_function(X))) | |
print() | |
print("Beta DaskGLM lbfgs:\nBeta: %s" % beta_lbfgs) | |
lr.coef_ = beta_lbfgs.reshape(1, D) | |
print("Score: %s" % lr.score(X, y)) | |
print("AUC: %s" % roc_auc_score(y, lr.decision_function(X))) | |
print() | |
print("Beta DaskPS Adagrad:\n%s" % beta_sgd) | |
lr.coef_ = beta_sgd.reshape(1, D) | |
print("Score: %s" % lr.score(X, y)) | |
print("AUC: %s" % roc_auc_score(y, lr.decision_function(X))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment