Procházet zdrojové kódy

:recycle: refactoring again to push to notifications without dispatchers

tags/0.0.1^2
J před 3 roky
rodič
revize
603f90fbc4

+ 92
- 2
backend/lib/plugins/notification.js Zobrazit soubor

@@ -1,11 +1,101 @@
1 1
 const NotificationRoute = require('../routes/notification')
2
-const { onEvent } = require('../services/notification')
2
+
3
+/** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */
4
+
5
+const Stream = require('stream')
6
+const PassThrough = Stream.PassThrough
7
+const Transform = Stream.Transform
8
+
9
+const ENDER = { event: 'end', data: '' }
10
+
11
+const allStreams = {}
12
+
13
+/**
14
+ * Stringify a stream
15
+ * ?: I don't really get what this is doing
16
+ * @param {Stream} event
17
+ * @returns {string}
18
+ */
19
+const stringifyEvent = function (event) {
20
+    let str = ''
21
+    const endl = '\r\n'
22
+    for (const i in event) {
23
+        let val = event[i]
24
+        if (val instanceof Buffer) {
25
+            val = val.toString()
26
+        }
27
+        if (typeof val === 'object') {
28
+            val = JSON.stringify(val)
29
+        }
30
+        str += i + ': ' + val + endl
31
+    }
32
+    str += endl
33
+    return str
34
+}
35
+
36
+/**
37
+ * Transform extension
38
+ * ?: I don't really get what this is doing
39
+ * @param {object} options
40
+ * @param {object} objectMode
41
+ */
42
+class Transformer extends Transform {
43
+    constructor(options, objectMode) {
44
+        super({ objectMode })
45
+        options = options || {}
46
+        this.counter = 1
47
+        this.event = options.event || null
48
+        this.generateId = options.generateId
49
+            ? options.generateId
50
+            : () => this.counter++
51
+    }
52
+    _transform(chunk, encoding, callback) {
53
+        const event = {
54
+            id: this.generateId(chunk),
55
+            data: chunk,
56
+        }
57
+        if (this.event) {
58
+            event.event = this.event
59
+        }
60
+        this.push(stringifyEvent(event))
61
+        callback()
62
+    }
63
+    _flush(callback) {
64
+        this.push(stringifyEvent(ENDER))
65
+        callback()
66
+    }
67
+}
68
+
69
+/**
70
+ * Callback to decorate server toolkit (h)
71
+ * !: Currently we only support ObjectMode streams
72
+ * ?: I don't really get what this is doing
73
+ * @param {Stream} event stream input
74
+ * @param {Toolkit} h hapi common response toolkit
75
+ * @param {object} streamOptions
76
+ */
77
+const onEvent = (event, h, streamOptions) => {
78
+    let active
79
+    if (event instanceof Stream.Readable) {
80
+        if (event._readableState.objectMode) {
81
+            active = new PassThrough()
82
+            const through = new Transformer(streamOptions, true)
83
+            through.pipe(active)
84
+            event.pipe(through)
85
+        }
86
+        return h
87
+            .response(active)
88
+            .header('content-type', 'text/event-stream')
89
+            .header('content-encoding', 'identity')
90
+    }
91
+}
3 92
 
