Assignment 3 - Add pub/sub messaging

October 16, 2024 · View on GitHub

Assignment goals

To complete this assignment, you must reach the following goals:

  1. The TrafficControlService sends SpeedingViolation messages using the Dapr pub/sub building block.

  2. The FineCollectionService receives SpeedingViolation messages using the Dapr pub/sub building block.

  3. RabbitMQ is used as pub/sub message broker that runs as part of the solution in a Docker container.

Don't worry if you have no experience with RabbitMQ. You will run it as a container in the background and don't need to interact with it directly in any way. The instructions will explain exactly how to do that.

This assignment targets number 2 in the end-state setup:

Step 1: Run RabbitMQ as message broker

In the example, you will use RabbitMQ as the message broker with the Dapr pub/sub building block. You're going to pull a standard Docker image containing RabbitMQ to your machine and start it as a container.

  1. Open the terminal window in VS Code.

  2. Start a RabbitMQ message broker by entering the following command:

    docker run -d -p 5672:5672 --name dtc-rabbitmq rabbitmq:3-management-alpine
    

This will pull the docker image rabbitmq:3-management-alpine from Docker Hub and start it. The name of the container will be dtc-rabbitmq. The server will be listening for connections on port 5672 (which is the default port for RabbitMQ).

If everything goes well, you should see some output like this:

❯ docker run -d -p 5672:5672 --name dtc-rabbitmq rabbitmq:3-management-alpine
Unable to find image 'rabbitmq:3-management-alpine' locally
3-management-alpine: Pulling from library/rabbitmq
a0d0a0d46f8b: Pull complete
31312314eeb3: Pull complete
926937e20d4d: Pull complete
f5676ddf0782: Pull complete
ff9526ce7ab4: Pull complete
6163319fe438: Pull complete
592def0a276e: Pull complete
59922d736a7b: Pull complete
76025ca84b3c: Pull complete
4965e42a5d3c: Pull complete
Digest: sha256:8885c08827289c61133d30be68658b67c6244e517931bb7f1b31752a9fcaec73
Status: Downloaded newer image for rabbitmq:3-management-alpine
85a98f00f1a87b856008fec85de98c8412eb099e3a7675b87945c777b131d876

If you see any errors, make sure you have access to the Internet and are able to download images from Docker Hub. See Docker Hub for more info.

The container will keep running in the background. If you want to stop it, enter the following command:

docker stop dtc-rabbitmq

You can then start the container later by entering the following command:

docker start dtc-rabbitmq

If you are done using the container, you can also remove it by entering the following command:

docker rm dtc-rabbitmq -f

Once you have removed it, you need to start it again with the docker run command shown at the beginning of this step.

For your convenience, the Infrastructure folder contains Bash scripts for starting the infrastructural components you'll use throughout the workshop. You can use the Infrastructure/rabbitmq/start-rabbitmq.sh script to start the RabbitMQ container.

If you don't mind starting all the infrastructural containers at once (also for assignments to come), you can also use the Infrastructure/start-all.sh script.

Step 2: Configure the pub/sub component

Until now, you have been using the Dapr components that are installed by default when you install Dapr on your machine. These are a state management component and a pub/sub component. They both use the Redis server that is also installed by default. The components are installed in the folder %USERPROFILE%\.dapr\components on Windows and $HOME/.dapr/components on Linux or Mac.

Because you need to change the message broker from Redis to RabbitMQ, you will create a separate folder with the component configuration files and use this folder when starting the services using the Dapr CLI. You can specify which folder to use on the command-line with the --resources-path flag.

  1. Create a new folder dapr/components.

  2. Copy all files from the folder %USERPROFILE%\.dapr\components\ on Windows and $HOME/.dapr/components on Linux or Mac to the dapr/components folder.

  3. Open the file dapr/components/pubsub.yaml in VS Code.

  4. Inspect this file. As you can see, it specifies the type of the message broker to use (pubsub.redis) and specifies information on how to connect to the Redis server in the metadata section.

  5. Change the content of this file to:

    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:
      name: pubsub
    spec:
      type: pubsub.rabbitmq
      version: v1
      metadata:
      - name: host
        value: "amqp://localhost:5672"
      - name: durable
        value: "false"
      - name: deletedWhenUnused
        value: "false"
      - name: autoAck
        value: "false"
      - name: reconnectWait
        value: "0"
      - name: concurrency
        value: parallel
    scopes:
      - trafficcontrolservice
      - finecollectionservice
    

