reCAPTCHA WAF Session Token

Get started with Java’s new structured concurrency model


Structured concurrency is a new way to use multithreading in Java. It allows developers to think about work in logical groups while taking advantage of both traditional and virtual threads. Available in preview in Java 21, structured concurrency is a key aspect of Java’s future, so now is a good time to start working with it.

Why we need structured concurrency

Writing concurrent software is one of the greatest challenges for software developers. Java’s thread model makes it a strong contender among concurrent languages, but multithreading has always been inherently tricky. The name “structured concurrency” comes from structured programming. In essence, it provides a way to write concurrent software using familiar program flows and constructs. This lets developers focus on the jobs that need to be done. As the JEP for structured concurrency says, “If a task splits into concurrent subtasks then they all return to the same place, namely the task’s code block.”

Virtual threads, now an official feature of Java, creates the possibility of cheaply spawning threads to gain concurrent performance. Structured concurrency provides the simple syntax to do so. As a result, there is very little learning curve to understand how threads are organized with structured concurrency.

The new StructuredTaskScope class

The main class in structured concurrency is java.util.concurrent.StructuredTaskScope. The Java 21 documentation includes examples of how to use structured concurrency. At the time of this writing, you’ll need to use --enable-preview and --source 21 or --source 22 to enable structured concurrency in your Java programs. My $java --version is openjdk 22-ea, so our example using Maven will specify --enable-preview --source 22 for the compile step and --enable-preview for the execution step. (Note that SDKMan is a good option for managing multiple JDK installs.)

You can find the example code in my GitHub repository for this article. Note the .mvn/jvm.config file that sets --enable-preview for execution. To run the code, use $mvn clean compile exec:java.

Multithreading with structured concurrency

For our examples, we’ll make several requests to the Star Wars API (SWAPI) to get information about planets by their ID. If we were doing this in standard synchronous Java, we’d probably do something like Listing 1, using the Apache HTTPClient.

Listing 1. Conventional-style multiple API calls


package com.infoworld;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

public class App {
  public String getPlanet(int planetId) throws Exception {
    System.out.println("BEGIN getPlanet()");
    String url = "https://swapi.dev/api/planets/" + planetId + "https://www.infoworld.com/";
    String ret = "?";

    CloseableHttpClient httpClient = HttpClients.createDefault();
 
    HttpGet request = new HttpGet(url);
    CloseableHttpResponse response = httpClient.execute(request);

    // Check the response status code
    if (response.getStatusLine().getStatusCode() != 200) {
      System.err.println("Error fetching planet information for ID: " + planetId);
      throw new RuntimeException("Error fetching planet information for ID: " + planetId);
    } else {
      // Parse the JSON response and extract planet information
        ret = EntityUtils.toString(response.getEntity());
        System.out.println("Got a Planet: " + ret);
      }

      // Close the HTTP response and client
      response.close();
      httpClient.close();
      return ret;
    }   
    void sync() throws Exception {
      int[] planetIds = {1,2,3,4,5};
      for (int planetId : planetIds) {
        getPlanet(planetId);
      }
    }
    public static void main(String[] args) {
        var myApp = new App();
        System.out.println("\n\r-- BEGIN Sync");
        try {
          myApp.sync();
        } catch (Exception e){
          System.err.println("Error: " + e);
        }
    }
}

In Listing 1, we have a main method that calls the sync() method, which simply iterates over a set of IDs while issuing calls to the "https://swapi.dev/api/planets/" + planetId endpoint. These calls are issued via the getPlanet() method, which uses the Apache HTTP library to handle the boilerplate request, response, and error handling. Essentially, the method receives each response and prints it to the console if it’s good (200); otherwise, it throws an error. (These examples are using bare minimum errors, so we just throw RuntimeException in that case.)

The output is something like this:


-- BEGIN Sync
BEGIN getPlanet()
Got a Planet: {"name":"Tatooine"} 
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
BEGIN getPlanet()
Got a Planet: {"name":"Yavin”}
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}

Now let’s try the same example using structured concurrency. As shown in Listing 2, structured concurrency lets us break up the calls into concurrent requests and keep everything in the same code space. In Listing 2, we add the necessary StructuredTaskScope import, then use its core methods, fork() and join(), to break each request into its own thread and then wait on them all.

Listing 2. Multiple API calls with StructuredTaskScope


package com.infoworld;

import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
//...

public class App {
    public String getPlanet(int planetId) throws Exception {
      // ... same ...
    }
    
     void sync() throws Exception {
        int[] planetIds = {1,2,3,4,5};
        for (int planetId : planetIds) {
          getPlanet(planetId);
        }
      }
    void sc() throws Exception {
      int[] planetIds = {1,2,3,4,5};
        try (var scope = new StructuredTaskScope<Object>()) {
          for (int planetId : planetIds) {
            scope.fork(() -> getPlanet(planetId));
          } 
          scope.join();
        }catch (Exception e){
          System.out.println("Error: " + e);
        }
    }
    public static void main(String[] args) {
      var myApp = new App();
      // ...
      System.out.println("\n\r-- BEGIN Structured Concurrency");
      try {
        myApp.sc();
      } catch (Exception e){
        System.err.println("Error: " + e);
      }    
    }
}

