Building a Real-time Order Book from the Coinbase Websocket Feed with Clojure

Coinbase Pro is a cryptocurrency marketplace that provides an API for algorithmic trading. The first step in developing an algorithmic trading model is collecting training data. This blog post will focus on creating and maintaining a real-time order book based on the Coinbase Pro websocket feed. This real-time order book can be snapshotted at regular intervals to collect historical data for training our model. Additionally, once we have a trained model that can make trading decisions for us, we can feed the real-time order book to our model for real-time algorithmic trading.

The full code can be found here: https://github.com/btamadio/cbpro

What’s an order book?

At any given time, the order book represents all open orders on the market. For our purposes, we only care if an order is a bid or an ask. An order consists of a price and the quantity of the product to be bought or sold. For example, on the BTC-USD order book, a bid order of 2.0 BTC at $18,000 would be an offer to buy 2 BTC at the price of $18,000. If there are corresponding ask orders at $18,000 for 2 or more BTC, the bid order will be filled and disappear from the order book. This is a simplified explanation that glosses over some of the details of how orders are filled, but it should be enough for us to get started.

Websocket Feed

The Coinbase Pro websocket feed provides real-time order book updates, and luckily for us is reasonably well-documented. When connecting to the websocket feed, you specify the products and channels you want to subscribe to.

A product is a currency pair, such as BTC-USD or ETH-BTC. When subscribing to a product, the first currency will be the product being bought or sold and the second currency will be the purchasing currency.

There are several different channels we can subscribe to, but for maintaining a real-time order book, we care about the “level2” channel. When first connecting to the feed, we’ll get a “subscription” event, telling us which channels and products we’re subscribed to. Then, because we connected to the “level2” channel, the first set of messages will be a “snapshot” message for each of the products we asked for. The snapshot is the entire state of the order book (all bid and ask orders) at the time of subscription. All events after this should be “l2update” events, which describe changes to the order book for a product.

For connecting to the websocket feed, I’m going to use this library, which provides a nice wrapper and semantics for supplying a callback function for handling events.

I’m using Leiningen, so the first step is adding the following to project.clj:

:dependencies [[org.clojure/clojure "1.10.1"]
               [coinbase-pro-clj "1.0.0"]]

Then, in the namespace declaration of my application code, I’ll add:

:require [coinbase-pro-clj.core :as cp] 

To create a websocket connection and immediately start processing events, I can then do:

