I’ve been struggling quite a bit to get CometD (see this for more information on the Comet Pattern) working with Grails and the corresponding Grails CometD plugin, the documentation being sparse and the new version of the CometD API being somewhat confusing with lots of new interfaces and abstractions.
Anyway, I’ve got it working so I thought, I might share the experience (see code sections at the end of the article for copying source code):
Assuming that you would like a Grails service to communicate via CometD, this is how a basic implementation of a simple service that finds Account
entities for search parameters would look like:
There are a few things to note here. The service implements the interface InitializingBean provided by Spring. This interface supplies an afterPropertiesSet()
method that allows you to initialize additional properties after Spring has wrought its magic. This way you can initialize and handshake with the Bayeux server using the bayeux
bean that’s injected by the CometD plugin.
The runAsync
closure in findAccounts(def query, def params = [:])
is provided by the Grails Executor plugin, which allows you to run background threads in your application without losing the Hibernate session. This is exactly why runAsync
is needed here. As findAccounts(def query, def params = [:])
will be called asynchronously we have to make sure everything that occurs inside this method is thread-safe.
Via the subscribe()
method you can have a so-called SubscriberListener
subscribe to a messaging channel:
Such a SubscriberListener
has to extend the abstract class SessionChannel.SubscriberListener
as provided by org.cometd.bayeux.client.SessionChannel
. This class demands that its sub-classes provide an onMessage(SessionChannel channel, Message message)
method, which will act as a callback method upon new messages on the channels the listener is subscribed to.
Apart from that, there’s some nice Groovy magic happening here that allows you to instantiate a MySubscriberListener
which calls a service method without actually hard-coding that service method in the MySubscriberListener
class. In this case, the listener will call the service’s findAccounts(def query, def params = [:])
method.
The client side of this little example would look something like this (assuming you’re using the jQuery version of the CometD client), with cometd-subscriptions.js
being loaded first and init.js
afterwards:
Once this is all set up, you can do something like this to nudge AccountService
from your JavaScript client:
$.cometd.publish('/requests/search', { 'payload': { q: 'some query', params: { offset: 10 } } });
I hope this primer helps you to overcome initial problems with CometD and Grails. If you’ve got any questions or additional contributions, please feel free to leave a comment.
Source code for copy & paste:
AccountService.groovy:
package myapp
import grails.converters.JSON
import org.springframework.beans.factory.InitializingBean
class AccountService implements InitializingBean {
def bayeux
def bayeuxSession
static transactional = true
void afterPropertiesSet() {
bayeuxSession = bayeux.newLocalSession()
bayeuxSession.handshake()
bayeuxSession.getChannel('/requests/search').subscribe(new MySubscriberListener
(this, 'findAccountsToFollow', ['q', 'params']))
}
def findAccounts(def query, def params = [:]) {
runAsync {
def accounts = Account.findAllByName(query, params)
// publish to search results channel
bayeuxSession.getChannel('/results/search').publish(['payload':['accounts':accounts]] as JSON)
}
}
}
MySubscriberListener.groovy:
package myapp
import org.cometd.bayeux.Message
import org.cometd.bayeux.client.SessionChannel
class MySubscriberListener extends SessionChannel.SubscriberListener {
def callbackService
def callbackMethod
def callbackParams
public MySubscriberListener(def service, def methodName, def params = []) {
callbackService = service
callbackMethod = methodName
callbackParams = params
}
public void onMessage(SessionChannel channel, Message message) {
// callback
def callbackParamValues = []
callbackParams.each { param ->
callbackParamValues.add(message.data.payload."${param}")
}
callbackService."${callbackMethod}"(*callbackParamValues)
}
}
cometd-subscriptions.js:
var subscriptions = {};
function refreshCometSubscriptions(channels, callbackFunctions) {
for (var i in channels) {
if (typeof channels[i] == 'string') {
unsubscribeFromCometChannel(channels[i]);
subscribeToCometChannel(channels[i], callbackFunctions[channels[i]]);
}
}
}
function unsubscribeFromCometChannel(channel) {
if (subscriptions[channel]) {
$.cometd.unsubscribe(subscriptions[channel]);
}
subscriptions[channel] = null;
}
function subscribeToCometChannel(channel, callbackFunction) {
subscriptions[channel] = $.cometd.subscribe(channel, callbackFunction);
}
init.js:
// initialize cometd
var channels = ['/test', '/results', '/results/search'];
var testCallback = function() { };
var resultsCallback = function() { };
var searchResultsCallback = function(message) {
renderFollowerSearchResults(JSON.parse(message.data).payload.accounts, '#resultsContainer', 'resultList', '#box2');
};
var callbackFunctions = { '/test' : testCallback, '/results' : resultsCallback, '/results/search' : searchResultsCallback };
$.cometd.init('../../cometd');
$.cometd.addListener('/meta/connect', function(message) {
if ($.cometd.isDisconnected()){
return;
}
if (message.successful) {
$.cometd.publish('/test', { 'data': { 'message':'Connection with CometD server has been established.' } });
}
});
refreshCometSubscriptions(channels, callbackFunctions);