Created
February 28, 2017 16:57
-
-
Save ericcj/49118c877f471b3d8d53db545160e4ac to your computer and use it in GitHub Desktop.
An okhttp3.Call with a deadline timeout from the start of isExecuted until ResponseBody.source() is closed or unused.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pushd.util; | |
import android.support.annotation.NonNull; | |
import android.support.annotation.Nullable; | |
import java.io.IOException; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.logging.Logger; | |
import okhttp3.Call; | |
import okhttp3.Callback; | |
import okhttp3.MediaType; | |
import okhttp3.OkHttpClient; | |
import okhttp3.Request; | |
import okhttp3.Response; | |
import okhttp3.ResponseBody; | |
import okhttp3.internal.http2.StreamResetException; | |
import okio.Buffer; | |
import okio.BufferedSource; | |
import okio.ForwardingSource; | |
import okio.Okio; | |
/** | |
* An okhttp3.Call with a deadline timeout from the start of isExecuted until ResponseBody.source() is closed or unused. | |
*/ | |
public class DeadlineCall implements Call { | |
private final static Logger LOGGER = Logger.getLogger(DeadlineCall.class.getName()); | |
private static AtomicInteger sFutures = new AtomicInteger(); | |
private static final ScheduledExecutorService sHTTPCancelExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { | |
@Override | |
public Thread newThread(Runnable r) { | |
Thread t = new Thread(r, "DeadlineCallCancel"); | |
t.setDaemon(true); | |
t.setPriority(Thread.NORM_PRIORITY); | |
return t; | |
} | |
}); | |
private final Call mUnderlying; | |
private final int mDeadlineTimeout; | |
private volatile ScheduledFuture mDeadline; | |
private volatile boolean mDeadlineHit; | |
private volatile boolean mCancelled; | |
private volatile BufferedSource mBodySource; | |
DeadlineCall(Call underlying, int deadlineTimeout) { | |
mUnderlying = underlying; | |
mDeadlineTimeout = deadlineTimeout; | |
} | |
/** | |
* Factory wrapper for OkHttpClient.newCall(request) to create a new DeadlineCall scheduled to cancel its underlying Call after the deadline. | |
* @param client | |
* @param request | |
* @param deadlineTimeout in ms | |
* @return Call | |
*/ | |
public static DeadlineCall newDeadlineCall(@NonNull OkHttpClient client, @NonNull Request request, int deadlineTimeout) { | |
final Call underlying = client.newCall(request); | |
return new DeadlineCall(underlying, deadlineTimeout); | |
} | |
/** | |
* Shuts down thread that cancels calls when their deadline is hit. | |
*/ | |
public static void shutdownNow() { | |
sHTTPCancelExecutorService.shutdownNow(); | |
} | |
@Override | |
public Request request() { | |
return mUnderlying.request(); | |
} | |
/** | |
* Response MUST be closed to clean up deadline even if body is not read, e.g. on !isSuccessful | |
* @return | |
* @throws IOException | |
*/ | |
@Override | |
public Response execute() throws IOException { | |
startDeadline(); | |
try { | |
return wrapResponse(mUnderlying.execute()); | |
} catch (IOException e) { | |
cancelDeadline(); | |
throw wrapIfDeadline(e); | |
} | |
} | |
/** | |
* Deadline is removed when onResponse returns unless response.body().source() or a method using | |
* it is called synchronously from onResponse to indicate caller's committment to close it themselves. | |
* This includes peekBody so prefer DeadlineResponseBody.peek unless you explicitly close after peekBody. | |
* @param responseCallback | |
*/ | |
@Override | |
public void enqueue(final Callback responseCallback) { | |
startDeadline(); | |
mUnderlying.enqueue(new Callback() { | |
@Override | |
public void onFailure(Call underlying, IOException e) { | |
cancelDeadline(); // there is no body to read so no need for deadline anymore | |
responseCallback.onFailure(DeadlineCall.this, wrapIfDeadline(e)); | |
} | |
@Override | |
public void onResponse(Call underlying, Response response) throws IOException { | |
try { | |
responseCallback.onResponse(DeadlineCall.this, wrapResponse(response)); | |
if (mBodySource == null) { | |
cancelDeadline(); // remove deadline if body was never opened | |
} | |
} catch (IOException e) { | |
cancelDeadline(); | |
throw wrapIfDeadline(e); | |
} | |
} | |
}); | |
} | |
private IOException wrapIfDeadline(IOException e) { | |
if (mDeadlineHit && isCancellationException(e)) { | |
return new DeadlineException(e); | |
} | |
return e; | |
} | |
public class DeadlineException extends IOException { | |
public DeadlineException(Throwable cause) { | |
super(cause); | |
} | |
} | |
/** | |
* Wraps response to cancelDeadline when response closed and throw correct DeadlineException when deadline happens during response reading. | |
* @param response | |
* @return | |
*/ | |
private Response wrapResponse(final Response response) { | |
return response.newBuilder().body(new DeadlineResponseBody(response)).build(); | |
} | |
public class DeadlineResponseBody extends ResponseBody { | |
private final Response mResponse; | |
DeadlineResponseBody(final Response response) { | |
mResponse = response; | |
} | |
@Override | |
public MediaType contentType() { | |
return mResponse.body().contentType(); | |
} | |
@Override | |
public long contentLength() { | |
return mResponse.body().contentLength(); | |
} | |
/** | |
* @return the body source indicating it will be closed later by the caller to cancel the deadline | |
*/ | |
@Override | |
public BufferedSource source() { | |
if (mBodySource == null) { | |
mBodySource = Okio.buffer(new ForwardingSource(mResponse.body().source()) { | |
@Override | |
public long read(Buffer sink, long byteCount) throws IOException { | |
try { | |
return super.read(sink, byteCount); | |
} catch (IOException e) { | |
throw wrapIfDeadline(e); | |
} | |
} | |
@Override | |
public void close() throws IOException { | |
cancelDeadline(); | |
super.close(); | |
} | |
}); | |
} | |
return mBodySource; | |
} | |
/** | |
* @return the body source without indicating it will be closed later by caller, e.g. to peekBody on unsucessful requests | |
*/ | |
public BufferedSource peekSource() { | |
return mResponse.body().source(); | |
} | |
/** | |
* Copy of https://square.github.io/okhttp/3.x/okhttp/okhttp3/Response.html#peekBody-long- that uses peekSource() since Response class is final | |
* @param byteCount | |
* @return | |
* @throws IOException | |
*/ | |
public ResponseBody peek(long byteCount) throws IOException { | |
BufferedSource source = peekSource(); | |
source.request(byteCount); | |
Buffer copy = source.buffer().clone(); | |
// There may be more than byteCount bytes in source.buffer(). If there is, return a prefix. | |
Buffer result; | |
if (copy.size() > byteCount) { | |
result = new Buffer(); | |
result.write(copy, byteCount); | |
copy.clear(); | |
} else { | |
result = copy; | |
} | |
return ResponseBody.create(mResponse.body().contentType(), result.size(), result); | |
} | |
} | |
private void startDeadline() { | |
mDeadline = sHTTPCancelExecutorService.schedule(new Runnable() { | |
@Override | |
public void run() { | |
mDeadlineHit = true; | |
mUnderlying.cancel(); // calls onFailure or causes body read to throw | |
LOGGER.fine("Deadline hit for " + request()); // should trigger a subsequent wrapIfDeadline but if we see this log line without that it means the caller orphaned us without closing | |
} | |
}, mDeadlineTimeout, TimeUnit.MILLISECONDS); | |
LOGGER.fine("started deadline for " + request()); | |
if (sFutures.incrementAndGet() == 1000) { | |
LOGGER.warning("1000 pending DeadlineCalls, may be leaking due to not calling close()"); | |
} | |
} | |
private void cancelDeadline() { | |
if (mDeadline != null) { | |
mDeadline.cancel(false); | |
mDeadline = null; | |
sFutures.decrementAndGet(); | |
LOGGER.fine("canceled deadline for " + request()); | |
} else { | |
LOGGER.info("deadline already canceled for " + request()); | |
} | |
} | |
@Override | |
public void cancel() { | |
mCancelled = true; | |
// should trigger onFailure or raise from execute or responseCallback.onResponse which will cancelDeadline | |
mUnderlying.cancel(); | |
} | |
@Override | |
public boolean isExecuted() { | |
return mUnderlying.isExecuted(); | |
} | |
@Override | |
public boolean isCanceled() { | |
return mCancelled; | |
} | |
@Override | |
public Call clone() { | |
return new DeadlineCall(mUnderlying.clone(), mDeadlineTimeout); | |
} | |
private static boolean isCancellationException(IOException e) { | |
// okhttp cancel from HTTP/2 calls | |
if (e instanceof StreamResetException) { | |
switch (((StreamResetException) e).errorCode) { | |
case CANCEL: | |
return true; | |
} | |
} | |
// https://android.googlesource.com/platform/external/okhttp/+/master/okhttp/src/main/java/com/squareup/okhttp/Call.java#281 | |
if (e instanceof IOException && | |
e.getMessage() != null && e.getMessage().equals("Canceled")) { | |
return true; | |
} | |
return false; | |
} | |
} |
@NoHarmDan we use okhttp directly so I'm not familiar with retrofit, but it sounds like you understand the nuts and bolts enough to create a pull request on retrofit that would support integrating DeadlineCall. could you link to the retrofit feature request from square/okhttp#2840 (comment)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, how do you use this with Retrofit2? (I mean, if you do, but I suppose you do, since a feature request thread on Retrofit lead me here) I can't exactly use this instead of retrofit2.Call and I can't replace it's inner okhttp3.Call instance, as I would have to basically hack the whole Retrofit library. Or am I completely missing something?
(Of course it could be altered to work with the retrofit2.Call... almost - I don't know of a construct similar to response.newBuilder().body(new DeadlineResponseBody(response)).build(); in retrofit2.Call. Or any other way to replace its okhttp3.ResponseBody...)
Thanks for any input - I'm desperately looking for a reliable way to put a universal timeout on a retrofit2.Call and this is the best thing I've seen so far.