Reakt Reactive Java - Invokable Promise - Fluent Reactive Java

posted Apr 22, 2016, 10:19 PM by Rick Hightower

Reakt, Java reactive programming lib, supports invokable promises so service methods could return a Promise.

Invokable Promise

        employeeService.lookupEmployee("123")
               .then((employee)-> {...}).catchError(...).invoke();

NOTE: QBit Microservices Lib version 1.4 (coming soon) will support generation of client stubs (local and remote) that return Reakt Promises. Having the client proxy method return the promise instead of taking a callback. With QBit you will be able to use Callback's on the service side and Promises on the client proxy side. Callbacks are natural for the service, and Promises are natural for the client.

But you do not have to use QBit to use Reakt's invokable promises.

Let's walk through an example of using a Reakt invokable promises.

Let's say we are developing a service discovery service with this interface.

ServiceDiscovery interface (Example service that does lookup)

    interface ServiceDiscovery {
        Promise<URI> lookupService(URI uri);

        default void shutdown() {}
        default void start() {}
    }

Note that ServiceDiscovery.lookupService is just an example likeEmployeeService.lookupEmployee was just an example.

Notice the lookupService returns a Promise (Promise<URI> lookupService(URI uri)). To call this, we can use an invokable promise. The service side will return an invokable promise, and the client of the service will use that Promise to register its thenthenExpectthenMap, and/orcatchError handlers.

Here is the service side of the invokable promise example.

Service Side of invokable promise

    class ServiceDiscoveryImpl implements ServiceDiscovery {

        @Override
        public Promise<URI> lookupService(URI uri) {
            return invokablePromise(promise -> {
                if (uri == null) {
                    promise.reject("URI was null");
                } else {
                    ...
                    // DO SOME ASYNC OPERATION WHEN IT RETURNS CALL RESOLVE.
                    promise.resolve(successResult);
                }
            });
        }
    }

Notice to create the invokablePromise we use Promises.invokablePromise which takes aPromise Consumer (static <T> Promise<T> invokablePromise(Consumer<Promise<T>> promiseConsumer)).

The lookupService example returns a Promise and it does not execute its body until theinvoke (Promise.invoke) on the client side of the equation is called.

Let's look at the client side.

Client side of using lookupService.

        serviceDiscovery.lookupService(empURI)
                .then(this::handleSuccess)
                .catchError(this::handleError)
                .invoke();

The syntax this::handleSuccess is a method reference which can be used as Java 8 lambda expressions. We use method references to make the example shorter.

Here is a complete example with two versions of our example service.

Complete example from unit tests

package io.advantageous.reakt.promise;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.net.URI;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static io.advantageous.reakt.promise.Promises.*;
import static org.junit.Assert.*;

public class InvokablePromise {


    final URI successResult = URI.create("http://localhost:8080/employeeService/");
    ServiceDiscovery serviceDiscovery;
    ServiceDiscovery asyncServiceDiscovery;
    URI empURI;
    CountDownLatch latch;
    AtomicReference<URI> returnValue;
    AtomicReference<Throwable> errorRef;

    @Before
    public void before() {
        latch = new CountDownLatch(1);
        returnValue = new AtomicReference<>();
        errorRef = new AtomicReference<>();
        serviceDiscovery = new ServiceDiscoveryImpl();
        asyncServiceDiscovery = new ServiceDiscoveryAsyncImpl();
        asyncServiceDiscovery.start();
        empURI = URI.create("marathon://default/employeeService?env=staging");
    }

    @After
    public void after() {
        asyncServiceDiscovery.shutdown();
    }