4 93
 module.exports = {
5 94
     name: 'notification-plugin',
6 95
     version: '1.0.0',
7
-    register: async (server, options) => {
96
+    register: async server => {
8 97
         await server.route(NotificationRoute)
9 98
         server.decorate('toolkit', 'event', onEvent)
99
+        server.expose('streams', allStreams)
10 100
     },
11 101
 }

+ 19
- 0
backend/lib/routes/membership/active.js Zobrazit soubor

@@ -6,6 +6,8 @@ const errorSchema = require('../../schemas/errors')
6 6
 const groupingSchema = require('../../schemas/groupings')
7 7
 const params = require('../../schemas/params')
8 8
 
9
+const PassThrough = require('stream').PassThrough
10
+
9 11
 const pluginConfig = {
10 12
     handlerType: 'grouping',
11 13
     docs: {
@@ -34,6 +36,19 @@ const responseSchemas = {
34 36
     error: errorSchema.single,
35 37
 }
36 38
 
39
+const dispatch = (streamName, streams, h) => {
40
+    const stream = streams[streamName]
41
+    if (!stream || !streams) return
42
+    const msg = {
43
+        name: 'MSHRM',
44
+        price: (500 + Math.floor(Math.random() * 100)).toString(),
45
+        order: null,
46
+        type: 'info',
47
+    }
48
+    stream.write(msg)
49
+    h.event(stream, h, { event: streamName })
50
+}
51
+
37 52
 module.exports = {
38 53
     method: 'GET',
39 54
     path: '/{profile_id}',
@@ -81,6 +96,10 @@ module.exports = {
81 96
                 return g
82 97
             })
83 98
 
99
+            const allStreams =
100
+                request.server.plugins['notification-plugin']['streams']
101
+            dispatch(`${profileId}.stonk`, allStreams, h)
102
+
84 103
             try {
85 104
                 return {
86 105
                     ok: true,

+ 8
- 9
backend/lib/routes/notification/index.js Zobrazit soubor

@@ -3,7 +3,6 @@ const apiSchema = require('../../schemas/api')
3 3
 const errorSchema = require('../../schemas/errors')
4 4
 const params = require('../../schemas/params')
5 5
 
6
-const Stream = require('stream')
7 6
 const PassThrough = require('stream').PassThrough
8 7
 
9 8
 const pluginConfig = {
@@ -28,8 +27,11 @@ module.exports = {
28 27
         cors: true,
29 28
         handler: async (request, h) => {
30 29
             const { profile_id } = request.params
31
-            const input = new PassThrough({ objectMode: true })
32
-            const eventType = 'stonk'
30
+
31
+            const streamName = `${profile_id}.stonk`
32
+            const allStreams =
33
+                request.server.plugins['notification-plugin']['streams']
34
+            allStreams[streamName] = new PassThrough({ objectMode: true })
33 35
 
34 36
             const msg = {
35 37
                 profile_id,
@@ -40,15 +42,12 @@ module.exports = {
40 42
             }
41 43
 
42 44
             // Write to the input stream
43
-            setInterval(() => {
44
-                msg.order = Math.floor(Math.random() * 2) === 1 ? 'BUY' : 'SELL'
45
-                input.write(msg)
46
-            }, 5000)
45
+            msg.order = Math.floor(Math.random() * 2) === 1 ? 'BUY' : 'SELL'
46
+            allStreams[streamName].write(msg)
47 47
 
48 48
             // h.event() Added at plugin registration
49 49
             // h is the toolkit
50
-            const streamOptions = { event: `${profile_id}.${eventType}` }
51
-            return h.event(input, h, streamOptions)
50
+            return h.event(allStreams[streamName], h, { event: streamName })
52 51
         },
53 52
 
54 53
         /** Validate based on validators object */

+ 0
- 128
backend/lib/services/notification.js Zobrazit soubor

@@ -1,128 +0,0 @@
1
-/** Heavily lifted from: https://github.com/mtharrison/susie/blob/master/lib/index.js */
2
-
3
-const Stream = require('stream')
4
-const PassThrough = Stream.PassThrough
5
-const Transform = Stream.Transform
6
-
7
-const ENDER = { event: 'end', data: '' }
8
-
9
-/**
10
- * Stringify a stream
11
- * ?: I don't really get what this is doing
12
- * @param {Stream} event
13
- * @returns {string}
14
- */
15
-const stringifyEvent = function (event) {
16
-    let str = ''
17
-    const endl = '\r\n'
18
-    for (const i in event) {
19
-        let val = event[i]
20
-        if (val instanceof Buffer) {
21
-            val = val.toString()
22
-        }
23
-        if (typeof val === 'object') {
24
-            val = JSON.stringify(val)
25
-        }
26
-        str += i + ': ' + val + endl
27
-    }
28
-    str += endl
29
-    return str
30
-}
31
-
32
-/**
33
- * Transform extension
34
- * ?: I don't really get what this is doing
35
- * @param {object} options
36
- * @param {object} objectMode
37
- */
38
-class Transformer extends Transform {
39
-    constructor(options, objectMode) {
40
-        super({ objectMode })
41
-        options = options || {}
42
-        this.counter = 1
43
-        this.event = options.event || null
44
-        this.generateId = options.generateId
45
-            ? options.generateId
46
-            : () => {
47
-                  return this.counter++
48
-              }
49
-    }
50
-    _transform(chunk, encoding, callback) {
51
-        const event = {
52
-            id: this.generateId(chunk),
53
-            data: chunk,
54
-        }
55
-        if (this.event) {
56
-            event.event = this.event
57
-        }
58
-        this.push(stringifyEvent(event))
59
-        callback()
60
-    }
61
-    _flush(callback) {
62
-        this.push(stringifyEvent(ENDER))
63
-        callback()
64
-    }
65
-}
66
-
67
-/**
68
- * Take an event stream and write content to another stream
69
- * ?: Save this for future extension
70
- * @param {Stream} event stream input
71
- * @param {Stream} stream to write to
72
- */
73
-const writeEvent = function (event, stream) {
74
-    if (event) {
75
-        stream.write(stringifyEvent(event))
76
-    } else {
77
-        // closing time
78
-        stream.write(stringifyEvent(ENDER))
79
-        stream.end()
80
-    }
81
-}
82
-
83
-/**
84
- * Callback to decorate server toolkit (h)
85
- * !: Currently we only support ObjectMode streams
86
- * ?: I don't really get what this is doing
87
- * @param {Stream} event stream input
88
- * @param {Toolkit} h hapi common response toolkit
89
- * @param {object} streamOptions
90
- */
91
-const onEvent = (event, h, streamOptions) => {
92
-    // const state = h.request.plugins.notifications = h.request.plugins.notifications || {}
93
-    let active
94
-    if (event instanceof Stream.Readable) {
95
-        if (event._readableState.objectMode) {
96
-            const through = new Transformer(streamOptions, true)
97
-            active = new PassThrough()
98
-            through.pipe(active)
99
-            event.pipe(through)
100
-        }
101
-        // else {
102
-        //     stream = new Transformer(streamOptions, false)
103
-        //     event.pipe(stream)
104
-        // }
105
-        console.log('streamOptions :', streamOptions)
106
-        return h
107
-            .response(active)
108
-            .header('content-type', 'text/event-stream')
109
-            .header('content-encoding', 'identity')
110
-    }
111
-    // Uncomment to do stream state stuff
112
-    // handle a first object arg
113
-    // if (!state.stream) {
114
-    //     active = new PassThrough()
115
-    //     state.stream = active
116
-    //     state.mode = 'object'
117
-    //     const response = h.response(active)
118
-    //         .header('content-type', 'text/event-stream')
119
-    //         .header('content-encoding', 'identity')
120
-    //     writeEvent(event, active)
121
-    //     return response
122
-    // }
123
-    // already have an object stream flowing, just write next event
124
-    // active = state.stream
125
-    // internals.writeEvent(event, active)
126
-}
127
-
128
-module.exports = { onEvent }

Načítá se…
Zrušit
Uložit