Thursday, August 4, 2011

A Storm is coming: more details and plans for release

We've received a lot of questions about what's going to happen to Storm now that BackType has been acquired by Twitter. I'm pleased to announce that I will be releasing Storm at Strange Loop on September 19th! Check out the session info for more details.

In my preview post about Storm, I discussed how Storm can be applied to a huge variety of realtime computation problems. In this post, I'll give more details on Storm and what it's like to use.

Here's a recap of the three broad use cases for Storm:

  1. Stream processing: Storm can be used to process a stream of new data and update databases in realtime. Unlike the standard approach of doing stream processing with a network of queues and workers, Storm is fault-tolerant and scalable.
  2. Continuous computation: Storm can do a continuous query and stream the results to clients in realtime. An example is streaming trending topics on Twitter into browsers. The browsers will have a realtime view on what the trending topics are as they happen.
  3. Distributed RPC: Storm can be used to parallelize an intense query on the fly. The idea is that your Storm topology is a distributed function that waits for invocation messages. When it receives an invocation, it computes the query and sends back the results. Examples of Distributed RPC are parallelizing search queries or doing set operations on large numbers of large sets.

The beauty of Storm is that it's able to solve such a wide variety of use cases with just a simple set of primitives.

Components of a Storm cluster

A Storm cluster is superficially similar to a Hadoop cluster. Whereas on Hadoop you run "MapReduce jobs", on Storm you run "topologies". "Jobs" and "topologies" themselves are very different -- one key difference is that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you kill it).

There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called "Nimbus" that is similar to Hadoop's "JobTracker". Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.

Each worker node runs a daemon called the "Supervisor". The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster. Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and stateless; all state is kept in Zookeeper or on local disk. This means you can kill -9 Nimbus or the Supervisors and they'll start back up like nothing happened. This design leads to Storm clusters being incredibly stable. We've had topologies running for months without requiring any maintenance.

Running a Storm topology

Running a topology is straightforward. First, you package all your code and dependencies into a single jar. Then, you run a command like the following:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

This runs the class backtype.storm.MyTopology with the arguments arg1 and arg2. The main function of the class defines the topology and submits it to Nimbus. The storm jar part takes care of connecting to Nimbus and uploading the jar.

Since topology definitions are just Thrift structs, and Nimbus is a Thrift service, you can create and submit topologies using any programming language. The above example is the easiest way to do it from a JVM-based language.

Streams and Topologies

Let's dig into the abstractions Storm exposes for doing scalable realtime computation. After I go over the main abstractions, I'll tie everything together with a concrete example of a Storm topology.

The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.

The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt does single-step stream transformations. It creates new streams based on its input streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts.

Multi-step stream transformations are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Everything in Storm runs in parallel in a distributed way. Spouts and bolts execute as many threads across the cluster, and they pass messages to each other in a distributed way. Messages never pass through any sort of central router, and there are no intermediate queues. A tuple is passed directly from the thread who created it to the threads that need to consume it.

Storm guarantees that every message flowing through a topology will be processed, even if a machine goes down and the messages it was processing get dropped. How Storm accomplishes this without any intermediate queuing is the key to how it works and what makes it so fast.

Let's look at a concrete example of spouts, bolts, and topologies to solidify the concepts.

A simple example topology

The example topology I'm going to show is "streaming word count". The topology contains a spout that emits sentences, and the final bolt emits the number of times each word has appeared across all sentences. Every time the count for a word is updated, a new count is emitted for it. The topology looks like this:

Here's how you define this topology in Java:

The spout for this topology reads sentences off of the "sentence_queue" on a Kestrel server located at kestrel.backtype.com on port 22133.

The spout is inserted into the topology with a unique id using the setSpout method. Every node in the topology must be given an id, and the id is used by other bolts to subscribe to that node's output streams. The KestrelSpout is given the id "1" in this topology.

setBolt is used to insert bolts in the topology. The first bolt defined in this topology is the SplitSentence bolt. This bolt transforms a stream of sentences into a stream of words. Let's take a look at the implementation of SplitSentence:

The key method is the execute method. As you can see, it splits the sentence into words and emits each word as a new tuple. Another important method is declareOutputFields, which declares the schema for the bolt's output tuples. Here it declares that it emits 1-tuples with a field called "word".

Bolts can be implemented in any language. Here is the same bolt implemented in Python:

