setup integration test

This commit is contained in:
softprops 2019-10-20 17:50:35 -04:00
parent 1a522d88d8
commit 845942e04a
555 changed files with 103819 additions and 1 deletions

39
node_modules/bottleneck/src/Batcher.coffee generated vendored Normal file
View file

@ -0,0 +1,39 @@
parser = require "./parser"
Events = require "./Events"
class Batcher
defaults:
maxTime: null
maxSize: null
Promise: Promise
constructor: (@options={}) ->
parser.load @options, @defaults, @
@Events = new Events @
@_arr = []
@_resetPromise()
@_lastFlush = Date.now()
_resetPromise: ->
@_promise = new @Promise (res, rej) => @_resolve = res
_flush: ->
clearTimeout @_timeout
@_lastFlush = Date.now()
@_resolve()
@Events.trigger "batch", @_arr
@_arr = []
@_resetPromise()
add: (data) ->
@_arr.push data
ret = @_promise
if @_arr.length == @maxSize
@_flush()
else if @maxTime? and @_arr.length == 1
@_timeout = setTimeout =>
@_flush()
, @maxTime
ret
module.exports = Batcher

298
node_modules/bottleneck/src/Bottleneck.coffee generated vendored Normal file
View file

@ -0,0 +1,298 @@
NUM_PRIORITIES = 10
DEFAULT_PRIORITY = 5
parser = require "./parser"
Queues = require "./Queues"
Job = require "./Job"
LocalDatastore = require "./LocalDatastore"
RedisDatastore = require "./RedisDatastore"
Events = require "./Events"
States = require "./States"
Sync = require "./Sync"
class Bottleneck
Bottleneck.default = Bottleneck
Bottleneck.Events = Events
Bottleneck.version = Bottleneck::version = require("./version.json").version
Bottleneck.strategy = Bottleneck::strategy = { LEAK:1, OVERFLOW:2, OVERFLOW_PRIORITY:4, BLOCK:3 }
Bottleneck.BottleneckError = Bottleneck::BottleneckError = require "./BottleneckError"
Bottleneck.Group = Bottleneck::Group = require "./Group"
Bottleneck.RedisConnection = Bottleneck::RedisConnection = require "./RedisConnection"
Bottleneck.IORedisConnection = Bottleneck::IORedisConnection = require "./IORedisConnection"
Bottleneck.Batcher = Bottleneck::Batcher = require "./Batcher"
jobDefaults:
priority: DEFAULT_PRIORITY
weight: 1
expiration: null
id: "<no-id>"
storeDefaults:
maxConcurrent: null
minTime: 0
highWater: null
strategy: Bottleneck::strategy.LEAK
penalty: null
reservoir: null
reservoirRefreshInterval: null
reservoirRefreshAmount: null
reservoirIncreaseInterval: null
reservoirIncreaseAmount: null
reservoirIncreaseMaximum: null
localStoreDefaults:
Promise: Promise
timeout: null
heartbeatInterval: 250
redisStoreDefaults:
Promise: Promise
timeout: null
heartbeatInterval: 5000
clientTimeout: 10000
Redis: null
clientOptions: {}
clusterNodes: null
clearDatastore: false
connection: null
instanceDefaults:
datastore: "local"
connection: null
id: "<no-id>"
rejectOnDrop: true
trackDoneStatus: false
Promise: Promise
stopDefaults:
enqueueErrorMessage: "This limiter has been stopped and cannot accept new jobs."
dropWaitingJobs: true
dropErrorMessage: "This limiter has been stopped."
constructor: (options={}, invalid...) ->
@_validateOptions options, invalid
parser.load options, @instanceDefaults, @
@_queues = new Queues NUM_PRIORITIES
@_scheduled = {}
@_states = new States ["RECEIVED", "QUEUED", "RUNNING", "EXECUTING"].concat(if @trackDoneStatus then ["DONE"] else [])
@_limiter = null
@Events = new Events @
@_submitLock = new Sync "submit", @Promise
@_registerLock = new Sync "register", @Promise
storeOptions = parser.load options, @storeDefaults, {}
@_store = if @datastore == "redis" or @datastore == "ioredis" or @connection?
storeInstanceOptions = parser.load options, @redisStoreDefaults, {}
new RedisDatastore @, storeOptions, storeInstanceOptions
else if @datastore == "local"
storeInstanceOptions = parser.load options, @localStoreDefaults, {}
new LocalDatastore @, storeOptions, storeInstanceOptions
else
throw new Bottleneck::BottleneckError "Invalid datastore type: #{@datastore}"
@_queues.on "leftzero", => @_store.heartbeat?.ref?()
@_queues.on "zero", => @_store.heartbeat?.unref?()
_validateOptions: (options, invalid) ->
unless options? and typeof options == "object" and invalid.length == 0
throw new Bottleneck::BottleneckError "Bottleneck v2 takes a single object argument. Refer to https://github.com/SGrondin/bottleneck#upgrading-to-v2 if you're upgrading from Bottleneck v1."
ready: -> @_store.ready
clients: -> @_store.clients
channel: -> "b_#{@id}"
channel_client: -> "b_#{@id}_#{@_store.clientId}"
publish: (message) -> @_store.__publish__ message
disconnect: (flush=true) -> @_store.__disconnect__ flush
chain: (@_limiter) -> @
queued: (priority) -> @_queues.queued priority
clusterQueued: -> @_store.__queued__()
empty: -> @queued() == 0 and @_submitLock.isEmpty()
running: -> @_store.__running__()
done: -> @_store.__done__()
jobStatus: (id) -> @_states.jobStatus id
jobs: (status) -> @_states.statusJobs status
counts: -> @_states.statusCounts()
_randomIndex: -> Math.random().toString(36).slice(2)
check: (weight=1) -> @_store.__check__ weight
_clearGlobalState: (index) ->
if @_scheduled[index]?
clearTimeout @_scheduled[index].expiration
delete @_scheduled[index]
true
else false
_free: (index, job, options, eventInfo) ->
try
{ running } = await @_store.__free__ index, options.weight
@Events.trigger "debug", "Freed #{options.id}", eventInfo
if running == 0 and @empty() then @Events.trigger "idle"
catch e
@Events.trigger "error", e
_run: (index, job, wait) ->
job.doRun()
clearGlobalState = @_clearGlobalState.bind @, index
run = @_run.bind @, index, job
free = @_free.bind @, index, job
@_scheduled[index] =
timeout: setTimeout =>
job.doExecute @_limiter, clearGlobalState, run, free
, wait
expiration: if job.options.expiration? then setTimeout ->
job.doExpire clearGlobalState, run, free
, wait + job.options.expiration
job: job
_drainOne: (capacity) ->
@_registerLock.schedule =>
if @queued() == 0 then return @Promise.resolve null
queue = @_queues.getFirst()
{ options, args } = next = queue.first()
if capacity? and options.weight > capacity then return @Promise.resolve null
@Events.trigger "debug", "Draining #{options.id}", { args, options }
index = @_randomIndex()
@_store.__register__ index, options.weight, options.expiration
.then ({ success, wait, reservoir }) =>
@Events.trigger "debug", "Drained #{options.id}", { success, args, options }
if success
queue.shift()
empty = @empty()
if empty then @Events.trigger "empty"
if reservoir == 0 then @Events.trigger "depleted", empty
@_run index, next, wait
@Promise.resolve options.weight
else
@Promise.resolve null
_drainAll: (capacity, total=0) ->
@_drainOne(capacity)
.then (drained) =>
if drained?
newCapacity = if capacity? then capacity - drained else capacity
@_drainAll(newCapacity, total + drained)
else @Promise.resolve total
.catch (e) => @Events.trigger "error", e
_dropAllQueued: (message) -> @_queues.shiftAll (job) -> job.doDrop { message }
stop: (options={}) ->
options = parser.load options, @stopDefaults
waitForExecuting = (at) =>
finished = =>
counts = @_states.counts
(counts[0] + counts[1] + counts[2] + counts[3]) == at
new @Promise (resolve, reject) =>
if finished() then resolve()
else
@on "done", =>
if finished()
@removeAllListeners "done"
resolve()
done = if options.dropWaitingJobs
@_run = (index, next) -> next.doDrop { message: options.dropErrorMessage }
@_drainOne = => @Promise.resolve null
@_registerLock.schedule => @_submitLock.schedule =>
for k, v of @_scheduled
if @jobStatus(v.job.options.id) == "RUNNING"
clearTimeout v.timeout
clearTimeout v.expiration
v.job.doDrop { message: options.dropErrorMessage }
@_dropAllQueued options.dropErrorMessage
waitForExecuting(0)
else
@schedule { priority: NUM_PRIORITIES - 1, weight: 0 }, => waitForExecuting(1)
@_receive = (job) -> job._reject new Bottleneck::BottleneckError options.enqueueErrorMessage
@stop = => @Promise.reject new Bottleneck::BottleneckError "stop() has already been called"
done
_addToQueue: (job) =>
{ args, options } = job
try
{ reachedHWM, blocked, strategy } = await @_store.__submit__ @queued(), options.weight
catch error
@Events.trigger "debug", "Could not queue #{options.id}", { args, options, error }
job.doDrop { error }
return false
if blocked
job.doDrop()
return true
else if reachedHWM
shifted = if strategy == Bottleneck::strategy.LEAK then @_queues.shiftLastFrom(options.priority)
else if strategy == Bottleneck::strategy.OVERFLOW_PRIORITY then @_queues.shiftLastFrom(options.priority + 1)
else if strategy == Bottleneck::strategy.OVERFLOW then job
if shifted? then shifted.doDrop()
if not shifted? or strategy == Bottleneck::strategy.OVERFLOW
if not shifted? then job.doDrop()
return reachedHWM
job.doQueue reachedHWM, blocked
@_queues.push job
await @_drainAll()
reachedHWM
_receive: (job) ->
if @_states.jobStatus(job.options.id)?
job._reject new Bottleneck::BottleneckError "A job with the same id already exists (id=#{job.options.id})"
false
else
job.doReceive()
@_submitLock.schedule @_addToQueue, job
submit: (args...) ->
if typeof args[0] == "function"
[fn, args..., cb] = args
options = parser.load {}, @jobDefaults
else
[options, fn, args..., cb] = args
options = parser.load options, @jobDefaults
task = (args...) =>
new @Promise (resolve, reject) ->
fn args..., (args...) ->
(if args[0]? then reject else resolve) args
job = new Job task, args, options, @jobDefaults, @rejectOnDrop, @Events, @_states, @Promise
job.promise
.then (args) -> cb? args...
.catch (args) -> if Array.isArray args then cb? args... else cb? args
@_receive job
schedule: (args...) ->
if typeof args[0] == "function"
[task, args...] = args
options = {}
else
[options, task, args...] = args
job = new Job task, args, options, @jobDefaults, @rejectOnDrop, @Events, @_states, @Promise
@_receive job
job.promise
wrap: (fn) ->
schedule = @schedule.bind @
wrapped = (args...) -> schedule fn.bind(@), args...
wrapped.withOptions = (options, args...) -> schedule options, fn, args...
wrapped
updateSettings: (options={}) ->
await @_store.__updateSettings__ parser.overwrite options, @storeDefaults
parser.overwrite options, @instanceDefaults, @
@
currentReservoir: -> @_store.__currentReservoir__()
incrementReservoir: (incr=0) -> @_store.__incrementReservoir__ incr
module.exports = Bottleneck

