Completable Future (Java 8) programs!

Vivek Singh
5 min readJul 12, 2021

--

In this article we will focus on CompletableFuture which is a Java8 feature.

I will cover the basics about this feature in my next tech article. Until then you can refer to https://www.baeldung.com/java-completablefuture This is quite cool and covers all aspects !

We will discuss two problems here :

  1. Execute a new task ‘T2’ if the previous task ‘T1’ has completed executing. We will keep on polling the task ‘T1’ to check if task T1 is completed. Once T1 is complete, we will execute T2.

To give you a real life example :

Say that you are keeping a track of the ride of vehicles. Every vehicle has a status ‘In_PROGRESS’ and ‘COMPLETED’. When the ride is complete for any vehicle, we initiate the payment to the rider.

To keep a check on the status, we needed a DB connection, hence I have used a simpler example to rule out any additional DB connection logic.

I took the current time , added 10 seconds to this time and stored it in a final variable(say timeToReach). Now my polling logic will complete in 10 seconds because every second I call the polling logic and it checks to see if currentTime is greater than timeToReach. Keep in mind, you can replace this logic with your logic, and it could be anything like to keep a check on one of the columns in the table to see if the state is ‘COMPLETED’ as explained above in the vehicle/ride example.

I have added the comments in the code which I think is pretty clear considering you know the basics of Completablefuture.

Please refer to my git repo for more clear comments, as indentation is difficult in medium :

https://github.com/Viveksingh1313/MultiThreading/blob/dev/src/main/java/com/mycompany/app/App.java

import java.util.concurrent.*;

public class App
{
static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); // create a single thread because we are using scheduler

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getId()); // not needed, but gives an idea about the working thread
// pollForCompletion returns a CompletableFuture object. Once this is complete only then thenApply will be called
//pollForCompletion is Task T1 ,and the logic inside thenApply acts as a task T2 which will only execute once task T1 is complete
final Object jobResult = pollForCompletion("2").thenApply(jId -> {
System.out.println("Your next operation "+jId); // this block acts as Task T2, you can write your own logic here
return jId;
}).get();
// Completable Future makes asynchronous calls, but we want the async logic to commplete before executing next staement, so we are using get()
System.out.println(Thread.currentThread().getId());
executor.shutdown();
//when shutdown() method is called on an executor service, it stops accepting new tasks, waits for previously submitted tasks to execute, and then terminates the executor.
}

private static CompletableFuture<String> pollForCompletion(String jobId) {
Long time = System.currentTimeMillis(); //
Long timeToReach = time + 10000; // a variable to decide when polling should be complete

RemoteServer remoteServer = new RemoteServer(); // has the main polling logic
CompletableFuture<String> completionFuture = new CompletableFuture<>(); // we will return this object once polling is complete
final ScheduledFuture<Void> checkFuture = (ScheduledFuture<Void>) executor.scheduleAtFixedRate(() -> {
try {
// we are calling the polling logic. If it returns true, we complete the Task T1 and return a completed future
if (remoteServer.isJobDone(jobId, timeToReach)) {
completionFuture.complete(jobId);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
// scheduler is scheduled to be executed every 2 seconds, till it returns true.
completionFuture.whenComplete((result, thrown) -> {
checkFuture.cancel(true);
});
return completionFuture;
}
}

class RemoteServer {
boolean isJobDone(String jobId, Long timeToReach) throws InterruptedException {
System.out.println(Thread.currentThread().getId());
if(System.currentTimeMillis() > timeToReach) // this will be true after 10 seconds according to the logic
return true;
else
return false;

}
}

//OUTPUT :
//
// 1
// 12
// 12
// 12
// 12
// 12
// 12
// Your next operation 2
// 1

2. Calling 5 different endpoints parallelly , and aggregating their data in a list. I have used dummy endpoints I found on the internet to write my logic.

Git link for better code indentation : https://github.com/Viveksingh1313/MultiThreading/blob/dev/src/main/java/com/mycompany/app/RestCallCompletable.java

package com.mycompany.app;

import org.omg.CORBA.TIMEOUT;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* This program will call 5 endpoints parallelly using Completable Future, and store the responses from all endpoints in an object
*/
public class RestCallCompletable {

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
List<Object> list = new ArrayList<>(); // list object to store and aggregate responses from all endpoints

// list of 5 different dummy URLS which are to be executed parallely
String url1 = "https://jsonplaceholder.typicode.com/todos/1";
String url2 = "https://jsonplaceholder.typicode.com/todos/2";
String url3 = "https://jsonplaceholder.typicode.com/todos/3";
String url4 = "https://jsonplaceholder.typicode.com/todos/4";
String url5 = "https://jsonplaceholder.typicode.com/todos/5";

RestTemplate restTemplate = new RestTemplate(); // this is a springframework dependency to make Rest Endpoint calls.

// allOf makes sure to call all endpoints parallelly
// supplyAsync will return a CompletableFuture object which will consist of the response from endpoints
// thenApply will only be executed once supplyAsync() is complete
// in thenAccept we just append the response from endpoints to the list object
// error handling : https://www.baeldung.com/java-completablefuture
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url1, String.class))
.thenApply(x -> {
list.add(x);
System.out.println("called url1");
return null;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!"),
CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url2, String.class))
.thenApply(x -> {
list.add(x);
System.out.println("called url2");
return null;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!"),
CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url3, String.class))
.thenApply(x -> {
list.add(x);
System.out.println("called url3");
return null;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!"),
CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url4, String.class))
.thenApply(x -> {
list.add(x);
System.out.println("called url4");
return null;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!"),
CompletableFuture.supplyAsync(() -> restTemplate.getForObject(url5, String.class))
.thenApply(x -> {
list.add(x);
System.out.println("called url5");
return null;
}).handle((s, t) -> {
if (s != null) {
} else {
System.out.println("Hello, Stranger!");
}
return s;
})
).get();
System.out.println(list);
}

// Output :

//[{
// "userId": 1,
// "id": 4,
// "title": "et porro tempora",
// "completed": true
// }, {
// "userId": 1,
// "id": 2,
// "title": "quis ut nam facilis et officia qui",
// "completed": false
// }, {
// "userId": 1,
// "id": 5,
// "title": "laboriosam mollitia et enim quasi adipisci quia provident illum",
// "completed": false
// }, {
// "userId": 1,
// "id": 1,
// "title": "delectus aut autem",
// "completed": false
// }, {
// "userId": 1,
// "id": 3,
// "title": "fugiat veniam minus",
// "completed": false
// }]
//
}

That’s it! Thanks for reading!

--

--

Vivek Singh
Vivek Singh

Written by Vivek Singh

Software Developer. I write about Full Stack, NLP and Blockchain. Buy me a coffee - buymeacoffee.com/viveksinless

No responses yet