../Hermes: a simple message passing framework

8 minute read

Hermes is a simple message passing framework, built to enable multiplayer in my game engine. This article will go over the high level design of the crate as well as design descisions I made along the way.

Disclaimer hermes is under relatively active development but at this point I feel comfortable writing about it at a high level.

Disclaimer 2 the design is inspired by One Lone Coder's video on the same topic, highly recommend checking that out.

Early Design

As someone who has dabbled just enough in networking to be dangerous, but not enough to care all that much about how I get bytes on this machine to a different machine, I wanted to design a networking library that I could use without too much overhead when sending and recieving messages without sacraficing too much flexibility in terms of what exactly I can send through it.

Another aspect of the structure was that I would use client-server model. This doesn't entirely avoid complexity but it does remove a lot of the burden of needing to reason about how clients talk to one another. Will this scale gracefully? Maybe not but for my use case of wanting to support 2-4 players cooperating to city build from an RTS style camera I think it is a good model.

Early on I knew I wanted the API of a client to roughly look like this:

let network_client = Client::new("{IP Addr}", port);
let message = Message::new("hello");
network_client.send(message);
let in_messages = network_client.read_messages();

Likewise I wanted the server end of the API to be simple as well

let server = Server::new(port);
loop {
    let message = server.wait_for_message();
    // do something with message
    let out_message = Message::new("world");
    // this is hand wavey but it is what I wanted
    server.send_to(client_handle, out_message);
}

Equipped with this design, I had a target in mind for the API so I just needed to sort out the implementation details of actually moving the messages through the series of tubes we call the internets.

Implementation

I already had quite a bit on my plate as far as things I wanted to learn about and implement so I decided to not roll my own asynchronous runtime, which left me with two possibilities to implement my underlying networking logic with, tokio and async-std. I spent some time researching the tradeoffs between the two and decided to go with tokio as it seemed more mature at the time.

I broke up the crate into its four major components

  • message: holds the bytes and some information about what the bytes are meant to be
  • connection: holds the entry point to your local internets tube
  • client: holds one connection to the server
  • server: holds a connection for each client

Message

This was the first part of the framework I started focusing on, I knew that my API needed to be simple and low friction in order for me to want to use it, so I had to make sure my abstraction for what a message is was right.

I decided it would be sensible to use a tried and true method of passing bytes through the internet, a header prependend to the actual bytes. So my message struct should look like

pub struct Message {
    pub header: MessageHeader,
    pub body: Vec<u8>,
}

Easy enough, so my header probably will need to contain two pieces of information: What the bytes are, and how many bytes are there?

pub struct MessageHeader<T> {
    pub kind: T,
    pub size: u32,
}

I very quickly ran into some roadblocks with this model, mainly due to Rust protecting me from myself. Rust wanted to be sure that whatever T was could actually be passed through the internet without breakng everything.

That's fine, I just need a new plain old data (Pod) trait constraint on T for my MessageHeader.

pub trait Pod: 'static + Copy + Sized + Send + Sync + std::fmt::Debug {}

I also wanted to be sure I could use this trait on any type that fit these contraints I wanted to pass through my framework.

impl<T: 'static + Copy + Sized + Send + Sync + std::fmt::Debug> Pod for T {}

Lastly I wanted to make sure I didn't mistakenly use the MessageHeader kind wrong so I added a alias trait for my Pod trait as an extra layer of protection

pub trait Messageable: Pod {}

Adding this to what we have we now have

pub trait Pod: 'static + Copy + Sized + Send + Sync + std::fmt::Debug {}
impl<T: 'static + Copy + Sized + Send + Sync + std::fmt::Debug> Pod for T {}
pub trait Messageable: Pod {}

pub struct MessageHeader<T: Messageable> {
    // kind is called id in my actual code becuase it may be used as an identifier 
    // for the message
    pub id: T,
    pub size: u32,
}

pub struct Message<T: Messageable> {
    pub header: MessageHeader<T>,
    pub body: Vec<u8>,
}

With the structures laid out, we just need to add some impl's for the actual API. Making a new message with no data is simple