As you can see, you specify a different type of pub/sub component (pubsub.rabbitmq) and you specify in the metadata how to connect to the RabbitMQ container you started in step 1 (running on localhost on port 5672). The other metadata can be ignored for now. In the scopes section, you specify that only the TrafficControlService and FineCollectionService should use the pub/sub building block.

Step 3: Send messages from the TrafficControlService

With the Dapr pub/sub building block, you use a topic to send and receive messages. The producer sends messages to the topic and one or more consumers subscribe to this topic to receive those messages. First you are going to prepare the TrafficControlService so it can send messages using Dapr pub/sub.

  1. Open the file TrafficControlService/src/main/java/dapr/traffic/fines/DefaultFineCollectionClient.java in VS Code.

  2. Inside the submitForFine method, you find the code that sends a SpeedingViolation message to the collectfine endpoint of the FineCollectionService over HTTP:

    restTemplate.postForObject(fineCollectionEndpoint, speedingViolation, Void.class);
    

    The restTemplate is a utility provided by Spring to invoke the FineCollectionService. Its base address for consuming that REST web service is injected through the constructor of that class. That constructor is invoked from a Spring configuration class, which in turn reads the Spring configuration file using @Value.

  3. Open the file TrafficControlService/src/main/resources/application.yml in VS Code.

    Here we see the actual value being configured. Inspect the fine-collection.address setting. You can see that in the HTTP call, the URL of the VehicleRegistrationService (running on port 6001) is used.

  4. The URL for publishing a message using the Dapr pub/sub API is: http://localhost:<daprPort>/v1.0/publish/<pubsub-name>/<topic>. You'll use this API to send a message to the speedingviolations topic using the pub/sub component named pubsub. The Dapr sidecar for the TrafficControlService runs on HTTP port 3600. Replace the URL in the HTTP call with a call to the Dapr pub/sub API:

    fine-collection.address: http://localhost:3600/v1.0/publish/pubsub/speedingviolations
    

That's it. You now use Dapr pub/sub to publish a message to a message broker.

Step 4: Receive messages in the FineCollectionService (declaratively)

Now you are going to prepare the FineCollectionService so it can receive messages using Dapr pub/sub. Consuming messages can be done in two ways: declaratively (through configuration) or programmatic (from the code). First you'll use the declarative way. Later you'll also use the programmatic way and finally also using the Dapr SDK for Java.

  1. Add a new file in the dapr/components folder named subscription.yaml.

  2. Open this file in VS Code.

  3. You're going to define a subscription on a topic and link that to a web API operation on the FineCollectionService. Paste this snippet into the file:

    apiVersion: dapr.io/v1alpha1
    kind: Subscription
    metadata:
      name: speedingviolations-subscription
    spec:
      topic: speedingviolations
      route: /collectfine
      pubsubname: pubsub
    scopes:
    - finecollectionservice
    

    The route field tells Dapr to forward all messages send to the speedingviolations topic to the /collectfine endpoint in the app. The scopes field restricts this subscription to only the service with app-id finecollectionservice only.

