We're planting a tree for every job application! Click here to learn more

Trying To Create A Message Bus Using An RxJS Subject In Angular 6.1.10

Ben Nadel

26 Oct 2018

•

9 min read

Trying To Create A Message Bus Using An RxJS Subject In Angular 6.1.10
  • JavaScript

CAUTION: I'm very new to RxJS. And, I'm not particularly good at TypeScript. As such, this post should be viewed as an experiment and not as a confident suggestion.

I've been thinking a lot about state-management in an Angular application lately. And, one of the thoughts that I keep coming back to is the fact that not every event in an application pertains directly to state-change. As such, I believe that not every event should be piped directly through a state container like Redux. This means that we need to create a parallel communication pathway within our Angular applications. In the past, for this purpose, I've created simple very "message buses" that just use simple string-comparison. But, now that we have strong-typing with TypeScript and a powerful streaming utility like RxJS, I wanted to see if I could use something like an RxJS Subject to create a type-safe(ish) message bus in Angular 6.1.10.

##Check out this video

Run this demo in my JavaScript Demos project on GitHub.

View this code in my JavaScript Demos project on GitHub.

Now, when I say "type safe", I am not referring to the Message Bus itself. Since a message bus is - by definition - open-ended, I don't think it's necessary (or worthwhile) to consider type safety at that level. In fact, with the lazy loading of feature modules, I am not sure that it's even feasible for a message bus to be comprehensively aware of the events that it may transport over its life-time.

So, when I say "type safe", I am referring to what we can know when inside the message bus event handlers. And, to that end, I wanted to take the same kind of path that all of the other action-oriented systems take: I'm going to create a set of action classes that assert a "type" property that can be used in a discriminated union.

But, I wanted to try a slightly different approach. Rather than individually exporting classes and action types, I wanted to see if I could keep the two concepts more closely coupled. To do this, I tried defining the "type" as a static property on the action class; and then, using that static property to also define the instance property. And, to much celebration, this seems to work quite well. It allows a single Class to house the value on both sides of the discriminated union comparison.

I also wanted to try and cut down on some repetitive code; so, I created a generic abstract base class that would take care of assigning the payload:

// I am the base-class for all of the events that this application pushes onto the
// MessageQueue. The only guarantee that this class makes is a read-only Type.
abstract class Event {
 
    public readonly type: string;
 
}
 
// I am the sub-class / base-class for all of the payload-heavy events that this
// application pushes onto the MessageQueue. This class guarantees a payload with a
// given interface.
abstract class EventWithPayload<T> extends Event {
 
    public readonly payload: T;
 
    constructor( payload: T ) {
 
        super();
        this.payload = payload;
 
    }
 
}
 
// ----------------------------------------------------------------------------------- //
// Each of the following classes has both a STATIC and an INSTANCE [type] of the same
// value (the instance value is read from the static value). This allows you to use the
// instance type in a discriminating union while comparing it to the static type. In
// other words, you only have to import the one Event class to get access to both values.
// ----------------------------------------------------------------------------------- //
 
export interface EventTypeAPayload {
    foo: string;
}
 
export class EventTypeA extends EventWithPayload<EventTypeAPayload> {
    static readonly type = "EventTypeA";
    public readonly type = EventTypeA.type;
}
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
export interface EventTypeBPayload {
    bar: string;
}
 
export class EventTypeB extends EventWithPayload<EventTypeBPayload> {
    static readonly type = "EventTypeB";
    public readonly type = EventTypeB.type;
}
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
export interface EventTypeCPayload {
    baz: string;
}
 
export class EventTypeC extends EventWithPayload<EventTypeCPayload> {
    static readonly type = "EventTypeC";
    public readonly type = EventTypeC.type;
}
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
export class EventTypeD extends Event {
    static readonly type = "EventTypeD";
    public readonly type = EventTypeD.type;
}
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
// For convenience in our type annotations, we're going to export all of the Event
// types as one type union. This way, our message bus event handler can use this type
// in its signature and then narrow down the type using a discriminating union.
export type EventTypes =
    EventTypeA |
    EventTypeB |
    EventTypeC |
    EventTypeD
