How to use Apache AsyncHttpClient with Kotlin Coroutines

3 minutes read in Kotlin Highlights

upd. Code updated to work with Kotlin 1.3 and stable coroutines.

If you try to use Apache Async HttpClient with Kotlin coroutines you will found that their API (execute method) returns java.util.concurrent.Future that can’t be simply used with coroutines. But, you also allowed to pass future callback to execute method:

// interface HttpAsyncClient
Future<HttpResponse> execute(
    HttpUriRequest request,
    FutureCallback<HttpResponse> callback
);

Let’s take a look at FutureCallback interface:

public interface FutureCallback<T> {
    void completed(T result);
    void failed(Exception ex);
    void cancelled();
}

Step 1: Init.

There are many in common between CompletableFuture and FutureCallback API, so let’s create CompletableFuture and wrap current method call to return it:

fun HttpAsyncClient.execute(request: HttpUriRequest): CompletableFuture<HttpResponse> {
    val future = CompletableFuture<HttpResponse>()

    this.execute(request, object : FutureCallback<HttpResponse> {
        override fun completed(result: HttpResponse) {
            future.complete(result)
        }

        override fun cancelled() {
            future.cancel(false)
        }

        override fun failed(ex: Exception) {
            future.completeExceptionally(ex)
        }
    })

    return future
}

So what happens here:

We create extension function for HttpAsyncClient, and we can use this function in more natural way:

override suspend fun getCount(url: String): Int {
    // ...
    val response = httpClient./* calling extension */execute(request)./* kotlinx.coroutines extension */await()
    // ...
}

On first method’s line we create a new incomplete instance of CompletableFuture and return it on last line. We use this instance inside FutureCallback. So when some method of callback will be called, our future will change state.

Another important part – suspend doc modifier on function, that mean that function can be suspended.

Step 2: Refactor.

Imagine that we have few methods execute and we want to reuse code. Then again: extension functions and class delegation to the rescue!

class CompletableFutureCallback<T>(
    val completableFuture: CompletableFuture<T>
) : FutureCallback<T>, Future<T> by completableFuture, CompletionStage<T> by completableFuture {
    override fun failed(ex: Exception) {
        completableFuture.completeExceptionally(ex)
    }

    override fun cancelled() {
        completableFuture.cancel(false)
    }

    override fun completed(result: T) {
        completableFuture.complete(result)
    }
}

suspend fun <T> CompletableFutureCallback<T>.await(): T = this.completableFuture.await()

And now we can use this class like this:

fun HttpAsyncClient.execute(request: HttpUriRequest): CompletableFutureCallback<HttpResponse> {
    val future = CompletableFutureCallback(CompletableFuture<HttpResponse>())

    this.execute(request, future)

    return future
}

// Somewhere

client.execute(request)./* our extension */await()

So that’s it, most simple approach to use HttpAsyncClient with coroutines.

Step 3: Performance.

You can see that we create at least one instance of CompletableFuture, so we do one allocation of CompletableFuture for every call. Is it necessary?

What we do: we create instance of CompletableFuture and then suspend on it with help of CompletableFuture.await() extension. You can see that await uses suspendCancellableCoroutine function under the hood. Can we use this function directly? Sure, why not:

import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.cancelFutureOnCancellation
import kotlinx.coroutines.suspendCancellableCoroutine
import org.apache.http.HttpResponse
import org.apache.http.client.methods.HttpUriRequest
import org.apache.http.concurrent.FutureCallback
import org.apache.http.nio.client.HttpAsyncClient

suspend fun HttpAsyncClient.execute(request: HttpUriRequest): HttpResponse {
    return suspendCancellableCoroutine { cont: CancellableContinuation<HttpResponse> ->
        val future = this.execute(request, object : FutureCallback<HttpResponse> {
            override fun completed(result: HttpResponse) {
                cont.resumeWith(Result.success(result))
            }

            override fun cancelled() {
                if (cont.isCancelled) return
                cont.resumeWith(Result.failure(CancellationException("Cancelled")))
            }

            override fun failed(ex: Exception) {
                cont.resumeWith(Result.failure(ex))
            }
        })

        cont.cancelFutureOnCancellation(future);
        Unit
    }
}

Looks good, we use low-level suspendCancellableCoroutine function from standard library, and also we support cancellation of Future! (but cancellation support is not perfect).

This is my final approach for today.


← JFuture 2018 Small scopes and SRP rocks, isn’t it? →