Release: Deepstream 4.0

The massively upgraded deepstream server! Introducing too many changes to be contained within a description!

Features:

  • New protobuf protocol support (under the hood)
  • Bulk actions instead of individual subscribes (under the hood)
  • Official Plugin Support
  • Monitoring Support
  • Clustering Support (with small caveats)
  • Listening Discovery Simplification
  • V2 storage API
  • V2 cache API
  • Notify API

Improvements

  • Lazy data parsing
  • Improved deepstream lifecycle
  • Upgraded development tools
  • New deepstream.io website

Backwards compatibility

  • All V3 SDKs no longer compatible due to protobuf binary protocol

Upgrade guide

You can see the upgrade guide for backwards compatibility here

TLDR:

Unsupported SDKs

I wanted to leave this part till the end, but it’s the biggest loss with upgrading to V4 and will be an instant blocker for some users.

We are sad to say that we haven’t yet migrated the V3 non browser and node SDKs to V4. The reason is that underlying protocol has changed and the way SDKs were written in V3 constructed and parsed string messages all over the code base. This design has unfortunately meant that while we could write a binary to text parser in the Java SDK it would just make it maintenance hell.

Our Swift SDK has been ambitious from the start, using J2OBJC in order to convert the java code to Objective C with thick polyfills for java methods. While this approach has generally worked, it is really hard to maintain and build.

Our goal going forward is to write a single Kotlin SDK that can run on both iOS and Java. We would also have it run a much more minimal set of functionality, allowing the SDK to only consume strings rather than objects. This would allow us to integrate easily with many of the different flavours of JSON libraries out there.

Because we used Protobuf, however, the good news is we can easily create protocol objects using a generator, which means we can immediately focus on SDKs and less so on lower level binary parsing!

This website

There has been a lot of feedback on the differences between our deepstreamHub and deepstream documentation and offerings, where some users were not certain where the line was drawn between open source and enterprise. We also have over a hundred pages of documentation in a world where some of yesterday’s hot trends (For example Knockout, AngularJS) have been replaced by others (React, Vue). And even within the one library, approaches have been deprecated, replaced or advised against (React mixins, stateful components, and hooks). While we love keeping up to date with all the latest chatter in DevOps and developer land, it’s pretty much impossible to do so while also focusing on integrating important features into deepstream’s core. As such, I’m happy to say we have migrated all of our OS documentation and website back to opensource using the amazing Gatsby framework. Every page can now be edited by the community, and adding pages is as easy as writing a markdown document, adding some images and letting the build take care of the rest. If you would like to do anything fancy, you are more than welcome to add a React component! It’s worth noting that while all content has been migrated across the css can and will still need an insane amount of ❤️ since that was ported and not rewritten from the original website.

Binary Protocol

Binary Protocol

The driver behind pretty much all of the V4 refactor was our move from our old text based protocol to binary. It makes building SDKs and new features so much easier. Seriously. LIKE SO MUCH EASIER.

Okay so first things first, the structure of text vs binary messages:

V3 -Text:

TOPIC | ACTION | meta1 | meta2 | ...metaN | payload +

This string had the initial TOPIC and ACTION read by the parser to find out where to route it, and the rest of the data was figured out within the code module that dealt with it. This gave some benefits like only parsing a full message once its actually required, but also meant that the message parsing code was distibuted and adding for example a meta field would require lots of refactoring. Tests also had to create text based messages even when testing internal code paths. Payload serialization also didn't use JSON, but instead used a custom form of serialization to minimize bandwidth: U for undefined, T for true, F for false, O for object, S prefix for string and a N prefix for number.

So the message object in V3 SDKs and server were like:

{
    "topic": "R",
    "action": "S",
    "data": ["A", "recordName"]
}

V4 - Binary:

The binary protocol is implemented using protobuf. The decision to use proto was due to its wide support of other languages, it's ease of formats and how quickly we managed to get it implemented.

The main message is simply this:

