Build a Full, Real-time, Microservices based Application using React, Websocket, Play for Scala, Kafka Streaming Platform and Cassandra
This post is about building an example application using React, Websocket, Play for Scala, Akka, Kafka, Alpakka Kafka, and Cassandra, among others.
We will build the Product Reviewer Microservices based application, a tiny part of an eCommerce business where people can make reviews on a particular product. Our focus will be on the reviews, not on the products or anything else, so we will have only one product in our system, and all reviews will be against that product.
By the end, you should at least have some few more ideas or options while designing your next real-time Kafka based application.
Let’s first see what the application looks like …
Below you can see a video of the example application in action.
The product in question is a Kobe Bryant LA Jersey. You can see the current reviews, and publish yours. Under the product image you have the reviews statistics.
This application is almost real-time. When you publish a new review, all connected users can see it, along with the updated statistics.
The way it works is straightforward. You just fill your name and email, you select a rating, you say something, and you publish.
If the name is empty, ‘Tom Dollars’ will be used.
If the email is empty, ‘tomdollars@example.com’ will be used.
If the content is empty, ‘(No content)’ will be used.
If you write an invalid email address, a security check will reject your review.
That is it!
The code is hosted on Github in two parts: the Backend part and the Frontend part. The README
from the Backend part will show you all you need to do to deploy the application. Do not hesitate to open an issue if needed.
Now that you know what we are going to build, let’s start by drawing the big picture of our solution so we can have a better shared understanding of what is going on.
The picture below shows how our tools fit together to form our architecture:
In this section we will get Kafka and Cassandra ready for our example application. We will assume you already installed both, and directly proceed to their setup for our example application. We personally used the Confluent Community Edition for Ubuntu for the Kafka Streaming Platform, and the Official Cassandra Docker Image for Cassandra, but others options are available. Just peek the one you feel more comfortable with.
Kafka
Our application will use five topics. You must run all the commands below from KAFKA_HOME
.
Cassandra
Cassandra
setup is straightforward, just run the script below. That will create the proreviewer
Keyspace
, and add the tables reviews_by_product
, stats
, and tasks
, and create the materialized view reviews
.
Datastax Sink Connector
Note that this step should be performed after setting up Cassandra
, because the Connector
will connect to Cassandra
during its registration.
We used Kafka Connect in Distributed mode, but you can easily adapt the configuration for the Standalone mode.
Below is the body of our POST
request:
Adjust the values to match your environment.
Just POST it to http://localhost:8083
(your Kafka Connect API interface) and you will be all set. After this, all messages published to the observed topics will trigger an Insert or Update in the mapped tables in Cassandra, everything is handled for you.
Now that we are all set with our Infrastructure, we can start writing our code, and we will start with our Microservices
.
We will start by installing all dependencies.
Installing all dependencies
All our Microservices
are developed in Scala, and we use sbt to manage the projects.
product-reviewer/build.sbt
product-reviewer/project/plugins.sbt
This define five projects:
- common (for common libraries)
- webserver
- reviews
- security
- statistics
Except the common project, all the others map to our Microservices
.
Now that our projects directories are created and all dependencies downloaded, let’s start writing the actual code.
It is not possible to specify every steps here, so I will try to show the important pieces of code, hoping that they will convince you to look at the full code on the Github repository. Let’s start with the common libraries.
Common libraries
There are two important things in the common libraries: the Avro data models and an abstract implementation of the org.apache.commons.daemon.Daemon
Interface.
Avro Data Models
I choose to create the Avro data models in the common project, so they are available to all the services. By making them a common concern, every schema change should be approved or at least noticed by all the teams. If you look into product-reviewer/common/src/main/avro/
, you will see all the Avro schemas files:
./task.avsc
./reviews/entities.Review.avsc
./reviews/events/ReviewApproved.avsc
./reviews/events/ReviewRejected.avsc
./security/events/SecurityCheckRequested.avsc
./security/events/SecurityCheckProcessed.avsc
./statistics/entities/ProductStatistic.avsc
You can use the sbt-avro plugin to generate the Avro
Specific Records Java classes.
The Daemon Interface
In the common project you will also find an abstract implementation of the org.apache.commons.daemon.Daemon
Interface. All our Kafka Streams
applications will extend that abstract implementation so they can be ran as Daemon if you want, using jsvc, but to keep things simple, we will not make use of jsvc
when Dockerizing
our example application.
Let’s move to our stream processing services: Security
, Reviews
and Statistics
services.
The Security
, Reviews
and Statistics
services are Kafka Streams
applications. As such, I will just show you their stream topology code. You can see their full code on the code repository.
Security service stream topology
product-reviewer/security/src/main/scala/SecurityApp.scala
The application simply consumes all the SecurityCheckRequested
event from the security-eventsource
topic, performs its check, then publishes a SecurityCheckProcessed
event still in the security-eventsource
topic.
Let’s move to the Reviews
service.
Reviews service stream topology
product-reviewer/reviews/src/main/scala/ReviewsApp.scala
The application first consumes all the SecurityCheckProcessed
events from security-eventsource
topic and using the branch
processor, it creates two KStream
, one for the VALID
reviews and the other for the INVALID
reviews.
On the VALID
branch, the SecurituCheckProcessed
data model is mapped to a ReviewApproved
data model, and published to the reviews-eventsource
topic.
On the INVALID
branch, the SecurituCheckProcessed
data model is mapped to a ReviewRejected
data model, and published to the reviews-eventsource
topic.
The application also consumes all the ReviewRejected
and ReviewApproved
events from the reviews-eventsource
topic, map them to a Review
entity, and publishes the entity to the reviews-changelog
topic.
The application also consumes the tasks-changelog
topic as a table
, and perform a KStream to KTable join
between the SecurityCheckProcessed
KStream
and Task
KTable
, then using our taskJoiner
, it set the SecurityCheckProcessed's
corresponding Task
as DONE
, and publish the updated Task
to the tasks-changelog
topic.
That is basically it. Let’s move to the Statistics
service.
Statistics service stream topology
product-reviewer/statistics/src/main/scala/StatisticsApp.scala
The application simply consumes the ReviewApproved
event from the reviews-eventsource
topic, then set the key
to the product_id
, so all reviews for a given product will be a the same partition, then aggregate the reviews by the new product_id
key.
aggregate
is a state-full processor, it uses a state store
, backed by a Kafka topic
, to save the previous aggregated value.
Our Aggregator
simple produce a new ProductStatistic
data model based on the previous statistics and the ReviewApproved
data model, and publish it as an update to the stats-changelog
topic.
Let’s move to our last service, the Web Server
The Web Server service is a Play for Scala
application. It contains Models
, Controllers
, Repositories
, Services
and Sources
. Let's start with the models because they are used everywhere.
Models
Our Avro
data models described previously are created to be serialized and deserialized as Avro
. The models in this section are created to send data to the Frontend
in JSON
format. We could have struggle to do it directly with the Avro
data models, but working with the models in this section is more convenient, and specially in a Scala
perspective.
Some of those model extends the DomainModel
Trait, which simply define a toDataModel
method.
Task
product-reviewer/webserver/app/models/data/Task.scala
Review
product-reviewer/webserver/app/models/data/Review.scala
ProductStatistic
product-reviewer/webserver/app/models/data/ProductStatistic.scala
As part of the Models, we also defined the Messages protocol, that is the Events and Queries received by the service.
Messages protocol
product-reviewer/webserver/app/models/protocol/Messages.scala
Now let’s move to our single repository: ReviewRepo
Repositories
Reviews repository
That repository is our Data Access Object to fetch everything we need from Cassandra
, it as an Akka Actor.
product-reviewer/webserver/app/repositories/ReviewRepo.scala
Let’s move to our single service, the ReviewManager
Services
ReviewManager
It is also an Akka Actor.
product-reviewer/webserver/app/services/ReviewManager.scala
Basically, when it receive a SecurityCheckRequested
message from our Messages
protocol, it publishes an Avro
SecurityCheckResquested
data model to the security-eventsource
topic, and an Avro
Task
entity with a PROCESSING
status to the tasks-changelog
topic, then it returns that Task to the Frontend
, so it can track the task status on Cassandra.
It also forward all queries to the ReviewRepo
actor.
Let’s move to our sources.
Sources
You should be wondering on what I call a Source. If you remember our discussion on Alpakka Kafka
, a Source here is that special Akka Streams Source
that generate message from a Kafka topic. I made them first class citizen in our Play application, just like Models or Controllers. We will have two Sources, one for reviews and one for product statistics.
Reviews source
product-reviewer/webserver/app/sources/ReviewsSource.scala
Product statistics source
product-reviewer/webserver/app/sources/ProductStatsSource.scala
Those Sources will be used in the Socket handler to send reviews and statistics to the Frontend
as they are available, making our application a real-time one.
Controllers
ReviewController
product-reviewer/webserver/app/controllers/ReviewController.scala
TaskController
product-reviewer/webserver/app/controllers/TaskController.scala
Routes
Let’s finally look at the routes
file
product-reviewer/webserver/conf/routes
We have covered all the main pieces of code from the webserver
service, and this close our coverage of our Microservices
. Let's move to the Frontend
.
Our frontend
is based on React
, TypeScript
, NodeJS
, NextJS
and uses React Context
for state management. The most important aspect of our frontend
is data or state management. You can see the full code on the Frontend repository of our example application. Let's first see how posting a new review is handled.
Posting a new Review
product-reviewer-client/components/ReviewForm.tsx
At line 17 we perform the actual POST
and receive the Task
created by the ReviewManager
from our webserver
Microservice
.
The pollUntilTaskFinished
function (shown below) will try to fetch the task from Cassandra
each 500ms
, and check if it is DONE
or still PROCESSING
, and it will try to do it for a maximum of 60 times.
60 * 500ms = 30 seconds
. So we are basically giving 30 seconds to our backend
infrastructure to complete the process, after what we will issue a Timeout Error.
The task returned by our POST
contains a resourceId
, which is the ID of our newly created review. We use that ID in the callback to fetch the review from Cassandra and make it available to the frontend
.
Now let’s see how reviews and statistics updates are fetched.
Initial load of reviews and statistics updates
The DataProvider Component
We have defined a DataProvider
component wrapping a DataContext
to keep our reviews and statistics data:
product-reviewer-client/data/DataProdiver.tsx
The Index Page
Below you can see how data are fetched when our index page load
product-reviewer-client/pages/index.tsx
The fetchReviews
and fetchStats
methods fetch all data and update the context.
Still on that index page, you can see the SocketProvider
, used to fetch new reviews and product statistics updates as they are available. let's look at it.
Real-time reviews and product statistics update load
We defined a SocketProvider
wrapping a SocketContext
, to make the socket connection available to the component requiring it.
The SocketProvider Component
product-reviewer-client/data/SocketProvider.tsx
This component will automatically try to reconnect up to 10 times if the connection is closed, making it permanent.
Real-time reviews
Like you can see in the IndexPage
, the ReviewsList
component use the socket connection to retrieve review in real-time and add them to the DataContext
:
product-reviewer-client/pages/index.tsx
product-reviewer-client/components/ReviewsList.tsx
Real-time product statistics updates
In the Product
component, you can also see that the Stats
component is provided a socket connection to fetch product statistics updates in real-time:
product-reviewer-client/components/Product.tsx
product-reviewer-client/components/Stats.tsx
We covered all the important aspects of data management in our frontend
, and that close our coverage of the Frontend
application.
Our example application was deployed using Docker. You will find all instructions on the code repository README.
I hope you enjoyed the post.