Clojure/Ring middleware handler to automatically apply overload control based on a target latency



(this space intentionally left almost blank)
(ns overload-middleware.bucket
  (:require [overload-middleware.schemas :refer [TokenBucketParameters]]
            [overload-middleware.utils :refer [get-time]]
            [schema.core :as s]))
(s/defn replenish-underlying [bucket :- TokenBucketParameters]
  (let [now (get-time)
        difference-in-secs (/ (- now (:last-time bucket)) 1000)
        increase (* (:rate bucket) difference-in-secs)
        new-tokens (min (:max bucket) (+ (:tokens bucket) increase))]
    (-> bucket
        (assoc :tokens new-tokens)
        (assoc :last-time now))))
(s/defn replenish [bucket-ref :- clojure.lang.Ref]
  (dosync (alter bucket-ref replenish-underlying)))
(s/defn dec-token [bucket :- TokenBucketParameters]
  (assoc bucket :tokens (dec (:tokens bucket))))
(s/defn get-token [bucket-ref :- clojure.lang.Ref]
   (if (>= 0 (:tokens @bucket-ref))
     (commute bucket-ref dec-token))))
(s/defn get-token-with-replenishment [bucket-ref :- clojure.lang.Ref]
  (replenish bucket-ref)
  (get-token bucket-ref))
(ns overload-middleware.latency
  (:require [overload-middleware.utils :refer [abs]]
            [overload-middleware.schemas :refer [LatencyAlgorithmParameters]]
            [schema.core :as s]))
(s/defn sensible-parameters? [{:keys [dec-factor dec-threshold
                                     inc-factor inc-threshold inc-weight
                                     min-rate max-rate
                                     nreq timeout
                                     alpha]} :- LatencyAlgorithmParameters]
   (< 1 dec-factor)
   (pos? inc-factor)
   (>= dec-threshold 0)
   (neg? inc-threshold)
   (<= inc-weight 0)
   (> max-rate min-rate)
   (pos? min-rate)
   (>= alpha 0)
   (= 0 (rem nreq 10))
   (pos? timeout)))