message Message {
  TOPIC topic = 2;
  bytes message = 3;
}

While individual messages use a combination of an action enum and fields.

For example, the event message looks something like this:

message EventMessage {
    required EVENT_ACTION action = 1;
    string data = 2;
    string correlationId = 3;
    bool isError = 4;
    bool isAck = 5;
    string name = 6;

    repeated string names = 7;
    string subscription = 8;

    TOPIC originalTOPIC = 10;
    EVENT_ACTION originalAction = 11;
}

An example representation that deepstream would get translated within the JS SDKs looks like this:

{
    "topic": 3,
    "action": 2,
    "isAck": true,
    "name": "event"
}

This makes writing code alot easier. At the time of writing the full message API that can be consumed is as follows:

export interface Message {
    topic: TOPIC
    action: ALL_ACTIONS
    name?: string

    isError?: boolean
    isAck?: boolean

    data?: string | Buffer
    parsedData?: RecordData | RPCResult | EventData | AuthData

    parseError?: false

    // listen
    subscription?: string

    originalTopic?: TOPIC | STATE_REGISTRY_TOPIC
    originalAction?: ALL_ACTIONS
    names?: Array<string>
    reason?: string

    // connection
    url?: string
    protocolVersion?: string

    // record
    isWriteAck?: boolean
    correlationId?: string
    path?: string
    version?: number
    versions?: { [index: string]: number }

    // state
    checksum?: number
    fullState?: Array<string>
    serverName?: string
    registryTopic?: TOPIC

    // cluster
    leaderScore?: number
    externalUrl?: string,
    role?: string

    // lock
    locked?: boolean
}

Using this approach has made adding new features and maintaining current ones significantly easier. And the given combination of TOPICs and ACTIONs we can pretty much ensure we'll be able to extend it without running out of space any time soon.

Cons

It wouldn't be fair to say that this overhaul has no downsides. There have been some sacrifices that we had to make along the way.

1) If you count messages in the billions, those extra bytes add up. Data bandwidth is quite expensive on cloud systems so lack of compression isn't just a latency issue anymore. Protobuf has some very good compression algorithms which defeats JSON objects in most cases.

Why yet another proprietary standard?

Because deepstream offers some very specific features, and has alot more on the way. For example we currently have a unique concept such as listening. Trying to use a realtime standard (which there aren't many of) would seriously hinder development. That being said deepstream allows swapping out of protocols quite easily as long as theres an interop layer so feel free to create compatibility protocols to work with your favourite SDKs!

Typescript

We converted the majority of the codebase to typescript, for the benefit of future code maintenance as well making it easier for people to contribute.

This also means that we now have declarations for all possible plugin interfaces which should make it much easier for people to write their own, once they fork the V4 plugin template.

Current custom external plugins are:

  • Authentication
  • Permissioning
  • Storage and Cache
  • Logger
  • Connection Endpoints
  • State Registry Factory
  • Subscription Registry Factory
  • Monitoring
  • Generic Plugins

All these plugins need to extend or implement the same plugin interface (via the @deepstream/types package)

abstract class DeepstreamPlugin<PluginConfig> {
  public abstract description: string
  constructor (pluginConfig: PluginConfig, services: DeepstreamServices, config: DeepstreamConfig)
  public init? (): void
  public async whenReady (): Promise<void> {}
  public async close (): Promise<void> {}
  public setConnectionListener (connectionListener: ConnectionListener): void
}

Improvements to the startup lifecyle also means that deepstream now launches everything in the following order:

  1. Logger
  2. Services
  3. Handlers
  4. Plugins
  5. Connection Endpoints

This means by the time your custom plugins are initialized, Deepstream has all the services started. The reason why this isn’t the last lifecyle before running is because once the server is stopped you would usually want to drain all the connections before stopping your own custom logic. If you need to access the connection-endpoint directly please raise an issue: it’s easy to add a hook, however, simpler APIs are always better.

Monitoring

A simple monitoring interface was added to monitor statistics from deepstream:

interface DeepstreamMonitoring  {
  onErrorLog (loglevel: LOG_LEVEL, event: EVENT, logMessage: string): void
  onLogin (allowed: boolean, endpointType: string): void
  onMessageRecieved (message: Message): void
  onMessageSend (message: Message): void
  onBroadcast (message: Message, count: number): void
}

Remember that these hooks are callbacks used to recieve live updates from deepstream internals. If you want to query deepstream for more verbose stats you can easily access those from deepstream services.

For example, getting all the events subscribed to via a HTTP endpoint would be:

class HTTPMonitoringEndpoint extends DeepstreamPlugin implements Monitoring {
    // setup HTTP server and implement interface

    public async whenReady () {
        if (!this.isReady) {
            await new Promise(resolve => this.server.on('ready', resolve))
        }
    }

    private addHTTPEndpoints () {
        this.server.
            get('/events', (req, res) => {
                const eventsSubscribedTo = this.services.getSubscriptionsRegistry(TOPIC.EVENT).getNames()
                res.json(eventsSubscribedTo)
            })
        }
    }
}

