Ruby Messaging Part 1: Know The Players, Know The Game

Hey, all. Here's a new series of blog posts I've been thinking about for a few weeks now. It's a series where I'm hoping to introduce you to a bit of work I've been doing lately on the Ruby messaging front with Qpid Proton. Specifically, I've been working on a low-level set of APIs dubbed the "engine APIs". They are a close analog to elements in the AMQP protocol specification, so a knowledge of one will help in understanding the other.

In this first post I'm going to introduce you to the main players in the game and how they each represent aspects of the AMQP messaging specification. Then I'll share a very small example application that accepts a connection from a remote container, but does nothing else for now. This application we can grow over the process of these posts until we have fully functional application.

The Logical Components

Working from the outside in, we have the following pieces:

A container is either a producer, a consumer, both a producer and a consumer, or a queue. As their names imply, producers and consumers either produce (send) messages or else consume (receiver) messages, or both, as is the case with most messaging containers.

A container that primarily produces/consumers messages is going to be an application that uses messaging as a means of coordinating its efforts with other applications. An example, which I'll develop over the course of these blog posts, is a traffic light system which is designed to work with the local municipality's emergency needs.

A container that acts as a queue would be a broker, communications bridge or similar application. It's not the endpoint for the bulk of the messages it receives, but is really a conduit through which messages flow on their way to their true endpoint; i.e., it stores and then forwards messages. Examples would include the Qpid C++ broker and the Qpid Dispatch router.

Containers create connections to, or accept from, other containers. The connection is how data actually flows in between the two containers and is broken down into constituent pieces, like the layers of an onion. But the details of that flow are outside of the scope of this series of posts.

Within connections are sessions. A session will have a pair of channels for sending and receiving data.

Connections and session are thought of as endpoints which hold incoming messages and which hold the last known state information for outgoing messages.

A channel is a unidirectional (one way) means of sending messages. The reason why a session contains a pair of channels is to allow for bidirectional (two way) communcation.

The terminus maintains the state information for incoming and outgoing messages that flow over the link. A channel will have a source and a target terminus.

The link is the where the actual protocol work is done, transmitting the message from source to target. It ties together the termini of a channel.

So, to summarize:
  • A container is an application that produces, consumes or queues messages.
  • A connection is how two containers communicate with each other.
  • A connection has one or more sessions, which pair up unidirectional channels to allow bidrectional communication.
  • A channel sends data from one endpoint to the other over a link unidirectionally.
  • A channel has a terminus which maintains the state information on each end of its link.

Translating To Ruby Classes

So now that we know the big picture pieces from the specification, I'll translate that knowledge to the Proton Ruby engine work I've been done.

There is no analog (at this time) in the library for a container.

The Qpid::Proton::Connection class represents the connection between containers. With it you can start and stop working with a remote container, create new sessions, retrieve the next session which has pending work and also the next link with pending work to process. You can also access the transport engine, which is itself the topic for another post.

The Qpid::Proton::Session class is the session analog. With it you can start and stop sessions with the remote container, create endpoints to send and receive messages, peek at how many incoming and outgoing bytes there are as well as access the parent connection.

The Qpid::Proton::Sender and Qpid::Proton::Receiver classes are used for sending and receiving instances of Qpid::Proton::Message, respectively.

Additional Classes

In the Ruby library there are additional classes that work with the specification analogs to tie things together: the Transport and the Collector.

The Qpid::Proton::Transport class is the protocol engine. It processes incoming and outgoing bytes for a connection, and publishes events as that connection's state evolves over time.

The Qpid::Proton::Collector class is a FIFI queue which holds the events fired by the transport. It can be queried by the container to find the oldest event.

An Example Application: The Traffic Light Manager

Following is a simple Ruby application that accepts incoming connections and starts an AMQP dialog follows. It accepts a TCP connection, creates a connection and related objects and then begins looking at events to process.

Bear in mind that this application is by no means the best example of how to write such an application, but is only used as a means of demonstrating how the pieces we're shown here fit together. A more details example will be developed in follow up posts.

require 'qpid_proton'
require 'socket'

server = TCPServer.new(5672) # accept connections on the default AMQP port

# start an infinite loop
loop do
  # spawn a new thread for each incoming client
  Thread.new(server.accept) do |client_socket|
    # Create a connection object, a collector for any
    # events, and a transport to process the data
    # flowing over the wire.
    conn = Qpid::Proton::Connection.new
    collector = Qpid::Proton::Collector.new
    conn.collect(collector)
    transport = Qpid::Proton::Transport.new

    # details of moving data between the client socket
    # and the transport are excluded here
    #
    # instead, we'll just jump right to processing events

    # get the next event
    event = collector.peek

    unless event.nil?
      # do something here, a detail for a later post

      # remove the event we've processed and get the next
      collector.pop
      event = collector.peek
    end
  end
end

Comments