Design patterns in TypeScript: Chain of Responsibility

In event-driven systems messages produced by one object can be handled by one or more other objects. None of the objects need to know of one another – all they need to share is a common mechanism for distributing messages. Messages are sent from one object to another making them part of a chain. This pattern is called “Chain of Responsibility.”

Chain of responsibility in TypeScript

Node.js – and TypeScript by association – have idiomatic support for asynchronous event-driven programming. The best way to implement this pattern is by utilizing the EventEmitter class.

EventEmitter acts as a message bus. Events can be emitted or sent and can be subscribed to. Objects emitting events are independent of objects subscribing to events. This message bus needs to be a Singleton since it is shared between event publishers and event subscribers.

Source: ChainOfResponsibility.ts

import {EventEmitter} from 'events';

var messageBus=new EventEmitter();

function messageConsumer(msg:Object) {
    console.log(JSON.stringify(msg));
}

messageBus.on("topic", (msg) => {
   messageConsumer(msg);
});

messageBus.emit("topic", {
   payload: "Hello world" 
});

Case study: equities trading system backend

Events are useful constructs that represent real-world business workflows. In an equities trading system, there is typically a front-end application that allows traders to enter orders. The orders are published on a message bus, and there are multiple consumers:

  1. Order Management System (OMS) whose purpose is to capture and track orders and executions of those orders. Large orders are rarely executed all in one shot – they are typically executed as some number of smaller parts.
  2. Compliance and reporting system whose purpose is to track everything that is happening and analyze data for potential patterns of fraud.

There can be many more participants in the ecosystems – market data, reporting, etc. For this article we’ll stick to a simple simulation of OMS and compliance.

Source: HFT.ts

import {EventEmitter} from 'events';

var messageBus=new EventEmitter();

interface Order {
    orderId: number,
    side:"Buy"|"Sell",
    symbol:string,
    quantity:number
}

interface Execution {
    orderId: number,
    executionId: number,
    symbol: string,
    quantity: number,
    price: number
}

class OrderManagementSystem {
    

    constructor() {
        messageBus.on("order", (orderMessage) => {
            this.processOrder(orderMessage as Order);
        });
        messageBus.on("execution", (executionMessage) => {
           this.processExecution(executionMessage as Execution); 
        });
    }
    
    processOrder(order:Order) {
        console.log(new Date().getTime()+":OMS:order:"+JSON.stringify(order));
        var numberOfExecutions=10;
        var quantityPerExecution=order.quantity/numberOfExecutions;
        for (var i=0;i<numberOfExecutions;i++) {
            messageBus.emit("execution", {
               orderId:order.orderId,
               executionId: i,
               symbol: order.symbol,
               quantity: quantityPerExecution,
               price: 101.5
               
            });
        }
    }
    
    processExecution(execution:Execution) {
        console.log(new Date().getTime()+":OMS:execution:"+JSON.stringify(execution));
    }
}

class ComplianceSystem {
    constructor() {
        messageBus.on("order", (orderMessage) => {
           this.trackOrder(orderMessage as Order); 
        });
        messageBus.on("execution", (executionMessage) => {
            this.trackExecution(executionMessage as Execution);
        })
    } 
    
    trackOrder(order:Order) {
        console.log(new Date().getTime()+":COMPLIANCE:order:"+JSON.stringify(order));
    }
    
    trackExecution(execution:Execution) {
        console.log(new Date().getTime()+":COMPLIANCE:execution:"+JSON.stringify(execution));
    }
}

var oms=new OrderManagementSystem();
var compliance=new ComplianceSystem();


/**
 * Simulate orders coming in from the front end
 **/
for (var orderId=0;orderId<10;orderId++) {
    messageBus.emit("order", {
        orderId: orderId,
        symbol: "MSFT",
        quantity: 1000,
        side: "Buy"
    } as Order);
}

Node events implications for concurrency and parallelism

Even though Node supports asynchronous event processing, it is important to remember that there is a single CPU thread per Node process. That means that asynchronous processing of events does not necessarily happen in parallel.

To achieve true parallel event processing, Node.js offers cluster framework. This framework supports multiple forked Node processes running on the same physical machine to operate in parallel and take advantage of multi-core CPUs.

Event processing at scale

While cluster framework helps with taking advantage of multi-core CPUs, that is often not enough. In large enterprise systems, it is not uncommon to have both event publishers and event subscribers existing independently of one another. In a cloud environment, it is also possible to auto-scale event processors based on workload requirements.

Message queues are crucial for scalable event processing. While it is beyond the scope of this article to get into the details of message queues, here are some examples:

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s