const _allStreams = {} const NotificationRoute = require('../routes/notification') /** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */ const Stream = require('stream') const PassThrough = Stream.PassThrough const Transform = Stream.Transform const ENDER = { event: 'end', data: '' } /** * Stringify a stream * ?: I don't really get what this is doing * @param {Stream} event * @returns {string} */ const _stringifyEvent = function (event) { let str = '' const endl = '\r\n' for (const i in event) { let val = event[i] if (val instanceof Buffer) { val = val.toString() } if (typeof val === 'object') { val = JSON.stringify(val) } str += i + ': ' + val + endl } str += endl return str } /** * Transform extension * ?: I don't really get what this is doing * @param {object} options * @param {object} objectMode */ class Transformer extends Transform { constructor(options, objectMode) { super({ objectMode }) options = options || {} this.counter = 1 this.event = options.event || null this.generateId = options.generateId ? options.generateId : () => this.counter++ } _transform(chunk, encoding, callback) { const event = { id: this.generateId(chunk), data: chunk, } if (this.event) { event.event = this.event } this.push(_stringifyEvent(event)) callback() } _flush(callback) { this.push(_stringifyEvent(ENDER)) callback() } } /** * Callback to decorate server toolkit (h) * !: Currently we only support ObjectMode streams * ?: I don't really get what this is doing * @param {Stream} event stream input * @param {Toolkit} h hapi common response toolkit * @param {object} streamOptions */ const _event = (event, h, streamOptions) => { let active if (event instanceof Stream.Readable) { if (event._readableState.objectMode) { active = new PassThrough() const through = new Transformer(streamOptions, true) through.pipe(active) event.pipe(through) } return h .response(active) .header('content-type', 'text/event-stream') .header('content-encoding', 'identity') } } /** * Takes an open HTTP stream and writes * a msg to it, then fires notification plugin's * _event callback * @param {object} msg you want to send * @param {string} name . * @param {boolean} shouldInitialize * @param {Toolkit} h hapi common response toolkit */ const onNotify = (name, msg, h, shouldInitialize = false) => { if (shouldInitialize) { _allStreams[name] = new PassThrough({ objectMode: true }) } if (!_allStreams[name]) return _allStreams[name].write(msg) return _event(_allStreams[name], h, { event: name }) } module.exports = { name: 'notification-plugin', version: '1.0.0', register: async server => { await server.route(NotificationRoute) server.method('notify', onNotify) }, }