pub fn new(id: T) -> Self {
    let header = MessageHeader {
        id,
        size: std::mem::size_of::<MessageHeader<T>>() as u32,
    };

    Self {
        header,
        body: vec![],
}

We just need to know the kind of message we are sending when creating a new one and the library code can figure out the rest of the setup.

Now these messages need to be able to have bytes pushed into and pulled out of them, based on if your sending or recieving the message. Pushing bytes is required to even have a message in the first place, let's go over that first.

// utility function for recalculating message size
pub fn size(&self) -> u32 {
    (std::mem::size_of::<MessageHeader<T>>() + self.body.len()) as u32
}

pub fn push<V: Pod>(&mut self, data: V) {
    // resize our byte buffer to be able to fit the 
    // incoming data as well as any data already in the buffer
    self.body
        .resize(self.body.len() + std::mem::size_of::<V>(), 0);

    unsafe {
        // downcast from the type to a slice of bytes
        let data_ptr: *const V = &data;
        let byte_ptr: *const u8 = data_ptr as *const _;
        let byte_slice: &[u8] = std::slice::from_raw_parts(byte_ptr, std::mem::size_of::<V>());

        // copy the new data bytes into our byte buffer
        std::ptr::copy(
            &byte_slice[0],
            self.body.as_mut_ptr().add(self.body.len()),
            std::mem::size_of::<V>(),
        );
    }

    // recalculate the total size of the message
    self.header.size = self.size();
}

This push call at a high level does 3 things

  1. Resizes the body of the message to fit incoming data with the existing data
  2. Downcasts the data type to a slice of bytes and copies those bytes to the end of the body of the message
  3. Updates the message header to correctly capture the new size of the entire message.

The pull call will need to do the inverse of this operation.

pub fn pull<V: Pod>(&mut self) -> Result<V, MessageError> {
    // calculate new length after removing a single instance of `V`
    let bytes = std::mem::size_of::<V>();
    // check to make sure there are enough bytes in the buffer to pull out `V`
    if bytes > self.body.len() {
        return Err(MessageError::NotEnoughBytes {
            type_size: bytes,
            remaining: self.body.len(),
        });
    }
    let new_length = self.body.len() - bytes;
    let out = unsafe {
        // get a pointer to the start of the bytes we will be pulling out
        let data_ptr = self.body.as_ptr().add(new_length);
        // get pointer to byte slice of our `V` data
        let byte_slice: &[u8] = std::slice::from_raw_parts(data_ptr, bytes);
        // copy the bytes out of the buffer into an instance of `V`
        std::mem::transmute_copy(&byte_slice[0])
    };
    // shrink buffer to new length
    self.body.resize(new_length, 0);
    self.header.size = self.size();

    Ok(out)
}

I had to play with the call signature of the pull call in order to get it to feel right. I originally had the number of bytes you wanted to pull out be specified in the function call but that was a bit cumbersome to actually implement. This signature makes it pretty easy to pull the data out as long as you pull the data out in the inverse order you had orignally pushed it in.

I will admit, this API doesn't really lend it self to being very safe and there are barely any validations done. However, it is very easy to use with a few extra impl's within in the library. Any Pod type I want to send through this framework just works as long as I add the couple of blanket impl's on the type if it doesn't already have the constraints outlined by Pod.

Connection

Now that we have our Message struct to hold our bytes, we now need an entry point to the network to push the messages through. This is why my decision to use tokio began to really matter.

A connection is mainly responsible for processing incoming and outgoing messages, when a caller invokes send on the client or server interfaces, it is not actually invoking a proper send of the data, instead if is pushing the Message into an outgoing message queue in the connection. Similarly, we cannot expect the client or server to drop everything it is doing to process each and every message as they come in, so we need to put the incoming messages in a queue that it can process the queue at its convenience.

So this gives our connection several things to manage

  1. Writing to incoming messages queue
  2. Pulling from outgoing messages queue
  3. Connecting to the server for a client side connection
  4. Creating a connection on the server side for any connecting client
  5. Listen to the TCP socket for incoming messages
  6. Process outgoing messages queue

So we could create a Connection struct that looks like

pub struct Connection<T: Messageable> {
    messages_in: Vec<Message<T>>,
    messages_out: Vec<Message<T>>,
    peer_addr: Option<std::net::SocketAddr>,
    tcp_stream: Option<tokio::net::TcpStream>,
}

A pretty big problem I faced with getting this API to work for my use was that I had originally considered that the server would parse the MessageHeader for some sort of identifying information about the client in order to know which client sent the message. The reason this was difficult is the client would only have a single queue of incoming messages which made it precarious to know which client sent what.

This eneded up being pretty cumbersome to use so I ended up using two seperate models of Message queues. Outgoing messages would stay the same, but incoming messages would be stored as a tuple of the peer's address and the message it sent. And because I wanted to be able to push and pop messages to these Vec's like a queue I also started using std::collections::VecDeque instead.

use std::collections::VecDeque;

pub type AddressedMessageQueue<T> = VecDeque<(std::net::SocketAddr, Message<T>)>;
pub struct Connection<T: Messageable> {
    messages_in: AddressedMessageQueue<T>,
    messages_out: VecDeque<Message<T>>,
    peer_addr: Option<std::net::SocketAddr>,
    tcp_stream: Option<tokio::net::TcpStream>,
}

So far not too bad, but how exactly are we going to handle processing outgoing messages while also listening for incoming messages? I probably could wrap it in a loop and do some clever logic to handle both ends of that socket but since I am already using tokio, why not just spin up new tasks to handle each half of this?

Tokio has a built in structure to support this, split. I can take a single stream and get the read and write half of it.

use std::collections::VecDeque;

pub type AddressedMessageQueue<T> = VecDeque<(std::net::SocketAddr, Message<T>)>;
pub struct Connection<T: Messageable> {
    messages_in: AddressedMessageQueue<T>,
    messages_out: VecDeque<Message<T>>,
    peer_addr: Option<std::net::SocketAddr>,
    read_stream: Option<ReadHalf<tokio::net::TcpStream>>,
    write_stream: Option<WriteHalf<tokio::net::TcpStream>>,
}

Right, so we have the basics we need to start reading and writing to a TCP socket. But we need to also be able to mutably access these queues across threads and we probably to hold the queues in the client and server code... Not a problem, let's just add some Arc's and Mutex's.

use crate::AddressedMessageQueue;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;

pub struct Connection<T: Messageable> {
    messages_in: Arc<Mutex<AddressedMessageQueue<T>>>,
    messages_out: Arc<Mutex<VecDeque<Message<T>>>>,
    peer_addr: Option<std::net::SocketAddr>,
    read_stream: Option<ReadHalf<tokio::net::TcpStream>>,
    write_stream: Option<WriteHalf<tokio::net::TcpStream>>,
}

That is basically all there is to the Connection struct, the actual logic for connecting and handling the message queues isn't terrible interesting and a bit rough around the edges. You can find that code here.

The high level overview of that process is the writing task takes the Message's out of the messages_out queue and converts them entirely to bytes before writing it to the socket. The read task listens for incoming bytes on the socket and uses the MessageHeader to create Message's out of the incoming stream of bytes until it has read the entire Message at which point it pushes the new Message to the messages_in queue.

Coming Soon

Sections about the client, server, and current usage of the framework in my game.