Now your FineCollectionService is ready to receive messages through Dapr pub/sub. But there is a catch! Dapr uses the CloudEvents message format for pub/sub. So when we send a message through pub/sub, the receiving application needs to understand this format and handle the message as a CloudEvent. Therefore we need to change the code slightly. For now, you will read the incoming JSON by hand (instead of the Jackson model binding doing that for you). You will change this later when you will use the Dapr SDK for Java.

  1. Open the file FineCollectionService/src/main/java/dapr/fines/violation/ViolationController.java in VS Code.

  2. Remove the SpeedingViolation request parameter from the registerViolation method and replace this with a event parameter of type JsonNode, and leave the @RequestBody annotation in place:

    public ResponseEntity<Void> registerViolation(@RequestBody final JsonNode event) {
    

    Add an import for the com.fasterxml.jackson.databind.JsonNode class.

    This enables you to get to the raw JSON in the request.

  3. Add the following code in the body of the method to extract the SpeedingViolation data from the event, just before the call to violationProcessor.processSpeedingViolation:

    var data = event.get("data");
    var violation = new SpeedingViolation(
            data.get("licenseNumber").asText(),
            data.get("roadId").asText(),
            data.get("excessSpeed").asInt(),
            LocalDateTime.parse(data.get("timestamp").asText())
    );
    

    Also, add an import for the java.time.LocalDateTime class.

  4. Open the terminal window in VS Code and make sure the current folder is FineCollectionService.

  5. Check all your code-changes are correct by building the code. Execute the following command in the terminal window:

    mvn package
    

    If you see any warnings or errors, review the previous steps to make sure the code is correct.

Step 5: Test the application

You're going to start all the services now. You specify the custom components folder you've created on the command-line using the --resources-path flag so Dapr will use these config files:

  1. Make sure no services from previous tests are running (close the command-shell windows).

  2. Open the terminal window in VS Code and make sure the current folder is VehicleRegistrationService.

  3. Enter the following command to run the VehicleRegistrationService with a Dapr sidecar:

    dapr run --app-id vehicleregistrationservice --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --resources-path ../dapr/components mvn spring-boot:run
    

    Notice that you specify the custom components folder you've created on the command-line using the --resources-path flag so Dapr will use RabbitMQ for pub/sub.

  4. Open a new terminal window in VS Code and change the current folder to FineCollectionService.

  5. Enter the following command to run the FineCollectionService with a Dapr sidecar:

    dapr run --app-id finecollectionservice --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --resources-path ../dapr/components mvn spring-boot:run
    
  6. Open a new terminal window in VS Code and change the current folder to TrafficControlService.

  7. Enter the following command to run the TrafficControlService with a Dapr sidecar:

    dapr run --app-id trafficcontrolservice --app-port 6000 --dapr-http-port 3600 --dapr-grpc-port 60000 --resources-path ../dapr/components mvn spring-boot:run
    
  8. Open a new terminal window in VS Code and change the current folder to Simulation.

  9. Start the simulation:

    mvn spring-boot:run
    

You should see the same logs as before. Obviously, the behavior of the application is exactly the same as before. But if you look closely at the Dapr logs of the FineCollectionService, you should see something like this in there:

INFO[0004] app is subscribed to the following topics: [speedingviolations] through pubsub=pubsub  app_id=finecollectionservice instance=maartenm03 scope=dapr.runtime type=log ver=1.2.2

So you can see that Dapr has registered a subscription for the FineCollectionService to the speedingviolations topic.

Step 6: Receive messages in the FineCollectionService (programmatic)

The other way of subscribing to pub/sub events is the programmatic way. Dapr will call your service on the well known endpoint /dapr/subscribe to retrieve the subscriptions for that service. You will implement this endpoint and return the subscription for the speedingviolations topic.

  1. Stop the FineCollectionService by navigating to its terminal window and pressing Ctrl-C. You can keep the other services running for now.

  2. Open the file FineCollectionService/src/main/java/dapr/fines/violation/ViolationController.java in VS Code.

  3. Add a new method subscribe to the controller that will listen to the route /dapr/dubscribe:

    @GetMapping("/dapr/subscribe")
    public ResponseEntity<List<Map<String, Object>>> subscribe() {
        var subscription = Map.<String, Object>of(
            "pubsubname", "pubsub",
            "topic", "speedingviolations",
            "route", "/collectfine"
        );
        return ResponseEntity.ok(Collections.singletonList(subscription));
    }
    

    Add the following imports to the class:

    import org.springframework.web.bind.annotation.GetMapping;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    
  4. Remove the file dapr/components/subscription.yaml. This file is not needed anymore because you implemented the /dapr/subscribe method.

  5. Go back to the terminal window in VS Code and make sure the current folder is FineCollectionService.

  6. Check all your code-changes are correct by building the code. Execute the following command in the terminal window:

    mvn package
    

    If you see any warnings or errors, review the previous steps to make sure the code is correct.

  7. Start the updated FineCollectionService:

    dapr run --app-id finecollectionservice --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --resources-path ../dapr/components mvn spring-boot:run
    

    If you kept the other services running before you started with this step, you may observe that there a few speeding limitations waiting to be processed. The application will immediately start processing this backlog.

  8. After you've looked at the log output and confirmed that everything works, you can stop all the services.

Step 7: Use Dapr publish / subscribe with the Dapr SDK for Java

In this step, you will change the code slightly so it uses the Dapr SDK for Java. First you'll change the TrafficControlService that sends messages.

  1. Add a dependency to the Dapr SDK for Java to the pom.xml in the TrafficControlService directory:

    <dependency>
       <groupId>io.dapr</groupId>
       <artifactId>dapr-sdk</artifactId>
    </dependency>
    

    Again, the version of the dependency is managed using Mavens "dependency management" - you can inspect the pom.xml file inside the java folder to see the exact version.

  2. Create a new file, TrafficControlService/src/main/java/dapr/traffic/fines/DaprFineCollectionClient.java and open it in VS Code. Make sure to include a package declaration: package dapr.traffic.fines;.

  3. Declare a class DaprFineCollectionClient that implements the FineCollectionClient interface. To fulfil the contract of the FineCollectionClient interface, add the following method:

@Override
public void submitForFine(SpeedingViolation speedingViolation) {
}

Also, add a few imports for the class:

import dapr.traffic.violation.SpeedingViolation;
import io.dapr.client.DaprClient;
import spring.SleuthDaprTracingInjector;
  1. Finally, add an instance variable of type DaprClient, and add a constructor to inject it:

    private final DaprClient daprClient;
    
    public DaprFineCollectionClient(final DaprClient daprClient) {
       this.daprClient = daprClient;
    }
    
  2. Open the file TrafficControlService/src/main/java/dapr/traffic/TrafficControlConfiguration.java in VS Code. The default JSON serialization is not suitable for todays goal, so you need to customize the Jackson ObjectMapper that it uses. You do so by adding a static inner class to configure the JSON serialization:

static class JsonObjectSerializer extends DefaultObjectSerializer {
    public JsonObjectSerializer() {
        OBJECT_MAPPER.registerModule(new JavaTimeModule());
        OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    }
}

This is a bit of a lazy approach, but it is enough for this workshop. In fact, the SDK documentation recommends to write your own serializer for production scenario's.

  1. In the same class, add a new method to declare a Spring Bean of type DaprClient:
@Bean
public DaprClient daprClient() {
    return new DaprClientBuilder()
            .withObjectSerializer(new JsonObjectSerializer())
            .build();
}

In the same class, the fineCollectionClient method declares a Spring Bean that provides an implementation of the FineCollectionClient interface. To do so, it needs a RestTemplate bean. Replace this method with the following:

@Bean
public FineCollectionClient fineCollectionClient(final DaprClient daprClient) {
    return new DaprFineCollectionClient(daprClient);
}

Finally, update the import statements in the class:

import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dapr.traffic.fines.DaprFineCollectionClient;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.serializer.DefaultObjectSerializer;
  1. Finally, update the submitForFine() method in this class to use the DaprClient:
daprClient.publishEvent("pubsub",  "speedingviolations", speedingViolation)
       .contextWrite(new SleuthDaprTracingInjector())
       .block();
  1. Open the terminal window in VS Code and make sure the current folder is TrafficControlService.

  2. Check all your code-changes are correct by building the code. Execute the following command in the terminal window:

    mvn package
    

    If you see any warnings or errors, review the previous steps to make sure the code is correct.

Now you will change the FineCollectionService that receives messages. The Dapr SDK for Java provides an additional Spring Boot integration library, which automatically wires correctly annotated methods to a pub/sub topic. For every message sent to that topic, the corresponding Java method is invoked and the payload of the message is delivered as request body. You don't have to poll for messages on the message broker.

  1. Add a dependency to the Dapr SDK for Java to the pom.xml in the FineCollectionService directory:

    <dependency>
       <groupId>io.dapr</groupId>
       <artifactId>dapr-sdk-springboot</artifactId>
    </dependency>
    
  2. Open the file FineCollectionService/src/main/java/dapr/fines/violation/ViolationController.java.

  3. Remove the subscribe method.

  4. Change the registerViolation method by making the type of the event parameter to CloudEvent<SpeedingViolation>; keep the @RequestBody annotation in place. Add an import for io.dapr.client.domain.CloudEvent. The method signature should now look like this:

    public ResponseEntity<Void> registerViolation(@RequestBody final CloudEvent<SpeedingViolation> event) {
    
  5. Change the implementation of the method to extract the actual violation info from the Cloud Event:

    var violation = event.getData();
    violationProcessor.processSpeedingViolation(violation);
    
  6. Add an import for the io.dapr.Topic class. Add a @Topic annotation above the registerViolation method to link this method to a topic called speedingviolations:

    @Topic(name = "speedingviolations", pubsubName = "pubsub")
    

    The "pubsubName" argument passed to this attribute refers to the name of the Dapr pub/sub component to use.

  7. Open the terminal window in VS Code and make sure the current folder is FineCollectionService.

  8. Check all your code-changes are correct by building the code. Execute the following command in the terminal window:

    mvn package
    

    If you see any warnings or errors, review the previous steps to make sure the code is correct.

Now you can test the application again. Execute the activities in step 5 again.

Next assignment

Make sure you stop all running processes and close all the terminal windows in VS Code before proceeding to the next assignment.

Go to assignment 4.