;

As you can see, I have two abstract classes, Event and EventWithPayload. Both contain the "type" property; but, only the latter of the two contains a payload. When we look at the concrete classes, we can see that all of them contain both a static and an instance "type" property. Again, this is for the purposes of the discriminated union. And, because of the base classes, none of the concrete classes need a constructor.

NOTE: Generics in TypeScript are still mostly a mystery to me. Getting here required a lot of trial-and-error, seeing what the compiler would accept. If there is a better way to do this, please let me know!

Now that we have these action types in distinct classes, let's look at how they can be used in the message bus event handlers to provide type-safety. To demonstrate, I created an App component that could subscribe to, unsubscribe from, and emit events on the message bus:


// Import the core angular services.
import { Component } from "@angular/core";
 
// Import the application components and services.
import { EventTypeA } from "./message-bus-events";
import { EventTypeB } from "./message-bus-events";
import { EventTypeC } from "./message-bus-events";
import { EventTypeD } from "./message-bus-events";
import { EventTypes } from "./message-bus-events";
import { MessageBusGroup } from "./message-bus";
import { MessageBusService } from "./message-bus";
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
@Component({
    selector: "my-app",
    styleUrls: [ "./app.component.less" ],
    template:
    `
        <button (click)="sendMessages()">
            Send Messages ( view in Console )
        </button>
 
        <button (click)="subscribeToMessages()">
            Subscribe To Messages
        </button>
 
        <button (click)="unsubscribeFromMessages()">
            Unsubscribe From Messages
        </button>
    `
})
export class AppComponent {
 
    private messageBusGroup: MessageBusGroup;
 
    // I initialize the app component.
    constructor( messageBus: MessageBusService ) {
 
        // Let's create a GROUP for this message bus. The GROUP keeps track of all the
        // subscriptions that we make within this context. As such, it allows us to
        // unsubscribe from all the events with a single method.
        this.messageBusGroup = messageBus.group();
 
    }
 
    // ---
    // PUBLIC METHODS.
    // ---
 
    // I put several messages onto the message bus.
    public sendMessages() : void {
 
        // NOTE: Since the message bus is SYNCHRONOUS, we know that any thing logged
        // during the .emit() calls will be logged inside of our grouping.
        console.group( "Events" );
        this.messageBusGroup.emit( new EventTypeA({ foo: "foo" }) );
        this.messageBusGroup.emit( new EventTypeB({ bar: "bar" }) );
        this.messageBusGroup.emit( new EventTypeC({ baz: "bazzzy" }) );
        this.messageBusGroup.emit( new EventTypeD() );
        console.groupEnd();
 
    }
 
 
    // I subscribe to events on the message bus.
    public subscribeToMessages() {
 
        console.warn( "Subscribed to events!" );
 
        // Subscribe to all events on the message bus.
        this.messageBusGroup.subscribe(
            ( event: EventTypes ) : void => {
 
                // Try navigating the discriminating union using the [type] property.
                switch ( event.type ) {
                    case EventTypeA.type:
 
                        console.log( "Event-A happened [type]:", event.payload.foo );
 
                    break;
                    case EventTypeB.type:
 
                        console.log( "Event-B happened [type]:", event.payload.bar );
 
                    break;
                    case EventTypeD.type:
 
                        console.log( "Event-D happened [type]: (no payload)" );
 
                    break;
                }
 
                // Try navigating the discriminating union using the instance type.
                if ( event instanceof EventTypeA ) {
 
                    console.log( "Event-A happened [instanceof]:", event.payload.foo );
 
                } else if ( event instanceof EventTypeB ) {
 
                    console.log( "Event-B happened [instanceof]:", event.payload.bar );
 
                }
 
            }
        );
 
        // Subscribe to a specific event. Since we know our callback will only be
        // invoked for a specific event type, there will be automatic type inference
        // and we don't have to explicitly type the event arguments (magic!).
        this.messageBusGroup.on(
            EventTypeC,
            ( event ) : void => {
 
                console.log( "Event-C happened [on]:", event.payload.baz );
 
            }
        );
 
        // Subscribe to a specific of event, but execute the callback in the given
        // context.
        // --
        // CAUTION: This won't have automatic type inference (as above) since the class
        // method can be called by anything (not just the message bus). As such, no
        // implicit type guarantee can be made by the compiler.
        this.messageBusGroup.on( EventTypeC, this.handleC, this );
 
    }
 
 
    // I unsubscribe from all of the events being tracked by the group.
    public unsubscribeFromMessages() {
 
        console.warn( "Unsubscribed from events!" );
        // NOTE: Because we are using a message bus GROUP, this will automatically
        // unsubscribe from all of the events that this component is listening to.
        this.messageBusGroup.unsubscribe();
 
    }
 
