| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- /** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */
-
- const Stream = require('stream')
- const PassThrough = Stream.PassThrough
- const Transform = Stream.Transform
-
- const ENDER = { event: 'end', data: '' }
-
- /**
- * Stringify a stream
- * ?: I don't really get what this is doing
- * @param {Stream} event
- * @returns {string}
- */
- const stringifyEvent = function (event) {
- let str = ''
- const endl = '\r\n'
- for (const i in event) {
- let val = event[i]
- if (val instanceof Buffer) {
- val = val.toString()
- }
- if (typeof val === 'object') {
- val = JSON.stringify(val)
- }
- str += i + ': ' + val + endl
- }
- str += endl
- return str
- }
-
- /**
- * Transform extension
- * ?: I don't really get what this is doing
- * @param {object} options
- * @param {object} objectMode
- */
- class Transformer extends Transform {
- constructor(options, objectMode) {
- super({ objectMode })
- options = options || {}
- this.counter = 1
- this.event = options.event || null
- this.generateId = options.generateId
- ? options.generateId
- : () => {
- return this.counter++
- }
- }
- _transform(chunk, encoding, callback) {
- const event = {
- id: this.generateId(chunk),
- data: chunk,
- }
- if (this.event) {
- event.event = this.event
- }
- this.push(stringifyEvent(event))
- callback()
- }
- _flush(callback) {
- this.push(stringifyEvent(ENDER))
- callback()
- }
- }
-
- /**
- * Take an event stream and write content to another stream
- * ?: Save this for future extension
- * @param {Stream} event stream input
- * @param {Stream} stream to write to
- */
- const writeEvent = function (event, stream) {
- if (event) {
- stream.write(stringifyEvent(event))
- } else {
- // closing time
- stream.write(stringifyEvent(ENDER))
- stream.end()
- }
- }
-
- /**
- * Callback to decorate server toolkit (h)
- * !: Currently we only support ObjectMode streams
- * ?: I don't really get what this is doing
- * @param {Stream} event stream input
- * @param {Toolkit} h hapi common response toolkit
- * @param {object} streamOptions
- */
- const onEvent = (event, h, streamOptions) => {
- // const state = h.request.plugins.notifications = h.request.plugins.notifications || {}
- let active
- if (event instanceof Stream.Readable) {
- if (event._readableState.objectMode) {
- const through = new Transformer(streamOptions, true)
- active = new PassThrough()
- through.pipe(active)
- event.pipe(through)
- }
- // else {
- // stream = new Transformer(streamOptions, false)
- // event.pipe(stream)
- // }
- console.log('streamOptions :', streamOptions)
- return h
- .response(active)
- .header('content-type', 'text/event-stream')
- .header('content-encoding', 'identity')
- }
- // Uncomment to do stream state stuff
- // handle a first object arg
- // if (!state.stream) {
- // active = new PassThrough()
- // state.stream = active
- // state.mode = 'object'
- // const response = h.response(active)
- // .header('content-type', 'text/event-stream')
- // .header('content-encoding', 'identity')
- // writeEvent(event, active)
- // return response
- // }
- // already have an object stream flowing, just write next event
- // active = state.stream
- // internals.writeEvent(event, active)
- }
-
- module.exports = { onEvent }
|