3
node_modules/bottleneck/src/BottleneckError.coffee generated vendored Normal file
View file

@ -0,0 +1,3 @@
class BottleneckError extends Error
module.exports = BottleneckError

38
node_modules/bottleneck/src/DLList.coffee generated vendored Normal file
View file

@ -0,0 +1,38 @@
class DLList
constructor: (@incr, @decr) ->
@_first = null
@_last = null
@length = 0
push: (value) ->
@length++
@incr?()
node = { value, prev: @_last, next: null }
if @_last?
@_last.next = node
@_last = node
else @_first = @_last = node
undefined
shift: () ->
if not @_first? then return
else
@length--
@decr?()
value = @_first.value
if (@_first = @_first.next)?
@_first.prev = null
else
@_last = null
value
first: () -> if @_first? then @_first.value
getArray: () ->
node = @_first
while node? then (ref = node; node = node.next; ref.value)
forEachShift: (cb) ->
node = @shift()
while node? then (cb node; node = @shift())
undefined
debug: () ->
node = @_first
while node? then (ref = node; node = node.next; { value: ref.value, prev: ref.prev?.value, next: ref.next?.value })
module.exports = DLList

38
node_modules/bottleneck/src/Events.coffee generated vendored Normal file
View file

@ -0,0 +1,38 @@
class Events
constructor: (@instance) ->
@_events = {}
if @instance.on? or @instance.once? or @instance.removeAllListeners?
throw new Error "An Emitter already exists for this object"
@instance.on = (name, cb) => @_addListener name, "many", cb
@instance.once = (name, cb) => @_addListener name, "once", cb
@instance.removeAllListeners = (name=null) =>
if name? then delete @_events[name] else @_events = {}
_addListener: (name, status, cb) ->
@_events[name] ?= []
@_events[name].push {cb, status}
@instance
listenerCount: (name) ->
if @_events[name]? then @_events[name].length else 0
trigger: (name, args...) ->
try
if name != "debug" then @trigger "debug", "Event triggered: #{name}", args
return unless @_events[name]?
@_events[name] = @_events[name].filter (listener) -> listener.status != "none"
promises = @_events[name].map (listener) =>
return if listener.status == "none"
if listener.status == "once" then listener.status = "none"
try
returned = listener.cb?(args...)
if typeof returned?.then == "function"
await returned
else
returned
catch e
if "name" != "error" then @trigger "error", e
null
(await Promise.all promises).find (x) -> x?
catch e
if "name" != "error" then @trigger "error", e
null
module.exports = Events

80
node_modules/bottleneck/src/Group.coffee generated vendored Normal file
View file

@ -0,0 +1,80 @@
parser = require "./parser"
Events = require "./Events"
RedisConnection = require "./RedisConnection"
IORedisConnection = require "./IORedisConnection"
Scripts = require "./Scripts"
class Group
defaults:
timeout: 1000 * 60 * 5
connection: null
Promise: Promise
id: "group-key"
constructor: (@limiterOptions={}) ->
parser.load @limiterOptions, @defaults, @
@Events = new Events @
@instances = {}
@Bottleneck = require "./Bottleneck"
@_startAutoCleanup()
@sharedConnection = @connection?
if !@connection?
if @limiterOptions.datastore == "redis"
@connection = new RedisConnection Object.assign {}, @limiterOptions, { @Events }
else if @limiterOptions.datastore == "ioredis"
@connection = new IORedisConnection Object.assign {}, @limiterOptions, { @Events }
key: (key="") -> @instances[key] ? do =>
limiter = @instances[key] = new @Bottleneck Object.assign @limiterOptions, {
id: "#{@id}-#{key}",
@timeout,
@connection
}
@Events.trigger "created", limiter, key
limiter
deleteKey: (key="") =>
instance = @instances[key]
if @connection
deleted = await @connection.__runCommand__ ['del', Scripts.allKeys("#{@id}-#{key}")...]
if instance?
delete @instances[key]
await instance.disconnect()
instance? or deleted > 0
limiters: -> { key: k, limiter: v } for k, v of @instances
keys: -> Object.keys @instances
clusterKeys: ->
if !@connection? then return @Promise.resolve @keys()
keys = []
cursor = null
start = "b_#{@id}-".length
end = "_settings".length
until cursor == 0
[next, found] = await @connection.__runCommand__ ["scan", (cursor ? 0), "match", "b_#{@id}-*_settings", "count", 10000]
cursor = ~~next
keys.push(k.slice(start, -end)) for k in found
keys
_startAutoCleanup: ->
clearInterval @interval
(@interval = setInterval =>
time = Date.now()
for k, v of @instances
try if await v._store.__groupCheck__(time) then @deleteKey k
catch e then v.Events.trigger "error", e
, (@timeout / 2)).unref?()
updateSettings: (options={}) ->
parser.overwrite options, @defaults, @
parser.overwrite options, options, @limiterOptions
@_startAutoCleanup() if options.timeout?
disconnect: (flush=true) ->
if !@sharedConnection
@connection?.disconnect flush
module.exports = Group