Further endpoints can be exposed if a usecase is proposed via a feature request.

Clustering

Clustering is a touchy topic. Up to version three we had it in opensource, but given that we needed to understand our market fit and generate an actual income we took it out of V3 and added in HTTP support instead. I’m very happy to announce that although (here’s the caveat) you need to write your own message bus, we have included all the logic for actual clustering in the OS version and got over 100 complex end to end tests running with it.

So how does it work?

Using the following plugins:

  • Cluster node

A cluster node is the core of clustering and is responsible for serializing the messages to send, as well as subscribing to any messages in the cluster it may be interested in. The default supplied version with deepstream is a vertical message bus for use with node clustering. It’s very easy to write your own though!

interface DeepstreamClusterNode  {
  // Broadcast a message to all nodes
  send (message: Message): void
  // Send a message directly to one node. When using PUB/SUB this is simply adding a `toServer` property on the payload
  sendDirect (serverName: string, message: Message): void
  // Subscribe to messages on the event bus
  subscribe<SpecificMessage> (stateRegistryTopic: TOPIC | STATE_REGISTRY_TOPIC, callback: (message: SpecificMessage, originServerName: string) => void): void
}
  • Locks

A lock registry allows a single node (the cluster leader) to get or release a node. This is currently implemented via a distributed central cluster nominated leader. But if you wanted to you could use a redis cache as easily and get rid of the extra step of having a leader hold onto locks!

type LockCallback = (locked: boolean) => void
interface DeepstreamLockRegistry  {
  // Request a lock that is across the entire cluster
  get (lock: string, callback: LockCallback): void
  // Release the lock
  release (lock: string): void
}
  • State Registries Factory

The state registry is responsible for holding the state of subscriptions across the cluster. The default implementation is distributed, using add/remove and reconciliation checks. However, this is one of the more expensive operations in deepstream due to consistency checks. By being a plugin we could also use a Redis based approach, as long as we figure out how to clear down the state if a server died ungracefully.

type StateRegistryCallback = (name: string) => void
interface StateRegistry {
  // The name is registered somewhere on the cluster
  has (name: string): boolean
  // Add the name, called multiple times (so you can figure out how many subscriptions exist on one node)
  add (name: string): void
  // Remove the name, called multiple times
  remove (name: string): void

  // Callback to whenever a name is added (only on first add)
  onAdd (callback: StateRegistryCallback): void
  // Callback to whenever a name is removed (only on last remove)
  onRemove (callback: StateRegistryCallback): void

  // Return all the names (in total or scoped to a server)
  getAll (serverName?: string): string[]
  // Return all the servers who have the name
  getAllServers (subscriptionName: string): string[]
  // Called when a server is removed from cluster for general cleanup
  removeAll (serverName: string): void
}

interface StateRegistryFactory extends DeepstreamPlugin {
  // Factory function
  getStateRegistry (topic: TOPIC | STATE_REGISTRY_TOPIC): StateRegistry
}
  • Cluster Registry

