Deepstream is in maintenance mode, contributors needed

Cluster Node Plugin

Building your own cluster node plugin

Create a plugin that allows deepstream nodes to connect to each other and scale

You can see the code for this here

Configuring the plugin

  1. Via config.yml:
    clusterNode:
      path: ./cluster-node/vertical-cluster-node
  2. Via deepstream constructor:
    import { Deepstream } from '@deepstream/server'
    
    export const deepstream = new Deepstream({
        clusterNode: {
            path: './cluster-node/vertical-cluster-node',
            options: {}
        }
    })
    
    deepstream.start()
  3. Via deepstream setter:
    import { Deepstream } from '@deepstream/server'
    import VerticalClusterNode from './vertical-cluster-node'
    
    const deepstream = new Deepstream({})
    
    deepstream.set(
        'clusterNode', 
        new VerticalClusterNode({}, deepstream.getServices(), deepstream.getConfig())
    )
    
    deepstream.start()

Example Documented Plugin

import * as cluster from 'cluster'
import { EventEmitter } from 'events'
import { Message } from '@deepstream/protobuf/dist/types/messages';
import { TOPIC } from '@deepstream/protobuf/dist/types/all';
import { DeepstreamPlugin, DeepstreamClusterNode, DeepstreamServices, DeepstreamConfig } from '@deepstream/types';

if (cluster.isWorker) {
    process.on('message', (serializedMessage) => {
        const { serverName, message }: { serverName: string, message: Message } = JSON.parse(serializedMessage)
        VerticalClusterNode.emitter.emit(TOPIC[message.topic!], message, serverName)
    })
}

if (cluster.isMaster) {
    cluster.on('message', (worker, serializedMessage: string, handle) => {
        for (const id in cluster.workers) {
            const fromWorker = cluster.workers[id]!
            if (fromWorker !== worker) {
                worker.send(serializedMessage)
            }
        }
    })
}

/**
 * This class will allow deepstream to scale vertically using the cluster nodeJS approach. This is a POC,
 * but demonstrates how the API works!
 */
export default class VerticalClusterNode extends DeepstreamPlugin implements DeepstreamClusterNode {
    public static emitter = new EventEmitter()
    public description: string = 'Vertical Cluster Message Bus'
    private callbacks = new Map<string, any>()

    constructor (options: any, services: DeepstreamServices, private config: DeepstreamConfig) {
        super()
    }

    /**
     * Broadcast a message to all nodes in the server
     */ 
    public send (message: Message): void {
        process.send!(JSON.stringify({ message, fromServer: this.config.serverName }))
    }

    /**
     * Send a message to a specific version
     */
    public sendDirect (serverName: string, message: Message, metaData?: any): void {
        process.send!(JSON.stringify({ toServer: serverName, fromServer: this.config.serverName, message }))
    }

    /**
     * Subscribe to all messages on a certain topic on the server
     */
    public subscribe<SpecificMessage> (stateRegistryTopic: TOPIC, callback: (message: SpecificMessage, originServerName: string) => void): void {
        this.callbacks.set(TOPIC[stateRegistryTopic], callback)
        VerticalClusterNode.emitter.on(TOPIC[stateRegistryTopic], callback)
    }
}