If we run Listing 2, we’ll get similar output, but it is quite a bit faster because the requests are issued simultaneously and proceed concurrently. Consider the differences between the sc() method (using multithreading) versus the sync() method, which uses synchronous code. The structured concurrency approach is not that much harder to think about but delivers much faster results. 

Working with tasks and subtasks

By default, when StructuredTaskScope is created, it uses virtual threads, so we are not actually provisioning operating system threads here; instead, we’re telling the JVM to orchestrate requests in the most efficient way. (The constructor for StructuredTaskScope also accepts a ThreadFactory.)

In Listing 2, we create the StructuredTaskScope object in a try-with-resource block, which is the way it is designed to be used. We can create as many jobs as we need using fork(). The fork() method accepts anything implementing Callable, which is to say, any method or function. Here we wrap our getPlanet() method in an anonymous function: () -> getPlanet(planetId)—a useful syntax for passing an argument into the target function.

When we call join(), we tell the scope to wait on all the jobs that were forked. Essentially, join() brings us back to synchronous mode. The forked jobs will proceed as configured by the TaskScope

Closing a task scope

Since we created the TaskScope in a try-with-resource block, when that block ends, the scope will be automatically closed. This invokes the shutdown() process for the scope, which can be customized to handle the disposal of running threads as needed. The shutdown() method can also be called manually, if you need to shut down the scope before it is closed.

StructuredTaskScope includes two classes that implement built-in shutdown policies: ShutDownOnSuccess and ShutDownOnFailure. These watch for a successful or erroring subtask, and then cancel the rest of the running threads. Using our current setup, we could use these classes as follows:

Listing 3. Built-in shutdown policies


void failFast() throws ExecutionException, InterruptedException {
   int[] planetIds = {1,2,3,-1,4};
   try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
     for (int planetId : planetIds) {
       scope.fork(() -> getPlanet(planetId));
     } 
     scope.join();
   }
 }
 void  succeedFast() throws ExecutionException, InterruptedException {
   int[] planetIds = {1,2};
   try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
     for (int planetId : planetIds) {
       scope.fork(() -> getPlanet(planetId));
     } 
     scope.join();
   } catch (Exception e){
     System.out.println("Error: " + e);
  }
} 
public static void main(String[] args) {
  var myApp = new App();
  System.out.println("\n\r-- BEGIN succeedFast");
  try {
    myApp. succeedFast();
  } catch (Exception e) {
    System.out.println(e.getMessage());
  }      
  System.out.println("\n\r-- BEGIN failFast");
        try {
            myApp.failFast();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

    }

These policies will give output similar to below:


-- BEGIN succeedFast
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt

-- BEGIN failFast
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Error fetching planet information for ID: -1
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt

So what we have is a simple mechanism to initiate all the requests concurrently, and then cancel the rest when one either succeeds or fails via exception. From here, any customizations can be made. The structured concurrency documentation includes an example of collecting subtask results as they succeed or fail and then returning the results. This is fairly simply accomplished by overriding the join() method and watching the results of each task.

StructuredTaskScope.Subtask

One thing we have not seen in our example is watching the return values of subtasks. Each time StructuredTaskScope.fork() is called, a StructuredTaskScope.SubTask object is returned. We can make use of this to watch the state of the tasks. For example, in our sc() method, we could do the following:

Listing 4. Using StructuredTaskScope.Subtask to watch state


import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.ArrayList;

void sc() throws Exception {
      int[] planetIds = {1,2,3,4,5};
      ArrayList<Subtask> tasks = new ArrayList<Subtask>(planetIds.length); 
        try (var scope = new StructuredTaskScope<Object>()) {
          for (int planetId : planetIds) {
            tasks.add(scope.fork(() -> getPlanet(planetId)));
          } 
          scope.join();
        }catch (Exception e){
          System.out.println("Error: " + e);
        }
      for (Subtask t : tasks){
        System.out.println("Task: " + t.state());
      }
    }

In this example, we take each task and hold it in an ArrayList, then output the state on them after join(). Note that the available states for Subtask are defined on it as enum. This new method will output something similar to this:


-- BEGIN Structured Concurrency
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Got a Planet: {"name":"Yavin IV"}
Got a Planet: {"name":"Alderaan"}
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS

Conclusion

Between virtual threads and structured concurrency, Java developers have a compelling new mechanism for breaking up almost any code into concurrent tasks without much overhead. Context and requirements are important, so don’t just use these new concurrency tools because they exist. At the same time, this combination does deliver some serious power. Any time you encounter a bottleneck where many tasks are occurring, you can easily hand them all off to the virtual thread engine, which will find the best way to orchestrate them. The new thread model with structured concurrency also makes easy to customize and fine-tune this behavior.

It will be very interesting to see how developers use these new concurrency capabilities in our applications, frameworks, and servers going forward.

Copyright © 2023 IDG Communications, Inc.

Leave a Reply

Your email address will not be published. Required fields are marked *

WP Twitter Auto Publish Powered By : XYZScripts.com