Messaging with quarkus and vert.x

Header image

Vert.x was designed from the outset to have polyglot support. You can write verticles in Java, JavaScript, Groovy, Ruby, Ceylon, Scala and Kotlin — or even mix and match in the same application — and still access the same set of features. However, the intention to keep things consistent comes at the expense of certain language-specific niceties. Java annotations are one such example. In Jakarta EE and Spring, annotations are used extensively to provide a declarative approach to programming that developers have grown accustomed to, but in Vert.x the programming model is rather different — more low-level and programmatic.

A good example of where annotations really come into their own is when setting up REST API entry points. In Vert.x it’s necessary to create an HTTP server and program the routing. It can be done within a verticle like this:

class Rest extends AbstractVerticle {
@Override
public void start(Future<Void> fut) {

    // 1: Create a router object.
    Router router = Router.router(vertx);

    // 2: Create a route and the associated response
    router.route("/").handler(routingContext -> {
      HttpServerResponse response = routingContext.response();
      response
          .putHeader("content-type", "text/html")
          .end("Hello World!");
    });

    // 3: Create and configure the HTTP server.
    vertx
        .createHttpServer()
        .requestHandler(router::accept)
        .listen(8080,
            result -> {
              if (result.succeeded()) {
                fut.complete();
              } else {
                fut.fail(result.cause());
              }
           }
        );

}

}

This is more work than Jakarta EE or Spring developers are used to and it’s only a basic example. In reality, a single application is likely to have dozens of endpoints that must deal with request parameters, POST bodies, and so on. In Vert.x there is no automatic mapping of HTTP requests to objects so there is some work for the developer there. Also, while there is support for request validation and API documentation, it's arguably less elegant and takes more work than annotation-based approaches.

What can we do about it?

Annotation support for REST with Quarkus

Unlike Vert.x, Quarkus takes a more opinionated approach to things. Java is the supported language and other technologies popular in the Java world come pre-selected and well-integrated, including: Hibernate, RESTEasy and Apache Camel. The programming model brings with it the familiarity of Jakarta EE and Spring.

Quarkus comes with an HTTP server and RESTEasy out of the box, so creating an endpoint is simple:

@Path("/")
class Rest {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "Hello World!";
    }

}

That's already a lot more readable. There's no need to explicitly create an HTTP server as one starts by default on port 8080.

A simple example is one thing, but we’ll want to see something more realistic. Let’s consider a rudimentary, web-based messaging application. IO bound applications like these lend themselves well to a asynchronous/reactive style of programming so we’ll write it in that style. The structure of the REST endpoints might look like this:

@Path("/")
public class ChatResource {

    @POST
    @Path("join")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public CompletionStage<User> join(@Valid User user) {
        ...
    }

    @PUT
    @Path("leave")
    @Consumes(MediaType.TEXT_HTML)
    public CompletionStage<String> leave(@Email String username) {
        ...
    }

    @POST
    @Produces(MediaType.TEXT_HTML)
    @Consumes(MediaType.APPLICATION_JSON)
    @Path("chat")
    public CompletionStage<String> chat(@Valid ChatMessage message) {
        ...
    }

}

A few highlights:

@Path to map incoming requests to methods. Declarative approach to media type capabilities with @Consumes and @Produces. Request validation with @Valid (delegates validation to the POJO) and others - @Email in this case. Returning CompletionStage<T> indicates to the underlying JAX-RS (2.1) implementation that asynchronous processing is enabled.

Testing REST endpoints

Testing endpoints in Quarkus is made very simple thanks to the fact that Quarkus starts up so rapidly, meaning it can be brought up and shut down between functional/unit tests. The @QuarkusTest annotation enables this.

@QuarkusTest
public class GroupChatResourceTest {

@Test
public void testCreateUserEndpoint() {
User user = new User("christopher@quad.team", "Chris");

    User result =
        given()
          .contentType("application/json")
          .body(user)
        .when().post("/join")
          .as(User.class);

    assertEquals("christopher@quad.team", result.getUsername());
    assertEquals("Chris", result.getAlias());

}

}

The join method confirms that the user has joined by returning the User object and we test this is as we expect it to be.

Dispatching requests to the Vert.x event bus

Typically we want to separate our entry points (adapters) - in this case HTTP entry points - from the business logic part of the application. Injecting dependencies is one option with Quarkus but we can also decouple even further by bringing in the Vert.x event bus into the picture in the following way:

@Path("/")
public class ChatResource {

    @Inject
    EventBus eventBus;

    @POST
    @Path("join")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public CompletionStage<User> join(@Valid User user) {
        return eventBus.<JsonObject>send("join", JsonObject.mapFrom(user))
                .thenApply(Message::body)
                .thenApply(jsonObject -> jsonObject.mapTo(User.class));
    }

    @PUT
    @Path("leave")
    @Consumes(MediaType.TEXT_HTML)
    public CompletionStage<String> leave(@Email String username) {
        return eventBus.<String>send("leave", username)
                .thenApply(Message::body)
                .exceptionally(Throwable::getMessage);
    }

