Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. const _allStreams = {}
  2. const NotificationRoute = require('../routes/notification')
  3. /** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */
  4. const Stream = require('stream')
  5. const PassThrough = Stream.PassThrough
  6. const Transform = Stream.Transform
  7. const ENDER = { event: 'end', data: '' }
  8. /**
  9. * Stringify a stream
  10. * ?: I don't really get what this is doing
  11. * @param {Stream} event
  12. * @returns {string}
  13. */
  14. const _stringifyEvent = function (event) {
  15. let str = ''
  16. const endl = '\r\n'
  17. for (const i in event) {
  18. let val = event[i]
  19. if (val instanceof Buffer) {
  20. val = val.toString()
  21. }
  22. if (typeof val === 'object') {
  23. val = JSON.stringify(val)
  24. }
  25. str += i + ': ' + val + endl
  26. }
  27. str += endl
  28. return str
  29. }
  30. /**
  31. * Transform extension
  32. * ?: I don't really get what this is doing
  33. * @param {object} options
  34. * @param {object} objectMode
  35. */
  36. class Transformer extends Transform {
  37. constructor(options, objectMode) {
  38. super({ objectMode })
  39. options = options || {}
  40. this.counter = 1
  41. this.event = options.event || null
  42. this.generateId = options.generateId
  43. ? options.generateId
  44. : () => this.counter++
  45. }
  46. _transform(chunk, encoding, callback) {
  47. const event = {
  48. id: this.generateId(chunk),
  49. data: chunk,
  50. }
  51. if (this.event) {
  52. event.event = this.event
  53. }
  54. this.push(_stringifyEvent(event))
  55. callback()
  56. }
  57. _flush(callback) {
  58. this.push(_stringifyEvent(ENDER))
  59. callback()
  60. }
  61. }
  62. /**
  63. * Callback to decorate server toolkit (h)
  64. * !: Currently we only support ObjectMode streams
  65. * ?: I don't really get what this is doing
  66. * @param {Stream} event stream input
  67. * @param {Toolkit} h hapi common response toolkit
  68. * @param {object} streamOptions
  69. */
  70. const _event = (event, h, streamOptions) => {
  71. let active
  72. if (event instanceof Stream.Readable) {
  73. if (event._readableState.objectMode) {
  74. active = new PassThrough()
  75. const through = new Transformer(streamOptions, true)
  76. through.pipe(active)
  77. event.pipe(through)
  78. }
  79. return h
  80. .response(active)
  81. .header('content-type', 'text/event-stream')
  82. .header('content-encoding', 'identity')
  83. }
  84. }
  85. /**
  86. * Takes an open HTTP stream and writes
  87. * a msg to it, then fires notification plugin's
  88. * _event callback
  89. * @param {object} msg you want to send
  90. * @param {string} name <profileId>.<eventType>
  91. * @param {boolean} shouldInitialize
  92. * @param {Toolkit} h hapi common response toolkit
  93. */
  94. const onNotify = (name, msg, h, shouldInitialize = false) => {
  95. if (shouldInitialize) {
  96. _allStreams[name] = new PassThrough({ objectMode: true })
  97. }
  98. if (!_allStreams[name]) return
  99. _allStreams[name].write(msg)
  100. return _event(_allStreams[name], h, { event: name })
  101. }
  102. module.exports = {
  103. name: 'notification-plugin',
  104. version: '1.0.0',
  105. register: async server => {
  106. await server.route(NotificationRoute)
  107. server.method('notify', onNotify)
  108. },
  109. }