Roy van Kaathoven
Full Stack Developer

05 May

Implementing GCM server with Scala and Akka

I started working on an Android app for the Roboflow project. I have a LG Watch R and i thought it would be handy to be able to send notifications from a flow within Roboflow. So users can easily configure notifications for themselves for the things that matter.

To get started i had to implement a client. Following the GCM Client Documentation was pretty straightforward and after some reading and by following the examples i quickly had a working client. Maybe i will talk more about this in a follow up blog.

The next thing was implementing a GCM server which can be done in 2 ways:

  • HTTP server, by simply sending requests to a Google endpoint which then sends the messages to the clients
  • XMPP server, which is a bidirectional XML protocol which allows the client to talk back to the server

I started out with the HTTP server to quickly try and see if my client was working properly. The request looks as follows:

Content-Type:application/json
Authorization:key=AIzaSyB-1uEai2WiUapxCs2Q0GZYzPu7Udno5aA

{
  "registration_ids" : ["APA91bHun4MxP5egoKMwt2KZFBaFUH-1RYqx..."],
  "data" : {
    "message": "This is a message",
    "title": "Test"
  },
}

The data object may contain arbitrary data, in my case my client will send a notification to my smartwatch with the given title and message.

Doing webrequests with Play Frameworks is simple with the included library which provides a simple API to make asynchronous HTTP calls.

import play.api.Logger
import play.api.libs.json.Json
import play.api.libs.ws.WS

class GcmRestServer(val key: String) {

  def send(ids: List[String], data: Map[String, String]) = {

    import play.api.Play.current
    import scala.concurrent.ExecutionContext.Implicits.global

    val body = Json.obj(
      "registration_ids" => ids,
      "data" => data
    )

    WS.url("https://android.googleapis.com/gcm/send")
      .withHeaders(
        "Authorization" => s"key=$key",
        "Content-type" => "application/json"
      )
      .post(body)
      .map { response =>
        Logger.debug("Result: " + response.body)
      }
    }

}

I ran the code and the notification showed up instantly on my smartwatch

val server = new GcmRestServer("key")
server.send(List("clientid"), Map(
  "message" => "Test Message",
  "title" => "Test Title"
))

