Assignment 3 - Add pub/sub messaging
October 16, 2024 · View on GitHub
Assignment goals
To complete this assignment, you must reach the following goals:
-
The TrafficControlService sends
SpeedingViolationmessages using the Dapr pub/sub building block. -
The FineCollectionService receives
SpeedingViolationmessages using the Dapr pub/sub building block. -
RabbitMQ is used as pub/sub message broker that runs as part of the solution in a Docker container.
Don't worry if you have no experience with RabbitMQ. You will run it as a container in the background and don't need to interact with it directly in any way. The instructions will explain exactly how to do that.
This assignment targets number 2 in the end-state setup:
Step 1: Run RabbitMQ as message broker
In the example, you will use RabbitMQ as the message broker with the Dapr pub/sub building block. You're going to pull a standard Docker image containing RabbitMQ to your machine and start it as a container.
-
Open the terminal window in VS Code.
-
Start a RabbitMQ message broker by entering the following command:
docker run -d -p 5672:5672 --name dtc-rabbitmq rabbitmq:3-management-alpine
This will pull the docker image rabbitmq:3-management-alpine from Docker Hub and start it. The name of the container will be dtc-rabbitmq. The server will be listening for connections on port 5672 (which is the default port for RabbitMQ).
If everything goes well, you should see some output like this:
❯ docker run -d -p 5672:5672 --name dtc-rabbitmq rabbitmq:3-management-alpine
Unable to find image 'rabbitmq:3-management-alpine' locally
3-management-alpine: Pulling from library/rabbitmq
a0d0a0d46f8b: Pull complete
31312314eeb3: Pull complete
926937e20d4d: Pull complete
f5676ddf0782: Pull complete
ff9526ce7ab4: Pull complete
6163319fe438: Pull complete
592def0a276e: Pull complete
59922d736a7b: Pull complete
76025ca84b3c: Pull complete
4965e42a5d3c: Pull complete
Digest: sha256:8885c08827289c61133d30be68658b67c6244e517931bb7f1b31752a9fcaec73
Status: Downloaded newer image for rabbitmq:3-management-alpine
85a98f00f1a87b856008fec85de98c8412eb099e3a7675b87945c777b131d876
If you see any errors, make sure you have access to the Internet and are able to download images from Docker Hub. See Docker Hub for more info.
The container will keep running in the background. If you want to stop it, enter the following command:
docker stop dtc-rabbitmq
You can then start the container later by entering the following command:
docker start dtc-rabbitmq
If you are done using the container, you can also remove it by entering the following command:
docker rm dtc-rabbitmq -f
Once you have removed it, you need to start it again with the docker run command shown at the beginning of this step.
For your convenience, the
Infrastructurefolder contains Bash scripts for starting the infrastructural components you'll use throughout the workshop. You can use theInfrastructure/rabbitmq/start-rabbitmq.shscript to start the RabbitMQ container.If you don't mind starting all the infrastructural containers at once (also for assignments to come), you can also use the
Infrastructure/start-all.shscript.
Step 2: Configure the pub/sub component
Until now, you have been using the Dapr components that are installed by default when you install Dapr on your machine. These are a state management component and a pub/sub component. They both use the Redis server that is also installed by default. The components are installed in the folder %USERPROFILE%\.dapr\components on Windows and $HOME/.dapr/components on Linux or Mac.
Because you need to change the message broker from Redis to RabbitMQ, you will create a separate folder with the component configuration files and use this folder when starting the services using the Dapr CLI. You can specify which folder to use on the command-line with the --resources-path flag.
-
Create a new folder
dapr/components. -
Copy all files from the folder
%USERPROFILE%\.dapr\components\on Windows and$HOME/.dapr/componentson Linux or Mac to thedapr/componentsfolder. -
Open the file
dapr/components/pubsub.yamlin VS Code. -
Inspect this file. As you can see, it specifies the type of the message broker to use (
pubsub.redis) and specifies information on how to connect to the Redis server in themetadatasection. -
Change the content of this file to:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub spec: type: pubsub.rabbitmq version: v1 metadata: - name: host value: "amqp://localhost:5672" - name: durable value: "false" - name: deletedWhenUnused value: "false" - name: autoAck value: "false" - name: reconnectWait value: "0" - name: concurrency value: parallel scopes: - trafficcontrolservice - finecollectionservice
As you can see, you specify a different type of pub/sub component (pubsub.rabbitmq) and you specify in the metadata how to connect to the RabbitMQ container you started in step 1 (running on localhost on port 5672). The other metadata can be ignored for now. In the scopes section, you specify that only the TrafficControlService and FineCollectionService should use the pub/sub building block.
Step 3: Send messages from the TrafficControlService
With the Dapr pub/sub building block, you use a topic to send and receive messages. The producer sends messages to the topic and one or more consumers subscribe to this topic to receive those messages. First you are going to prepare the TrafficControlService so it can send messages using Dapr pub/sub.
-
Open the file
TrafficControlService/src/main/java/dapr/traffic/fines/DefaultFineCollectionClient.javain VS Code. -
Inside the
submitForFinemethod, you find the code that sends aSpeedingViolationmessage to thecollectfineendpoint of the FineCollectionService over HTTP:restTemplate.postForObject(fineCollectionEndpoint, speedingViolation, Void.class);The
restTemplateis a utility provided by Spring to invoke the FineCollectionService. Its base address for consuming that REST web service is injected through the constructor of that class. That constructor is invoked from a Spring configuration class, which in turn reads the Spring configuration file using@Value. -
Open the file
TrafficControlService/src/main/resources/application.ymlin VS Code.Here we see the actual value being configured. Inspect the
fine-collection.addresssetting. You can see that in the HTTP call, the URL of the VehicleRegistrationService (running on port 6001) is used. -
The URL for publishing a message using the Dapr pub/sub API is:
http://localhost:<daprPort>/v1.0/publish/<pubsub-name>/<topic>. You'll use this API to send a message to thespeedingviolationstopic using the pub/sub component namedpubsub. The Dapr sidecar for the TrafficControlService runs on HTTP port3600. Replace the URL in the HTTP call with a call to the Dapr pub/sub API:fine-collection.address: http://localhost:3600/v1.0/publish/pubsub/speedingviolations
That's it. You now use Dapr pub/sub to publish a message to a message broker.
Step 4: Receive messages in the FineCollectionService (declaratively)
Now you are going to prepare the FineCollectionService so it can receive messages using Dapr pub/sub. Consuming messages can be done in two ways: declaratively (through configuration) or programmatic (from the code). First you'll use the declarative way. Later you'll also use the programmatic way and finally also using the Dapr SDK for Java.
-
Add a new file in the
dapr/componentsfolder namedsubscription.yaml. -
Open this file in VS Code.
-
You're going to define a subscription on a topic and link that to a web API operation on the FineCollectionService. Paste this snippet into the file:
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: speedingviolations-subscription spec: topic: speedingviolations route: /collectfine pubsubname: pubsub scopes: - finecollectionserviceThe
routefield tells Dapr to forward all messages send to thespeedingviolationstopic to the/collectfineendpoint in the app. Thescopesfield restricts this subscription to only the service with app-idfinecollectionserviceonly.
Now your FineCollectionService is ready to receive messages through Dapr pub/sub. But there is a catch! Dapr uses the CloudEvents message format for pub/sub. So when we send a message through pub/sub, the receiving application needs to understand this format and handle the message as a CloudEvent. Therefore we need to change the code slightly. For now, you will read the incoming JSON by hand (instead of the Jackson model binding doing that for you). You will change this later when you will use the Dapr SDK for Java.
-
Open the file
FineCollectionService/src/main/java/dapr/fines/violation/ViolationController.javain VS Code. -
Remove the
SpeedingViolation requestparameter from theregisterViolationmethod and replace this with aeventparameter of typeJsonNode, and leave the@RequestBodyannotation in place:public ResponseEntity<Void> registerViolation(@RequestBody final JsonNode event) {Add an import for the
com.fasterxml.jackson.databind.JsonNodeclass.This enables you to get to the raw JSON in the request.
-
Add the following code in the body of the method to extract the
SpeedingViolationdata from the event, just before the call toviolationProcessor.processSpeedingViolation:var data = event.get("data"); var violation = new SpeedingViolation( data.get("licenseNumber").asText(), data.get("roadId").asText(), data.get("excessSpeed").asInt(), LocalDateTime.parse(data.get("timestamp").asText()) );Also, add an import for the
java.time.LocalDateTimeclass. -
Open the terminal window in VS Code and make sure the current folder is
FineCollectionService. -
Check all your code-changes are correct by building the code. Execute the following command in the terminal window:
mvn packageIf you see any warnings or errors, review the previous steps to make sure the code is correct.
Step 5: Test the application
You're going to start all the services now. You specify the custom components folder you've created on the command-line using the --resources-path flag so Dapr will use these config files:
-
Make sure no services from previous tests are running (close the command-shell windows).
-
Open the terminal window in VS Code and make sure the current folder is
VehicleRegistrationService. -
Enter the following command to run the VehicleRegistrationService with a Dapr sidecar:
dapr run --app-id vehicleregistrationservice --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --resources-path ../dapr/components mvn spring-boot:runNotice that you specify the custom components folder you've created on the command-line using the
--resources-pathflag so Dapr will use RabbitMQ for pub/sub. -
Open a new terminal window in VS Code and change the current folder to
FineCollectionService. -
Enter the following command to run the FineCollectionService with a Dapr sidecar:
dapr run --app-id finecollectionservice --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --resources-path ../dapr/components mvn spring-boot:run -
Open a new terminal window in VS Code and change the current folder to
TrafficControlService. -
Enter the following command to run the TrafficControlService with a Dapr sidecar:
dapr run --app-id trafficcontrolservice --app-port 6000 --dapr-http-port 3600 --dapr-grpc-port 60000 --resources-path ../dapr/components mvn spring-boot:run -
Open a new terminal window in VS Code and change the current folder to
Simulation. -
Start the simulation:
mvn spring-boot:run
You should see the same logs as before. Obviously, the behavior of the application is exactly the same as before. But if you look closely at the Dapr logs of the FineCollectionService, you should see something like this in there:
INFO[0004] app is subscribed to the following topics: [speedingviolations] through pubsub=pubsub app_id=finecollectionservice instance=maartenm03 scope=dapr.runtime type=log ver=1.2.2
So you can see that Dapr has registered a subscription for the FineCollectionService to the speedingviolations topic.
Step 6: Receive messages in the FineCollectionService (programmatic)
The other way of subscribing to pub/sub events is the programmatic way. Dapr will call your service on the well known endpoint /dapr/subscribe to retrieve the subscriptions for that service. You will implement this endpoint and return the subscription for the speedingviolations topic.
-
Stop the FineCollectionService by navigating to its terminal window and pressing
Ctrl-C. You can keep the other services running for now. -
Open the file
FineCollectionService/src/main/java/dapr/fines/violation/ViolationController.javain VS Code. -
Add a new method
subscribeto the controller that will listen to the route/dapr/dubscribe:@GetMapping("/dapr/subscribe") public ResponseEntity<List<Map<String, Object>>> subscribe() { var subscription = Map.<String, Object>of( "pubsubname", "pubsub", "topic", "speedingviolations", "route", "/collectfine" ); return ResponseEntity.ok(Collections.singletonList(subscription)); }Add the following imports to the class:
import org.springframework.web.bind.annotation.GetMapping; import java.util.Collections; import java.util.List; import java.util.Map; -
Remove the file
dapr/components/subscription.yaml. This file is not needed anymore because you implemented the/dapr/subscribemethod. -
Go back to the terminal window in VS Code and make sure the current folder is
FineCollectionService. -
Check all your code-changes are correct by building the code. Execute the following command in the terminal window:
mvn packageIf you see any warnings or errors, review the previous steps to make sure the code is correct.
-
Start the updated FineCollectionService:
dapr run --app-id finecollectionservice --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --resources-path ../dapr/components mvn spring-boot:runIf you kept the other services running before you started with this step, you may observe that there a few speeding limitations waiting to be processed. The application will immediately start processing this backlog.
-
After you've looked at the log output and confirmed that everything works, you can stop all the services.
Step 7: Use Dapr publish / subscribe with the Dapr SDK for Java
In this step, you will change the code slightly so it uses the Dapr SDK for Java. First you'll change the TrafficControlService that sends messages.
-
Add a dependency to the Dapr SDK for Java to the pom.xml in the TrafficControlService directory:
<dependency> <groupId>io.dapr</groupId> <artifactId>dapr-sdk</artifactId> </dependency>Again, the version of the dependency is managed using Mavens "dependency management" - you can inspect the pom.xml file inside the java folder to see the exact version.
-
Create a new file, TrafficControlService/src/main/java/dapr/traffic/fines/DaprFineCollectionClient.java and open it in VS Code. Make sure to include a package declaration:
package dapr.traffic.fines;. -
Declare a class
DaprFineCollectionClientthat implements theFineCollectionClientinterface. To fulfil the contract of theFineCollectionClientinterface, add the following method:
@Override
public void submitForFine(SpeedingViolation speedingViolation) {
}
Also, add a few imports for the class:
import dapr.traffic.violation.SpeedingViolation;
import io.dapr.client.DaprClient;
import spring.SleuthDaprTracingInjector;
-
Finally, add an instance variable of type DaprClient, and add a constructor to inject it:
private final DaprClient daprClient; public DaprFineCollectionClient(final DaprClient daprClient) { this.daprClient = daprClient; } -
Open the file
TrafficControlService/src/main/java/dapr/traffic/TrafficControlConfiguration.javain VS Code. The default JSON serialization is not suitable for todays goal, so you need to customize the JacksonObjectMapperthat it uses. You do so by adding a static inner class to configure the JSON serialization:
static class JsonObjectSerializer extends DefaultObjectSerializer {
public JsonObjectSerializer() {
OBJECT_MAPPER.registerModule(new JavaTimeModule());
OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
}
}
This is a bit of a lazy approach, but it is enough for this workshop. In fact, the SDK documentation recommends to write your own serializer for production scenario's.
- In the same class, add a new method to declare a Spring Bean of type DaprClient:
@Bean
public DaprClient daprClient() {
return new DaprClientBuilder()
.withObjectSerializer(new JsonObjectSerializer())
.build();
}
In the same class, the fineCollectionClient method declares a Spring Bean that provides an implementation of the FineCollectionClient interface. To do so, it needs a RestTemplate bean. Replace this method with the following:
@Bean
public FineCollectionClient fineCollectionClient(final DaprClient daprClient) {
return new DaprFineCollectionClient(daprClient);
}
Finally, update the import statements in the class:
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dapr.traffic.fines.DaprFineCollectionClient;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.serializer.DefaultObjectSerializer;
- Finally, update the
submitForFine()method in this class to use theDaprClient:
daprClient.publishEvent("pubsub", "speedingviolations", speedingViolation)
.contextWrite(new SleuthDaprTracingInjector())
.block();
-
Open the terminal window in VS Code and make sure the current folder is
TrafficControlService. -
Check all your code-changes are correct by building the code. Execute the following command in the terminal window:
mvn packageIf you see any warnings or errors, review the previous steps to make sure the code is correct.
Now you will change the FineCollectionService that receives messages. The Dapr SDK for Java provides an additional Spring Boot integration library, which automatically wires correctly annotated methods to a pub/sub topic. For every message sent to that topic, the corresponding Java method is invoked and the payload of the message is delivered as request body. You don't have to poll for messages on the message broker.
-
Add a dependency to the Dapr SDK for Java to the pom.xml in the FineCollectionService directory:
<dependency> <groupId>io.dapr</groupId> <artifactId>dapr-sdk-springboot</artifactId> </dependency> -
Open the file
FineCollectionService/src/main/java/dapr/fines/violation/ViolationController.java. -
Remove the
subscribemethod. -
Change the
registerViolationmethod by making the type of theeventparameter toCloudEvent<SpeedingViolation>; keep the@RequestBodyannotation in place. Add an import forio.dapr.client.domain.CloudEvent. The method signature should now look like this:public ResponseEntity<Void> registerViolation(@RequestBody final CloudEvent<SpeedingViolation> event) { -
Change the implementation of the method to extract the actual violation info from the Cloud Event:
var violation = event.getData(); violationProcessor.processSpeedingViolation(violation); -
Add an import for the
io.dapr.Topicclass. Add a@Topicannotation above theregisterViolationmethod to link this method to a topic calledspeedingviolations:@Topic(name = "speedingviolations", pubsubName = "pubsub")The "pubsubName" argument passed to this attribute refers to the name of the Dapr pub/sub component to use.
-
Open the terminal window in VS Code and make sure the current folder is
FineCollectionService. -
Check all your code-changes are correct by building the code. Execute the following command in the terminal window:
mvn packageIf you see any warnings or errors, review the previous steps to make sure the code is correct.
Now you can test the application again. Execute the activities in step 5 again.
Next assignment
Make sure you stop all running processes and close all the terminal windows in VS Code before proceeding to the next assignment.
Go to assignment 4.