84
node_modules/bottleneck/src/IORedisConnection.coffee generated vendored Normal file
View file

@ -0,0 +1,84 @@
parser = require "./parser"
Events = require "./Events"
Scripts = require "./Scripts"
class IORedisConnection
datastore: "ioredis"
defaults:
Redis: null
clientOptions: {}
clusterNodes: null
client: null
Promise: Promise
Events: null
constructor: (options={}) ->
parser.load options, @defaults, @
@Redis ?= eval("require")("ioredis") # Obfuscated or else Webpack/Angular will try to inline the optional ioredis module. To override this behavior: pass the ioredis module to Bottleneck as the 'Redis' option.
@Events ?= new Events @
@terminated = false
if @clusterNodes?
@client = new @Redis.Cluster @clusterNodes, @clientOptions
@subscriber = new @Redis.Cluster @clusterNodes, @clientOptions
else if @client? and !@client.duplicate?
@subscriber = new @Redis.Cluster @client.startupNodes, @client.options
else
@client ?= new @Redis @clientOptions
@subscriber = @client.duplicate()
@limiters = {}
@ready = @Promise.all [@_setup(@client, false), @_setup(@subscriber, true)]
.then =>
@_loadScripts()
{ @client, @subscriber }
_setup: (client, sub) ->
client.setMaxListeners 0
new @Promise (resolve, reject) =>
client.on "error", (e) => @Events.trigger "error", e
if sub
client.on "message", (channel, message) =>
@limiters[channel]?._store.onMessage channel, message
if client.status == "ready" then resolve()
else client.once "ready", resolve
_loadScripts: -> Scripts.names.forEach (name) => @client.defineCommand name, { lua: Scripts.payload(name) }
__runCommand__: (cmd) ->
await @ready
[[_, deleted]] = await @client.pipeline([cmd]).exec()
deleted
__addLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
new @Promise (resolve, reject) =>
@subscriber.subscribe channel, =>
@limiters[channel] = instance
resolve()
__removeLimiter__: (instance) ->
[instance.channel(), instance.channel_client()].forEach (channel) =>
await @subscriber.unsubscribe channel unless @terminated
delete @limiters[channel]
__scriptArgs__: (name, id, args, cb) ->
keys = Scripts.keys name, id
[keys.length].concat keys, args, cb
__scriptFn__: (name) ->
@client[name].bind(@client)
disconnect: (flush=true) ->
clearInterval(@limiters[k]._store.heartbeat) for k in Object.keys @limiters
@limiters = {}
@terminated = true
if flush
@Promise.all [@client.quit(), @subscriber.quit()]
else
@client.disconnect()
@subscriber.disconnect()
@Promise.resolve()
module.exports = IORedisConnection

98
node_modules/bottleneck/src/Job.coffee generated vendored Normal file
View file

@ -0,0 +1,98 @@
NUM_PRIORITIES = 10
DEFAULT_PRIORITY = 5
parser = require "./parser"
BottleneckError = require "./BottleneckError"
class Job
constructor: (@task, @args, options, jobDefaults, @rejectOnDrop, @Events, @_states, @Promise) ->
@options = parser.load options, jobDefaults
@options.priority = @_sanitizePriority @options.priority
if @options.id == jobDefaults.id then @options.id = "#{@options.id}-#{@_randomIndex()}"
@promise = new @Promise (@_resolve, @_reject) =>
@retryCount = 0
_sanitizePriority: (priority) ->
sProperty = if ~~priority != priority then DEFAULT_PRIORITY else priority
if sProperty < 0 then 0 else if sProperty > NUM_PRIORITIES-1 then NUM_PRIORITIES-1 else sProperty
_randomIndex: -> Math.random().toString(36).slice(2)
doDrop: ({ error, message="This job has been dropped by Bottleneck" } = {}) ->
if @_states.remove @options.id
if @rejectOnDrop then @_reject (error ? new BottleneckError message)
@Events.trigger "dropped", { @args, @options, @task, @promise }
true
else
false
_assertStatus: (expected) ->
status = @_states.jobStatus @options.id
if not (status == expected or (expected == "DONE" and status == null))
throw new BottleneckError "Invalid job status #{status}, expected #{expected}. Please open an issue at https://github.com/SGrondin/bottleneck/issues"
doReceive: () ->
@_states.start @options.id
@Events.trigger "received", { @args, @options }
doQueue: (reachedHWM, blocked) ->
@_assertStatus "RECEIVED"
@_states.next @options.id
@Events.trigger "queued", { @args, @options, reachedHWM, blocked }
doRun: () ->
if @retryCount == 0
@_assertStatus "QUEUED"
@_states.next @options.id
else @_assertStatus "EXECUTING"
@Events.trigger "scheduled", { @args, @options }
doExecute: (chained, clearGlobalState, run, free) ->
if @retryCount == 0
@_assertStatus "RUNNING"
@_states.next @options.id
else @_assertStatus "EXECUTING"
eventInfo = { @args, @options, @retryCount }
@Events.trigger "executing", eventInfo
try
passed = await if chained?
chained.schedule @options, @task, @args...
else @task @args...
if clearGlobalState()
@doDone eventInfo
await free @options, eventInfo
@_assertStatus "DONE"
@_resolve passed
catch error
@_onFailure error, eventInfo, clearGlobalState, run, free
doExpire: (clearGlobalState, run, free) ->
if @_states.jobStatus @options.id == "RUNNING"
@_states.next @options.id
@_assertStatus "EXECUTING"
eventInfo = { @args, @options, @retryCount }
error = new BottleneckError "This job timed out after #{@options.expiration} ms."
@_onFailure error, eventInfo, clearGlobalState, run, free
_onFailure: (error, eventInfo, clearGlobalState, run, free) ->
if clearGlobalState()
retry = await @Events.trigger "failed", error, eventInfo
if retry?
retryAfter = ~~retry
@Events.trigger "retry", "Retrying #{@options.id} after #{retryAfter} ms", eventInfo
@retryCount++
run retryAfter
else
@doDone eventInfo
await free @options, eventInfo
@_assertStatus "DONE"
@_reject error
doDone: (eventInfo) ->
@_assertStatus "EXECUTING"
@_states.next @options.id
@Events.trigger "done", eventInfo
module.exports = Job

140
node_modules/bottleneck/src/LocalDatastore.coffee generated vendored Normal file
View file

