Cluster Node Plugin
Building your own cluster node plugin
Create a plugin that allows deepstream nodes to connect to each other and scale
You can see the code for this here
Configuring the plugin
- Via config.yml:
clusterNode: path: ./cluster-node/vertical-cluster-node
- Via deepstream constructor:
import { Deepstream } from '@deepstream/server' export const deepstream = new Deepstream({ clusterNode: { path: './cluster-node/vertical-cluster-node', options: {} } }) deepstream.start()
- Via deepstream setter:
import { Deepstream } from '@deepstream/server' import VerticalClusterNode from './vertical-cluster-node' const deepstream = new Deepstream({}) deepstream.set( 'clusterNode', new VerticalClusterNode({}, deepstream.getServices(), deepstream.getConfig()) ) deepstream.start()
Example Documented Plugin
import * as cluster from 'cluster'
import { EventEmitter } from 'events'
import { Message } from '@deepstream/protobuf/dist/types/messages';
import { TOPIC } from '@deepstream/protobuf/dist/types/all';
import { DeepstreamPlugin, DeepstreamClusterNode, DeepstreamServices, DeepstreamConfig } from '@deepstream/types';
if (cluster.isWorker) {
process.on('message', (serializedMessage) => {
const { serverName, message }: { serverName: string, message: Message } = JSON.parse(serializedMessage)
VerticalClusterNode.emitter.emit(TOPIC[message.topic!], message, serverName)
})
}
if (cluster.isMaster) {
cluster.on('message', (worker, serializedMessage: string, handle) => {
for (const id in cluster.workers) {
const fromWorker = cluster.workers[id]!
if (fromWorker !== worker) {
worker.send(serializedMessage)
}
}
})
}
/**
* This class will allow deepstream to scale vertically using the cluster nodeJS approach. This is a POC,
* but demonstrates how the API works!
*/
export default class VerticalClusterNode extends DeepstreamPlugin implements DeepstreamClusterNode {
public static emitter = new EventEmitter()
public description: string = 'Vertical Cluster Message Bus'
private callbacks = new Map<string, any>()
constructor (options: any, services: DeepstreamServices, private config: DeepstreamConfig) {
super()
}
/**
* Broadcast a message to all nodes in the server
*/
public send (message: Message): void {
process.send!(JSON.stringify({ message, fromServer: this.config.serverName }))
}
/**
* Send a message to a specific version
*/
public sendDirect (serverName: string, message: Message, metaData?: any): void {
process.send!(JSON.stringify({ toServer: serverName, fromServer: this.config.serverName, message }))
}
/**
* Subscribe to all messages on a certain topic on the server
*/
public subscribe<SpecificMessage> (stateRegistryTopic: TOPIC, callback: (message: SpecificMessage, originServerName: string) => void): void {
this.callbacks.set(TOPIC[stateRegistryTopic], callback)
VerticalClusterNode.emitter.on(TOPIC[stateRegistryTopic], callback)
}
}