    @POST
    @Produces(MediaType.TEXT_HTML)
    @Consumes(MediaType.APPLICATION_JSON)
    @Path("chat")
    public CompletionStage<String> chat(@Valid ChatMessage message) {
        return eventBus.<String>send("chat", JsonObject.mapFrom(message))
                .thenApply(Message::body)
                .exceptionally(Throwable::getMessage);
    }

}

The entire request-response processing is asynchronous but the callback-free programming model gives the opposite impression. Here we use a version of the event bus compatible with CompletionStage, i.e. io.vertx.axle.core.eventbus.EventBus.

Consuming Vert.x event bus messages

On the "consuming" side we can choose between the Vert.x style of programming or take advantage of what Quarkus offers by way of integration with Vert.x.

Vert.x style

Here's a verticle that handles requests to the leave event bus address:

public class GroupChatService extends AbstractVerticle {

    private Map<String, User> members;

    @Override
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        this.members = new HashMap<>();
    }

    @Override
    public void start(Future<Void> startFuture) throws Exception {

        ...

        // Leave group
        vertx.eventBus().<String>consumer("leave", message -> {
            members.remove(message.body());
            message.reply("OK");
        });

        ...

        super.start(startFuture);
    }

}

Verticles and other Vert.x components are not instantiatiated by Quarkus but there's an easy way to do just that when the application starts. Quarkus supports CDI's @Observes annotation and emits StartupEvent when the application starts. This is a good place to do things like deploying verticles or any other activities that need to be done at startup time:

@ApplicationScoped
public class ApplicationLifecyle {

    @Inject
    Vertx vertx;

    void onStart(@Observes StartupEvent event) {
        vertx.deployVerticle(new GroupChatService());
    }

}

Quarkus style

Quarkus provides a number of alternatives to the Vert.x style of consuming messages. For example, consuming on the "leave" address could be implemented like so:

@ApplicationScoped
public class GroupChatServiceQuarkus {

    private Map<String, User> members;

    void GroupChatServiceQuarkus() {
        this.members = new HashMap<>();
    }

    ...

    @ConsumeEvent("leave")
    public String leave(String username) {
        members.remove(username);
        return "OK";
    }

    ...

}

Vert.x still handles the messaging under the hood but there is no need to be aware of that. Should lower-level access be required, it is still possible to access the io.vertx.core.eventbus.Message:

@ConsumeEvent("leave")
public void leave(Message<String> message) {
String username = message.body();
members.remove(username);
message.reply("OK");
}

A void return type is mandatory when using Message<T> as the method parameter.

Vert.x supports by default various basic types like String, Integer and also JsonObject for the value of T.

Streaming server-sent events

Server-sent events provide an easy way to stream messages to browsers that support them (most do). Remember that we already have an endpoint to handle incoming messages. Here's a reminder:

@POST
@Produces(MediaType.TEXT_HTML)
@Consumes(MediaType.APPLICATION_JSON)
@Path("chat")
public CompletionStage<String> chat(@Valid ChatMessage message) {
return eventBus.<String>send("chat", JsonObject.mapFrom(message))
.thenApply(Message::body)
.exceptionally(Throwable::getMessage);
}

The verticle handles the business logic:

vertx.eventBus().<JsonObject>consumer("chat", message -> {
ChatMessage chatMessage = message.body().mapTo(ChatMessage.class);
User member = members.get(chatMessage.getUsername());
if(Objects.isNull(member)) {
message.fail(1, "Not a member");
} else {
vertx.eventBus().publish("stream", member.getAlias() + ":" + chatMessage.getMessage());
message.reply("OK");
}
});

The business logic determines whether the incoming message is from a member of the group. If it is, it's broadcast via the "stream" event bus address.

The final part is to to stream the messages as server sent events. To the ChatResource endpoint we can add the following:

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("stream")
public Publisher<String> stream() {
return eventBus.<String>consumer("stream").toPublisherBuilder()
.map(Message::body)
.buildRs();
}

Here then, messages ariving on the "stream" event bus address are consumed and published to connected browsers thanks to org.reactivestreams.Publisher<T>.

Connecting the UI

All that remains now is to display the chat messages in the browser. For that we need just a small amount of HTML and Javascript:

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8" />
    <script type="application/javascript" src="streaming.js"></script>
  </head>
  <body>
    <div id="container"></div>
  </body>
</html>

streaming.js:

var eventSource = new EventSource('/stream');
eventSource.onmessage = function (event) {
  var container = document.getElementById('container');
  var paragraph = document.createElement('p');
  paragraph.innerHTML = event.data;
  container.appendChild(paragraph);
};

API documentation with Swagger

A fully documented API with Swagger is achieved by adding a Quarkus extension to the project:

./mvnw quarkus:add-extension -Dextensions="smallrye-openapi"

Open up the page on /swagger-ui.

Conclusion

Quarkus works very well as a "gateway" for REST requests with a declarative programming model that makes for very readable code. And with seamless Vert.x integration we needn't miss out on the power of the Vert.x event bus either.

Join a team

Heb je een teamlid, collega of vriend met wie je het liefst blijft ontwikkelen, meld je dan samen aan!

orange-arrow-leftorange-arrow-centerorange-arrow-rightorange-arrow-leftorange-arrow-right

Contact Mark

Geen recruiter maar een developer, net zoals jij.