@ -0,0 +1,140 @@
parser = require "./parser"
BottleneckError = require "./BottleneckError"
class LocalDatastore
constructor: (@instance, @storeOptions, storeInstanceOptions) ->
@clientId = @instance._randomIndex()
parser.load storeInstanceOptions, storeInstanceOptions, @
@_nextRequest = @_lastReservoirRefresh = @_lastReservoirIncrease = Date.now()
@_running = 0
@_done = 0
@_unblockTime = 0
@ready = @Promise.resolve()
@clients = {}
@_startHeartbeat()
_startHeartbeat: ->
if !@heartbeat? and ((
@storeOptions.reservoirRefreshInterval? and @storeOptions.reservoirRefreshAmount?
) or (
@storeOptions.reservoirIncreaseInterval? and @storeOptions.reservoirIncreaseAmount?
))
(@heartbeat = setInterval =>
now = Date.now()
if @storeOptions.reservoirRefreshInterval? and now >= @_lastReservoirRefresh + @storeOptions.reservoirRefreshInterval
@_lastReservoirRefresh = now
@storeOptions.reservoir = @storeOptions.reservoirRefreshAmount
@instance._drainAll @computeCapacity()
if @storeOptions.reservoirIncreaseInterval? and now >= @_lastReservoirIncrease + @storeOptions.reservoirIncreaseInterval
{ reservoirIncreaseAmount: amount, reservoirIncreaseMaximum: maximum, reservoir } = @storeOptions
@_lastReservoirIncrease = now
incr = if maximum? then Math.min amount, maximum - reservoir else amount
if incr > 0
@storeOptions.reservoir += incr
@instance._drainAll @computeCapacity()
, @heartbeatInterval).unref?()
else clearInterval @heartbeat
__publish__: (message) ->
await @yieldLoop()
@instance.Events.trigger "message", message.toString()
__disconnect__: (flush) ->
await @yieldLoop()
clearInterval @heartbeat
@Promise.resolve()
yieldLoop: (t=0) -> new @Promise (resolve, reject) -> setTimeout resolve, t
computePenalty: -> @storeOptions.penalty ? ((15 * @storeOptions.minTime) or 5000)
__updateSettings__: (options) ->
await @yieldLoop()
parser.overwrite options, options, @storeOptions
@_startHeartbeat()
@instance._drainAll @computeCapacity()
true
__running__: ->
await @yieldLoop()
@_running
__queued__: ->
await @yieldLoop()
@instance.queued()
__done__: ->
await @yieldLoop()
@_done
__groupCheck__: (time) ->
await @yieldLoop()
(@_nextRequest + @timeout) < time
computeCapacity: ->
{ maxConcurrent, reservoir } = @storeOptions
if maxConcurrent? and reservoir? then Math.min((maxConcurrent - @_running), reservoir)
else if maxConcurrent? then maxConcurrent - @_running
else if reservoir? then reservoir
else null
conditionsCheck: (weight) ->
capacity = @computeCapacity()
not capacity? or weight <= capacity
__incrementReservoir__: (incr) ->
await @yieldLoop()
reservoir = @storeOptions.reservoir += incr
@instance._drainAll @computeCapacity()
reservoir
__currentReservoir__: ->
await @yieldLoop()
@storeOptions.reservoir
isBlocked: (now) -> @_unblockTime >= now
check: (weight, now) -> @conditionsCheck(weight) and (@_nextRequest - now) <= 0
__check__: (weight) ->
await @yieldLoop()
now = Date.now()
@check weight, now
__register__: (index, weight, expiration) ->
await @yieldLoop()
now = Date.now()
if @conditionsCheck weight
@_running += weight
if @storeOptions.reservoir? then @storeOptions.reservoir -= weight
wait = Math.max @_nextRequest - now, 0
@_nextRequest = now + wait + @storeOptions.minTime
{ success: true, wait, reservoir: @storeOptions.reservoir }
else { success: false }
strategyIsBlock: -> @storeOptions.strategy == 3
__submit__: (queueLength, weight) ->
await @yieldLoop()
if @storeOptions.maxConcurrent? and weight > @storeOptions.maxConcurrent
throw new BottleneckError("Impossible to add a job having a weight of #{weight} to a limiter having a maxConcurrent setting of #{@storeOptions.maxConcurrent}")
now = Date.now()
reachedHWM = @storeOptions.highWater? and queueLength == @storeOptions.highWater and not @check(weight, now)
blocked = @strategyIsBlock() and (reachedHWM or @isBlocked now)
if blocked
@_unblockTime = now + @computePenalty()
@_nextRequest = @_unblockTime + @storeOptions.minTime
@instance._dropAllQueued()
{ reachedHWM, blocked, strategy: @storeOptions.strategy }
__free__: (index, weight) ->
await @yieldLoop()
@_running -= weight
@_done += weight
@instance._drainAll @computeCapacity()
{ running: @_running }
module.exports = LocalDatastore

28
node_modules/bottleneck/src/Queues.coffee generated vendored Normal file
View file

@ -0,0 +1,28 @@
DLList = require "./DLList"
Events = require "./Events"
class Queues
constructor: (num_priorities) ->
@Events = new Events @
@_length = 0
@_lists = for i in [1..num_priorities] then new DLList (=> @incr()), (=> @decr())
incr: -> if @_length++ == 0 then @Events.trigger "leftzero"
decr: -> if --@_length == 0 then @Events.trigger "zero"
push: (job) -> @_lists[job.options.priority].push job
queued: (priority) -> if priority? then @_lists[priority].length else @_length
shiftAll: (fn) -> @_lists.forEach (list) -> list.forEachShift fn
getFirst: (arr=@_lists) ->
for list in arr
return list if list.length > 0
[]
shiftLastFrom: (priority) -> @getFirst(@_lists[priority..].reverse()).shift()
module.exports = Queues

91
node_modules/bottleneck/src/RedisConnection.coffee generated vendored Normal file
View file

@ -0,0 +1,91 @@
parser = require "./parser"
Events = require "./Events"
Scripts = require "./Scripts"
class RedisConnection
datastore: "redis"
defaults:
Redis: null
clientOptions: {}
client: null
Promise: Promise
Events: null
constructor: (options={}) ->
parser.load options, @defaults, @
@Redis ?= eval("require")("redis") # Obfuscated or else Webpack/Angular will try to inline the optional redis module. To override this behavior: pass the redis module to Bottleneck as the 'Redis' option.
@Events ?= new Events @
@terminated = false
@client ?= @Redis.createClient @clientOptions
@subscriber = @client.duplicate()
@limiters = {}
@shas = {}
@ready = @Promise.all [@_setup(@client, false), @_setup(@subscriber, true)]
.then => @_loadScripts()
.then => { @client, @subscriber }
_setup: (client, sub) ->
client.setMaxListeners 0
new @Promise (resolve, reject) =>
client.on "error", (e) => @Events.trigger "error", e
if sub
client.on "message", (channel, message) =>
@limiters[channel]?._store.onMessage channel, message
if client.ready then resolve()
else client.once "ready", resolve
_loadScript: (name) ->
new @Promise (resolve, reject) =>
payload = Scripts.payload name
@client.multi([["script", "load", payload]]).exec (err, replies) =>
if err? then return reject err
@shas[name] = replies[0]
resolve replies[0]
_loadScripts: -> @Promise.all(Scripts.names.map (k) => @_loadScript k)
__runCommand__: (cmd) ->
await @ready
new @Promise (resolve, reject) =>
@client.multi([cmd]).exec_atomic (err, replies) ->
if err? then reject(err) else resolve(replies[0])
__addLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
new @Promise (resolve, reject) =>
handler = (chan) =>
if chan == channel
@subscriber.removeListener "subscribe", handler
@limiters[channel] = instance
resolve()
@subscriber.on "subscribe", handler
@subscriber.subscribe channel
__removeLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
unless @terminated
await new @Promise (resolve, reject) =>
@subscriber.unsubscribe channel, (err, chan) ->
if err? then return reject err
if chan == channel then return resolve()
delete @limiters[channel]
__scriptArgs__: (name, id, args, cb) ->
keys = Scripts.keys name, id
[@shas[name], keys.length].concat keys, args, cb
__scriptFn__: (name) ->
@client.evalsha.bind(@client)
disconnect: (flush=true) ->
clearInterval(@limiters[k]._store.heartbeat) for k in Object.keys @limiters
@limiters = {}
@terminated = true
@client.end flush
@subscriber.end flush
@Promise.resolve()
module.exports = RedisConnection

