TAGS :Viewed: 14 - Published at: a few seconds ago

[ Redis plugin blocking in Publish/Subscribe ]

I'm trying to connect to Redis for Publish-Subscribe using the Typesafe Redis Play plugin.

I have the following test scenario, consisting of an actor that generates messages every second:

  // Initialization happens in Application.scala,
  private lazy val fakeStreamActor = Akka.system.actorOf(Props[FakeStreamActor])

  val actorPut = Akka.system.scheduler.schedule(
    Duration(1000, MILLISECONDS),
    Duration(1000, MILLISECONDS),
    fakeStreamActor,
    Put("This is a sample message"))

The actor source:

class FakeStreamActor extends Actor {
  implicit val timeout = Timeout(1, SECONDS)

  val CHANNEL = "channel1"
  val plugin = Play.application.plugin(classOf[RedisPlugin]).get
  val listener = new MyListener()

  val pool = plugin.sedisPool

  pool.withJedisClient{ client =>
    client.subscribe(listener, CHANNEL)
  }

  def receive = {

    case Put(msg: String) => {
      //send data to Redis
      Logger.info("Push %s".format(msg))
      pool.withJedisClient { client =>
        client.publish(CHANNEL, msg)
      }

    }
  }
}

/** Messages */
case class Put(msg: String)

And a subscribe listener:

case class MyListener() extends JedisPubSub {
  def onMessage(channel: String, message: String): Unit = {
    Logger.info("onMessage[%s, %s]".format(channel, message))
  }

  def onSubscribe(channel: String, subscribedChannels: Int): Unit = {
    Logger.info("onSubscribe[%s, %d]".format(channel, subscribedChannels))
  }

  def onUnsubscribe(channel: String, subscribedChannels: Int): Unit = {
    Logger.info("onUnsubscribe[%s, %d]".format(channel, subscribedChannels))
  }

  def onPSubscribe(pattern: String, subscribedChannels: Int): Unit = {
    Logger.info("onPSubscribe[%s, %d]".format(pattern, subscribedChannels))
  }

  def onPUnsubscribe(pattern: String, subscribedChannels: Int): Unit = {
    Logger.info("onPUnsubscribe[%s, %d]".format(pattern, subscribedChannels))
  }

  def onPMessage(pattern: String, channel: String, message: String): Unit = {
    Logger.info("onPMessage[%s, %s, %s]".format(pattern, channel, message))
  }
}

Now, ideally I should be subscribing to the defined channel and seeing in the logs how the Listener is processing the received messages every second. But that doesn't happen, as the act of subscribing locks the thread.

My question is:

There is any way to take advantage of Play asynchronous nature to have non-blocking subscription?

Answer 1


Yup. This is how I do it in Global.scala:

Akka.future { 
  val j = new RedisPlugin(app).jedisPool.getResource
  j.subscribe(PubSub, "*")
}

I had trouble instantiating the plugin, but you'd essentially put the withJedisClient bit inside of the future block.

Thanks for showing me how to instantiate the plugin in scala!