diff --git a/lib/job.js b/lib/job.js index 7b85c819fb3414a14ea81ed27adcba7aa0473b75..c487ce3c5692717ef4673b5e66267d3c7ae3ef71 100644 --- a/lib/job.js +++ b/lib/job.js @@ -419,6 +419,10 @@ Job.prototype.isStuck = function() { }); }; +Job.prototype.isDiscarded = function() { + return this._discarded; +}; + Job.prototype.getState = function() { const fns = [ { fn: 'isCompleted', state: 'completed' }, diff --git a/lib/process/master.js b/lib/process/master.js index 67eb871cfe9420bc473f70dc4f16ac1aff208bb3..fac07fbbd0e8e4ba58eb2b404e159506c7ceccb2 100644 --- a/lib/process/master.js +++ b/lib/process/master.js @@ -183,5 +183,13 @@ function wrapJob(job) { }); job.data = data; }; + /* + * Emulate the real job `discard` function. + */ + job.discard = function() { + process.send({ + cmd: 'discard' + }); + }; return job; } diff --git a/lib/process/sandbox.js b/lib/process/sandbox.js index cfa67c842ccee9b74f24e682e9fc1c2b1532784e..a9f32946cc3c13c9cd6f1b307a02090646ddd46b 100644 --- a/lib/process/sandbox.js +++ b/lib/process/sandbox.js @@ -33,6 +33,9 @@ module.exports = function(processFile, childPool) { case 'update': job.update(msg.value); break; + case 'discard': + job.discard(); + break; case 'log': job.log(msg.value); break; diff --git a/test/fixtures/fixture_processor_discard.js b/test/fixtures/fixture_processor_discard.js new file mode 100644 index 0000000000000000000000000000000000000000..0d38884499f44ee3e6c6e4c76999944e11ed4bb7 --- /dev/null +++ b/test/fixtures/fixture_processor_discard.js @@ -0,0 +1,14 @@ +/** + * A processor file to be used in tests. + * + */ +'use strict'; + +const delay = require('delay'); + +module.exports = function(job) { + return delay(500).then(() => { + job.discard(); + throw new Error('Manually discarded processor'); + }); +}; diff --git a/test/test_sandboxed_process.js b/test/test_sandboxed_process.js index c90c32d211ed9c03b44bcd7bb9e683d2ad170538..537c0b3310407083f0928f8b816f4e4fa71f44b4 100644 --- a/test/test_sandboxed_process.js +++ b/test/test_sandboxed_process.js @@ -282,6 +282,27 @@ describe('sandboxed process', () => { queue.add({ foo: 'bar' }); }); + it('should process, discard and fail without retry', done => { + queue.process(__dirname + '/fixtures/fixture_processor_discard.js'); + + queue.on('failed', (job, err) => { + try { + expect(job.data).eql({ foo: 'bar' }); + expect(job.isDiscarded()).to.be.true; + expect(job.failedReason).eql('Manually discarded processor'); + expect(err.message).eql('Manually discarded processor'); + expect(err.stack).include('fixture_processor_discard.js'); + expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); + expect(queue.childPool.getAllFree()).to.have.lengthOf(1); + done(); + } catch (err) { + done(err); + } + }); + + queue.add({ foo: 'bar' }); + }); + it('should process and fail', done => { queue.process(__dirname + '/fixtures/fixture_processor_fail.js');