158
node_modules/bottleneck/src/RedisDatastore.coffee generated vendored Normal file
View file

@ -0,0 +1,158 @@
parser = require "./parser"
BottleneckError = require "./BottleneckError"
RedisConnection = require "./RedisConnection"
IORedisConnection = require "./IORedisConnection"
class RedisDatastore
constructor: (@instance, @storeOptions, storeInstanceOptions) ->
@originalId = @instance.id
@clientId = @instance._randomIndex()
parser.load storeInstanceOptions, storeInstanceOptions, @
@clients = {}
@capacityPriorityCounters = {}
@sharedConnection = @connection?
@connection ?= if @instance.datastore == "redis" then new RedisConnection { @Redis, @clientOptions, @Promise, Events: @instance.Events }
else if @instance.datastore == "ioredis" then new IORedisConnection { @Redis, @clientOptions, @clusterNodes, @Promise, Events: @instance.Events }
@instance.connection = @connection
@instance.datastore = @connection.datastore
@ready = @connection.ready
.then (@clients) => @runScript "init", @prepareInitSettings @clearDatastore
.then => @connection.__addLimiter__ @instance
.then => @runScript "register_client", [@instance.queued()]
.then =>
(@heartbeat = setInterval =>
@runScript "heartbeat", []
.catch (e) => @instance.Events.trigger "error", e
, @heartbeatInterval).unref?()
@clients
__publish__: (message) ->
{ client } = await @ready
client.publish(@instance.channel(), "message:#{message.toString()}")
onMessage: (channel, message) ->
try
pos = message.indexOf(":")
[type, data] = [message.slice(0, pos), message.slice(pos+1)]
if type == "capacity"
await @instance._drainAll(if data.length > 0 then ~~data)
else if type == "capacity-priority"
[rawCapacity, priorityClient, counter] = data.split(":")
capacity = if rawCapacity.length > 0 then ~~rawCapacity
if priorityClient == @clientId
drained = await @instance._drainAll(capacity)
newCapacity = if capacity? then capacity - (drained or 0) else ""
await @clients.client.publish(@instance.channel(), "capacity-priority:#{newCapacity}::#{counter}")
else if priorityClient == ""
clearTimeout @capacityPriorityCounters[counter]
delete @capacityPriorityCounters[counter]
@instance._drainAll(capacity)
else
@capacityPriorityCounters[counter] = setTimeout =>
try
delete @capacityPriorityCounters[counter]
await @runScript "blacklist_client", [priorityClient]
await @instance._drainAll(capacity)
catch e then @instance.Events.trigger "error", e
, 1000
else if type == "message"
@instance.Events.trigger "message", data
else if type == "blocked"
await @instance._dropAllQueued()
catch e then @instance.Events.trigger "error", e
__disconnect__: (flush) ->
clearInterval @heartbeat
if @sharedConnection
@connection.__removeLimiter__ @instance
else
@connection.disconnect flush
runScript: (name, args) ->
await @ready unless name == "init" or name == "register_client"
new @Promise (resolve, reject) =>
all_args = [Date.now(), @clientId].concat args
@instance.Events.trigger "debug", "Calling Redis script: #{name}.lua", all_args
arr = @connection.__scriptArgs__ name, @originalId, all_args, (err, replies) ->
if err? then return reject err
return resolve replies
@connection.__scriptFn__(name) arr...
.catch (e) =>
if e.message == "SETTINGS_KEY_NOT_FOUND"
if name == "heartbeat" then @Promise.resolve()
else
@runScript("init", @prepareInitSettings(false))
.then => @runScript(name, args)
else if e.message == "UNKNOWN_CLIENT"
@runScript("register_client", [@instance.queued()])
.then => @runScript(name, args)
else @Promise.reject e
prepareArray: (arr) -> (if x? then x.toString() else "") for x in arr
prepareObject: (obj) ->
arr = []
for k, v of obj then arr.push k, (if v? then v.toString() else "")
arr
prepareInitSettings: (clear) ->
args = @prepareObject Object.assign({}, @storeOptions, {
id: @originalId
version: @instance.version
groupTimeout: @timeout
@clientTimeout
})
args.unshift (if clear then 1 else 0), @instance.version
args
convertBool: (b) -> !!b
__updateSettings__: (options) ->
await @runScript "update_settings", @prepareObject options
parser.overwrite options, options, @storeOptions
__running__: -> @runScript "running", []
__queued__: -> @runScript "queued", []
__done__: -> @runScript "done", []
__groupCheck__: -> @convertBool await @runScript "group_check", []
__incrementReservoir__: (incr) -> @runScript "increment_reservoir", [incr]
__currentReservoir__: -> @runScript "current_reservoir", []
__check__: (weight) -> @convertBool await @runScript "check", @prepareArray [weight]
__register__: (index, weight, expiration) ->
[success, wait, reservoir] = await @runScript "register", @prepareArray [index, weight, expiration]
return {
success: @convertBool(success),
wait,
reservoir
}
__submit__: (queueLength, weight) ->
try
[reachedHWM, blocked, strategy] = await @runScript "submit", @prepareArray [queueLength, weight]
return {
reachedHWM: @convertBool(reachedHWM),
blocked: @convertBool(blocked),
strategy
}
catch e
if e.message.indexOf("OVERWEIGHT") == 0
[overweight, weight, maxConcurrent] = e.message.split ":"
throw new BottleneckError("Impossible to add a job having a weight of #{weight} to a limiter having a maxConcurrent setting of #{maxConcurrent}")
else
throw e
__free__: (index, weight) ->
running = await @runScript "free", @prepareArray [index]
return { running }
module.exports = RedisDatastore

151
node_modules/bottleneck/src/Scripts.coffee generated vendored Normal file
View file

@ -0,0 +1,151 @@
lua = require "./lua.json"
headers =
refs: lua["refs.lua"]
validate_keys: lua["validate_keys.lua"]
validate_client: lua["validate_client.lua"]
refresh_expiration: lua["refresh_expiration.lua"]
process_tick: lua["process_tick.lua"]
conditions_check: lua["conditions_check.lua"]
get_time: lua["get_time.lua"]
exports.allKeys = (id) -> [
###
HASH
###
"b_#{id}_settings"
###
HASH
job index -> weight
###
"b_#{id}_job_weights"
###
ZSET
job index -> expiration
###
"b_#{id}_job_expirations"
###
HASH
job index -> client
###
"b_#{id}_job_clients"
###
ZSET
client -> sum running
###
"b_#{id}_client_running"
###
HASH
client -> num queued
###
"b_#{id}_client_num_queued"
###
ZSET
client -> last job registered
###
"b_#{id}_client_last_registered"
###
ZSET
client -> last seen
###
"b_#{id}_client_last_seen"
]
templates =
init:
keys: exports.allKeys
headers: ["process_tick"]
refresh_expiration: true
code: lua["init.lua"]
group_check:
keys: exports.allKeys
headers: []
refresh_expiration: false
code: lua["group_check.lua"]
register_client:
keys: exports.allKeys
headers: ["validate_keys"]
refresh_expiration: false
code: lua["register_client.lua"]
blacklist_client:
keys: exports.allKeys
headers: ["validate_keys", "validate_client"]
refresh_expiration: false
code: lua["blacklist_client.lua"]
heartbeat:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["heartbeat.lua"]
update_settings:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: true
code: lua["update_settings.lua"]
running:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["running.lua"]
queued:
keys: exports.allKeys
headers: ["validate_keys", "validate_client"]
refresh_expiration: false
code: lua["queued.lua"]
done:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["done.lua"]
check:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"]
refresh_expiration: false
code: lua["check.lua"]
submit:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"]
refresh_expiration: true
code: lua["submit.lua"]
register:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"]
refresh_expiration: true
code: lua["register.lua"]
free:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: true
code: lua["free.lua"]
current_reservoir:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["current_reservoir.lua"]
increment_reservoir:
keys: exports.allKeys
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: true
code: lua["increment_reservoir.lua"]
exports.names = Object.keys templates
exports.keys = (name, id) ->
templates[name].keys id
exports.payload = (name) ->
template = templates[name]
Array::concat(
headers.refs,
template.headers.map((h) -> headers[h]),
(if template.refresh_expiration then headers.refresh_expiration else ""),
template.code
)
.join("\n")

