overload-middleware0.1.1Clojure/Ring middleware handler to automatically apply overload control based on a target latency dependencies
| (this space intentionally left almost blank) | |||||||||
(ns overload-middleware.bucket (:require [overload-middleware.schemas :refer [TokenBucketParameters]] [overload-middleware.utils :refer [get-time]] [schema.core :as s])) | ||||||||||
(s/defn replenish-underlying [bucket :- TokenBucketParameters] (let [now (get-time) difference-in-secs (/ (- now (:last-time bucket)) 1000) increase (* (:rate bucket) difference-in-secs) new-tokens (min (:max bucket) (+ (:tokens bucket) increase))] (-> bucket (assoc :tokens new-tokens) (assoc :last-time now)))) | ||||||||||
(s/defn replenish [bucket-ref :- clojure.lang.Ref] (dosync (alter bucket-ref replenish-underlying))) | ||||||||||
(s/defn dec-token [bucket :- TokenBucketParameters] (assoc bucket :tokens (dec (:tokens bucket)))) | ||||||||||
(s/defn get-token [bucket-ref :- clojure.lang.Ref] (dosync (if (>= 0 (:tokens @bucket-ref)) nil (commute bucket-ref dec-token)))) | ||||||||||
(s/defn get-token-with-replenishment [bucket-ref :- clojure.lang.Ref] (replenish bucket-ref) (get-token bucket-ref)) | ||||||||||
(ns overload-middleware.latency (:require [overload-middleware.utils :refer [abs]] [overload-middleware.schemas :refer [LatencyAlgorithmParameters]] [schema.core :as s])) | ||||||||||
(s/defn sensible-parameters? [{:keys [dec-factor dec-threshold inc-factor inc-threshold inc-weight min-rate max-rate nreq timeout alpha]} :- LatencyAlgorithmParameters] (and (< 1 dec-factor) (pos? inc-factor) (>= dec-threshold 0) (neg? inc-threshold) (<= inc-weight 0) (> max-rate min-rate) (pos? min-rate) (>= alpha 0) (= 0 (rem nreq 10)) (pos? timeout))) | ||||||||||
(s/defn calculate-new-rate [current-rate :- s/Number {:keys [dec-factor dec-threshold inc-factor inc-threshold inc-weight min-rate max-rate] :as parameters} :- LatencyAlgorithmParameters error :- s/Number overloads :- java.lang.Boolean] {:pre [(sensible-parameters? parameters)]} (let [decreased-rate (-> current-rate (/ dec-factor) (max min-rate)) increased-rate (-> error (- inc-weight) (abs) (* inc-factor) (+ current-rate) (min max-rate))] (cond (or overloads (> error dec-threshold)) decreased-rate (< error inc-threshold) increased-rate :else current-rate))) | ||||||||||
(s/defn add-latency-sample [latency-ref :- clojure.lang.Ref parameters :- LatencyAlgorithmParameters sample :- s/Number] {:pre [(sensible-parameters? parameters) (>= sample 0)] :post [vector? #(= 2 (count %))]} (dosync (let [new-recent-latencies (conj (:recent-latencies @latency-ref) sample) nreq (count new-recent-latencies) nintieth-percentile-index (max 0 (dec (int (* 0.9 nreq)))) result (nth (sort new-recent-latencies) nintieth-percentile-index) nreq-reached (= (:nreq parameters) nreq) timer-popped (< (- (System/currentTimeMillis) 1000) (:last-calculation @latency-ref))] (if (or nreq-reached timer-popped) (do (alter latency-ref assoc :recent-latencies []) [true result]) (do (alter latency-ref assoc :recent-latencies new-recent-latencies) [false nil]))))) | ||||||||||
(s/defn update-latency-estimate [latency-ref :- clojure.lang.Ref parameters :- LatencyAlgorithmParameters new-data :- s/Number] {:pre [(sensible-parameters? parameters) (>= new-data 0)]} (dosync (let [{:keys [alpha]} parameters target (:target-latency @latency-ref) smoothed-estimate (-> (:smoothed-estimate @latency-ref) (* alpha) (+ (* (- 1 alpha) new-data))) ] (alter latency-ref assoc :smoothed-estimate smoothed-estimate)))) | ||||||||||
(ns overload-middleware.middleware (:require [overload-middleware.bucket :refer :all] [overload-middleware.latency :refer :all] [overload-middleware.utils :refer :all] [overload-middleware.schemas :refer :all])) | ||||||||||
(declare register-new-latency) | ||||||||||
(defn wrap-overload [app {:keys [target-latency override-algorithm-parameters override-bucket-parameters]}] {:pre [(number? target-latency) (pos? target-latency)]} (let [algorithm-parameters (merge default-latency-algorithm-parameters override-algorithm-parameters) bucket-parameters (assoc (merge default-token-bucket-parameters override-bucket-parameters) :last-time (get-time)) bucket (ref bucket-parameters) latency-info (ref {:target-latency target-latency :smoothed-estimate target-latency :recent-latencies [] :recent-overloads false :last-calculation (get-time)})] (fn [req] (if (get-token-with-replenishment bucket) (let [[latency {:keys [status] :as result}] (with-latency app req)] (if (= 503 status) (dosync (alter latency-info assoc :recent-overloads true))) (register-new-latency latency-info algorithm-parameters bucket latency) result) {:status 503 :body "Overloaded"})))) | ||||||||||
(defn register-new-latency [latency-info algorithm-parameters bucket new-latency] (let [[do-update ninetieth-percentile] (add-latency-sample latency-info algorithm-parameters new-latency)] (if do-update (dosync (update-latency-estimate latency-info algorithm-parameters ninetieth-percentile) (let [target (:target-latency @latency-info) err (-> (:smoothed-estimate @latency-info) (- target) (/ target))] (alter bucket assoc :rate (calculate-new-rate (:rate @bucket) algorithm-parameters err (:recent-overloads @latency-info)))) (alter latency-info assoc :recent-overloads false))))) | ||||||||||
(ns overload-middleware.schemas (:require [schema.core :as s] [overload-middleware.utils :refer [get-time]])) | ||||||||||
(def LatencyAlgorithmParameters {:dec-factor s/Number ;; Multiplicative factor controlling how the rate is decreased. :dec-threshold s/Number ;; The required % difference between the average latency and the target latency for the bucket rate to be reduced. :inc-factor s/Number ;; Additive factor controlling how much the rate is increased. :inc-threshold s/Number ;; The required % difference between the average latency and the ;; target latency for the bucket rate to be reduced. :inc-weight s/Number ;; Weight affecting how much impact the % difference has on the change in the bucket rate. :min-rate s/Number ; Minimum bucket rate. :max-rate s/Number ; Maximum bucket rate. :nreq s/Number :timeout s/Number ;; The rate is changed after :nreq requests have been processed or :timeout ms have passed, whichever comes first. :alpha s/Number ;; The 90th-percentile latency is tracked as a smoothed average - this is the smoothing factor. }) | ||||||||||
(def default-latency-algorithm-parameters {:dec-factor 1.2, :dec-threshold 0.0, :inc-factor 2, :inc-threshold -0.005, ; -0.5% :inc-weight -0.1, :min-rate 0.05, :max-rate 5000, :nreq 100, :timeout 1000, ; milliseconds :alpha 0.7}) | ||||||||||
(def TokenBucketParameters {:tokens s/Number ;; The number of tokens currently available in this bucket (each of which allows one request to be processed). :max s/Number ; The maximum number of tokens this bucket can hold. :rate s/Number ;; The rate at which this bucket is refilled (tokens per second). Varied by the middleware based on latency. :last-time s/Number ;; The last time this bucket was refilled (a timestamp in milliseconds) }) | ||||||||||
(def default-token-bucket-parameters {:tokens 100, :max 5000, :last-time -1 ; The wrapper sets it to the current timestamp when created :rate 10}) | ||||||||||
(ns overload-middleware.utils) | ||||||||||
(defn with-latency [f & args] (let [start (System/currentTimeMillis) result (apply f args) end (System/currentTimeMillis)] [(- end start) result])) | ||||||||||
(defn with-nano-latency [f] (let [start (System/nanoTime) result (f) end (System/nanoTime)] [(- end start) result])) | ||||||||||
(defn abs [num] (if (neg? num) (- 0 num) num)) | ||||||||||
(defn get-time [] (System/currentTimeMillis)) | ||||||||||