const { allStreams } = require('../utils') const NotificationRoute = require('../routes/notification') /** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */ const Stream = require('stream') const PassThrough = Stream.PassThrough const Transform = Stream.Transform const ENDER = { event: 'end', data: '' } /** * Stringify a stream * ?: I don't really get what this is doing * @param {Stream} event * @returns {string} */ const stringifyEvent = function (event) { let str = '' const endl = '\r\n' for (const i in event) { let val = event[i] if (val instanceof Buffer) { val = val.toString() } if (typeof val === 'object') { val = JSON.stringify(val) } str += i + ': ' + val + endl } str += endl return str } /** * Transform extension * ?: I don't really get what this is doing * @param {object} options * @param {object} objectMode */ class Transformer extends Transform { constructor(options, objectMode) { super({ objectMode }) options = options || {} this.counter = 1 this.event = options.event || null this.generateId = options.generateId ? options.generateId : () => this.counter++ } _transform(chunk, encoding, callback) { const event = { id: this.generateId(chunk), data: chunk, } if (this.event) { event.event = this.event } this.push(stringifyEvent(event)) callback() } _flush(callback) { this.push(stringifyEvent(ENDER)) callback() } } /** * Callback to decorate server toolkit (h) * !: Currently we only support ObjectMode streams * ?: I don't really get what this is doing * @param {Stream} event stream input * @param {Toolkit} h hapi common response toolkit * @param {object} streamOptions */ const onEvent = (event, h, streamOptions) => { let active if (event instanceof Stream.Readable) { if (event._readableState.objectMode) { active = new PassThrough() const through = new Transformer(streamOptions, true) through.pipe(active) event.pipe(through) } return h .response(active) .header('content-type', 'text/event-stream') .header('content-encoding', 'identity') } } module.exports = { name: 'notification-plugin', version: '1.0.0', register: async server => { await server.route(NotificationRoute) server.decorate('toolkit', 'event', onEvent) server.expose('streams', allStreams) }, }