diff --git a/lib/queue.js b/lib/queue.js index 37a60dc583df2cf5878f3505185b5b925c6bb463..27d9aa075b203f8032e4cee2e31a4ec2d6d717ed 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -36,7 +36,7 @@ var debuglog = require('debuglog')('bull'); Delayed jobs are jobs that cannot be executed until a certain time in ms has passed since they were added to the queue. The mechanism is simple, a delayedTimestamp variable holds the next - known timestamp that is on the delayed set (or MAX_INT if none). + known timestamp that is on the delayed set (or MAX_TIMEOUT_MS if none). When the current job has finalized the variable is checked, if no delayed job has to be executed yet a setTimeout is set so that a @@ -59,6 +59,8 @@ var REDLOCK_DRIFT_FACTOR = 0.01; var REDLOCK_RETRY_COUNT = 0; var REDLOCK_RETRY_DELAY = 200; +var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed + var Queue = function Queue(name, redisPort, redisHost, redisOptions){ if(!(this instanceof Queue)){ return new Queue(name, redisPort, redisHost, redisOptions); @@ -555,7 +557,7 @@ Queue.prototype.run = function(concurrency){ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ var _this = this; - if(newDelayedTimestamp < _this.delayedTimestamp){ + if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){ clearTimeout(this.delayTimer); this.delayedTimestamp = newDelayedTimestamp; @@ -570,7 +572,7 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ nextTimestamp = Number.MAX_VALUE; } _this.updateDelayTimer(nextTimestamp); - }).catch(function(err){ + }).catch(function(err){ console.error('Error updating the delay timer', err); }); _this.delayedTimestamp = Number.MAX_VALUE; diff --git a/test/test_queue.js b/test/test_queue.js index 356367e5c93b56ac0b387ee3f5e633d1335c1dfe..cc28013abe9ee81037144b276dcd25ebd45992d8 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1118,7 +1118,7 @@ describe('Queue', function () { }); }).then(function () { expect(publishHappened).to.be(true); - queue.close().then(done, done); + queue.close(true).then(done, done); }); });