-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy paththrough2-concurrent.js
More file actions
163 lines (143 loc) · 4.19 KB
/
through2-concurrent.js
File metadata and controls
163 lines (143 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Like through2 except execute in parallel with a set maximum
// concurrency
"use strict";
var through2 = require('through2');
function cbNoop (cb) {
cb();
}
function removeFirst(arr, item) {
arr.some((v, index) => {
if (v === item) {
arr.splice(index, 1);
return true;
}
return false;
});
}
module.exports = function concurrentThrough (options, transform, flush) {
var concurrent = 0, lastCallback = null, pendingFinish = null;
if (typeof options === 'function') {
flush = transform;
transform = options;
options = {};
}
var maxConcurrency = options.maxConcurrency || 16;
if (options.preserveOrder) {
const sendArr = [];
const doFinal = options.final || cbNoop;
options.final = function(finalCb) {
Promise.all(sendArr).then(() => {
process.nextTick(() => {
doFinal.call(this, finalCb);
})
});
};
const fnFlush = function(flushCb) {
if (flush) {
flush.call(this, flushCb);
} else {
flushCb();
}
};
const fnTransform = async function(data, encoding, callback) {
const sendP = new Promise((resolve, reject) => {
transform.call(this, data, encoding, (err, sendData) => {
if (err) {
reject(err);
} else {
resolve(sendData);
}
});
});
sendArr.push(sendP);
// throttle
if (sendArr.length >= maxConcurrency) {
await Promise.all(sendArr.slice());
const sendData = await sendP;
removeFirst(sendArr, sendP);
callback(null, sendData);
return;
}
process.nextTick(() => {
callback();
});
await Promise.all(sendArr.slice());
const sendData = await sendP;
removeFirst(sendArr, sendP);
this.push(sendData);
};
return through2(options, fnTransform, fnFlush);
}
function _transform (message, enc, callback) {
var self = this;
var callbackCalled = false;
concurrent++;
if (concurrent < maxConcurrency) {
// Ask for more right away
callback();
} else {
// We're at the concurrency limit, save the callback for
// when we're ready for more
lastCallback = callback;
}
transform.call(this, message, enc, function (err) {
// Ignore multiple calls of the callback (shouldn't ever
// happen, but just in case)
if (callbackCalled) return;
callbackCalled = true;
if (err) {
self.emit('error', err);
} else if (arguments.length > 1) {
self.push(arguments[1]);
}
concurrent--;
if (lastCallback) {
var cb = lastCallback;
lastCallback = null;
cb();
}
if (concurrent === 0 && pendingFinish) {
pendingFinish();
pendingFinish = null;
}
});
}
// We need to pass in final to through2 even if the caller has
// not given us a final option so that it will wait for all
// transform callbacks to complete before emitting a "finish"
// and "end" event.
if (typeof options.final !== 'function') {
options.final = cbNoop;
}
// We also wrap flush to make sure anyone using an ancient version
// of through2 without support for final will get the old behaviour.
// TODO: don't wrap flush after upgrading through2 to a version with guaranteed `_final`
if (typeof flush !== 'function') {
flush = cbNoop;
}
// Flush is always called only after Final has finished
// to ensure that data from Final gets processed, so we only need one pending callback at a time
function callOnFinish (original) {
return function (callback) {
if (concurrent === 0) {
original.call(this, callback);
} else {
pendingFinish = original.bind(this, callback);
}
}
}
options.final = callOnFinish(options.final);
return through2(options, _transform, callOnFinish(flush));
};
module.exports.obj = function (options, transform, flush) {
if (typeof options === 'function') {
flush = transform;
transform = options;
options = {};
}
options.objectMode = true;
if (options.highWaterMark == null) {
options.highWaterMark = 16;
}
return module.exports(options, transform, flush);
};