overload-middleware0.1.1Clojure/Ring middleware handler to automatically apply overload control based on a target latency dependencies
| (this space intentionally left almost blank) | |||||||||
(ns overload-middleware.bucket
(:require [overload-middleware.schemas :refer [TokenBucketParameters]]
[overload-middleware.utils :refer [get-time]]
[schema.core :as s])) | ||||||||||
(s/defn replenish-underlying [bucket :- TokenBucketParameters]
(let [now (get-time)
difference-in-secs (/ (- now (:last-time bucket)) 1000)
increase (* (:rate bucket) difference-in-secs)
new-tokens (min (:max bucket) (+ (:tokens bucket) increase))]
(-> bucket
(assoc :tokens new-tokens)
(assoc :last-time now)))) | ||||||||||
(s/defn replenish [bucket-ref :- clojure.lang.Ref] (dosync (alter bucket-ref replenish-underlying))) | ||||||||||
(s/defn dec-token [bucket :- TokenBucketParameters] (assoc bucket :tokens (dec (:tokens bucket)))) | ||||||||||
(s/defn get-token [bucket-ref :- clojure.lang.Ref]
(dosync
(if (>= 0 (:tokens @bucket-ref))
nil
(commute bucket-ref dec-token)))) | ||||||||||
(s/defn get-token-with-replenishment [bucket-ref :- clojure.lang.Ref] (replenish bucket-ref) (get-token bucket-ref)) | ||||||||||
(ns overload-middleware.latency
(:require [overload-middleware.utils :refer [abs]]
[overload-middleware.schemas :refer [LatencyAlgorithmParameters]]
[schema.core :as s])) | ||||||||||
(s/defn sensible-parameters? [{:keys [dec-factor dec-threshold
inc-factor inc-threshold inc-weight
min-rate max-rate
nreq timeout
alpha]} :- LatencyAlgorithmParameters]
(and
(< 1 dec-factor)
(pos? inc-factor)
(>= dec-threshold 0)
(neg? inc-threshold)
(<= inc-weight 0)
(> max-rate min-rate)
(pos? min-rate)
(>= alpha 0)
(= 0 (rem nreq 10))
(pos? timeout))) | ||||||||||
(s/defn calculate-new-rate [current-rate :- s/Number
{:keys [dec-factor dec-threshold
inc-factor inc-threshold inc-weight
min-rate max-rate] :as parameters} :- LatencyAlgorithmParameters
error :- s/Number
overloads :- java.lang.Boolean]
{:pre [(sensible-parameters? parameters)]}
(let [decreased-rate (-> current-rate
(/ dec-factor)
(max min-rate))
increased-rate (-> error
(- inc-weight)
(abs)
(* inc-factor)
(+ current-rate)
(min max-rate))]
(cond
(or overloads (> error dec-threshold))
decreased-rate
(< error inc-threshold)
increased-rate
:else
current-rate))) | ||||||||||
(s/defn add-latency-sample [latency-ref :- clojure.lang.Ref
parameters :- LatencyAlgorithmParameters
sample :- s/Number]
{:pre [(sensible-parameters? parameters)
(>= sample 0)]
:post [vector? #(= 2 (count %))]}
(dosync
(let [new-recent-latencies (conj (:recent-latencies @latency-ref) sample)
nreq (count new-recent-latencies)
nintieth-percentile-index (max 0 (dec (int (* 0.9 nreq))))
result (nth (sort new-recent-latencies) nintieth-percentile-index)
nreq-reached (= (:nreq parameters) nreq)
timer-popped (< (- (System/currentTimeMillis) 1000) (:last-calculation @latency-ref))]
(if (or nreq-reached timer-popped)
(do
(alter latency-ref assoc :recent-latencies [])
[true result])
(do
(alter latency-ref assoc :recent-latencies new-recent-latencies)
[false nil]))))) | ||||||||||
(s/defn update-latency-estimate [latency-ref :- clojure.lang.Ref
parameters :- LatencyAlgorithmParameters
new-data :- s/Number]
{:pre [(sensible-parameters? parameters)
(>= new-data 0)]}
(dosync
(let [{:keys [alpha]} parameters
target (:target-latency @latency-ref)
smoothed-estimate (-> (:smoothed-estimate @latency-ref)
(* alpha)
(+ (* (- 1 alpha) new-data)))
]
(alter latency-ref assoc :smoothed-estimate smoothed-estimate)))) | ||||||||||
(ns overload-middleware.middleware
(:require [overload-middleware.bucket :refer :all]
[overload-middleware.latency :refer :all]
[overload-middleware.utils :refer :all]
[overload-middleware.schemas :refer :all])) | ||||||||||
(declare register-new-latency) | ||||||||||
(defn wrap-overload [app
{:keys [target-latency
override-algorithm-parameters
override-bucket-parameters]}]
{:pre [(number? target-latency) (pos? target-latency)]}
(let [algorithm-parameters (merge default-latency-algorithm-parameters override-algorithm-parameters)
bucket-parameters (assoc
(merge default-token-bucket-parameters override-bucket-parameters)
:last-time (get-time))
bucket (ref bucket-parameters)
latency-info (ref {:target-latency target-latency
:smoothed-estimate target-latency
:recent-latencies []
:recent-overloads false
:last-calculation (get-time)})]
(fn [req]
(if (get-token-with-replenishment bucket)
(let [[latency {:keys [status] :as result}] (with-latency app req)]
(if (= 503 status) (dosync (alter latency-info assoc :recent-overloads true)))
(register-new-latency latency-info algorithm-parameters bucket latency)
result)
{:status 503 :body "Overloaded"})))) | ||||||||||
(defn register-new-latency [latency-info algorithm-parameters bucket new-latency]
(let [[do-update ninetieth-percentile] (add-latency-sample latency-info
algorithm-parameters
new-latency)]
(if do-update
(dosync (update-latency-estimate latency-info algorithm-parameters ninetieth-percentile)
(let [target (:target-latency @latency-info)
err (-> (:smoothed-estimate @latency-info)
(- target)
(/ target))]
(alter bucket assoc :rate (calculate-new-rate (:rate @bucket) algorithm-parameters err (:recent-overloads @latency-info))))
(alter latency-info assoc :recent-overloads false))))) | ||||||||||
(ns overload-middleware.schemas
(:require [schema.core :as s]
[overload-middleware.utils :refer [get-time]])) | ||||||||||
(def LatencyAlgorithmParameters
{:dec-factor s/Number
;; Multiplicative factor controlling how the rate is decreased.
:dec-threshold s/Number
;; The required % difference between the average latency and the target latency for the bucket rate to be reduced.
:inc-factor s/Number
;; Additive factor controlling how much the rate is increased.
:inc-threshold s/Number
;; The required % difference between the average latency and the
;; target latency for the bucket rate to be reduced.
:inc-weight s/Number
;; Weight affecting how much impact the % difference has on the change in the bucket rate.
:min-rate s/Number ; Minimum bucket rate.
:max-rate s/Number ; Maximum bucket rate.
:nreq s/Number
:timeout s/Number
;; The rate is changed after :nreq requests have been processed or :timeout ms have passed, whichever comes first.
:alpha s/Number
;; The 90th-percentile latency is tracked as a smoothed average - this is the smoothing factor.
}) | ||||||||||
(def default-latency-algorithm-parameters
{:dec-factor 1.2,
:dec-threshold 0.0,
:inc-factor 2,
:inc-threshold -0.005, ; -0.5%
:inc-weight -0.1,
:min-rate 0.05,
:max-rate 5000,
:nreq 100,
:timeout 1000, ; milliseconds
:alpha 0.7}) | ||||||||||
(def TokenBucketParameters
{:tokens s/Number
;; The number of tokens currently available in this bucket (each of which allows one request to be processed).
:max s/Number ; The maximum number of tokens this bucket can hold.
:rate s/Number
;; The rate at which this bucket is refilled (tokens per second). Varied by the middleware based on latency.
:last-time s/Number
;; The last time this bucket was refilled (a timestamp in milliseconds)
}) | ||||||||||
(def default-token-bucket-parameters
{:tokens 100,
:max 5000,
:last-time -1 ; The wrapper sets it to the current timestamp when created
:rate 10}) | ||||||||||
(ns overload-middleware.utils) | ||||||||||
(defn with-latency [f & args]
(let [start (System/currentTimeMillis)
result (apply f args)
end (System/currentTimeMillis)]
[(- end start) result])) | ||||||||||
(defn with-nano-latency [f]
(let [start (System/nanoTime)
result (f)
end (System/nanoTime)]
[(- end start) result])) | ||||||||||
(defn abs [num] (if (neg? num) (- 0 num) num)) | ||||||||||
(defn get-time [] (System/currentTimeMillis)) | ||||||||||