Message Queues with Grails and Micronaut Kafka
Learn how to use message queues with Grails and Micronaut Kafka
Authors: Sergio del Amo
Grails Version: 4.1.0.M5
1 Training
Grails Training - Developed and delivered by the folks who created and actively maintain the Grails framework!.
2 Getting Started
In this guide we will show you how to setup and use Micronaut Kafka with a Grails application.
2.1 What you will need
To complete this guide, you will need the following:
-
Some time on your hands
-
A decent text editor or IDE
-
JDK 1.8 or greater installed with
JAVA_HOME
configured appropriately
2.2 How to complete the guide
To get started do the following:
-
Download and unzip the source
or
-
Clone the Git repository:
git clone https://github.com/grails-guides/grails-micronaut-kakfa.git
The Grails guides repositories contain three folders:
-
docker
-
complete
-
complete-analytics
In this guide you are going to create two Grails Applications. Both complete
and complete-analytics
apps are completed examples. It is the result of working through the steps presented by the guide and applying those changes.
To run Kafka in Docker the docker
folder contains a docker compose file. You need
Docker and Docker Compose installed.
To complete the guide, follow the instructions in the next sections.
You can go right to the completed example if you cd into grails-guides/grails-micronaut-kakfa/complete and grails-guides/grails-micronaut-kakfa/complete-analytics .
|
3 Application Overview
In this guide, we setup a message queue to work across two different applications. In this guide, we have an app which lists books and details of books. We want to keep track of the number of times each book is viewed. We add a separate analytics app that keeps track of the number of times each one is viewed.
4 Running Kafka
A fast way to start using Kafka is via Docker. Create this docker-compose.yml
file:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181:2181 (1)
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092:9092 (2)
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
1 | Zookeeper uses port 2181 by default, but you can change the value if necessary. |
2 | Kafka uses port 9092 by default, but you can change the value if necessary. |
Start Zookeeper and Kafka (use CTRL-C to stop both)
$ docker-compose up
Alternatively you can install and run a local Kafka instance.
5 Books Application
Create a Grails application with the rest-api
profile.
grails create-app example.grails.complete --profile=rest-api
First, add a Book
domain:
package example.grails
class Book {
String isbn
String name
static constraints = {
isbn unique: true, blank: false, nullable: false
name blank: false, nullable: false
}
}
Create default CRUD actions for Book
leveraging GORM data services:
package example.grails
import grails.gorm.services.Service
@Service(Book)
interface BookGormService {
Book saveBook(Book book)
List<Book> findAll()
Book findByIsbn(String isbn)
}
Then we need to actually create the book data with our Bootstrap.groovy
:
package example.grails
import groovy.transform.CompileStatic
@CompileStatic
class BootStrap {
BookGormService bookGormService
def init = { servletContext ->
[
new Book(isbn: '1491950358', name: 'Building Microservices'),
new Book(isbn: '1680502395', name: 'Release It!'),
new Book(isbn: '0321601912', name: 'Continuous Delivery')
].each {book ->
bookGormService.saveBook(book)
}
}
def destroy = {
}
}
Add the Micronaut Kafka dependency:
implementation "io.micronaut:micronaut-inject-groovy"
implementation("io.micronaut.kafka:micronaut-kafka:3.1.0")
The app connects to a Kafka broker running on localhost:9092. Add the following configuration:
kafka:
bootstrap:
servers: localhost:9092
Create an interface to send messages to Kafka. The Micronaut framework will implement the interface at compilation time:
package example.grails
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaClient
interface AnalyticsClient {
@Topic('analytics') (1)
Map updateAnalytics(Map book) (2)
}
1 | Set the topic name |
2 | Send the book information. The Micronaut Framework will automatically convert it to JSON before sending it. |
Create a controller which fetches books and notifies Kafka with the AnalyticsClient
:
package example.grails
import groovy.transform.CompileStatic
import org.springframework.beans.factory.annotation.Autowired
@CompileStatic
class BooksController {
BookGormService bookGormService
@Autowired
AnalyticsClient analyticsClient
static allowedMethods = [
index: 'GET',
show: 'GET'
]
def index() {
[books: bookGormService.findAll()]
}
def show(String isbn) {
Book book = bookGormService.findByIsbn(isbn)
if (!book) {
response.status = 404
return
}
analyticsClient.updateAnalytics([isbn: book.isbn])
render(template: 'book', model: [book: book])
}
}
Add the following mapping to UrlMappings
:
"/books/$isbn" {
controller = 'books'
action = 'show'
}
Create two JSON Views for the controller’s actions:
import example.grails.Book
model {
Book book
}
json {
isbn book.isbn
name book.name
}
import example.grails.Book
model {
List<Book> books = []
}
json tmpl.book(books)
6 Building Analytics app
Create a new Grails application for this additional app. For example by using Grails Application Forge or the command line:
$ grails create-app example.grails.complete-analytics --profile=rest-api
For the multi app part of this guide we will need to be able to run both apps simultaneously. To avoid a running port conflict update your app’s application.yml
to include the following:
server:
port: 8081
Create a Domain class BookAnalytics
which will keep track of how many times a book has been viewed:
package example.grails
class BookAnalytics {
String isbn
Long count
static constraints = {
isbn unique: true, blank: false, nullable: false
count blank: false, nullable: false
}
}
Create a GORM Data service for this domain class:
package example.grails
import grails.gorm.services.Query
import grails.gorm.services.Service
import javax.inject.Singleton
@Singleton
@Service(BookAnalytics)
interface BookAnalyticsGormService {
List<BookAnalytics> findAll()
BookAnalytics findByIsbn(String isbn)
BookAnalytics saveBookAnalytics(BookAnalytics bookAnalytics)
@Query("update ${BookAnalytics bookAnalytics} set ${bookAnalytics.count} = $newCount where bookAnalytics.isbn = $isbn") (1)
void updateCount(String isbn, Long newCount)
}
1 | Implement update operations using JPA-QL |
Create a controller which uses the previous service:
package example.grails
import groovy.transform.CompileStatic
@CompileStatic
class AnalyticsController {
BookAnalyticsGormService bookAnalyticsGormService
def index() {
[analytics: bookAnalyticsGormService.findAll()]
}
}
Create two JSON Views:
import example.grails.BookAnalytics
model {
BookAnalytics bookAnalytics
}
json {
isbn bookAnalytics.id
count bookAnalytics.count
}
import example.grails.BookAnalytics
model {
List<BookAnalytics> analytics = []
}
json tmpl.bookAnalytics(analytics)
Create a new class to act as a consumer of the messages sent to Kafka by the books microservice. The Micronaut framework will implement logic to invoke the consumer at compile time. Create the AnalyticsListener
class:
package example.grails
import groovy.transform.CompileStatic
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import org.springframework.beans.factory.annotation.Autowired
@CompileStatic
@Requires(notEnv = Environment.TEST) (1)
@KafkaListener (2)
class AnalyticsListener {
private final BookAnalyticsGormService bookAnalyticsGormService (3)
AnalyticsListener(BookAnalyticsGormService bookAnalyticsGormService) { (3)
this.bookAnalyticsGormService = bookAnalyticsGormService
}
@Topic('analytics') (4)
void updateAnalytics(Map payload) {
if (payload.containsKey('isbn')) {
BookAnalytics bookAnalytics = bookAnalyticsGormService.findByIsbn(payload.isbn as String)
if (bookAnalytics) {
bookAnalyticsGormService.updateCount(payload.isbn as String, bookAnalytics.count + 1)
} else {
bookAnalytics = new BookAnalytics(isbn: payload.isbn as String, count: 1L)
bookAnalyticsGormService.saveBookAnalytics(bookAnalytics)
}
}
}
}
1 | Do not load this bean for the test environment - this lets us run the tests without having Kafka running |
2 | Annotate the class with @KafkaListener to indicate that this bean will consume messages from Kafka |
3 | Constructor injection for BookAnalyticsGormService |
4 | Annotate the method with @Topic and specify the topic name to use |
7 Running the apps
Start Kafka:
$ cd docker
docker$ docker-compose up
Start the books
microservice:
$ cd complete
complete$ ./gradlew bootRun
Start the analytics
microservice:
$ cd complete-analytics
complete-analytics$ ./gradlew bootRun
Execute a curl request to get one book:
$ curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}
Now, use curl to see the analytics:
$ curl http://localhost:8081/analytics
[{"bookIsbn":"1491950358","count":1}]
Update the curl command to the books microservice to retrieve other books and repeat the invocations, then re-run the curl command to the analytics microservice to see that the counts increase.
8 Next Steps
To further your understanding read through the Micronaut Kafka plugin documentation.