/** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */ const Stream = require('stream') const PassThrough = Stream.PassThrough const Transform = Stream.Transform const ENDER = { event: 'end', data: '' } /** * Stringify a stream * ?: I don't really get what this is doing * @param {Stream} event * @returns {string} */ const stringifyEvent = function (event) { let str = '' const endl = '\r\n' for (const i in event) { let val = event[i] if (val instanceof Buffer) { val = val.toString() } if (typeof val === 'object') { val = JSON.stringify(val) } str += i + ': ' + val + endl } str += endl return str } /** * Transform extension * ?: I don't really get what this is doing * @param {object} options * @param {object} objectMode */ class Transformer extends Transform { constructor(options, objectMode) { super({ objectMode }) options = options || {} this.counter = 1 this.event = options.event || null this.generateId = options.generateId ? options.generateId : () => { return this.counter++ } } _transform (chunk, encoding, callback) { const event = { id: this.generateId(chunk), data: chunk } if (this.event) { event.event = this.event } this.push(stringifyEvent(event)) callback() } _flush(callback) { this.push(stringifyEvent(ENDER)) callback() } } /** * Take an event stream and write content to another stream * ?: Save this for future extension * @param {Stream} event stream input * @param {Stream} stream to write to */ const writeEvent = function (event, stream) { if (event) { stream.write(stringifyEvent(event)) } else { // closing time stream.write(stringifyEvent(ENDER)) stream.end() } } /** * Callback to decorate server toolkit (h) * !: Currently we only support ObjectMode streams * ?: I don't really get what this is doing * @param {Stream} event stream input * @param {Toolkit} h hapi common response toolkit * @param {object} streamOptions */ const onEvent = (event, h, streamOptions) => { // const state = h.request.plugins.notifications = h.request.plugins.notifications || {} let active if (event instanceof Stream.Readable) { if (event._readableState.objectMode) { const through = new Transformer(streamOptions, true) active = new PassThrough() through.pipe(active) event.pipe(through) } // else { // stream = new Transformer(streamOptions, false) // event.pipe(stream) // } console.log('streamOptions :', streamOptions) return h.response(active) .header('content-type', 'text/event-stream') .header('content-encoding', 'identity') } // Uncomment to do stream state stuff // handle a first object arg // if (!state.stream) { // active = new PassThrough() // state.stream = active // state.mode = 'object' // const response = h.response(active) // .header('content-type', 'text/event-stream') // .header('content-encoding', 'identity') // writeEvent(event, active) // return response // } // already have an object stream flowing, just write next event // active = state.stream // internals.writeEvent(event, active) } module.exports = { onEvent }