(s/defn calculate-new-rate [current-rate :- s/Number
                          {:keys [dec-factor dec-threshold
                                  inc-factor inc-threshold inc-weight
                                  min-rate max-rate] :as parameters} :- LatencyAlgorithmParameters
                          error :- s/Number
                          overloads :- java.lang.Boolean]
  {:pre [(sensible-parameters? parameters)]}
  (let [decreased-rate (-> current-rate
                           (/ dec-factor)
                           (max min-rate))
        increased-rate (-> error
                           (- inc-weight)
                           (* inc-factor)
                           (+ current-rate)
                           (min max-rate))]
     (or overloads (> error dec-threshold))
     (< error inc-threshold)
(s/defn add-latency-sample [latency-ref :- clojure.lang.Ref
                            parameters :- LatencyAlgorithmParameters
                            sample :- s/Number]
  {:pre [(sensible-parameters? parameters)
         (>= sample 0)]
   :post [vector? #(= 2 (count %))]}
   (let [new-recent-latencies (conj (:recent-latencies @latency-ref) sample)
         nreq (count new-recent-latencies)
         nintieth-percentile-index (max 0 (dec (int (* 0.9 nreq))))
         result (nth (sort new-recent-latencies) nintieth-percentile-index)
         nreq-reached (= (:nreq parameters) nreq)
         timer-popped (< (- (System/currentTimeMillis) 1000) (:last-calculation @latency-ref))]
     (if (or nreq-reached timer-popped)
         (alter latency-ref assoc :recent-latencies [])
         [true result])
         (alter latency-ref assoc :recent-latencies new-recent-latencies)
         [false nil])))))
(s/defn update-latency-estimate [latency-ref :- clojure.lang.Ref
                               parameters :- LatencyAlgorithmParameters
                               new-data :- s/Number]
  {:pre [(sensible-parameters? parameters)
         (>= new-data 0)]}
   (let [{:keys [alpha]} parameters
         target (:target-latency @latency-ref)
         smoothed-estimate (-> (:smoothed-estimate @latency-ref)
                               (* alpha)
                               (+ (* (- 1 alpha) new-data)))
     (alter latency-ref assoc :smoothed-estimate smoothed-estimate))))
(ns overload-middleware.middleware
  (:require [overload-middleware.bucket :refer :all]
            [overload-middleware.latency :refer :all]
            [overload-middleware.utils :refer :all]
            [overload-middleware.schemas :refer :all]))
(declare register-new-latency)
(defn wrap-overload [app
                     {:keys [target-latency
  {:pre [(number? target-latency) (pos? target-latency)]}
  (let [algorithm-parameters (merge default-latency-algorithm-parameters override-algorithm-parameters)
        bucket-parameters (assoc
                              (merge default-token-bucket-parameters override-bucket-parameters)
                            :last-time (get-time))
        bucket (ref bucket-parameters)
        latency-info (ref {:target-latency target-latency
                           :smoothed-estimate target-latency
                           :recent-latencies []
                           :recent-overloads false
                           :last-calculation (get-time)})]
    (fn [req]
      (if (get-token-with-replenishment bucket)
        (let [[latency {:keys [status] :as result}] (with-latency app req)]
          (if (= 503 status) (dosync (alter latency-info assoc :recent-overloads true)))
          (register-new-latency latency-info algorithm-parameters bucket latency)
        {:status 503 :body "Overloaded"}))))
(defn register-new-latency [latency-info algorithm-parameters bucket new-latency]
  (let [[do-update ninetieth-percentile] (add-latency-sample latency-info
    (if do-update
      (dosync (update-latency-estimate latency-info algorithm-parameters ninetieth-percentile)
          (let [target (:target-latency @latency-info)
                err (-> (:smoothed-estimate @latency-info)
                 (- target)
                 (/ target))]
            (alter bucket assoc :rate (calculate-new-rate (:rate @bucket) algorithm-parameters err (:recent-overloads @latency-info))))
          (alter latency-info assoc :recent-overloads false)))))
(ns overload-middleware.schemas
  (:require [schema.core :as s]
            [overload-middleware.utils :refer [get-time]]))
(def LatencyAlgorithmParameters
  {:dec-factor s/Number
   ;; Multiplicative factor controlling how the rate is decreased.
   :dec-threshold s/Number
   ;; The required % difference between the average latency and the target latency for the bucket rate to be reduced.
   :inc-factor s/Number
   ;; Additive factor controlling how much the rate is increased.
   :inc-threshold s/Number
   ;; The required % difference between the average latency and the
   ;; target latency for the bucket rate to be reduced.
   :inc-weight s/Number
   ;; Weight affecting how much impact the % difference has on the change in the bucket rate.
   :min-rate s/Number ; Minimum bucket rate.
   :max-rate s/Number ; Maximum bucket rate.
   :nreq s/Number
   :timeout s/Number
   ;; The rate is changed after :nreq requests have been processed or :timeout ms have passed, whichever comes first.
   :alpha s/Number
   ;; The 90th-percentile latency is tracked as a smoothed average - this is the smoothing factor.
(def default-latency-algorithm-parameters
  {:dec-factor 1.2,
   :dec-threshold 0.0,
   :inc-factor 2,
   :inc-threshold -0.005, ; -0.5%
   :inc-weight -0.1,
   :min-rate 0.05,
   :max-rate 5000,
   :nreq 100,
   :timeout 1000, ; milliseconds
   :alpha 0.7})
(def TokenBucketParameters
  {:tokens s/Number
   ;; The number of tokens currently available in this bucket (each of which allows one request to be processed).
   :max s/Number ; The maximum number of tokens this bucket can hold.
   :rate s/Number
   ;; The rate at which this bucket is refilled (tokens per second). Varied by the middleware based on latency.
   :last-time s/Number
   ;; The last time this bucket was refilled (a timestamp in milliseconds)
(def default-token-bucket-parameters
  {:tokens 100,
   :max 5000,
   :last-time -1 ; The wrapper sets it to the current timestamp when created
   :rate 10})
(ns overload-middleware.utils)
(defn with-latency [f & args]
  (let [start (System/currentTimeMillis)
        result (apply f args)
        end (System/currentTimeMillis)]
    [(- end start) result]))
(defn with-nano-latency [f]
  (let [start (System/nanoTime)
        result (f)
        end (System/nanoTime)]
    [(- end start) result]))
(defn abs [num]
  (if (neg? num) (- 0 num) num))
(defn get-time [] (System/currentTimeMillis))