43
node_modules/bottleneck/src/States.coffee generated vendored Normal file
View file

@ -0,0 +1,43 @@
BottleneckError = require "./BottleneckError"
class States
constructor: (@status) ->
@_jobs = {}
@counts = @status.map(-> 0)
next: (id) ->
current = @_jobs[id]
next = current + 1
if current? and next < @status.length
@counts[current]--
@counts[next]++
@_jobs[id]++
else if current?
@counts[current]--
delete @_jobs[id]
start: (id) ->
initial = 0
@_jobs[id] = initial
@counts[initial]++
remove: (id) ->
current = @_jobs[id]
if current?
@counts[current]--
delete @_jobs[id]
current?
jobStatus: (id) -> @status[@_jobs[id]] ? null
statusJobs: (status) ->
if status?
pos = @status.indexOf status
if pos < 0
throw new BottleneckError "status must be one of #{@status.join ', '}"
k for k,v of @_jobs when v == pos
else
Object.keys @_jobs
statusCounts: -> @counts.reduce(((acc, v, i) => acc[@status[i]] = v; acc), {})
module.exports = States

28
node_modules/bottleneck/src/Sync.coffee generated vendored Normal file
View file

@ -0,0 +1,28 @@
DLList = require "./DLList"
class Sync
constructor: (@name, @Promise) ->
@_running = 0
@_queue = new DLList()
isEmpty: -> @_queue.length == 0
_tryToRun: ->
if (@_running < 1) and @_queue.length > 0
@_running++
{ task, args, resolve, reject } = @_queue.shift()
cb = try
returned = await task args...
() -> resolve returned
catch error
() -> reject error
@_running--
@_tryToRun()
cb()
schedule: (task, args...) =>
resolve = reject = null
promise = new @Promise (_resolve, _reject) ->
resolve = _resolve
reject = _reject
@_queue.push { task, args, resolve, reject }
@_tryToRun()
promise
module.exports = Sync

3
node_modules/bottleneck/src/es5.coffee generated vendored Normal file
View file

@ -0,0 +1,3 @@
require("regenerator-runtime/runtime")
module.exports = require "./Bottleneck"

1
node_modules/bottleneck/src/index.coffee generated vendored Normal file
View file

@ -0,0 +1 @@
module.exports = require "./Bottleneck"

10
node_modules/bottleneck/src/parser.coffee generated vendored Normal file
View file

@ -0,0 +1,10 @@
exports.load = (received, defaults, onto={}) ->
for k, v of defaults
onto[k] = received[k] ? v
onto
exports.overwrite = (received, defaults, onto={}) ->
for k, v of received
if defaults[k] != undefined
onto[k] = v
onto

View file

@ -0,0 +1,8 @@
local blacklist = ARGV[num_static_argv + 1]
if redis.call('zscore', client_last_seen_key, blacklist) then
redis.call('zadd', client_last_seen_key, 0, blacklist)
end
return {}

6
node_modules/bottleneck/src/redis/check.lua generated vendored Normal file
View file

@ -0,0 +1,6 @@
local weight = tonumber(ARGV[num_static_argv + 1])
local capacity = process_tick(now, false)['capacity']
local nextRequest = tonumber(redis.call('hget', settings_key, 'nextRequest'))
return conditions_check(capacity, weight) and nextRequest - now <= 0

View file

@ -0,0 +1,3 @@
local conditions_check = function (capacity, weight)
return capacity == nil or weight <= capacity
end

View file

@ -0,0 +1 @@
return process_tick(now, false)['reservoir']

3
node_modules/bottleneck/src/redis/done.lua generated vendored Normal file
View file

@ -0,0 +1,3 @@
process_tick(now, false)
return tonumber(redis.call('hget', settings_key, 'done'))

5
node_modules/bottleneck/src/redis/free.lua generated vendored Normal file
View file

@ -0,0 +1,5 @@
local index = ARGV[num_static_argv + 1]
redis.call('zadd', job_expirations_key, 0, index)
return process_tick(now, false)['running']

7
node_modules/bottleneck/src/redis/get_time.lua generated vendored Normal file
View file

@ -0,0 +1,7 @@
redis.replicate_commands()
local get_time = function ()
local time = redis.call('time')
return tonumber(time[1]..string.sub(time[2], 1, 3))
end

1
node_modules/bottleneck/src/redis/group_check.lua generated vendored Normal file
View file

@ -0,0 +1 @@
return not (redis.call('exists', settings_key) == 1)

1
node_modules/bottleneck/src/redis/heartbeat.lua generated vendored Normal file
View file

@ -0,0 +1 @@
process_tick(now, true)

View file

@ -0,0 +1,10 @@
local incr = tonumber(ARGV[num_static_argv + 1])
redis.call('hincrby', settings_key, 'reservoir', incr)
local reservoir = process_tick(now, true)['reservoir']
local groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))
refresh_expiration(0, 0, groupTimeout)
return reservoir

105
node_modules/bottleneck/src/redis/init.lua generated vendored Normal file
View file

@ -0,0 +1,105 @@
local clear = tonumber(ARGV[num_static_argv + 1])
local limiter_version = ARGV[num_static_argv + 2]
local num_local_argv = num_static_argv + 2
if clear == 1 then
redis.call('del', unpack(KEYS))
end
if redis.call('exists', settings_key) == 0 then
-- Create
local args = {'hmset', settings_key}
for i = num_local_argv + 1, #ARGV do
table.insert(args, ARGV[i])
end
redis.call(unpack(args))
redis.call('hmset', settings_key,
'nextRequest', now,
'lastReservoirRefresh', now,
'lastReservoirIncrease', now,
'running', 0,
'done', 0,
'unblockTime', 0,
'capacityPriorityCounter', 0
)
else
-- Apply migrations
local settings = redis.call('hmget', settings_key,
'id',
'version'
)
local id = settings[1]
local current_version = settings[2]
if current_version ~= limiter_version then
local version_digits = {}
for k, v in string.gmatch(current_version, "([^.]+)") do
table.insert(version_digits, tonumber(k))
end
-- 2.10.0
if version_digits[2] < 10 then
redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')
redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')
redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')
redis.call('hsetnx', settings_key, 'done', 0)
redis.call('hset', settings_key, 'version', '2.10.0')
end
-- 2.11.1
if version_digits[2] < 11 or (version_digits[2] == 11 and version_digits[3] < 1) then
if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then
redis.call('hmset', settings_key,
'lastReservoirRefresh', now,
'version', '2.11.1'
)
end
end
-- 2.14.0
if version_digits[2] < 14 then
local old_running_key = 'b_'..id..'_running'
local old_executing_key = 'b_'..id..'_executing'
if redis.call('exists', old_running_key) == 1 then
redis.call('rename', old_running_key, job_weights_key)
end
if redis.call('exists', old_executing_key) == 1 then
redis.call('rename', old_executing_key, job_expirations_key)
end
redis.call('hset', settings_key, 'version', '2.14.0')
end
-- 2.15.2
if version_digits[2] < 15 or (version_digits[2] == 15 and version_digits[3] < 2) then
redis.call('hsetnx', settings_key, 'capacityPriorityCounter', 0)
redis.call('hset', settings_key, 'version', '2.15.2')
end
-- 2.17.0
if version_digits[2] < 17 then
redis.call('hsetnx', settings_key, 'clientTimeout', 10000)
redis.call('hset', settings_key, 'version', '2.17.0')
end
-- 2.18.0
if version_digits[2] < 18 then
redis.call('hsetnx', settings_key, 'reservoirIncreaseInterval', '')
redis.call('hsetnx', settings_key, 'reservoirIncreaseAmount', '')
redis.call('hsetnx', settings_key, 'reservoirIncreaseMaximum', '')
redis.call('hsetnx', settings_key, 'lastReservoirIncrease', now)
redis.call('hset', settings_key, 'version', '2.18.0')
end
end
process_tick(now, false)
end
local groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))
refresh_expiration(0, 0, groupTimeout)
return {}