The last parameter to setBolt is the amount of parallelism you want for the bolt. The SplitSentence bolt is given a parallelism of 10 which will result in 10 threads executing the bolt in parallel across the Storm cluster. To scale a topology, all you have to do is increase the parallelism for the bolts at the bottleneck of the topology.

The setBolt method returns an object that you use to declare the inputs for the bolt. Continuing with the example, the SplitSentence bolt subscribes to the output stream of component "1" using a shuffle grouping. "1" refers to the KestrelSpout that was already defined. I'll explain the shuffle grouping part in a moment. What matters so far is that the SplitSentence bolt will consume every tuple emitted by the KestrelSpout.

A bolt can subscribe to multiple input streams by chaining input declarations, like so:

You would use this functionality to implement a streaming join, for instance.

The final bolt in the streaming word count topology, WordCount, reads in the words emitted by SplitSentence and emits updated counts for each word. Here's the implementation of WordCount: WordCount maintains a map in memory from word to count. Whenever it sees a word, it updates the count for the word in its internal map and then emits the updated count as a new tuple. Finally, in declareOutputFields the bolt declares that it emits a stream of 2-tuples named "word" and "count".

The internal map kept in memory will be lost if the task dies. If it's important that the bolt's state persist even if a task dies, you can use an external database like Riak, Cassandra, or Memcached to store the state for the word counts. An in-memory HashMap is used here for simplicity purposes.

Finally, the WordCount bolt declares its input as coming from component 2, the SplitSentence bolt. It consumes that stream using a "fields grouping" on the "word" field.

"Fields grouping", like the "shuffle grouping" that I glossed over before, is a type of "stream grouping". "Stream groupings" are the final piece that ties topologies together.

Stream groupings

A stream grouping tells a topology how to send tuples between two components. Remember, spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like this:

When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?

A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks. There's a few different kinds of stream groupings.

The simplest kind of grouping is called a "shuffle grouping" which sends the tuple to a random task. A shuffle grouping is used in the streaming word count topology to send tuples from KestrelSpout to the SplitSentence bolt. It has the effect of evenly distributing the work of processing the tuples across all of SplitSentence bolt's tasks.

A more interesting kind of grouping is the "fields grouping". A fields grouping is used between the SplitSentence bolt and the WordCount bolt. It is critical for the functioning of the WordCount bolt that the same word always go to the same task. Otherwise, more than one task will see the same word, and they'll each emit incorrect values for the count since each has incomplete information. A fields grouping lets you group a stream by a subset of its fields. This causes equal values for that subset of fields to go to the same task. Since WordCount subscribes to SplitSentence's output stream using a fields grouping on the "word" field, the same word always goes to the same task and the bolt produces the correct output.

Fields groupings are the basis of implementing streaming joins and streaming aggregations as well as a plethora of other use cases. Underneath the hood, fields groupings are implemented using consistent hashing.

There are a few other kinds of groupings, but talking about those is beyond the scope of this post.

With that, you should now have everything you need to understand the streaming word count topology. The topology doesn't require that much code, and it's completely scalable and fault-tolerant. Whether you're processing 10 messages per second or 100K messages per second, this topology can scale up or down as necessary by just tweaking the amount of parallelism for each component.

The complexity that Storm hides

