diff --git a/README.md b/README.md index bb9c1df5d3aaa006d3e3a5dbb2168cecf9faac06..814cc7884551162ea38a8852ef49b0b9a1533222 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Are you developing bull sponsored by a company? Please, let us now! - [x] Robust design based on Redis. - [x] Delayed jobs. - [x] Schedule and repeat jobs according to a cron specification. +- [x] Rate limiter for jobs. - [x] Retries. - [x] Priority. - [x] Concurrency. @@ -83,7 +84,6 @@ Are you developing bull sponsored by a company? Please, let us now! And coming up on the roadmap... - [ ] Job completion acknowledgement. -- [ ] Rate limiter for jobs. - [ ] Parent-child jobs relationships. --- diff --git a/REFERENCE.md b/REFERENCE.md index 74de1080fee25aa5b0b14c49f223194b3b84d4cb..1f3e39a80aa6387f99396b9210383fea79941a2a 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -49,12 +49,20 @@ The optional ```url``` argument, allows to specify a redis connection string suc ```typescript interface QueueOpts{ + limiter?: RateLimiter; redis?: RedisOpts; prefix?: string = 'bull'; // prefix for all queue keys. settings?: AdvancedSettings; } ``` +```typescript +interface RateLimiter { + max: number, // Max number of jobs processed + duration: number, // per duration in milliseconds +} +``` + ```RedisOpts``` are passed directly to ioredis constructor, check [ioredis](https://github.com/luin/ioredis/blob/master/API.md) for details. We document here just the most important ones. diff --git a/lib/commands/addJob-6.lua b/lib/commands/addJob-6.lua index cbb33aa842ff9fb74a5e42abadf5c343a8776f37..3f10fc2c83b35dcf1a347b7986253e1c5eeb4a57 100644 --- a/lib/commands/addJob-6.lua +++ b/lib/commands/addJob-6.lua @@ -53,10 +53,11 @@ end redis.call("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7]) -- Check if job is delayed -if(tonumber(ARGV[8]) ~= 0) then - local timestamp = tonumber(ARGV[8]) * 0x1000 + bit.band(jobCounter, 0xfff) +local delayedTimestamp = tonumber(ARGV[8]) +if(delayedTimestamp ~= 0) then + local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff) redis.call("ZADD", KEYS[5], timestamp, jobId) - redis.call("PUBLISH", KEYS[5], (timestamp / 0x1000)) + redis.call("PUBLISH", KEYS[5], delayedTimestamp) else local target diff --git a/lib/commands/moveToActive-5.lua b/lib/commands/moveToActive-7.lua similarity index 57% rename from lib/commands/moveToActive-5.lua rename to lib/commands/moveToActive-7.lua index d60cc29641ce4f045ef2577e63ce96fadda0bd15..ebfdc2911d95047ace227efb06aae7e860dc3466 100644 --- a/lib/commands/moveToActive-5.lua +++ b/lib/commands/moveToActive-7.lua @@ -13,11 +13,18 @@ KEYS[4] active event key KEYS[5] stalled key + -- Rate limiting + KEYS[6] rate limiter key + KEYS[7] delayed key + ARGV[1] key prefix ARGV[2] lock token ARGV[3] lock duration in milliseconds ARGV[4] timestamp ARGV[5] optional jobid + + ARGV[6] optional jobs per time unit (rate limiter) + ARGV[7] optional time unit ]] local jobId @@ -32,10 +39,32 @@ else end if jobId then + -- Check if we need to perform rate limiting. + if(ARGV[6]) then + local jobCounter + local maxJobs = tonumber(ARGV[6]) + jobCounter = tonumber(redis.call("GET", KEYS[6])) + if jobCounter ~= nil and jobCounter >= maxJobs then + local delay = tonumber(redis.call("PTTL", KEYS[6])) + local timestamp = delay + tonumber(ARGV[4]) + + redis.call("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId) + redis.call("PUBLISH", KEYS[7], timestamp) + redis.call("LREM", KEYS[2], 1, jobId) + return + else + jobCounter = redis.call("INCR", KEYS[6]) + if tonumber(jobCounter) == 1 then + redis.call("PEXPIRE", KEYS[6], ARGV[7]) + end + end + + end + local jobKey = ARGV[1] .. jobId local lockKey = jobKey .. ':lock' - -- get a the lock + -- get a lock redis.call("SET", lockKey, ARGV[2], "PX", ARGV[3]) redis.call("ZREM", KEYS[3], jobId) -- remove from priority diff --git a/lib/queue.js b/lib/queue.js index 1b1e83bf19b888a1e0c168a2894453b773f3f316..c38aa4f269a43be87cf07fd1ab212e95625da430 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -51,13 +51,13 @@ var commands = require('./commands/'); delayed job is processed after timing out. */ var MINIMUM_REDIS_VERSION = '2.8.18'; - var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed /* interface QueueOptions { prefix?: string = 'bull', - redis : RedisOpts, // ioredis defaults + limiter?: RateLimiter, + redis : RedisOpts, // ioredis defaults, createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient, // Advanced settings @@ -70,6 +70,11 @@ var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed retryProcessDelay?: number = 5000 } } + + interface RateLimiter { + max: number, // Number of jobs + duration: number, // per duration milliseconds + } */ // Queue(name: string, url?, opts?) @@ -93,6 +98,10 @@ var Queue = function Queue(name, url, opts){ throw Error('Options must be a valid object'); } + if(opts.limiter){ + this.limiter = opts.limiter; + } + this.name = name; this.token = uuid(); @@ -203,7 +212,8 @@ var Queue = function Queue(name, url, opts){ 'completed', 'failed', 'stalled', - 'repeat'], function(key){ + 'repeat', + 'limiter'], function(key){ keys[key] = _this.toKey(key); }); this.keys = keys; diff --git a/lib/scripts.js b/lib/scripts.js index 97a7bd0e66776ed3889930907b01a84d1d12aa04..fe4951a0ac44c9be2b93848e6ef049a97d164314 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -76,6 +76,8 @@ var scripts = { keys[3] = keys[1] + '@' + queue.token; keys[4] = queueKeys.stalled; + keys[5] = queueKeys.limiter; + keys[6] = queueKeys.delayed; var args = [ queueKeys[''], @@ -85,6 +87,9 @@ var scripts = { jobId ]; + if(queue.limiter){ + args.push(queue.limiter.max, queue.limiter.duration); + } return queue.client.moveToActive(keys.concat(args)).then(function(result){ if(result){ var jobData = result[0]; diff --git a/test/test_rate_limiter.js b/test/test_rate_limiter.js new file mode 100644 index 0000000000000000000000000000000000000000..3d85e2600f9ff3692e8ecf884235859031647a98 --- /dev/null +++ b/test/test_rate_limiter.js @@ -0,0 +1,53 @@ +/*eslint-env node */ +'use strict'; + +var expect = require('chai').expect; +var utils = require('./utils'); +var redis = require('ioredis'); +var _ = require('lodash'); + +describe('Rate limiter', function () { + var queue; + + beforeEach(function(){ + var client = new redis(); + return client.flushdb().then(function(){ + queue = utils.buildQueue('test rate limiter', { + limiter: { + max: 1, + duration: 1000 + } + }); + }); + }); + + afterEach(function(){ + return queue.close(); + }); + + it('should obey the rate limit', function(done) { + var startTime = new Date().getTime(); + var nbProcessed = 0; + + queue.process(function() { + return Promise.resolve(); + }); + + queue.add({}); + queue.add({}); + queue.add({}); + queue.add({}); + + queue.on('completed', _.after(4, function() { + try { + expect(new Date().getTime() - startTime).to.be.above(3000); + done(); + } catch (e) { + done(e); + } + })); + + queue.on('failed', done); + }); + +}); diff --git a/test/test_repeat.js b/test/test_repeat.js index 7da3c92f3260589b11f1a54590ea4a691e105999..3b002b332ac19955acbd897d898a6a503e90afc2 100644 --- a/test/test_repeat.js +++ b/test/test_repeat.js @@ -13,7 +13,7 @@ var ONE_HOUR = 60 * ONE_MINUTE; var ONE_DAY = 24 * ONE_HOUR; var ONE_MONTH = 31 * ONE_DAY; -describe.only('repeat', function () { +describe('repeat', function () { var queue; beforeEach(function(){