| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- const _allStreams = {}
-
- const NotificationRoute = require('../routes/notification')
-
- /** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */
-
- const Stream = require('stream')
- const PassThrough = Stream.PassThrough
- const Transform = Stream.Transform
-
- const ENDER = { event: 'end', data: '' }
-
- /**
- * Stringify a stream
- * ?: I don't really get what this is doing
- * @param {Stream} event
- * @returns {string}
- */
- const _stringifyEvent = function (event) {
- let str = ''
- const endl = '\r\n'
- for (const i in event) {
- let val = event[i]
- if (val instanceof Buffer) {
- val = val.toString()
- }
- if (typeof val === 'object') {
- val = JSON.stringify(val)
- }
- str += i + ': ' + val + endl
- }
- str += endl
- return str
- }
-
- /**
- * Transform extension
- * ?: I don't really get what this is doing
- * @param {object} options
- * @param {object} objectMode
- */
- class Transformer extends Transform {
- constructor(options, objectMode) {
- super({ objectMode })
- options = options || {}
- this.counter = 1
- this.event = options.event || null
- this.generateId = options.generateId
- ? options.generateId
- : () => this.counter++
- }
- _transform(chunk, encoding, callback) {
- const event = {
- id: this.generateId(chunk),
- data: chunk,
- }
- if (this.event) {
- event.event = this.event
- }
- this.push(_stringifyEvent(event))
- callback()
- }
- _flush(callback) {
- this.push(_stringifyEvent(ENDER))
- callback()
- }
- }
-
- /**
- * Callback to decorate server toolkit (h)
- * !: Currently we only support ObjectMode streams
- * ?: I don't really get what this is doing
- * @param {Stream} event stream input
- * @param {Toolkit} h hapi common response toolkit
- * @param {object} streamOptions
- */
- const _event = (event, h, streamOptions) => {
- let active
- if (event instanceof Stream.Readable) {
- if (event._readableState.objectMode) {
- active = new PassThrough()
- const through = new Transformer(streamOptions, true)
- through.pipe(active)
- event.pipe(through)
- }
- return h
- .response(active)
- .header('content-type', 'text/event-stream')
- .header('content-encoding', 'identity')
- }
- }
-
- /**
- * Takes an open HTTP stream and writes
- * a msg to it, then fires notification plugin's
- * _event callback
- * @param {object} msg you want to send
- * @param {string} name <profileId>.<eventType>
- * @param {boolean} shouldInitialize
- * @param {Toolkit} h hapi common response toolkit
- */
- const onNotify = (name, msg, h, shouldInitialize = false) => {
- if (shouldInitialize) {
- _allStreams[name] = new PassThrough({ objectMode: true })
- }
- if (!_allStreams[name]) return
- _allStreams[name].write(msg)
-
- return _event(_allStreams[name], h, { event: name })
- }
-
- module.exports = {
- name: 'notification-plugin',
- version: '1.0.0',
- register: async server => {
- await server.route(NotificationRoute)
- server.method('notify', onNotify)
- },
- }
|