The fun started when i started to try and implement the XMPP server by following [the documentation(https://developer.android.com/google/gcm/ccs.html).

The examples were out of date and used old libraries without mentioning any version numbers. After some tinkering and trying to get rid of errors i decided to throw everything away and write a server with the latest version of smack

I started from scratch by using the latest smack 4.1 and loosely followed the examples.

Add the dependencies to build.sbt

libraryDependencies ++= Seq(
  "org.igniterealtime.smack" % "smack-java7" % "4.1.0",
  "org.igniterealtime.smack" % "smack-tcp" % "4.1.0",
  "org.igniterealtime.smack" % "smack-im" % "4.1.0",
  "org.igniterealtime.smack" % "smack-extensions" % "4.1.0"
)

Then define some settings

object GcmSettings {
  val server = "gcm.googleapis.com"
  val port = 5235
  val elementName = "gcm"
  val namespace = "google:mobile:data"
}

server and port point to the Google servers. elementName and namespace refer to the XML tags within the XMPP messages. XMPP is a XML based protocol and these elements refer to the tags within the XML messages that are being send and received.

The connection needs some configuration.

val configBuilder = XMPPTCPConnectionConfiguration.builder
  .setSecurityMode(SecurityMode.disabled)
  .setSocketFactory(SSLSocketFactory.getDefault)
  .setServiceName(GcmSettings.server)
  .setHost(GcmSettings.server)
  .setDebuggerEnabled(true) // Only enable during development
  .setPort(GcmSettings.port)
  .build

Time to make a connection, make sure you have your project id and api key handy

connection = new XMPPTCPConnection(configBuilder)

// The users contact list is being called a roster in XMPP, GCM does not support a contact list and will throw errors when its not disabled
Roster.getInstanceFor(connection).setRosterLoadedAtLogin(false)

// Enable the automatic reconnection
ReconnectionManager.getInstanceFor(connection).enableAutomaticReconnection()

// Make connection
connection.connect()

val projectId = 798234798234987L
val apiKey = "1234567890qwerty"

// Username and password are based on the projectId and apiKey
connection.login(s"$projectId@gcm.googleapis.com", apiKey)

Now the server should connect and when the debugger is enabled you should see some messages arrive. Try sending some messages from your Android device.

To start handling messages there must be some extensions to be added, first create the GcmPacketExtension

import org.jivesoftware.smack.packet._
import org.jivesoftware.smack.util._

class GcmPacketExtension(val json: String) extends DefaultExtensionElement(GcmSettings.elementName, GcmSettings.namespace) with PlainStreamElement {

  import GcmSettings._

  override def toXML = {
    String.format("%s%s>", elementName, namespace, StringUtils.escapeForXML(json), elementName)
  }

  def toJson = Json.parse(json)

  def toPacket = {
    val message = new Message()
    message.addExtension(this)
    message
  }
}

Now add a the extension parser to the ProviderManager, this makes sure that Google Cloud Messages are serialized and deserialized properly

ProviderManager.addExtensionProvider(elementName, namespace, new ExtensionElementProvider[GcmPacketExtension]() {
  override def parse(xmlPullParser: XmlPullParser, i: Int) = {
    val json = xmlPullParser.nextText()
    new GcmPacketExtension(json)
  }
})

Add a listener to the connection which handles the messages, the getExtension method returns the GcmPacketExtension as configured by the Extension Provider

case class GcmMessage(
  message_id: String, 
  message_type: Option[String] = None,
  from: String,
  data: Option[JsObject] = None,
  to: Option[String] = None)

connection.addSyncStanzaListener(new StanzaListener() {
  override def processPacket(packet: Stanza): Unit = {
    val ext = packet.getExtension(elementName, namespace).asInstanceOf[GcmPacketExtension]
    handleMessage(ext.toJson.as[GcmMessage])
  }
}, MessageTypeFilter.NORMAL)

And make a receive and send handler to handle the messages

def handleMessage(message: GcmMessage) = {
  message.message_type match {
    case Some("ack") =>
      // Handle ack
      val from = message.from
      val messageId = message.message_id
    case Some("nack") =>
      val from = message.from
      val messageId = message.message_id
    case x =>
      sendMessage(GcmMessage(
        from = "server",
        to = Some(message.from),
        message_id = nextId,
        data = Some(Json.obj(
          "message" => "echo",
          "title" => "Server Response"
        ))))

      sendMessage(GcmMessage(
        from = "server",
        to = message.to,
        message_type = message.message_type,
        message_id = message.message_id))
  }
}

def sendMessage(message: GcmMessage) = {
  val request = new GcmPacketExtension(Json.toJson(message).toString())
  connection.sendStanza(request.toPacket)
}

Full code is as follows:

package roboflow.module.google.service

import javax.net.ssl.SSLSocketFactory

import akka.actor.{Actor, Cancellable}
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode
import org.jivesoftware.smack.filter.MessageTypeFilter
import org.jivesoftware.smack.packet._
import org.jivesoftware.smack.provider.{ExtensionElementProvider, ProviderManager}
import org.jivesoftware.smack.roster.Roster
import org.jivesoftware.smack.tcp.{XMPPTCPConnection, XMPPTCPConnectionConfiguration}
import org.jivesoftware.smack.util.StringUtils
import org.jivesoftware.smack.{ReconnectionManager, StanzaListener}
import org.xmlpull.v1.XmlPullParser
import play.api.libs.json.{JsObject, Json}

import scala.concurrent.duration._

object GcmSettings {
  val server = "gcm.googleapis.com"
  val port = 5235
  val elementName = "gcm"
  val namespace = "google:mobile:data"
}

object GcmMessage {
  implicit val format = Json.format[GcmMessage]
}

case class GcmMessage(message_id: String, message_type: Option[String] = None, from: String, data: Option[JsObject] = None, to: Option[String] = None)

class GcmPacketExtension(val json: String) extends DefaultExtensionElement(GcmSettings.elementName, GcmSettings.namespace) with PlainStreamElement {

  import GcmSettings._

  override def toXML = {
    String.format("%s%s>", elementName, namespace, StringUtils.escapeForXML(json), elementName)
  }

  def toJson = Json.parse(json)

  def toPacket = {
    val message = new Message()
    message.addExtension(this)
    message
  }
}

class GcmServer(val projectId: Long, val apiKey: String) extends Actor {

  import GcmSettings._

  var id = 0
  var cancellable: Cancellable = null
  var connection: XMPPTCPConnection = null

  def nextId = {
    id += 1
    s"m-$id"
  }

  /**
   * Connect with the XAMMP server
   */
  def connect() = {

    ProviderManager.addExtensionProvider(elementName, namespace, new ExtensionElementProvider[GcmPacketExtension]() {
      override def parse(xmlPullParser: XmlPullParser, i: Int) = {
        val json = xmlPullParser.nextText()
        new GcmPacketExtension(json)
      }
    })

    val configBuilder = XMPPTCPConnectionConfiguration.builder
      .setSecurityMode(SecurityMode.disabled)
      .setUsernameAndPassword(s"$projectId@gcm.googleapis.com", apiKey)
      .setSocketFactory(SSLSocketFactory.getDefault)
      .setServiceName(server)
      .setHost(server)
      .setDebuggerEnabled(true)
      .setPort(port)
      .build

    connection = new XMPPTCPConnection(configBuilder)

    connection.addSyncStanzaListener(new StanzaListener() {
      override def processPacket(packet: Stanza): Unit = {
        val ext = packet.getExtension(elementName, namespace).asInstanceOf[GcmPacketExtension]
        handleMessage(ext.toJson.as[GcmMessage])
      }
    }, MessageTypeFilter.NORMAL)

    Roster.getInstanceFor(connection).setRosterLoadedAtLogin(false)
    ReconnectionManager.getInstanceFor(connection).enableAutomaticReconnection()

    connection.connect()
    connection.login(s"$projectId@gcm.googleapis.com", apiKey)

    sendMessage(GcmMessage(
      from = "server",
      message_id = nextId.toString,
      to = Some("clientId"),
      data = Some(Json.obj(
        "message" => "Server connected",
        "title" => "Server connected"))
    ))
  }

  def handleMessage(message: GcmMessage) = {
    message.message_type match {
      case Some("ack") =>
        // Handle ack
        val from = message.from
        val messageId = message.message_id
      case Some("nack") =>
        val from = message.from
        val messageId = message.message_id
      case x =>
        sendMessage(GcmMessage(
          from = "server",
          to = Some(message.from),
          message_id = nextId,
          data = Some(Json.obj(
            "message" => "echo",
            "title" => "Server Response"
          ))))

        // Acknowledge received messages
        sendMessage(GcmMessage(
          from = "server",
          to = message.to,
          message_type = message.message_type,
          message_id = message.message_id))
    }
  }

  def sendMessage(message: GcmMessage) = {
    val request = new GcmPacketExtension(Json.toJson(message).toString())
    connection.sendStanza(request.toPacket)
  }

  override def preStart() = {
    import context.dispatcher
    cancellable = context.system.scheduler.schedule(1.minute, 1.minute, self, "ping")
    connect()
  }

  override def postStop() = {
    if (cancellable != null && !cancellable.isCancelled) {
      cancellable.cancel()
    }
    if (connection != null && connection.isConnected) {
      connection.disconnect()
    }
  }

  def receive = {
    case "ping" =>
      sendMessage(GcmMessage(
        from = "server",
        message_id = nextId.toString,
        data = Some(Json.obj(
          "message" => "ping"))
      ))
  }
}