    // ---
    // PRIVATE METHODS.
    // ---
 
    // I'm just here to demonstrate the .on( Type, callback, CONTEXT ) signature.
    private handleC( event: EventTypeC ) : void {
 
        console.log( "Event-C happened [on(this)]:", event.payload.baz, ( this instanceof AppComponent ) );
 
    }
 
}

In my experimental message bus, there are several ways to subscribe to events. First, you can subscribe to all events with .subscribe(). In that case, you have to explicitly annotate the expected types since the message bus cannot infer anything. Then, within the .subscribe() event handler, you can see that I am leveraging the discriminated union in two ways: using the "type" property and using the "instanceof" operator. In both approaches, the compiler will allow me to reach into class-specific payloads while also warning me when if I try to reference a property that the class-specific payload doesn't expose.

In the second approach - .on() - I tell the message bus what [class] type of event I want to listen for. In that case, the TypeScript compiler can perform some type inference and I don't [necessarily] have to annotate the incoming type. In fact, in the demo, you can see that I omit the event type in the signature. And still, the compiler will apply type-safety.

Now, you might notice that when I inject the message bus, I am calling a .group() method on it. This actually returns another class that keeps track of all the subscriptions being formed by the current context. This allows me to unsubscribe from all relevant events at the same time.

That said, let's finally look at my message bus implementation. What you will see is that it is really just a light-weight wrapper around an RxJS Subject that adds some error-handling:

// Import the core angular services.
import { ErrorHandler } from "@angular/core";
import { filter } from "rxjs/operators";
import { Injectable } from "@angular/core";
import { Subject } from "rxjs";
import { Subscription } from "rxjs";
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
interface CallbackFunction<T = any> {
    ( event: T ): void;
}
 
interface NewableType<T> {
    new( ...args: any[] ): T;
}
 
@Injectable({
    providedIn: "root"
})
export class MessageBusService {
 
    private errorHandler: ErrorHandler;
    private eventStream: Subject<any>;
 
    // I initialize the message bus service.
    constructor( errorHandler: ErrorHandler ) {
 
        this.errorHandler = errorHandler;
        this.eventStream = new Subject();
 
    }
 
    // ---
    // PUBLIC METHODS.
    // ---
 
    // I push the given event onto the message bus.
    public emit( event: any ) : void {
 
        this.eventStream.next( event );
 
    }
 
 
    // I create and return a new grouping of subscriptions on the message bus. This is
    // a convenience class that makes it easier to subscribe and unsubscribe to events
    // within a single, cohesive context (such as a component).
    public group() : MessageBusGroup {
 
        return( new MessageBusGroup( this ) );
 
    }
 
 
    // I subscribe to the message bus, but only invoke the callback when the event is
    // of the given newable type (ie, it's a Class definition, not an instance).
    // --
    // NOTE: The NewableType<T> will allow for Type inference.
    public on<T>(
        typeFilter: NewableType<T>,
        callback: CallbackFunction<T>,
        callbackContext: any = null
        ) : Subscription {
 
        var subscription = this.eventStream
            .pipe(
                filter(
                    ( event: any ) : boolean => {
 
                        return( event instanceof typeFilter );
 
                    }
                )
            )
            .subscribe(
                ( event: T ) : void => {
 
                    try {
 
                        callback.call( callbackContext, event );
 
                    } catch ( error ) {
 
                        this.errorHandler.handleError( error );
 
                    }
 
                }
            )
        ;
 
        return( subscription );
 
    }
 
 
    // I subscribe to all events on the message bus.
    public subscribe(
        callback: CallbackFunction,
        callbackContext: any = null
        ) : Subscription {
 
        var subscription = this.eventStream.subscribe(
            ( event: any ) : void => {
 
                try {
 
                    callback.call( callbackContext, event );
 
                } catch ( error ) {
 
                    this.errorHandler.handleError( error );
 
                }
 
            }
        );
 
        return( subscription );
 
    }
 
}
 
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
 
