const Stream = require('stream') const PassThrough = Stream.PassThrough const Transform = Stream.Transform const NotificationRoute = require('../routes/notification') const stringifyEvent = function (event) { let str = '' const endl = '\r\n' for (const i in event) { let val = event[i] if (val instanceof Buffer) { val = val.toString() } if (typeof val === 'object') { val = JSON.stringify(val) } str += i + ': ' + val + endl } str += endl return str } class Transformer extends Transform { constructor(options, objectMode) { super({ objectMode }) options = options || {} this.counter = 1 this.event = options.event || null this.generateId = options.generateId ? options.generateId : () => { return this.counter++ } } _transform (chunk, encoding, callback) { const event = { id: this.generateId(chunk), data: chunk } if (this.event) { event.event = this.event } this.push(stringifyEvent(event)) callback() } _flush(callback) { this.push(stringifyEvent({ event: 'end', data: '' })) callback() } } const writeEvent = function (event, stream) { if (event) { stream.write(stringifyEvent(event)) } else { // closing time stream.write(stringifyEvent({ event: 'end', data: '' })) stream.end() } } const onEvent = (event, h, streamOptions) => { // We only support ObjectMode streams if (!event._readableState.objectMode) return const through = new Transformer(streamOptions, true) const active = new PassThrough() through.pipe(active) event.pipe(through) console.log(event) return h.response(active) .header('content-type', 'text/event-stream') .header('content-encoding', 'identity') } module.exports = { name: 'notification-plugin', version: '1.0.0', register: async (server, options) => { await server.route(NotificationRoute) server.decorate('toolkit', 'event', onEvent) } }