February 23, 2012

Failing to Scale Out Push Web Services

Problem: on the web, enable a large number of message producers send a very large number of messages to a much larger number of message consumers. Example: allow 100,000 publishers send a total of 1 million messages per second to 100 million concurrently connected consumers.

We are dealing with the problem of connection channels, an abstraction that allows a producer distribute the message to many connected consumers. Our challenge is to design a distributed channel delivery mechanism that can scale out to millions of connected consumers. Throughout, our assumption is that this is a stateless delivery system, i.e. messages are either delivered or dropped and no persistence guarantees exists; if a consumer is not connected, it will miss the message.

The naïve approach is to perform consistent hashing by channel. In this model, each channel and all its consumers are in the same server. Since the channel identifier is part of the URI, the load balancer can effectively perform this operation, and we can add servers as required without requiring re-balancing. When we have many channels per server, the distribution is eventually uniform. Problems arise however as some channels have an order of magnitude more consumers than other channels. There is also a problem if a channel has more consumers than a server can sustain.

To solve the limitations of hashing by channel, we can instead perform consistent hashing by channel and connection (consumer). In this model, each consumer is consistently assigned to a pool of servers and we can add servers without having to re-balance consumers among servers. The channel stores a list of all the consumer identifiers and channels are consistently hashed across servers. To deliver a message, the load balancer will find the server holding the channel, and dispatch the request. The channel will lookup the list of consumer identifiers and again apply the consistent hashing algorithm to reach all the consumers.

Although the hashing by channel and connection is conceptually simple, it presents significant operability challenges. First, the loss of the server holding the channel metadata and list of connected consumers will require a watchdog cleaning up all the stale consumer connections. Second, as consumers join in and disappear, the channel server would need to maintain a consistent view of the list of consumers by the means of locks, with the incurred performance degradation of very large number of consumers. Third, as more consumers connect uniformly across the nodes, the more chattiness that will occur. At some point, all nodes will have consumer connections for a given channel. In order to to fulfill every operation, we must issue N requests to all nodes, where N is the number of nodes in the cluster. For the cluster to be able to process and deliver M messages, every node must be capable of processing N*M messages. This design will be limited in the number of connections it can hold, because of the centralized channel-consumer tracking problem, and will also only scale to the maximum request processing capacity of an individual node.

We can solve some of the operability challenges by removing the channel management of consumer connections, and instead of keeping a list, keeping the visibility of the peer nodes. Here the thinking is that since we will asymptotically reach the point where all nodes hold consumer connections for a given channel, all we really need to do is keep a list of all nodes in the cluster. Some centralized agent keeps a directory of all active peers holding consumer connections, e.g. a Zookeeper ensemble.

Ring-based Cluster

Consumers get uniformly connected to the nodes in the cluster by a “good” load-balancing scheme. Since any node can hold connections to consumers on any channel, there is therefore no snapshot of a channel’s consumers, and to be able to identify all consumers connected to a channel it is necessary to interrogate all nodes in the cluster. Whereas this design improves the previous ones in that it allows scaling to an infinite number of connections, it will still only scale to the message processing throughput of an individual node.

A popular alternative to the directory of nodes is the tree of nodes. In this model, we start with a single node. As we reach the maximum number of connections the node can hold, we add two new nodes. The original node still accepts messages from publishers, but brokers the delivery to the two new nodes. As those nodes themselves become saturated, we add a new layer of four nodes. And so forth. This approach has the same limitation as the one using a directory of nodes, i.e. the maximum throughput is bound to that of the individual node.

We’ve seen how to hold the connections to an infinite number of consumers, but not how to deliver an infinite number of messages. These solutions scale very well to tens of thousands of messages and millions of active connected consumers, but have an upper limit. For most producers out there, that upper limit is probably high enough to be fine. But such limit exists in push-based systems.

Both the messaging literature and the messaging praxis have historically preferred using pull-based models rather than push-based ones. In a pull model, consumers come back to the broker to fetch messages, at each consumer’s own rate, and the problem is therefore no longer dispatching across millions of connections. Pull-based messaging systems chose to store the messages until consumers come back to fetch them. In fact, the only scalable messaging system to millions of messages and millions of consumers that we know of uses store- and-forward: SMTP.

As much as I may think that the techniques that enable web push-models such as HTTP streaming, long-poll and WebSockets are genuinely useful to solve point problems, they are not techniques we can use to implement Internet-scale push- based web services, as they are fundamentally based on a PubSub model. The scalability of PubSub under high load remains an unresolved research question and as such is not a paradigm we should apply at Internet-scale.

In fact, I am now almost convinced that we’ve been looking at this in the wrong way, and that the right solution to this problem is a store-and-forward solution, where web consumers connect at their own rate to fetch messages and intermediaries throttle concurrent connection rates in order to achieve linear scalability. Essentially, this is a web of partially connected store-and- forward almost real-time async data peers. And that’s a mouthful, but a really exciting one.

14 thoughts on “Failing to Scale Out Push Web Services

    1. Nila

      I’d be surprised if that was the RHEL deafult as their docs talk about how to enable it in grub if you are having problems and include a warning that some systems won’t boot properly without it.. But I don’t have any RHEL6 boxes to be able to confirm/refute that with I’m afraid. I’ll prod our local LUG in case anyone has.@JeffWhilst the kernel developers are not necessarily enamoured with ACPI (include the need to have an interpreter embedded in the kernel) it’s unfair to say that this is some private arrangement between Intel and Microsoft Intel open sources their ACPI compiler and is an active contributer to the Linux kernels ACPI implementation (just try a git log drivers/acpi ).Five of the top 10 committers to that kernel code are from Intel (all time they’re at #1, #2, #6, #8 and #10 and for the last 1,000 commits they’re at #1, #3, #5, #6 and #7), and there are also contributions from HP in there too (another co-developer of ACPI).

       
  1. carl sturmer

    Bingo! However, there are situations where push works on a large/internet scale but it is not typically a generalizable solution. Instead it is almost a situation specific design. To work it needs model/goal level constraints, such as, very loose delivery times, ability for messages/notifications to gracefully and efficiently fail due to consumers being offline, optimizations like one producer sends the same message to (n) consumers so the storage is the single message and the message send can be parallelized across as many devices as necessary, etc.. Basically, it can be done successfully at some level but at scale it likely needs a big dynamic infrastructure behind it (cloud/grid). Having said all this, I would however vote with you and use the async poll method. It is more controllable, robust to failure and flexible/adaptable to change.

     
    1. Bruno Fernandez-Ruiz

      Thanks for the comment. I agree that adding constraints especially around graceful degradation allow push “practically”. More of us need to think this way, using async poll and store & forward patterns to scale the web in a decentralized fashion as we look forward at the evolution of the web for realtime delivery.

       
      1. Batie

        . For me, it generally comes down to whehter I am reusing an asset or combining assets for one of many shots. If I am reusing an asset for an individual scene then generally append, if I am creating one of many shots then I link

         
  2. Pingback: Distributed Weekly 144 — Scott Banwart's Blog

  3. Trip Hawkins

    Hi Bruno, can you help me connect with someone in BD at Yahoo?

    Trip CEO Digital Chocolate (founder of EA)

     
  4. Hugh

    The benchmark was run using the tool that I wrote about in my blog today tetgoher with an updated version of the flexAsynch which hopefully will soon be a part of a release.Each data node used up about 5.5 CPU cores. The 6.82M reads per second used a cluster of 32 data nodes.I haven’t tested flexSync (it’s called flexBench and was the original benchmark program for MySQL Cluster, not used so much anymore).