214
node_modules/bottleneck/src/redis/process_tick.lua generated vendored Normal file
View file

@ -0,0 +1,214 @@
local process_tick = function (now, always_publish)
local compute_capacity = function (maxConcurrent, running, reservoir)
if maxConcurrent ~= nil and reservoir ~= nil then
return math.min((maxConcurrent - running), reservoir)
elseif maxConcurrent ~= nil then
return maxConcurrent - running
elseif reservoir ~= nil then
return reservoir
else
return nil
end
end
local settings = redis.call('hmget', settings_key,
'id',
'maxConcurrent',
'running',
'reservoir',
'reservoirRefreshInterval',
'reservoirRefreshAmount',
'lastReservoirRefresh',
'reservoirIncreaseInterval',
'reservoirIncreaseAmount',
'reservoirIncreaseMaximum',
'lastReservoirIncrease',
'capacityPriorityCounter',
'clientTimeout'
)
local id = settings[1]
local maxConcurrent = tonumber(settings[2])
local running = tonumber(settings[3])
local reservoir = tonumber(settings[4])
local reservoirRefreshInterval = tonumber(settings[5])
local reservoirRefreshAmount = tonumber(settings[6])
local lastReservoirRefresh = tonumber(settings[7])
local reservoirIncreaseInterval = tonumber(settings[8])
local reservoirIncreaseAmount = tonumber(settings[9])
local reservoirIncreaseMaximum = tonumber(settings[10])
local lastReservoirIncrease = tonumber(settings[11])
local capacityPriorityCounter = tonumber(settings[12])
local clientTimeout = tonumber(settings[13])
local initial_capacity = compute_capacity(maxConcurrent, running, reservoir)
--
-- Process 'running' changes
--
local expired = redis.call('zrangebyscore', job_expirations_key, '-inf', '('..now)
if #expired > 0 then
redis.call('zremrangebyscore', job_expirations_key, '-inf', '('..now)
local flush_batch = function (batch, acc)
local weights = redis.call('hmget', job_weights_key, unpack(batch))
redis.call('hdel', job_weights_key, unpack(batch))
local clients = redis.call('hmget', job_clients_key, unpack(batch))
redis.call('hdel', job_clients_key, unpack(batch))
-- Calculate sum of removed weights
for i = 1, #weights do
acc['total'] = acc['total'] + (tonumber(weights[i]) or 0)
end
-- Calculate sum of removed weights by client
local client_weights = {}
for i = 1, #clients do
local removed = tonumber(weights[i]) or 0
if removed > 0 then
acc['client_weights'][clients[i]] = (acc['client_weights'][clients[i]] or 0) + removed
end
end
end
local acc = {
['total'] = 0,
['client_weights'] = {}
}
local batch_size = 1000
-- Compute changes to Zsets and apply changes to Hashes
for i = 1, #expired, batch_size do
local batch = {}
for j = i, math.min(i + batch_size - 1, #expired) do
table.insert(batch, expired[j])
end
flush_batch(batch, acc)
end
-- Apply changes to Zsets
if acc['total'] > 0 then
redis.call('hincrby', settings_key, 'done', acc['total'])
running = tonumber(redis.call('hincrby', settings_key, 'running', -acc['total']))
end
for client, weight in pairs(acc['client_weights']) do
redis.call('zincrby', client_running_key, -weight, client)
end
end
--
-- Process 'reservoir' changes
--
local reservoirRefreshActive = reservoirRefreshInterval ~= nil and reservoirRefreshAmount ~= nil
if reservoirRefreshActive and now >= lastReservoirRefresh + reservoirRefreshInterval then
reservoir = reservoirRefreshAmount
redis.call('hmset', settings_key,
'reservoir', reservoir,
'lastReservoirRefresh', now
)
end
local reservoirIncreaseActive = reservoirIncreaseInterval ~= nil and reservoirIncreaseAmount ~= nil
if reservoirIncreaseActive and now >= lastReservoirIncrease + reservoirIncreaseInterval then
local num_intervals = math.floor((now - lastReservoirIncrease) / reservoirIncreaseInterval)
local incr = reservoirIncreaseAmount * num_intervals
if reservoirIncreaseMaximum ~= nil then
incr = math.min(incr, reservoirIncreaseMaximum - (reservoir or 0))
end
if incr > 0 then
reservoir = (reservoir or 0) + incr
end
redis.call('hmset', settings_key,
'reservoir', reservoir,
'lastReservoirIncrease', lastReservoirIncrease + (num_intervals * reservoirIncreaseInterval)
)
end
--
-- Clear unresponsive clients
--
local unresponsive = redis.call('zrangebyscore', client_last_seen_key, '-inf', (now - clientTimeout))
local unresponsive_lookup = {}
local terminated_clients = {}
for i = 1, #unresponsive do
unresponsive_lookup[unresponsive[i]] = true
if tonumber(redis.call('zscore', client_running_key, unresponsive[i])) == 0 then
table.insert(terminated_clients, unresponsive[i])
end
end
if #terminated_clients > 0 then
redis.call('zrem', client_running_key, unpack(terminated_clients))
redis.call('hdel', client_num_queued_key, unpack(terminated_clients))
redis.call('zrem', client_last_registered_key, unpack(terminated_clients))
redis.call('zrem', client_last_seen_key, unpack(terminated_clients))
end
--
-- Broadcast capacity changes
--
local final_capacity = compute_capacity(maxConcurrent, running, reservoir)
if always_publish or (initial_capacity ~= nil and final_capacity == nil) then
-- always_publish or was not unlimited, now unlimited
redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))
elseif initial_capacity ~= nil and final_capacity ~= nil and final_capacity > initial_capacity then
-- capacity was increased
-- send the capacity message to the limiter having the lowest number of running jobs
-- the tiebreaker is the limiter having not registered a job in the longest time
local lowest_concurrency_value = nil
local lowest_concurrency_clients = {}
local lowest_concurrency_last_registered = {}
local client_concurrencies = redis.call('zrange', client_running_key, 0, -1, 'withscores')
for i = 1, #client_concurrencies, 2 do
local client = client_concurrencies[i]
local concurrency = tonumber(client_concurrencies[i+1])
if (
lowest_concurrency_value == nil or lowest_concurrency_value == concurrency
) and (
not unresponsive_lookup[client]
) and (
tonumber(redis.call('hget', client_num_queued_key, client)) > 0
) then
lowest_concurrency_value = concurrency
table.insert(lowest_concurrency_clients, client)
local last_registered = tonumber(redis.call('zscore', client_last_registered_key, client))
table.insert(lowest_concurrency_last_registered, last_registered)
end
end
if #lowest_concurrency_clients > 0 then
local position = 1
local earliest = lowest_concurrency_last_registered[1]
for i,v in ipairs(lowest_concurrency_last_registered) do
if v < earliest then
position = i
earliest = v
end
end
local next_client = lowest_concurrency_clients[position]
redis.call('publish', 'b_'..id,
'capacity-priority:'..(final_capacity or '')..
':'..next_client..
':'..capacityPriorityCounter
)
redis.call('hincrby', settings_key, 'capacityPriorityCounter', '1')
else
redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))
end
end
return {
['capacity'] = final_capacity,
['running'] = running,
['reservoir'] = reservoir
}
end

