39c8b14f |
1 | var Stream = require('stream').Stream; |
2 | var util = require('util'); |
3 | |
4 | module.exports = DelayedStream; |
5 | function DelayedStream() { |
6 | this.source = null; |
7 | this.dataSize = 0; |
8 | this.maxDataSize = 1024 * 1024; |
9 | this.pauseStream = true; |
10 | |
11 | this._maxDataSizeExceeded = false; |
12 | this._released = false; |
13 | this._bufferedEvents = []; |
14 | } |
15 | util.inherits(DelayedStream, Stream); |
16 | |
17 | DelayedStream.create = function(source, options) { |
18 | var delayedStream = new this(); |
19 | |
20 | options = options || {}; |
21 | for (var option in options) { |
22 | delayedStream[option] = options[option]; |
23 | } |
24 | |
25 | delayedStream.source = source; |
26 | |
27 | var realEmit = source.emit; |
28 | source.emit = function() { |
29 | delayedStream._handleEmit(arguments); |
30 | return realEmit.apply(source, arguments); |
31 | }; |
32 | |
33 | source.on('error', function() {}); |
34 | if (delayedStream.pauseStream) { |
35 | source.pause(); |
36 | } |
37 | |
38 | return delayedStream; |
39 | }; |
40 | |
41 | DelayedStream.prototype.__defineGetter__('readable', function() { |
42 | return this.source.readable; |
43 | }); |
44 | |
45 | DelayedStream.prototype.resume = function() { |
46 | if (!this._released) { |
47 | this.release(); |
48 | } |
49 | |
50 | this.source.resume(); |
51 | }; |
52 | |
53 | DelayedStream.prototype.pause = function() { |
54 | this.source.pause(); |
55 | }; |
56 | |
57 | DelayedStream.prototype.release = function() { |
58 | this._released = true; |
59 | |
60 | this._bufferedEvents.forEach(function(args) { |
61 | this.emit.apply(this, args); |
62 | }.bind(this)); |
63 | this._bufferedEvents = []; |
64 | }; |
65 | |
66 | DelayedStream.prototype.pipe = function() { |
67 | var r = Stream.prototype.pipe.apply(this, arguments); |
68 | this.resume(); |
69 | return r; |
70 | }; |
71 | |
72 | DelayedStream.prototype._handleEmit = function(args) { |
73 | if (this._released) { |
74 | this.emit.apply(this, args); |
75 | return; |
76 | } |
77 | |
78 | if (args[0] === 'data') { |
79 | this.dataSize += args[1].length; |
80 | this._checkIfMaxDataSizeExceeded(); |
81 | } |
82 | |
83 | this._bufferedEvents.push(args); |
84 | }; |
85 | |
86 | DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() { |
87 | if (this._maxDataSizeExceeded) { |
88 | return; |
89 | } |
90 | |
91 | if (this.dataSize <= this.maxDataSize) { |
92 | return; |
93 | } |
94 | |
95 | this._maxDataSizeExceeded = true; |
96 | var message = |
97 | 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.' |
98 | this.emit('error', new Error(message)); |
99 | }; |