(cp/create-websocket-connection {:url cp/websocket-url
    :product_ids product-ids
    :channels [{:name "level2"}]
    :on-receive on-receive)

Where product-ids is the vector of products I want to subscribe to, and on-receive is a single-argument function that will be called every time an event is received, with the event passed as the argument.

The order book atom

Since we’ll be mutating order books, we’ll store them in an atom. An atom is a thread-safe reference type that can be updated atomically with the swap! function. For simplicity, we’re going to keep all the order books in a single atom. We can make a simple constructor:

(defn order-book [product-ids]
    (atom (into {} (map (fn [x]
        {x {:bids (sorted-map) :asks (sorted-map)}})
        product-ids))))

The shape of the order-book will be a nested map. There will be a top-level key for each product-id, and then keys for the bids and asks of each product-id:

> @(order-book ["BTC-USD" "ETH-BTC"])

{"BTC-USD" {:bids {}, :asks {}},
"ETH-BTC" {:bids {}, :asks {}}}

We’ll use a sorted-map to represent each side of the order book, which will make accessing the highest bid price and lowest ask price easy.

Maintaining statelessness

We’ve started in a dangerous place, by already introducing state into our application. If we’re not careful, this state can quickly spread throughout the application and infect a lot of our code. One of the main reasons to use a functional language like Clojure is to minimize mutable state and keep it away from the core application logic. To avoid spreading mutable state everywhere, we’ll only mutate the order book in a single place: the anonymous on-receive function passed to create-websocket-connection:

(defn start-order-book [order-book]
    (cp/create-websocket-connection {:url cp/websocket-url
        :product_ids (vec (keys @order-book)))
        :channels [{:name "level2"}]
        :on-receive #(swap! order-book update-book %)}))

Every time an event is received, the value of the order-book will be replaced by the result of calling update-book on the value of order-book and the event:

(update-book @order-book event)

This code won’t compile because we haven’t defined update-book yet. The goal for the update-book function is to take an (immutable) order-book value and an event, and return the updated value of the order book. This new value will atomically replace the old value of the order book for every event, via the call to swap! The function update-book will be side effect free. Most of the rest of the code will be functions used by update-book, and so will also be side-effect free. So the only mutable state in the entire application will be inside this start-order-book function!

Updating the order book

The next thing we need to do is define the update-book function, which takes an order book value and an event, and returns the updated order book, based on the event. There are three types of events we have to handle, and we’ll define a separate function for each.

First, we need to handle snapshot events, which look like (in EDN):

{
 :type "snapshot"
 :product_id "BTC-USD"
 :bids [["10101.10", "0.45054140"]]
 :asks [["10102.55", "0.57753524"]]
}




Where the bids and asks are vectors of [price, size] vectors, and represent the entire order book. Unfortunately they come as vectors of strings, so the first order of business is to convert them to doubles and store them in a sorted-map:

(defn to-doubles [v] (vec (map #(Double/parseDouble %) v)))

(defn snapshot-bids [snapshot-event]
    (into (sorted-map-by >) 
          (map to-doubles (:bids snapshot-event))))

(defn snapshot-asks [snapshot-event]
    (into (sorted-map) 
        (map to-doubles (:asks snapshot-event))))

Bids are sorted in reverse order so that highest bids will appear first in the map. Asks are sorted in default order. This is because we are most interested in the highest bids and the lowest asks. If we’re going to truncate the order book later, say when displaying or saving to persistent storage, it will be convenient to have them stored in this order.

Then we can define the function that handles snapshot events:

(defn handle-snapshot [order-book event]
    (assoc order-book (:product_id event)
        {:bids (snapshot-bids event)
         :asks (snapshot-asks event)}))

Next, we need to handle l2update events, which have the following form:

{
  "type": "l2update",
  "product_id": "BTC-USD",
  "time": "2019-08-14T20:42:27.265Z",
  "changes": [
    [
      "buy",
      "10101.80000000",
      "0.162567"
    ]
  ]
}

This will be trickier than handling snapshot events because there can be any number of changes delivered in a single event, and we have to apply them all in order. A change is defined as the tuple [side, price, size], where side will be “buy” if the bids should be updated or “sell” if the asks should be updated. The price is the price whose size should be changed and the size is the size of the offers at that price. Note that the value of size is not a delta, but it’s what the size should be after applying the change. If the size is zero, that prices should be removed from the order book.

First let’s write the function to update the order book for a single change:

(defn apply-change [product-id order-book change]
    (let [side ({"buy" :bids "sell" :asks} (change 0))
          price (Double/parseDouble (change 1))
          size (Double/parseDouble (change 2))]
        (if (zero? size)
            (update-in order-book [product-id side] 
                dissoc price)
            (assoc-in order-book [product-id side price] 
                size))))

In the let block, we convert the side to the keyword :bids or :asks for the side of the order book to be updated. Then we convert the price and the size to doubles. If the size is zero, we remove that price from the order book for this product. Otherwise, we update the size for the corresponding price to the size specified in the change.

Then, to apply all the changes in an l2update event, we use the following recursive function:

(defn apply-changes [order-book product-id changes]
  (if (empty? changes)
    order-book
    (recur
      (handle-change product-id order-book (first changes))
      product-id
      (rest changes))))

This return the result of iteratively calling handle-change on the order book for each change in the l2update event. We use the recur keyword tell the compiler that this is tail-recursive.

It turns out that there’s a much simpler way to express this procedure, using reduce:

(defn apply-changes [order-book product-id changes]
    (reduce (partial apply-change product-id) order-book changes))

We also want to keep the timestamp of the last update event in the order book for each product, so we define the update-timestamp function as:

(defn update-timestamp [order-book product-id timestamp]
  (assoc-in order-book [product-id :timestamp] timestamp))

For each l2update event, we want to apply the changes and update the timestamp. We can use the thread-first macro to first apply the changes and then update the timestamp in our definition of handle-l2update:

(defn handle-l2update [order-book event]
  (-> order-book
      (apply-changes (:product_id event) (:changes event))
      (update-timestamp (:product_id event) (:time event))))

Next we provide a function to route even types to their handler functions:

(defn event-handler [event-type]
  (cond
    (= event-type "snapshot") handle-snapshot
    (= event-type "l2update") handle-l2update
    :else (fn [order-book event] order-book)))

The :else clause provides a no-op function in case the event-type is anything other than “snapshot” or “l2update”. Ideally we would also provide a function to handle error events, but I won’t cover that in this blog post. The other type of event we’ll get is a subscription event, which does not change the order book.

Finally, we can define the update-book function, which will be used as an argument to swap! to update the order book for every event:

(defn update-book [order-book event]
    (let [handler (event-handler (:type event))]
        (handler order-book event)))

Watching the live feed

The last thing we’ll implement is a function to print the highest bid and lowest ask for each product, as well as the timestamp of the most recent update. Then we can start asynchronously updating the order book and displaying the status at regular intervals

(defn display-book [order-book]
  (doseq [product-id (order-book-product-ids order-book)]
    (println product-id 
        (:timestamp (order-book product-id)))
    (println "\tbid:" 
        (first (:bids (order-book product-id))))
    (println "\task:" 
        (first (:asks (order-book product-id))))))

You could modify this to show the top 5 bids or asks using something like:

(println "\tasks:" (take 5 (:asks (order-book product-id)))) 

To get our order book updating asynchronously, we’ll use the go function from Clojure’s core.async library. First we add the dependency to our project.clj:

:dependencies [[org.clojure/clojure "1.10.1"]
               [coinbase-pro-clj "1.0.0"]
               [org.clojure/core.async "1.3.610"]]

Then in our namespace declaration we import the function we need:

(:require [coinbase-pro-clj.core :as cp]
         [clojure.core.async :refer [go]])

Finally we can put the pieces together and define our main function:

(defn -main [& args]
  (def ob (order-book (vec args)))
  (go (start-order-book ob))
  (loop []
    (Thread/sleep 1000)
    (display-book @ob)
    (recur)))

We can now run the program from the command line, supplying any number of products to track in real-time:

lein run BTC-USD ETH-USD XRP-USD

Conclusion

We showed how to use the Coinbase Pro websocket feed and Clojure to create a tool for observing real-time order books. This code can easily be extended to save order book snapshots to a database for training trading models. Once a model is trained, this real-time order book data can also be fed to a trading model to make real-time decisions.