10
node_modules/bottleneck/src/redis/queued.lua generated vendored Normal file
View file

@ -0,0 +1,10 @@
local clientTimeout = tonumber(redis.call('hget', settings_key, 'clientTimeout'))
local valid_clients = redis.call('zrangebyscore', client_last_seen_key, (now - clientTimeout), 'inf')
local client_queued = redis.call('hmget', client_num_queued_key, unpack(valid_clients))
local sum = 0
for i = 1, #client_queued do
sum = sum + tonumber(client_queued[i])
end
return sum

View file

@ -0,0 +1,11 @@
local refresh_expiration = function (now, nextRequest, groupTimeout)
if groupTimeout ~= nil then
local ttl = (nextRequest + groupTimeout) - now
for i = 1, #KEYS do
redis.call('pexpire', KEYS[i], ttl)
end
end
end

13
node_modules/bottleneck/src/redis/refs.lua generated vendored Normal file
View file

@ -0,0 +1,13 @@
local settings_key = KEYS[1]
local job_weights_key = KEYS[2]
local job_expirations_key = KEYS[3]
local job_clients_key = KEYS[4]
local client_running_key = KEYS[5]
local client_num_queued_key = KEYS[6]
local client_last_registered_key = KEYS[7]
local client_last_seen_key = KEYS[8]
local now = tonumber(ARGV[1])
local client = ARGV[2]
local num_static_argv = 2

51
node_modules/bottleneck/src/redis/register.lua generated vendored Normal file
View file

@ -0,0 +1,51 @@
local index = ARGV[num_static_argv + 1]
local weight = tonumber(ARGV[num_static_argv + 2])
local expiration = tonumber(ARGV[num_static_argv + 3])
local state = process_tick(now, false)
local capacity = state['capacity']
local reservoir = state['reservoir']
local settings = redis.call('hmget', settings_key,
'nextRequest',
'minTime',
'groupTimeout'
)
local nextRequest = tonumber(settings[1])
local minTime = tonumber(settings[2])
local groupTimeout = tonumber(settings[3])
if conditions_check(capacity, weight) then
redis.call('hincrby', settings_key, 'running', weight)
redis.call('hset', job_weights_key, index, weight)
if expiration ~= nil then
redis.call('zadd', job_expirations_key, now + expiration, index)
end
redis.call('hset', job_clients_key, index, client)
redis.call('zincrby', client_running_key, weight, client)
redis.call('hincrby', client_num_queued_key, client, -1)
redis.call('zadd', client_last_registered_key, now, client)
local wait = math.max(nextRequest - now, 0)
local newNextRequest = now + wait + minTime
if reservoir == nil then
redis.call('hset', settings_key,
'nextRequest', newNextRequest
)
else
reservoir = reservoir - weight
redis.call('hmset', settings_key,
'reservoir', reservoir,
'nextRequest', newNextRequest
)
end
refresh_expiration(now, newNextRequest, groupTimeout)
return {true, wait, reservoir}
else
return {false}
end

12
node_modules/bottleneck/src/redis/register_client.lua generated vendored Normal file
View file

@ -0,0 +1,12 @@
local queued = tonumber(ARGV[num_static_argv + 1])
-- Could have been re-registered concurrently
if not redis.call('zscore', client_last_seen_key, client) then
redis.call('zadd', client_running_key, 0, client)
redis.call('hset', client_num_queued_key, client, queued)
redis.call('zadd', client_last_registered_key, 0, client)
end
redis.call('zadd', client_last_seen_key, now, client)
return {}

1
node_modules/bottleneck/src/redis/running.lua generated vendored Normal file
View file

@ -0,0 +1 @@
return process_tick(now, false)['running']

74
node_modules/bottleneck/src/redis/submit.lua generated vendored Normal file
View file

@ -0,0 +1,74 @@
local queueLength = tonumber(ARGV[num_static_argv + 1])
local weight = tonumber(ARGV[num_static_argv + 2])
local capacity = process_tick(now, false)['capacity']
local settings = redis.call('hmget', settings_key,
'id',
'maxConcurrent',
'highWater',
'nextRequest',
'strategy',
'unblockTime',
'penalty',
'minTime',
'groupTimeout'
)
local id = settings[1]
local maxConcurrent = tonumber(settings[2])
local highWater = tonumber(settings[3])
local nextRequest = tonumber(settings[4])
local strategy = tonumber(settings[5])
local unblockTime = tonumber(settings[6])
local penalty = tonumber(settings[7])
local minTime = tonumber(settings[8])
local groupTimeout = tonumber(settings[9])
if maxConcurrent ~= nil and weight > maxConcurrent then
return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)
end
local reachedHWM = (highWater ~= nil and queueLength == highWater
and not (
conditions_check(capacity, weight)
and nextRequest - now <= 0
)
)
local blocked = strategy == 3 and (reachedHWM or unblockTime >= now)
if blocked then
local computedPenalty = penalty
if computedPenalty == nil then
if minTime == 0 then
computedPenalty = 5000
else
computedPenalty = 15 * minTime
end
end
local newNextRequest = now + computedPenalty + minTime
redis.call('hmset', settings_key,
'unblockTime', now + computedPenalty,
'nextRequest', newNextRequest
)
local clients_queued_reset = redis.call('hkeys', client_num_queued_key)
local queued_reset = {}
for i = 1, #clients_queued_reset do
table.insert(queued_reset, clients_queued_reset[i])
table.insert(queued_reset, 0)
end
redis.call('hmset', client_num_queued_key, unpack(queued_reset))
redis.call('publish', 'b_'..id, 'blocked:')
refresh_expiration(now, newNextRequest, groupTimeout)
end
if not blocked and not reachedHWM then
redis.call('hincrby', client_num_queued_key, client, 1)
end
return {reachedHWM, blocked, strategy}

14
node_modules/bottleneck/src/redis/update_settings.lua generated vendored Normal file
View file

@ -0,0 +1,14 @@
local args = {'hmset', settings_key}
for i = num_static_argv + 1, #ARGV do
table.insert(args, ARGV[i])
end
redis.call(unpack(args))
process_tick(now, true)
local groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))
refresh_expiration(0, 0, groupTimeout)
return {}

View file

@ -0,0 +1,5 @@
if not redis.call('zscore', client_last_seen_key, client) then
return redis.error_reply('UNKNOWN_CLIENT')
end
redis.call('zadd', client_last_seen_key, now, client)

3
node_modules/bottleneck/src/redis/validate_keys.lua generated vendored Normal file
View file

@ -0,0 +1,3 @@
if not (redis.call('exists', settings_key) == 1) then
return redis.error_reply('SETTINGS_KEY_NOT_FOUND')
end