    public void await() {
        try {
            latch.await(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    @Test
    public void testServiceWithReturnPromiseSuccess() {
        serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError).invoke();
        await();
        assertNotNull("We have a return", returnValue.get());
        assertNull("There were no errors", errorRef.get());
        assertEquals("The result is the expected result", successResult, returnValue.get());
    }


    @Test
    public void testServiceWithReturnPromiseFail() {


        serviceDiscovery.lookupService(null).then(this::handleSuccess)
                .catchError(this::handleError).invoke();

        await();
        assertNull("We do not have a return", returnValue.get());
        assertNotNull("There were  errors", errorRef.get());
    }


    @Test
    public void testAsyncServiceWithReturnPromiseSuccess() {
        asyncServiceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError).invoke();
        await();
        assertNotNull("We have a return from async", returnValue.get());
        assertNull("There were no errors form async", errorRef.get());
        assertEquals("The result is the expected result form async", successResult, returnValue.get());
    }


    @Test
    public void testAsyncServiceWithReturnPromiseFail() {


        asyncServiceDiscovery.lookupService(null).then(this::handleSuccess)
                .catchError(this::handleError).invoke();

        await();
        assertNull("We do not have a return from async", returnValue.get());
        assertNotNull("There were  errors from async", errorRef.get());
    }

    @Test (expected = IllegalStateException.class)
    public void testServiceWithReturnPromiseSuccessInvokeTwice() {
        final Promise<URI> promise = serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError);
        promise.invoke();
        promise.invoke();
    }

    @Test
    public void testIsInvokable() {
        final Promise<URI> promise = serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError);

        assertTrue("Is this an invokable promise", promise.isInvokable());
    }


    private void handleError(Throwable error) {
        errorRef.set(error);
        latch.countDown();
    }

    private void handleSuccess(URI uri) {
        returnValue.set(uri);
        latch.countDown();
    }


    interface ServiceDiscovery {
        Promise<URI> lookupService(URI uri);

        default void shutdown() {
        }

        default void start() {
        }
    }

    class ServiceDiscoveryImpl implements ServiceDiscovery {

        @Override
        public Promise<URI> lookupService(URI uri) {
            return invokablePromise(promise -> {

                if (uri == null) {
                    promise.reject("URI was null");
                } else {
                    promise.resolve(successResult);
                }
            });
        }
    }


    class ServiceDiscoveryAsyncImpl implements ServiceDiscovery {

        final ExecutorService executorService;

        final Queue<Runnable> runnables;

        final AtomicBoolean stop;

        public ServiceDiscoveryAsyncImpl() {
            executorService = Executors.newSingleThreadExecutor();
            runnables = new LinkedTransferQueue<>();
            stop = new AtomicBoolean();
        }

        @Override
        public Promise<URI> lookupService(URI uri) {
            return invokablePromise(promise -> {
                runnables.offer(() -> {
                    if (uri == null) {
                        promise.reject("URI was null");
                    } else {
                        promise.resolve(URI.create("http://localhost:8080/employeeService/"));
                    }
                });
            });
        }

        @Override
        public void shutdown() {
            stop.set(true);
            executorService.shutdown();
        }

        @Override
        public void start() {
            executorService.submit((Runnable) () -> {

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                while (true) {
                    if (stop.get())break;
                    Runnable runnable = runnables.poll();
                    while (runnable != null) {
                        runnable.run();
                        runnable = runnables.poll();
                    }
                }

            });
        }
    }
}

Reakt website

Related Projects

Further reading

What is Microservices Architecture?

QBit Java Micorservices lib tutorials

The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.

Find more tutorial on QBit.

Reactive ProgrammingJava MicroservicesRick Hightower

High-speed microservices consulting firm and authors of QBit with lots of experience with Vertx - Mammatus Technology

Highly recommended consulting and training firm who specializes in microservices architecture and mobile development that are already very familiar with QBit and Vertx as well as iOS and Android - About Objects

Java Microservices Architecture

Microservice Service Discovery with Consul

Microservices Service Discovery Tutorial with Consul

Reactive Microservices

High Speed Microservices

Java Microservices Consulting

Microservices Training

Reactive Microservices Tutorial, using the Reactor

QBit is mentioned in the Restlet blog


Comments