Spring WebFlux Reactive Repository using R2DBC – PART-4

In this section we will create a POST request to user controller to create new User in our Data Base. We will now use new Connection Factory method to crate ConnectionFactory instance. Like below

public ConnectionFactory connectionFactory() {
  return MySqlConnectionFactory.from(MySqlConnectionConfiguration.builder()

In above code snippet we have created instance of ConnectionFactory from MySqlConnectionFactory class .

Creating Request Object

We will create a UserSaveRequest class which will match our JSON POST Request Object so that the incoming JOSN data we get as an object in our controller.

public class UserSaveRequest {
private String firstName;
private String lastName;


Controller Changes for new POST Method

Add a @PostMapping to UserController class as below.

public Mono<UserSaveRequest> add(@RequestBody UserSaveRequest userSaveRequest) {
return userService.saveUser(userSaveRequest);

Using Repository Default Method

We will use inbuilt save method of ReactiveCrudRepository to save our User Object. This save method has signature as below

<S extends T> Mono<S> save(S var1);

We don’t have to do anything other than passing an object of User class created from incoming request to this method. It will save that object into the data base.

Modifying UserService

Add the save method in UserService interface and implementation in UserServiceImpl class . This method is simple it will take the object of UserSaverRquest and create new Object of User from it and then call save method of Repository.

public interface UserService {

Flux<UserResponse> getAllUsers();

Flux<User> findUserByFirstName(String firstName);

Mono<UserSaveRequest> saveUser(UserSaveRequest userSaveRequest);

public Mono<UserSaveRequest> saveUser(UserSaveRequest userSaveRequest) {
User user = new User();
return userRepository.save(user)
.doOnError(e -> {
if(e != null) {
throw new RuntimeException("Exception occurred during data base operation");

userRepository() method will save the object to Data base. the .thenReturn() method will get called when Mono is completed successfully and return the same UserSaveRequest object as we are not exposing any of the primary keys of data base to the client of the api. The .doOnError() will throw a RuntimeException if there some error while saving it to data base.

Response From API Call

User create API call

You can check the code commit for this changes here

Spring WebFlux Reactive Repository using R2DBC – PART-3

Continuing the Spring WeFlux series , here now we will cover the Reactive Repository. Here is the link for Part-2 where we have covered the controller part.

Why we need Reactive Repository

All the database calls using JDBC drivers are blocking in nature. That means a thread that calls the database wait until the database return any result or timeouts. So this is huge waste of the resource , thread which is just waiting ideally can be used for better purpose . So what are the different solutions for this problem ? There can be number of solutions but we will discuss below two and implement only one .

  1. Using CompletableFuture and ThreadPool
  2. Using Reactive Relational Database Connectivity  (R2DBC) .

CompletableFuture and ThreadPool

This way of executing database query is not a non-blocking way but it separates the blocking query into different execution context. Here we can create a Thread Pool and use that pool with CompletableFuture to execute database query something like this.

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    String sql = "SELECT * FROM users";
    ResultSet rs = stmt.executeQuery(sql);
    //iterate over result set and return result
    User user = new User();
     return user;
}, executor);

Using Reactive Relational Database Connectivity  (R2DBC)

R2DBC is a project that enables us to use the reactive programming API to connect to Relational Database. Until now it is not possible to use JDBC as those are blocking API’s. We can use the R2DBC project along with Spring data to talk to Relational Database. We are continuing example for last post and now we will add this R2DBC project , but for that we will have to add the following dependencies.

Spring Boot Data R2DBC


R2DBC MySql Driver


By adding above dependency we have all that we need to use the reactive API to access the relational database . In this case we are using R2DBC and Spring Data R2DBC

Data Base Configuration

For R2DBC to work we will have to add certain configuration , fo this we will add a DataBaseConfiguration class that creates a Spring Bean of ConnectionFactory this bean will have r2dbc connection url , data base name etc. We will create this Configuration class by extending AbstractR2dbcConfiguration. Below is the code for the same.

public class DataBaseConfiguration extends AbstractR2dbcConfiguration {

  public ConnectionFactory connectionFactory() {
    ConnectionFactory connectionFactory = ConnectionFactories.get(

    return connectionFactory;

@EnableR2dbcRepositories Annotation is used activate reactive relational repositories using R2DBC.

Reactive Repository

ReactiveRepository are similar to JPA repository, it can be created by simple interface and extending ReactiveCrudRepository interface with Entity class name and primary key type.

public interface UserRepository extends ReactiveCrudRepository<User, Long> {

@Query("SELECT * FROM user WHERE firstname = :firstname")
Flux<User> findByFirstName(String firstname);

Now with above repository implementation we the only difference in it compared to JpaRepository is that the return type of the Interface is of type Reactive in this case Flux or Mono.

We can call above repository from our UserService and just use it the way we use JpaRepository.

private UserRepository userRepository;

public Flux<User> findUserByFirstName(String firstName) {
  return userRepository.findByFirstName(firstName);

So now we have completed the implementation of Non-Blocking Connection using R2DBC with data base with Service and Repository layer to use it.

You can find the latest code at Github . The link will open the commits for above code so that you can see what exact changes are done.

Vaccine Locator Service Using Spring WebFlux

Humble request 🙏🏻
if you are configuring this service on your machine please do not overwhelm the cowin server, keep the ping duration optimum, by default its hard coded to 20 minutes

Vaccine Locator is service that calls the cowin(co-vin) API and fetches the available slots (18+ only) for vaccine for current day only. if slots are available this service will send message on Configured Public Telegram channel.This application as of now fetches vaccine slots for only 1 configured district.

Pull request’s are open. If you are running the application for your district please update the README file here as well as application.yml so that if for your district anyone is running application you don’t have to run it just join telegram channel.

Configuration Required in application.yml file

  • District ID along with name in districts: all:
    • ex: pune district id is 363 so it is placed as pune: 363 under all:
  • Name of the district for which ID has been configured under cowin:
    • the same district you have to provide like selectedDistrict: pune
  • Telegram API Key in telegram: tApiKey:
  • Telegram channel name under telegram: tChannelName:

Telegram Channel Configuration

Please follwo link 1 and link 2 to create Telegram channel and get the API token from BotFather. Use the API key as mentioned above in application.yml file and Channel name from channel description (not the actual name)

Running Application

Once above configuration is done, run the below comands

mvn clean install
mvn spring-boot:run

GitHub Repository

Here is the link for GitHub repository, pull request for enhancement an bug fixes are welcome.

I will cover this application in our series of Spring WebFlux tutorial in upcoming posts.

Spring WebFlux REST Controller – Part-2

In order to understand Spring WebFlux more deeply we will implement a simple REST end-point for User. This User end-point as of now will have following REST action.

HTTP MethodURIResponse
GET Method

Creating Spring Boot WebFlux Application

So first we will have to create Spring WebFlux application. Its simple to stat with , just go here and fill up Project metadata and in dependencies use Spring Reactive Web. This will generate a folder with requires pom.xml and other Spring WebFlux dependencies . Once you download it , extract and import it in your favourite IDE . Make sure the pom.xml file has following dependency .


Creating REST Controller

All is set up for us to create Controller . But before writing controller code directly , let’s just first take a moment and understand what are the information we are going to need to implement the controller . So we already know few things , and these are

  • We need REST End Point for Users
  • We have to use HTTP GET Method
  • We Know Path/URI for this HTTP GET Method
  • We Know what to send in Response.

Now we need to implement above functionality in Spring WebFlux application, Spring provides annotation for all of the above requirement , we just have to implement it in correct way. So here are those annotation in Spring Reactive Web Provides

  • @RestController for REST End Point for Users
  • @GetMapping for HTTP GET Method
  • @RequestMapping for Path/URI for this HTTP GET Method
  • @ResponseBody to send in Response.

Lest understand what each of this annotation do.


@RestController is composed annotation. It is composed of @Controller and @ResponseBody annotations. @Controller defines bean as controller whereas @ResponseBody indicates that every method this bean writes directly to the response body . The @RestController annotation is applied at the top of the class .


@GetMapping annotation is type of request mapping annotation that gets invoked when GET HTTP method is called . We can also use @RequestMapping annotation here like this

@RequestMapping(method = RequestMethod.GET)


@RequestMapping annotation maps the incoming request for the given URI to the particular endpoint . In our case we need to point all the request (irrespective of HTTP Method) to our controller , so we will declare this annotation at top of the Controller class mentioning the URI.


where values in parenthesis is the URI in our case it is “/v1/users” indicating its version-1 of the API with endpoint “users”. The same @RequestMapping annotation can also be used along with HTTP method above functions to define the routing as mentioned above.


@ResposneBody annotation adds the return type of the method where it is declared to the repose body of the request. It is defined at the top of controller methods. In Simple word if the method is returning String , then response of that endpoint will be String.

With above annotations knowledge lets create a controller with String as @ResposneBody.

public class UserController {
public String getValue(){
return "Hello World";

If you run the Spring application with above controller we will get response in browser for GET HTTP call on http://localhost:8080/v1/users as “Hello World” .

Till now its exactly similar to how we do it in Spring MVC/Boot and we have not yet used any of the WebFlux feature, in order to use it , lets enhance the application and add Service layer for User and return UserResponse object.

public class UserServiceImpl implements UserService {
public Flux<UserResponse> getAllUsers() {
return Flux.fromIterable(generateUsers());

private List<UserResponse> generateUsers() {
return Arrays.asList(UserResponse.builder()

Flux and Mono in Reactive Spring

As you can see we have UserServiceImpl class which has getAllUsers() method that Returns Flux<UserResposne>. Since we are returning the more than 1 entity in response we will have to use the Flux of that entity.

There are 2 Types of Publisher in Spring , Mono and Flux. Mono is a Publisher that produces 0 to 1 value . Flux is publisher that produces 0 to N values. As of now just keep in mind that we want to return more than 1 UserResposne entity we are using Flux. When we will use endpoint to get single entity of UserResposne that time we will use Mono. More on Flux and Mono in upcoming post.

Final Controller Code

public class UserController {
private UserService userService;

public Flux<UserResponse> getValue(){
return userService.getAllUsers();

When you run this application using postman with url http://localhost:8080/v1/users the you should see the following response

GET Request Response

Now we are done with first part of Spring WebFlux tutorial. There are lot more things to learn and things are getting super exited, we have just started.

The Code for above you can find it here at GitHub.

Thank You 🙏

Why do you need Reactive Web Stack

Spring boot is great framework to build micro services . But what about building micro service based on reactive stack ? Well for this exact reason reactive stack web framework has been introduce in Spring 5.

Well why do we need reactive and what are the advantages of using reactive stack ?

Consider simple web app that receives request from web browser , do some operation with data base (GET or POST call) and returns a response to the browser. Whenever request comes to server it allocates thread to process the request . Now this same thread is then process the data received be it queryParam or POST form data . If there is a data base operation then there is data base query call to remote data base server and this threads waits for the response , once response is received from DB then thread returns it to web browser. Now this is generally followed and simplest explanation to request response based web application.
So do you see any problem with above application ? Let’s say in an ideal scenario there are no errors and our simple application can return a data to GET HTTP request by querying data base and we know for every request there is a thread allocation by server.

IMG-1 Blocking Request

As shown in diagram the GET request waits for the result from DB and then result is sent back. If this DB call takes lot of time the thread allocated to will certainly be blocked until the result is obtained or will throw request time out error .

Consider there are hundreds of such request to the server at same time, what will happen ? Server will allocate thread to each request but soon it will run out of those threads if the request are too many or beyond what is configured as max thread pool size for server. We can define it in spring boot by following property


For application if this size is not enough , we can increase it up to certain number or consider adding one more instance of application along with load balancer .

But what if consume the resources more efficiently. This is what exactly we will see now. There are may ways to effectively consume those threads, one of them we are covering here using reactive manner.

So in above example instead of waiting for the data base to respond what if thread get notified when result is received ? may be like call back ? This kind of behaviour is very common in Node js application, in-fact all node js application are written in callback manner because node js can perform only single threaded , non blocking async operations .But in java the basic JDBC drivers are blocking so to solve this we have R2DBC project which is developing drivers for multiple platform out of which we will cover My-SQL in upcomming posts . So drivers are resolved what about controller stack ? For controller we will use spring WebFlux . This both together will be our complete non-block web stack, note here that webflux uses Reactor Netty server. I urge you go and check the official documentation of Spring WebFlux it is very well written.

In Next blog post we will cover what is Spring WebFlux and try to implement controller layer to get our hand dirty with this exiting framework.

JPA 2 accessing data from multiple schemas

In Micro service architecture , we generally have a dedicated database schema for each service (though there are exceptions). Each service is then responsible for managing data from its data base and then taking to or responding to other micro services .

Now since we have different data bases schema the data access layer for each micro service is different . But that doesn’t mean data is not related. We can have entities that are related across schemas.

Consider below diagram , few micro services interacting with each other (service discover not shown for simplicity) . Data Aggregation Service interacts with three different data base schemas and processing it and sending over network or cloud service like SNS or SQS or simply sending it to any third party consumer.

Microservice interaction

This Data Aggregation service should be able to access data from multiple databases schemas as per requirement but with minimal data base trips. SQL query over multiple data bases schemas is possible with joins but what about data access layer ? We know when using ORM we can easily access related data from different tables when relations are defined in Entity classes or configuration depending on which implementation of JPA you are using . For example lets say for online shopping site we have Users table and Addresses table and they have one to many relation. So using JPA 2 Hibernate implementation i’ss quite simple to define this relation on Entity level and then extract Addresses using User we all know that very well.

Here is scenario, Let’s say we have micro-service-1 which handles users and address operation but another micro-service-2 handles reviews given by User and uses Reviews table .
A Separate micro-service-3 handles Products which can include Type of product , its manufacturer and lot more but for simplicity we will consider this service handles only one table as of now called products

There is requirement of another micro-service say data aggregation service which should fetch data from Users,Products and Reviews tables and push this data to cloud service for analytics .
Lets Start by creating schema , we are just crating micro-service-3 data access layer to understand how JPA can be used to access related data across the schema’s. For simpler explanation just assume micro-service-1 and micro-service-2 are populating their respective tables and our service has read-only access to those tables. We are populating tables with insert statements.
Create 3 schema’s product_service, user_service, review_service and run below sql queries.

-- products service data base tables
CREATE TABLE `product_service`.`products` (
	name varchar(30) NOT NULL,
	manufacturer varchar(30) NOT NULL,
	type varchar(30) NOT NULL,

-- user service data base tables
CREATE TABLE `user_service`.`users` (
	first_name VARCHAR(30) NOT NULL,
	last_name VARCHAR(30) NOT NULL,

CREATE TABLE `user_service`.`addresses` (
	user_id bigint(20),
	street varchar(30) NOT NULL,
	city varchar(30) NOT NULL,
	state varchar(30) NOT NULL,

-- reviews service data base tables
CREATE TABLE `review_service`.`reviews` (
user_id bigint(20),
content varchar(255),
published_date timestamp DEFAULT CURRENT_TIMESTAMP,

-- Insert Data 
INSERT INTO `product_service`.`products` (name,manufacturer, type) 
values ('iPhone-11', 'Apple','mobile');
INSERT INTO `user_service`.`users` (first_name,last_name) 
values ('sherlock', 'holmes');
INSERT INTO `user_service`.`addresses` (user_id,street,city,state) 
values (1,'South Lake Avenue', 'Pasadena','CA');
INSERT INTO `review_service`.`reviews` (user_id,product_id,content) 
values (1,1,'best iPhone ever'); 

-- Check All Tabled 
select * from `product_service`.`products`;
select * from `user_service`.`users`;
select * from `user_service`.`addresses`;
select * from `review_service`.`reviews`;

--- Query to Fetch All Data for User including reviews
select product.name , product.manufacturer, user.first_name , user.last_name,
address.street, address.city, review.content, review.published_date
from `product_service`.`products` `product`
INNER JOIN `review_service`.`reviews` `review` ON `review`.`product_id`=`product`.`id`
INNER JOIN  `user_service`.`users`  user ON `user`.`id` = `review`.`user_id`
INNER JOIN  `user_service`.`addresses` `address` ON `address`.`user_id`=`user`.`id`
where user.id = 1; 

product_service schema has Products table that has details of each product. user_service schema has users and addresses and review_service has reviews tables containing reviews of the .

The last query in above section is our requirement. We need to fetch data from multiple schema with given userId and get result from all the tables mentioned below.
Since these schemas are different there is no direct JPA relation between them and that of the result we want instead we need to defined it . But first lets do some basics stuff.
We need pojo’s for the Tables, we also need EntityManager , TransactionManager for each schema and finally we need Repository classes .
All of this code is present at https://github.com/tejasgrd/jpa-multi-schema-access .
We will take example of each here for understanding rest of the code is on similar lines.

Users class with use of Lombok and javax.persistence annotations

@Table(name = "users",schema = "users")
public class Users {
@Column(name = "id", nullable = false)
private long id;
@Column(name = "first_name")
private String firstName;
@Column(name = "last_name")
private String lastName;
@OneToMany(mappedBy = "user")
private Set<Addresses> addresses;

User repository interface

public interface UsersRepository extends JpaRepository<Users, Long> {

Users Database config

basePackages = {"dev.tejasgarde.jpa.respository.users"},
entityManagerFactoryRef = "usersEntityManager",
transactionManagerRef = "usersTransactionManager")
public class UsersDBConfig extends BaseConfig{
public JpaProperties usersJpaProperties() {
return getJpaProperties();

public DataSourceProperties usersDataSourceProperties() {
return new DataSourceProperties();

public LocalContainerEntityManagerFactoryBean usersEntityManager(
final JpaProperties usersJpaProperties) {
EntityManagerFactoryBuilder builder = createEntityManagerFactory(usersJpaProperties);
return builder.dataSource(usersDataSource()).packages("dev.tejasgarde.jpa.domain.users")

@ConfigurationProperties(prefix = "datasources.users.configurations")
public DataSource usersDataSource() {
DataSource dataSource = usersDataSourceProperties()
return dataSource;

public JpaTransactionManager usersTransactionManager(final JpaProperties usersJpaProperties) {
return new JpaTransactionManager(usersEntityManager(usersJpaProperties).getObject());

Now to run required query we have to run it as native query. We also have to map the result of the query to some object representing the variables equivalent to what we have in select query . So lets first create class representing what we have in select query ReviewsByUser.java

public class ReviewsByUser {
  String name;
  String manufacturer;
  String first_name;
  String last_name;
  String street;
  String city;
  String content;
  Date published_date;

These are all the fields which are part of select statement . Note that column names from select query and name of the variables are not the same but one thing we need to be careful here that the type of the variables should be exactly similar to what we have in Hibernate domain objects.

Let’s move ahead , now we need to define native query . How we define it and where we have to define it . So we usually define it on domain class and in this case we are going to define it in Products class. What else ? we also have to map the result to class we already defined for that we will have to use annotation and provide it class who’s object represent the result from query. So lets first get to know these annotations that we are going to define on Product class

NamedNativesQueries are defined with annotation as below , here we have only one.

         name = "<name of query>",
         query = "<query goes here>",
         resultSetMapping= "<result set mapping>"

To map the result of NamedNative queries we have to provide another annotation
@SqlResultSetMapping and below is the format for this

             name="<result set mapping name>",
             classes = @ConstructorResult(
                      targetClass= <TargetClass>.class,
                      columns = {
                       @ColumnResult(name="name",type=<Type>.class ),

So Result from Native query are mapped using resultSetMapping and name , the resultSetMapping in @NamedNativeQuery should be same as that of name in @SqlResultSetMapping .
class is used to define class that represents the result from native query. Make sure that the class has constructor with the columns defined in e@ConstructorResult column .
Here is the complete Product class which has above two annotations.

@Table(name = "products",schema = "products")
name = "Products.findReviewsByUser",
query ="select product.name , product.manufacturer, user.first_name , user.last_name,\n" +
"address.street,address.city, review.content, review.published_date\n" +
"from `product_service`.`products` `product`\n" +
"INNER JOIN `review_service`.`reviews` `review` ON `review`.`product_id`=`product`.`id`\n" +
"INNER JOIN `user_service`.`users` user ON `user`.`id` = `review`.`user_id`\n" +
"INNER JOIN `user_service`.`addresses` `address` ON `address`.`user_id`=`user`.`id`\n" +
"where user.id = ?1",
resultSetMapping = "reviewsByUser")

name = "reviewsByUser",
classes = @ConstructorResult(
targetClass = ReviewsByUser.class,
columns = {
@ColumnResult(name="name",type=String.class ),
@ColumnResult(name="manufacturer",type=String.class ),
@ColumnResult(name="first_name",type=String.class ),
@ColumnResult(name="last_name",type=String.class ),
@ColumnResult(name="street",type=String.class ),
@ColumnResult(name="city",type=String.class ),
@ColumnResult(name="content",type=String.class ),
@ColumnResult(name="published_date",type= Date.class )}))
public class Products {
@Column(name = "id", nullable = false)
private long id;
@Column(name = "name")
private String name;
@Column(name = "manufacturer")
private String manufacturer;
@Column(name = "type")
private String type;

Note how results are mapped using same name and @ConstuctorResult has columns defined with types .
Now there is still one step left here . As we are using repository we have to just define method in repository class and call it , JPA will call the invoke native query and return result .

public interface ProductsRepository extends JpaRepository<Products, Long> {
@Transactional(timeout = 300)
@Query(nativeQuery = true)
List<ReviewsByUser> findReviewsByUser(Long userId)throws SQLException;;

Notice the name from @NamedNativeQuery and name of the method in repository are same and it has to be like this then only JPA will figure out which @NamedNativeQuery to invoke. Below is result from PostMan . We have just added @Controller which calls this method on GET request.

Hope this would make your understanding for JPA even better . Thank you for reading
Code is available at : https://github.com/tejasgrd/jpa-multi-schema-access