As your architecture becomes more complex, you often need to perform time-consuming tasks when certain business events occur. Examples include sending emails, processing images, and interacting with external APIs. If these events may be triggered by actions that occur within your web application, you don’t want to perform these tasks synchronously within the HTTP request/response cycle: your end-users would hate you and your throughput would suffer. Instead, you want to perform these tasks asynchronously outside of the HTTP request/response cycle.
Event-driven architectures help to solve these (and other) types of problems by allowing an arbitrary number of tasks to be performed as a side effect of an event being published. They can be built on top of message-oriented middleware, which allows the publishing and consumption of events to be truly distributed, and each component to be scaled independently.
After evaluating many message brokers upon which to build our event-driven architecture, we ultimately chose RabbitMQ, as it offered the best blend of features, performance, and ease of use. In addition, RabbitMQ implements the AMQP protocol, which is emerging as the defacto standard protocol for message-oriented middleware, and allows us to (in theory) move to a different broker in the future.
Our Ruby On Rails applications needed to publish messages to the broker, and there’s two main ways you can do that in Ruby: synchronously via the Bunny gem, or asynchronously via the AMQP gem. While Bunny has an easy interface to use, its interaction with the broker is synchronous. In addition, its currently in flux and pending an overhaul, which makes depending on it somewhat risky at the moment. The AMQP gem on the other hand interacts with the broker asynchronously, and is actively maintained. However, it is built on top of EventMachine to achieve its asynchronous functionality, so using it, and more importantly testing with it, is inherently more difficult.
We ultimately decided to publish messages asynchronously using the AMQP gem. While there’s a lot of documentation out there about the AMQP gem, I couldn’t find a simple “how to publish messages with the AMQP gem within your Rails model classes” type of tutorial anywhere, so here’s how we ended up doing it.
Messages
With a polyglot architecture, we needed a message format that was easily produced/consumed by various programming languages, so JSON was the obvious choice. All messages have a metadata property which includes basic information such as the application that produced the event, the host the application was running on, the time the event was produced, and the routing key.
{"metadata":
{"host":"web3.mycompany.com",
"app":"web",
"key":"some.event",
"created":"2012-11-28T37:11:56Z"
}
...message specific properties...
}
A base Message::Base class take care of serializing this information via ActiveModel::Serializers::JSON.
module Message
class Base
include ActiveModel::Serializers::JSON
attr_reader :metadata
def initialize(routing_key=nil)
raise StandardError, "routing_key cannot be nil" if routing_key.nil?
@metadata = {:host => Socket.gethostbyname(Socket.gethostname).first,
:app => "web", # configure this...
:key => routing_key,
:created => DateTime.now.new_offset(0).to_time.utc.iso8601}
end
def routing_key
@metadata[:key]
end
def as_json(options={})
hash = {:metadata => {:host => @metadata[:host],
:app => @metadata[:app],
:key => @metadata[:key],
:created => @metadata[:created]}}
hash
end
end
end
Message::Base subclasses are created for each message type, with properties specific to that message type. An example for a hypothetical new user account created message:
module Message
module User
# Message indicating that a new user account has been created
class AccountCreated < Base
attr_reader :account_id
attr_reader :email_address
ROUTING_KEY = "user.account.created"
def initialize(account_id, email_address)
raise ArgumentError, "account_id cannot be nil" if account_id.nil?
raise ArgumentError, "email_address cannot be nil" if email_address.nil?
super ROUTING_KEY
self.account_id = account_id
self.email_address = email_address
end
def as_json(options={})
hash = super
hash[:account_id] = account_id
hash[:email_address] = email_address
hash
end
end
end
end
Connecting To The Broker
Broker configuration details for each Rails environment are stored in a config/amqp.yml file in the Rails app.
defaults: &defaults logging: false connection_timeout: 3 host: localhost port: 5672 vhost: "/mycompany" exchange: "mycompany.topic" user: username pass: password test: <<: *defaults development: <<: *defaults production: <<: *defaults host: queue.mycompany.com
The Configurable module can be mixed in to read in the configuration from config/amqp.yml for a given Rails environment.
module Amqp
module Configurable
private
def load_config
if defined?(::Rails)
config_file = ::Rails.root.join('config/amqp.yml')
if config_file.file?
YAML.load(ERB.new(config_file.read).result)
else
nil
end
end
end
end
end
Connection classes wrap an AMQP::Channel object to communicate with the broker and utilize the Configurable module above to determine the name of the exchange to produce messages to (via an AMQP::Exchange object). Our use is hard coded to rely on a Topic Exchange, but you could easily make this configurable/dynamic as well.
The Amqp::AsynchronousConnection class uses the AMQP gem under the hood to communicate with the broker asynchronously. It is passed an active AMQP::Session object, which is established externally upon server startup (more on this below).
require 'amqp'
module Amqp
class AsynchronousConnection
extend Configurable
@@setup = false
def self.setup(amqp_session, config = nil, env = ::Rails.env)
if !self.setup?
raise StandardError, "Must pass an AMQP::Session to setup" if !amqp_session.is_a?(AMQP::Session)
@@amqp_session = amqp_session
@@config = load_config(config)
@@env_config = @@config[env]
raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?("exchange")
@@channel = AMQP::Channel.new(@@amqp_session)
@@exchange = @@channel.topic(@@env_config["exchange"], :durable => true)
@@setup = true
end
end
# Whether the underlying connection has been set up
def self.setup?
@@setup
end
def self.teardown
if self.setup?
remove_class_variable :@@exchange
remove_class_variable :@@channel
remove_class_variable :@@env_config
remove_class_variable :@@config
remove_class_variable :@@amqp_session
@@setup = false
end
end
# Produces a message to the underlying exchange
def self.produce(message)
if self.setup?
begin
@@exchange.publish(message.to_json, :routing_key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
rescue => err
::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
::Rails.logger.error "#{err.message}"
::Rails.logger.error err.backtrace.join("\n")
end
else
::Rails.logger.error "ERROR: Could not produce message, not connected to AMQP broker!"
end
end
end
end
The Amqp::Rails module is a helper which starts the EventMachine event loop and establishes the AMQP::Session. It also contains error handling code which will detect when the connection to the broker is lost, and attempts to re-connect every 5 seconds.
module Amqp
# Helper methods for connecting to AMQP brokers in Rails apps.
module Rails
extend Configurable
@connection_failure_handler = Proc.new do |settings|
::Rails.logger.error "Could not connect to AMQP broker! (settings: #{settings.inspect})"
::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
EventMachine.add_timer(5) do
::Rails.logger.error "Trying to connect to AMQP broker..."
connect(settings)
end
end
# Starts the EventMachine reactor (if necessary) and connects to the AMQP broker.
# This class will handle connection failures and automatically attempt to re-connect
# when the connection is lost after first being established.
#
# Assumes the file config/amqp.yml exists under Rails.root.
#
# Note that this class does NOT handle authentication failures with the broker.
# Authentication failures will cause an uncaught exception and the app will not start up.
def self.async_start(on_open_callback=nil)
AMQP::Utilities::EventLoopHelper.run do
yaml = YAML.load_file(File.join(::Rails.root, "config", "amqp.yml"))
settings = yaml.fetch(::Rails.env, Hash.new).symbolize_keys
settings.merge!(:on_tcp_connection_failure => @connection_failure_handler)
settings.merge!(:on_open_callback => on_open_callback) if !on_open_callback.nil? && on_open_callback.is_a?(Proc)
EventMachine.run do
connect(settings)
end
end
end
private
def self.connect(settings)
EventMachine.next_tick do
connection = AMQP.connect(settings) do |connection|
connection.on_tcp_connection_loss do |connection, settings|
::Rails.logger.error "Connection to AMQP broker lost!"
Amqp::AsynchronousConnection.teardown
::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
EventMachine.add_timer(5) do
::Rails.logger.error "Trying to connect to AMQP broker..."
connect(settings)
end
end
::Rails.logger.info "Connected to AMQP broker. (settings: #{settings.inspect})"
Amqp::AsynchronousConnection.setup(connection)
settings[:on_open_callback].call if settings.has_key?(:on_open_callback)
end
end
end
end
end
Finally, we add an initializer in config/initializers/amqp.rb which calls the async_start method in the helper.
Amqp::Rails.async_start
MessageProducer
The MessageProducer module allows us to mixin the produce method to our Model classes to produce messages to the broker using a Connection class. The produce method is an alias for produce_asynchronously, but we can easily change it to be an alias for produce_synchronously should we decide to switch to use the Bunny gem (more on this below).
module Amqp
module MessageProducer
# Produces a message to the underlying exchange using an asynchronous connection
def produce_asynchronously(message)
AsynchronousConnection.produce(message)
end
alias :produce :produce_asynchronously
end
end
Producing messages from Model classes
Finally, we include the Amqp::MessageProducer module in our model classes to produce messages to the broker when certain business events happen.
class Account < ActiveRecord::Base
include Amqp::MessageProducer
after_create :publish_account_created_message
# Publishes a message to the exchange when a new user account is created
def publish_account_created_message
produce(Message::User::AccountCreated.new(self.account_id, self.email_address))
end
end
Easily Switch Between Synchronous and Asynchronous
We structured the code such that we can easily switch to using Bunny simply by changing one alias. The Amqp::SynchronousConnection class wraps a Bunny object to communicate with the server.
require 'bunny'
module Amqp
class SynchronousConnection
extend Configurable
@@setup = false
@@connected = false
def self.setup(config = nil, env = Rails.env)
if !self.setup?
@@config = load_config(config)
@@env_config = @@config[env]
raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
# symbolize the keys, which Bunny expects
@@env_config.keys.each {|key| @@env_config[(key.to_sym rescue key) || key] = @@env_config.delete(key) }
raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?(:exchange)
@@bunny = Bunny.new(@@env_config)
@@setup = true
end
handle_passenger_forking
end
# Whether the underlying connection has been set up
def self.setup?
@@setup
end
# Establish a connection to the underlying exchange
def self.connect
raise StandardError, "AMQP not setup. Call setup before calling connect" if !self.setup?
@@bunny.start
@@exchange = @@bunny.exchange(@@env_config[:exchange], :type => :topic, :durable => true)
@@connected = true
end
# Disconnect from the underlying exchange
def self.disconnect
begin
@@bunny.stop
rescue
# if this is being called because the underlying connection went bad
# calling stop will raise an error. that's ok....
ensure
@@connected = false
end
end
# Re-connects to the underlying exchange
def self.reconnect
self.disconnect
@@setup = false
@@bunny = Bunny.new(@@env_config)
@@setup = true
self.connect
end
# Whether the underlying connection has been established
def self.connected?
@@connected
end
# Produces a message to the underlying exchange
def self.produce(message)
if !self.setup? || !self.connected?
::Rails.logger.error "AMQP not setup or connected. Call setup and connect before calling produce"
else
begin
@@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
rescue Bunny::ServerDownError
# the connection went south, try to reconnect and try one more time
begin
self.reconnect
@@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
rescue => err
::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
::Rails.logger.error "#{err.message}"
::Rails.logger.error err.backtrace.join("\n")
end
rescue => err
::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
::Rails.logger.error "#{err.message}"
::Rails.logger.error err.backtrace.join("\n")
end
end
end
end
end
And we add support for this to the Amqp::MessageProducer module by adding a produce_synchronously method.
module Amqp
module MessageProducer
# Produces a message to the underlying exchange using a synchronous connection
def produce_synchronously(message)
SynchronousConnection.produce(message)
end
# Produces a message to the underlying exchange using an asynchronous connection
def produce_asynchronously(message)
AsynchronousConnection.produce(message)
end
alias :produce :produce_asynchronously
end
end
Pros and Cons
On the plus side, this approach completely abstracts away which gem we’re using to produce messages to the broker. If we decide to use another gem for some reason, we’d only have to change the internal details of the connection classes. Also, clients can decide to produce messages synchronously or asynchronously if they need to for some reason.
On the down side, the use of class variables feels sort of dirty, so there’s some room for improvement there.
Overall, this approach has worked out great for us, but as always, suggestions welcome!