// I am a convenience class that keeps track of subscriptions within the group and can
// mass-unsubscribe from them as needed. Because of this tracking, the methods on this
// class return a reference to THIS class, instead of a Subscription, allowing for a
// more fluent API.
export class MessageBusGroup {
 
    private messageBus: MessageBusService;
    private subscriptions: Subscription[];
 
    // I initialize the message bus group service.
    constructor( messageBus: MessageBusService ) {
 
        this.messageBus = messageBus;
        this.subscriptions = [];
 
    }
 
    // ---
    // PUBLIC METHODS.
    // ---
 
    // I push the given event onto the message bus.
    public emit( event: any ) : MessageBusGroup {
 
        this.messageBus.emit( event );
 
        return( this );
 
    }
 
 
    // I subscribe to the message bus, but only invoke the callback when the event is
    // of the given newable type (ie, it's a Class definition, not an instance).
    public on<T>(
        typeFilter: NewableType<T>,
        callback: CallbackFunction<T>,
        callbackContext: any = null
        ) : MessageBusGroup {
 
        this.subscriptions.push(
            this.messageBus.on( typeFilter, callback, callbackContext )
        );
 
        return( this );
 
    }
 
 
    // I subscribe to all events on the message bus.
    public subscribe(
        callback: CallbackFunction,
        callbackContext: any = null
        ) : MessageBusGroup {
 
        this.subscriptions.push(
            this.messageBus.subscribe( callback, callbackContext )
        );
 
        return( this );
 
    }
 
 
    // I unsubscribe from all the current subscriptions.
    public unsubscribe() : MessageBusGroup {
 
        for ( var subscription of this.subscriptions ) {
 
            subscription.unsubscribe();
 
        }
 
        this.subscriptions = [];
 
        return( this );
 
    }
 
}

As you can see, there's really not that much logic here. The MessageBusService is really just a smart proxy around a private RxJS Subject. And, the MessageBusGroup is really just a smart proxy around a MessageBusService instance. Other than the .on() method, there's no type safety in the message bus because the message bus can't be aware of how it will be used. All the type safety is pushed to the type annotations in the subscribers.

Now, if we run this application and try subscribing to and triggering some events, we get the following browser output:

blog1.png

As you can see, once we subscribe to the events, we are able to use both the "type" property and the "instanceof" operator in order to navigate the discriminated union of Event classes.

Anyway, this was just a fun exploration of TypeScript, type safety, discriminated unions, and RxJS streams. Like I said above, a lot of this was trial-and-error, seeing what would make the TypeScript compiler happy. But, this seems to work and provides an appropriate amount of type safety. I do strongly believe that Angular applications need an event bus for decoupled communication even if they have a state management system like Redux or NgRX as I believe the two pathways speak to very different concerns (even though they are both "streams").

Did you like this article?

Related jobs

See all

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Related articles

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

•

12 Sep 2021

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

•

12 Sep 2021

WorksHub

CareersCompaniesSitemapFunctional WorksBlockchain WorksJavaScript WorksAI WorksGolang WorksJava WorksPython WorksRemote Works
hello@works-hub.com

Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ

108 E 16th Street, New York, NY 10003

Subscribe to our newsletter

Join over 111,000 others and get access to exclusive content, job opportunities and more!

© 2025 WorksHub

Privacy PolicyDeveloped by WorksHub