The abstractions that Storm provides are ultimately pretty simple. A topology is composed of spouts and bolts that you connect together with stream groupings to get data flowing. You specify how much parallelism you want for each component, package everything into a jar, submit the topology and code to Nimbus, and Storm keeps your topology running forever. Here's a glimpse at what Storm does underneath the hood to implement these abstractions in an extremely robust way.

  1. Guaranteed message processing: Storm guarantees that each tuple coming off a spout will be fully processed by the topology. To do this, Storm tracks the tree of messages that a tuple triggers. If a tuple fails to be fully processed, Storm will replay the tuple from the Spout. Storm incorporates some clever tricks to track the tree of messages in an efficient way.

  2. Robust process management: One of Storm's main tasks is managing processes around the cluster. When a new worker is assigned to a supervisor, that worker should be started as quickly as possible. When that worker is no longer assigned to that supervisor, it should be killed and cleaned up.

    An example of a system that does this poorly is Hadoop. When Hadoop launches a task, the burden for the task to exit is on the task itself. Unfortunately, tasks sometimes fail to exit and become orphan processes, sucking up memory and resources from other tasks.

    In Storm, the burden of killing a worker process is on the supervisor that launched it. Orphaned tasks simply cannot happen with Storm, no matter how much you stress the machine or how many errors there are. Accomplishing this is tricky because Storm needs to track not just the worker processes it launches, but also subprocesses launched by the workers (a subprocess is launched when a bolt is written in another language).

    The nimbus daemon and supervisor daemons are stateless and fail-fast. If they die, the running topologies aren't affected. The daemons just start back up like nothing happened. This is again in contrast to how Hadoop works.

  3. Fault detection and automatic reassignment: Tasks in a running topology heartbeat to Nimbus to indicate that they are running smoothly. Nimbus monitors heartbeats and will reassign tasks that have timed out. Additionally, all the tasks throughout the cluster that were sending messages to the failed tasks quickly reconnect to the new location of the tasks.

  4. Efficient message passing: No intermediate queuing is used for message passing between tasks. Instead, messages are passed directly between tasks using ZeroMQ. This is simpler and way more efficient than using intermediate queuing. ZeroMQ is a clever "super-socket" library that employs a number of tricks for maximizing the throughput of messages. For example, it will detect if the network is busy and automatically batch messages to the destination.

    Another important part of message passing between processes is serializing and deserializing messages in an efficient way. Again, Storm automates this for you. By default, you can use any primitive type, strings, or binary records within tuples. If you want to be able to use another type, you just need to implement a simple interface to tell Storm how to serialize it. Then, whenever Storm encounters that type, it will automatically use that serializer.

  5. Local mode and distributed mode: Storm has a "local mode" where it simulates a Storm cluster completely in-process. This lets you iterate on your topologies quickly and write unit tests for your topologies. You can run the same code in local mode as you run on the cluster.

Storm is easy to use, configure, and operate. It is accessible for everyone from the individual developer processing a few hundred messages per second to the large company processing hundreds of thousands of messages per second.

Relation to “Complex Event Processing”

Storm exists in the same space as “Complex Event Processing” systems like Esper, Streambase, and S4. Among these, the most closely comparable system is S4. The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages.

Some CEP systems have a built-in data storage layer. With Storm, you would use an external database like Cassandra or Riak alongside your topologies. It’s impossible for one data storage system to satisfy all applications since different applications have different data models and access patterns. Storm is a computation system and not a storage system. However, Storm does have some powerful facilities for achieving data locality even when using an external database.

Summary

I've only scratched the surface on Storm. The "stream" concept at the core of Storm can be taken so much further than what I've shown here -- I didn't talk about things like multi-streams, implicit streams, or direct groupings. I showed two of Storm's main abstractions, spouts and bolts, but I didn't talk about Storm's third, and possibly most powerful abstraction, the "state spout". I didn't show how you do distributed RPC over Storm, and I didn't discuss Storm's awesome automated deploy that lets you create a Storm cluster on EC2 with just the click of a button.

For all that, you're going to have to wait until September 19th. Until then, I will be working on adding documentation to Storm so that you can get up and running with it quickly once it's released. We're excited to release Storm, and I hope to see you there at Strange Loop when it happens.

- Nathan Marz (@nathanmarz)

Friday, July 1, 2011

Fast Core Animation UI for the Mac