The cluster registry is simply a registry to maintain the current state of the cluster and figure out who the leader is.

interface ClusterRegistry {
  // Is this node the leader?
  isLeader (): boolean
  // What is the server name of the leader?
  getLeader (): string
  // Get the names of all servers in the cluster
  getAll (): string[]
}

Performance Improvements

Things have changed quite a bit in the NodeJS world. Node 10 came out with the inclusion of a new garbage collector, async/await has changed the coding landscape, and V8 has been optimized for all the ES6 improvements. However, there’s unfortunately a bit of a dark side to all of this. In order to improve performance for the ES6 features most developers now use, the actual performance of ES5 has taken a hit. While there were talks about potentially switching to a totally different language instead a total rewrite would have been absolutely impossible. So instead we targeted what I like to call optimistic optimizations, which mean in the worst-case scenario it won’t make any difference at all, but if you’re lucky you could get boosts of multiple factors.

So what falls under these optimizations?

In this current release there are three parts:

Lazy data parsing

So the downside behind using JSON as a data payload is that its not exactly fast. Without knowing your schema upfront and given that each record, event or request/response can literally contain anything, there’s little we can do currently to improve that. So what we do instead is to just ignore the whole parsing aspect altogether on the server unless needed. What this means is as far as deepstream is concerned, as long as you don’t need to access the data you’ll never actually parse it. There are three places where the data payload is actually required.

  1. Permissions, only if you access the data value.
  2. Record patches. A record patch (setting a value with a path) has to apply the patch onto the current value requiring both the previous and value to be parsed (bandwidth vs CPU usage tradeoff).
  3. Storage adaptors. This is unfortunately unavoidable currently as some storage adaptors don’t accept buffers or strings directly. This means even though we pass the data all the way to the storage SDK optimally we have to parse it just for the SDK to serialize it again =(. On that topic as well node hasn’t made it too easy with most libraries using the Buffer wrapper while ignoring the more optimal (and not so nice to use) Array Buffer. We are looking at extending our storage API’s going forward to allow deepstream to pick between a buffer and string argument to allow optimal paths when possible.

Seperation of data storage concerns

This one has been a bit of an interesting decision from day one. We initially in V1 had data stored in records with the following nesting:

{
    _v: 1,
    _d: { "status": "DONE" }
}

That just made searching an absolute pain, so what we done is transformed the data to instead store it as follows:

{
    __ds: {
        _v: 1
    },
    "status": "DONE"
}

The reason it’s an object instead is in case we ever decided to add more metadata going forward. The issue with this, however, is we needed to load the entire record into memory and transform it whenever we want to do anything. When you start thinking in bulk (hundreds or thousands of subscriptions) the objects, CPU cycles, and immediate gc this uses is just, well, useless.

So how did we decide on optimizing this? By no longer doing any of the transform logic in the core server. This means rather than deepstream calling into storage using this:

public set (
    name: string, 
    data: { __ds: { _v: number }, ...recordData }, 
    (error: string) => void
)

We do this:

public set (
    name: string, 
    version: number,
    data: RecordData, 
    (error: string) => {}
)

It looks like a tiny change and for all our current adaptors it’s fully backwards compatible. However the goal is for us to start using things like custom Redis commands to store these entries seperately in the cache:

Name Example value Description
recordName_version 5 The record version
recordName_data { “name”: “Purist” } The data untouched by deepstream

This allows us to then do awesome things going forward like:

  • Validating the the version number doesn’t conflict with the one in the cache rather than in the server, critical when clustering
  • Only requesting the version number of records instead of the entire data-set when using offline-storage or doing a head/has
  • Potentially storing deepstream data in a meta collection for clear seperation

Bulk Subscription APIs

This was probably one of the biggest under the hood improvements, and although it can still be seriously optimized going forward it has already shown a huge performance boost.

So whats the difference?

In V3 if you’ve subscribed to a few thousand records the only optimization that would occur is that it would be sent as an individual websocket frame. So something like this (excuse the repetitiveness):

Sends:

Topic Action Name
RECORD SUBSCRIBE record1
RECORD SUBSCRIBE record2
RECORD SUBSCRIBE record3

And would have recieved the following responses:

Recieves:

Topic Action Name
RECORD SUBSCRIBE_ACK record1
RECORD SUBSCRIBE_ACK record2
RECORD SUBSCRIBE_ACK record3

Where now instead what would happen is:

Recieves:

Topic Action Names
RECORD SUBSCRIBE[record1, record2, record3] false

Sends:

Topic Action isAck
RECORD SUBSCRIBE true

This gives deepstream a massive boost in performance as it doesn’t have to care about individual records. However in terms of permissions it still calls into the permission handler to run them on a per name basis to ensure the same level of granualirity.

Bulk Head APIs

The new cache now implements headBulk which is an insanely quicker way to do bulk hydration when a client reconnects or loads from offline.

The idea being rathern than doing multiple calls to a cache, getting the response, parsing out the version and sending those one by one to the client we now just do it one go.

export type StorageHeadBulkCallback = (error: string | null, versions?: { [index: string]: number }, missing?: string[]) => void
public headBulk (names: string[], callback: StorageHeadBulkCallback)

The reason why we have a missing argument in the response is because we aren’t guaranteed that everything we requested is in the cache. If thats the case then it will load the missing records from storage using the classic recordRequest approach and send those lazily as they are loaded.

Notify API

Deepstream is used in multiple different ways and scenarios. Some users have played with it on their IoT devices, combining HTTP output from sensors and WebSocket connections for interactive realtime dashboards. Others have used it for communication applications and gaming. However, one thing was pretty much always a given, integrating deepstream for records into an existing app, especially one that is crud/HTTP based was a pain. Your application would update your data via your current trusted/battle-tested approach, and you just want deepstream to consume those changes. Prior to this, you would need to set the data using setData APIs. Not any more! As long your database is deepstream compatible (and if not, you can always write a thin StoragePlugin to conform to your standards!) then all you need to do is notify deepstream something changed in the database.

Lets see code!

Previously:

// Pseduo DB API
await this.db.save(name, value)
await this.ds.record.setDataWithAck(name, value)
// Now
await this.db.save(name, value)
await httpPost('/', {
    topic: 'record',
    action: 'notify',
    names: [name]
})

Okay so this doesn’t look more performant does it 😅

Lets dive a tiny bit into what happens on the first.

await this.ds.record.setDataWithAck(name, value)

This will do the following on deepstream: 1) Permission the record on deepstream 2) Load the data from cache to increment version, if it doesn’t exist get from DB 3) Set the data in database and cache 4) Respond with a successful write 5) Notify all interested users 6) Send to other nodes in cluster to notify their users

That is done for every single update. So if you’re updating a thousand records that’s 6000 steps. So if you’re writing things really quickly it will get quite busy quite fast. Previously we optimized this by having config-options (still supported) like hotPathPrefixes which skips the second step.

Now with

await httpPost('/', {
    topic: 'record',
    action: 'notify',
    names: [... 100 thousand names ...]
})

This will do the following on deepstream: 1) Check if the user can use notify, which is a blanket yes or no permission (god mode) 2) Delete the data from cache using a bulkAPI 3) Check for all active subscriptions, retrieve data from storage and set in cache (repeated) 4) Send to other nodes in cluster to notify their users

All steps other than 3 are run once, and the data is only loaded from storage if a user on that deepstream node is interested. This means that we can go down from time in minutes to milliseconds in order to get thousands of updated across!

Changing development tools

In order to be consistent with all our other repos we have focused on minimizing the amount of variations between toolsets. As such we now have a consistent toolset of mocha, sinon and typescript for our V4 development environments. All adaptors also now use docker to run their tests, as it really simplifies testing and development for all the seperate variations.