Starting today, Twitter is offering TwUI as an open-source framework (https://github.com/twitter/twui) for developing interfaces on the Mac.

Until now, there was not a simple and effective way to design interactive, hardware-accelerated interfaces on the Mac. Core Animation can create hardware-accelerated drawings, but doesn't provide interaction mechanisms. AppKit and NSView have excellent interaction mechanisms, but the drawings operations are CPU-bound, which makes fluid scrolling, animations, and other effects difficult – if not impossible – to accomplish.

UIKit on Apple’s iOS platform has offered developers a fresh start. While UIKit borrows many ideas from AppKit regarding interaction, it can offload compositing to the GPU because it is built on top of Core Animation. This architecture has enabled developers to create many applications that were, until this time, impossible to build.

TwUI as a solution

TwUI brings the philosophy of UIKit to the desktop. It is built on top of Core Animation, and it borrows interaction ideas from AppKit. It allows for all the things Mac users expect, including drag & drop, mouse events, tooltips, Mac-like text selection, and so on. And, since TwUI isn’t bound by the constraints of an existing API, developers can experiment with new features like block-based drawRect and layout.

How TwUI works

You will recognize the fundamentals of TwUI if you are familiar with UIKit. For example, a "TUIView" is a simple, lightweight wrapper around a Core Animation layer – much like UIView on iOS.

TUIView offers useful subclasses for operations such as scroll views, table views, buttons, and so on. More importantly, TwUI makes it easy to build your own custom interface components. And because all of these views are backed by layers, composited by Core Animation, your UI is rendered at optimal speed.

Xcode running the TwUI example project

Ongoing development

Since TwUI forms the basis of Twitter for the Mac, it is an integral part of our shipping code. Going forward, we need to stress test it in several implementations. We’ll continue to develop additional features and make improvements. And, we encourage you to experiment, as that will help us build a robust and exciting UI framework for the Mac.

Acknowledgements

The following engineers were mainly responsible for the TwUI development:

-Loren Brichter (@lorenb), Ben Sandofsky (@sandofsky)

Wednesday, June 22, 2011

Join the Flock!

Engineering Open House

Twitter’s engineering team is growing quickly. Two-thirds of our engineers were hired in the last 12 months. Those engineers joined us from cities and countries around the world and from companies of various sizes.

As part of our effort to find and hire great people to build great products and solve complicated problems, last Thursday we invited several dozen engineers to Twitter HQ for our first engineering open house. Presentations from @wfarner, @michibusch, @mracus and @esbie showcased the depth and range of the effort required to present twitter.com to the world. The topics covered some of these key areas for development:

  • Dynamic deployment and resource management with Mesos - @wfarner

    Using Mesos as a platform, we have built a private cloud system on which we hope to eventually run most, if not all, of our services. We expect this to simplify deployment and improve the reliability of our systems, while making more efficient use of our compute resources.

  • Real-time search at Twitter - @michibusch

    Since 2008, Twitter has made dramatic enhancements to our real-time search engine, scaling it from 200 QPS to 18,000 QPS. At the core of our infrastructure is Earlybird, a version of Lucene modified for real-time search. This work, combined with other key infrastructure components, led to our recent revamp of the search experience and will enable future innovation in real-time search.

  • The client-side architecture of #NewTwitter- @mracus and @esbie

    Client-side applications for desktop and mobile environments have access to a class of well-rounded tools and framework components that aren't as yet widely available for the browser. Therefore, a fully in-browser app like #NewTwitter requires investment in solid architecture in order to remain clean and extensible as it grows. At Twitter, we're constantly iterating on the in-house and open source JavaScript tools we use to address this need.

This was Twitter's first engineering open house, but it certainly won’t be our last. We plan to hold these regularly - every couple months or so. In the meantime, if you’re interested in keeping up with our engineering team, you can follow @twittereng or check out our jobs page.

- Mike Abbott (@mabb0tt), VP Engineering

Tuesday, May 31, 2011

The Engineering Behind Twitter’s New Search Experience

Today, Twitter launched a personalized search experience to help our users find the most relevant Tweets, images, and videos. To build this product, our infrastructure needed to support two major features: relevance-filtering of search results and the identification of relevant images and photos. Both features leverage a ground-up rewrite of the search infrastructure, with Blender and Earlybird at the core.

Investment in Search

Since the acquisition of Summize in 2008, Twitter has invested heavily in search. We've grown our search team from three to 15 engineers and scaled our real-time search engine by two orders of magnitude — all this, while we replaced the search infrastructure in flight, with no major service interruptions.

The engineering story behind the evolution of search is compelling. The Summize infrastructure used Ruby on Rails for the front-end and MySQL for the back-end (the same architecture as the one used by Twitter and many other start-ups). At the time, Lucene and other open-source search technology did not support real-time search. As a result, we constructed our reverse indexes in MySQL, leveraging its concurrent transactions and B-tree data structures to support concurrent indexing and searching. We were able to scale our MySQL-based solution surprisingly far by partitioning the index across multiple databases and replicating the Rails front-end. In 2008, Twitter search handled an average of 20 TPS and 200 QPS. By October 2010, when we replaced MySQL with Earlybird, the system was handling 1,000 TPS and 12,000 QPS on average.

Earlybird, a real-time, reverse index based on Lucene, not only gave us an order of magnitude better performance than MySQL for real-time search, it doubled our memory efficiency and provided the flexibility to add relevance filtering. However, we still needed to replace the Ruby on Rails front-end, which was only capable of synchronous calls to Earlybird and had accrued significant technical debt through years of scaling and transition to Earlybird.

In April 2011, we launched a replacement, called Blender, which improved our search latencies by 3x, gave us 10x throughput, and allowed us to remove Ruby on Rails from the search infrastructure. Today, we are indexing an average of 2,200 TPS while serving 18,000 QPS (1.6B queries per day!). More importantly, Blender completed the infrastructure necessary to make the most significant user-facing change to Twitter search since the acquisition of Summize.

From Hack-Week Project to Production

When the team launched Earlybird, we were all excited by its potential — it was fast and the code was clean and easy to extend. While on vacation in Germany, Michael Busch, one of our search engineers, implemented a demo of image and video search. A few weeks later, during Twitter's first Hack Week, the search team, along with some members of other teams, completed the first demo of our new search experience. Feedback from the company was so positive that the demo became part of our product roadmap.

Surfacing Relevant Tweets

There is a lot of information on Twitter — on average, more than 2,200 new Tweets every second! During large events, for example the #tsunami in Japan, this rate can increase by 3 to 4x. Often, users are interested in only the most memorable Tweets or those that other users engage with. In our new search experience, we show search results that are most relevant to a particular user. So search results are personalized, and we filter out the Tweets that do not resonate with other users.

To support relevance filtering and personalization, we needed three types of signals:

  • Static signals, added at indexing time
  • Resonance signals, dynamically updated over time
  • Information about the searcher, provided at search time

Getting all of these signals into our index required changes to our ingestion pipeline, Earlybird (our reverse index), and Blender (our front-ends). We also created a new updater component that continually pushes resonance signals to Earlybird. In the ingestion pipeline, we added a pipeline stage that annotates Tweets with static information, for example, information about the user and the language of the Tweet's text. The Tweets are then replicated to the Earlybird indexes (in real time), where we have extended Lucene’s internal data structures to support dynamic updates to arbitrary annotations. Dynamic updates, for example, the users' interactions with Tweets, arrive over time from the updater. Together, Earlybird and the updater support a high and irregular rate of updates without requiring locks or slowing down searches.

At query time, a Blender server parses the user’s query and passes it along with the user’s social graph to multiple Earlybird servers. These servers use a specialized ranking function that combines relevance signals and the social graph to compute a personalized relevance score for each Tweet. The highest-ranking, most-recent Tweets are returned to the Blender, which merges and re-ranks the results before returning them to the user.

Twitter search architecture with support for relevance

Removing Duplicates

Duplicate and near-duplicate Tweets are often not particularly helpful in Twitter search results. During popular and important events, when search should be most helpful to our users, nearly identical Tweets increase in number. Even when the quality of the duplicates is high, the searcher would benefit from a more diverse set of results. To remove duplicates we use a technique based on MinHashing, where several signatures are computed per Tweet and two Tweets sharing the same set of signatures are considered duplicates. The twist? Like everything at Twitter, brevity is key: We have a very small memory budget to store the signatures. Our algorithm compresses each Tweet to just 4 bytes while still identifying the vast majority of duplicates with very low computational requirements.

Personalization

Twitter is most powerful when you personalize it by choosing interesting accounts to follow, so why shouldn’t your search results be more personalized too? They are now! Our ranking function accesses the social graph and uses knowledge about the relationship between the searcher and the author of a Tweet during ranking. Although the social graph is very large, we compress the meaningful part for each user into a Bloom filter, which gives us space-efficient constant-time set membership operations. As Earlybird scans candidate search results, it uses the presence of the Tweet's author in the user's social graph as a relevance signal in its ranking function.

Even users that follow few or no accounts will benefit from other personalization mechanisms; for example, we now automatically detect the searcher's preferred language and location.

Images and Videos in Search

Images and videos have an amazing ability to describe people, places, and real-time events as they unfold. Take for example @jkrums' Twitpic of US Airways Flight 1549 Hudson river landing, and @stefmara's photos and videos of space shuttle Endeavour's final launch.

There is a fundamental difference between searching for Tweets and searching for entities in Tweets, such as images and videos. In the former case, the decision about whether a Tweet matches a query can be made by looking at the text of the Tweet, with no other outside information. Additionally, per-Tweet relevance signals can be used to rank and compare matching Tweets to find the best ones. The situation is different when searching for images or videos. For example, the same image may be tweeted many times, with each Tweet containing different keywords that all describe the image. Consider the following Tweets:

One possible description of the image is formed from the union of keywords in the Tweets' text; that is, "dog", "Australian", and "shepherd" all describe the image. If an image is repeatedly described by a term in the Tweet's text, it is likely to be about that term.

So what makes this a difficult problem? Twitter allows you to search Tweets within seconds; images and photos in tweets should be available in realtime too! Earlybird uses inverted indexes for search. While these data structures are extremely efficient, they do not support inline updates, which makes it nearly impossible to append additional keywords to indexed documents.

If timeliness was not important, we could use MapReduce jobs that periodically aggregate keyword unions and produce inverted indexes. In these offline indexes, each link to an image or photo link would be a document, with the aggregated keywords as the document’s text. However, to meet our indexing latency goals, we would have to run these MapReduce jobs every few seconds, an impractical solution.

Instead, we extended Earlybird's data structures to support efficient lookups of entities contained in Tweets. At query time, we look up the images and videos for matching Tweets and and store them in a custom hash map. The keys of the map are URLs and the values are score counters. Each time the same URL is added to the map, its corresponding score counter is incremented. After this aggregation is complete, the map is sorted and the best images and photos are returned for rendering.

What’s next?

The search team is excited to build innovative search products that drive discovery and help our users. While the new search experience is a huge improvement over pure real-time search, we are just getting started. In the coming months, we will improve quality, scale our infrastructure, expand our indexes, and bring relevance to mobile.

If you are a talented engineer and want to work on the largest real-time search engine in the world, Twitter search is hiring for search quality and search infrastructure!

Acknowledgements

The following people contributed to the launch: Abhi Khune, Abdur Chowdhury, Aneesh Sharma, Ashok Banerjee, Ben Cherry, Brian Larson, Coleen Baik, David Chen, Frost Li, Gilad Mishne, Isaac Hepworth, Jon Boulle, Josh Brewer, Krishna Gade, Michael Busch, Mike Hayes, Nate Agrin, Patrick Lok, Raghavendra Prabu, Sarah Brown, Sam Luckenbill, Stephen Fedele, Tian Wang, Yi Zhuang, Zhenghua Li.

We would also like to thank the original Summize team, former team members, hack-week contributors, and management for their contributions and support.

—@twittersearch

Thursday, May 19, 2011

Faster Ruby: Kiji Update

In March 2011, we shared Kiji, an improved Ruby runtime. The initial performance gains were relatively modest, but laid the foundation for future improvements. We continued the work and now have some excellent results.

FASTER REMEMBERED SET CALCULATIONS

In Kiji 0.10, every change to the longlife heap required full recalculation of the "remembered set," the boundary objects referenced from the longlife to the eden heap. For Kiji 0.11, we changed the calculation to an incremental model that only includes newly-allocated objects.

We made this easier by disabling garbage collection during source code parsing, which has a tendency to mutate references in place. Now, if the parser needs more memory, it merely allocates a new heap chunk. This lets us allocate all AST nodes, including those created in instance_eval, on the longlife heap. The result is a big performance boost for applications like template engines that use lots of instance_eval.

MORE OBJECTS IN LONGLIFE

For Kiji 0.11, we now allocate non-transient strings in the longlife heap, along with the AST nodes. This includes strings allocated during parsing, assigned to constants (or members of a constant hash or array), and those that are members of frozen objects. With Ruby's Kernel.freeze method, big parts of frozen objects are now evicted from the ordinary heap and moved to the longlife heap.

This change is significant. When the twitter.com web application ran Kiji 0.10, it had 450,000 live objects after garbage collection in its ordinary heap. Kiji 0.11 places over 300,000 string objects in the longlife heap, reducing the number of live objects in the ordinary heap to under 150,000. The nearly 66 percent reduction allows the heap to collect much less frequently.

SIMPLIFIED HEAP GROWTH STRATEGY

Ruby Enterprise Edition has a set of environment variables that govern when to run the garbage collector and how to grow and shrink the heaps. After evaluating Ruby’s heap growth strategy, we replaced it with one that is much simpler to configure and works better for server workloads.

As a first step, we eliminated GC_MALLOC_LIMIT. This environment variable prescribes when to force a garbage collection, following a set of C-level malloc() calls. We found this setting to be capricious; it performed best when it was set so high as to be effectively off. By eliminating the malloc limit entirely, the Kiji 0.11 garbage collector runs only when heaps are full, or when no more memory can be allocated from the operating system. This also means that under UNIX-like systems, you can more effectively size the process with ulimit -u.

0.11 now has only these three GC-tuning environment variables:

  • The first parameter is RUBY_GC_HEAP_SIZE. This parameter determines the number of objects in a heap slab. The value is specified in numbers of objects. Its default value is 32768.
  • The next parameter is RUBY_GC_EDEN_HEAPS. This parameter specifies the target number of heap slabs for the ordinary heap. Its default value is 24.

    The runtime starts out with a single heap slab, and when it fills up, it collects the garbage and allocates a new slab until it reaches the target number. This gradual strategy keeps fragmentation in the heaps low, as it tends to concentrate longer-lived objects in the earlier heap slabs. If the heap is forced to grow beyond the target number of slabs, the runtime releases vacated slabs after each garbage collection in order to restore the target size. Once the application reaches the target size of ordinary heap, it does not go below it.

    Since performance is tightly bound to the rate of eden collections (a classic memory for speed tradeoff), this makes the behavior of a long-lived process very predictable. We have had very good results with settings as high as 64.

  • The final parameter is RUBY_GC_LONGLIFE_LAZINESS, a decimal between 0 and 1, with a default of 0.05. This parameter governs a different heap growth strategy for longlife heap slabs. The runtime releases vacant longlife heap slabs when the ratio of free longlife heap slots to all longlife heap slots after the collection is higher than this parameter. Also, if the ratio is lower after collection, a new heap slab is allocated.

    The default value is well-tuned for our typical workload and prevents memory bloat.

We also reversed the order of adding the freed slots onto the free list. Now, new allocations are fulfilled with free slots from older (presumably, more densely-populated) heap slabs first, allowing recently allocated heap slabs to become completely vacant in a subsequent GC run. This may slightly impact locality of reference, but works well for us.

ADDITIONAL CHANGES

We replaced the old profiling methods that no longer applied with our improved memory debugging.

We also removed the “fastmarktable” mode, where the collector used a mark bit in the object slots. Kiji 0.11 uses only the copy-on-write friendly mark table. This lets us reset the mark bits after collection by zeroing out the entire mark table, instead of flipping a bit in every live object.

IT’S IN THE NUMBERS

We updated the performance chart from the first blog post about Kiji with the 0.11 data. As you can see, the new data shows a dramatic improvement for our example intensive workload. While Kiji 0.9 responded to all requests until 90 requests/sec and peaked at 95 responses out of 100 requests/sec, Kiji 0.11 responds to all requests until 120 requests/sec. This is a 30% improvement in throughput across the board, and 2.7x the speed of standard Ruby 1.8.

FULL ALLOCATION TRACING

We found that in order to effectively develop Kiji 0.11, we needed to add more sophisticated memory instrumentation than is currently available for Ruby. As a result, we ended up with some really useful debugging additions that you can turn on as well.

The first tool is a summary of memory stats after GC. It lets you cheaply measure the impact of memory-related changes:

The second tool is an allocation tracer (a replacement for BleakHouse and similar tools). After each GC, the runtime writes files containing full stack traces for the allocation points of all freed and surviving objects. You can easily parse this with AWK to list common object types, allocation sites, and number of objects allocated. This makes it easy to identify allocation hotspots, memory leaks, or objects that persist on the eden and should be manually moved to the longlife.

A sample output for allocation tracing, obtained by running RubySpec under Kiji:

For more information, refer to the README-kiji file in the distribution.

FUTURE DIRECTIONS

0.11 is a much more performant and operable runtime than Kiji 0.10. However, through this work we identified a practical strategy for making an even better, fully-generational version that would apply well to Ruby 1.9. Time will tell if we get to implement it.

We also would like to investigate the relative performance of JRuby.

TRY IT!

We have released the Kiji REE branch on GitHub.

ACKNOWLEDGEMENTS

The following engineers at Twitter contributed to the REE improvements: Rob Benson, Brandon Mitchell, Attila Szegedi, and Evan Weaver.

If you want to work on projects like this, join the flock!

— Attila (@asz)