How Airbnb constructed a persistent, excessive availability and low latency key-value storage engine for accessing derived information from offline and streaming occasions.
By: Chandramouli Rangarajan, Shouyan Guo, Yuxi Jin
Inside Airbnb, many on-line providers want entry to derived information, which is information computed with massive scale information processing engines like Spark or streaming occasions like Kafka and saved offline. These providers require a top quality derived information storage system, with robust reliability, availability, scalability, and latency ensures for serving on-line site visitors. For instance, the consumer profiler service shops and accesses real-time and historic consumer actions on Airbnb to ship a extra customized expertise.
On this publish, we are going to discuss how we leveraged plenty of open supply applied sciences, together with HRegion, Helix, Spark, Zookeeper,and Kafka to construct a scalable and low latency key-value retailer for tons of of Airbnb product and platform use instances.
Over the previous few years, Airbnb has advanced and enhanced our assist for serving derived information, transferring from groups rolling out customized options to a multi-tenant storage platform known as Mussel. This evolution might be summarized into three levels:
Stage 1 (01/2015): Unified read-only key-value retailer (HFileService)
Earlier than 2015, there was no unified key-value retailer resolution inside Airbnb that met 4 key necessities:
- Scale to petabytes of knowledge
- Environment friendly bulk load (batch technology and importing)
- Low latency reads (<50ms p99)
- Multi-tenant storage service that can be utilized by a number of prospects
Additionally, not one of the current options have been in a position to meet these necessities. MySQL doesn’t assist bulk loading, Hbase’s large bulk loading (distcp) shouldn’t be optimum and dependable, RocksDB had no built-in horizontal sharding, and we didn’t have sufficient C++ experience to construct a bulk load pipeline to assist RocksDB file format.
So we constructed HFileService, which internally used HFile (the constructing block of Hadoop HBase, which relies on Google’s SSTable):
- Servers have been sharded and replicated to handle scalability and reliability points
- The variety of shards was fastened (equal to the variety of Hadoop reducers within the bulk load jobs) and the mapping of servers to shards saved in Zookeeper. We configured the variety of servers mapped to a selected shard by manually altering the mapping in Zookeeper
- A day by day Hadoop job reworked offline information to HFile format and uploaded it to S3. Every server downloaded the information of their very own partitions to native disk and eliminated the previous variations of knowledge
- Totally different information sources have been partitioned by major key. Shoppers decided the right shard their requests ought to go to by calculating the hash of the first key and modulo with the entire variety of shards. Then queried Zookeeper to get a listing of servers that had these shards and despatched the request to one in every of them
Stage 2 (10/2015): Retailer each real-time and derived information (Nebula)
Whereas we constructed a multi-tenant key-value retailer that supported environment friendly bulk load and low latency learn, it had its drawbacks. For instance, it didn’t assist level, low-latency writes, and any replace to the saved information needed to undergo the day by day bulk load job. As Airbnb grew, there was an elevated have to have low latency entry to real-time information.
Subsequently, Nebula was constructed to assist each batch-update and real-time information in a single system. It internally used DynamoDB to retailer real-time information and S3/HFile to retailer batch-update information. Nebula launched timestamp primarily based versioning as a model management mechanism. For learn requests, information can be learn from each a listing of dynamic tables and the static snapshot in HFileService, and the end result merged primarily based on timestamp.
To reduce on-line merge operations, Nebula additionally had scheduled spark jobs that ran day by day and merged snapshots of DynamoDB information with the static snapshot of HFileService. Zookeeper was used to coordinate write availability of dynamic tables, snapshots being marked prepared for learn, and dropping of stale tables.
Stage 3 (2018): Scalable and low latency key-value storage engine (Mussel)
In Stage 3, we constructed a system that supported each learn and write on real-time and batch-update information with timestamp-based battle decision. Nevertheless, there have been alternatives for enchancment:
- Scale-out problem: It was cumbersome to manually edit partition mappings inside Zookeeper with rising information progress, or to horizontally scale the system for rising site visitors by including extra nodes
- Enhance learn efficiency beneath spiky write site visitors
- Excessive upkeep overhead: We would have liked to keep up HFileService and DynamoDB on the similar time
- Inefficient merging course of: The method of merging the delta replace from DynamoDB and HFileService day by day grew to become very sluggish as our whole information dimension grew to become bigger. The day by day replace information in DynamoDB was simply 1–2% of the baseline information in HFileService. Nevertheless, we re-published the complete snapshot (102% of whole information dimension) again to HFileService day by day
To unravel the drawbacks, we got here up with a brand new key-value retailer system known as Mussel.
- We launched Helix to handle the partition mapping inside the cluster
- We leveraged Kafka as a replication log to copy the write to the entire replicas as a substitute of writing on to the Mussel retailer
- We used HRegion as the one storage engine within the Mussel storage nodes
- We constructed a Spark pipeline to load the information from the information warehouse into storage nodes instantly
Let’s go into extra particulars within the following paragraphs.
Handle partitions with Helix
In Mussel, with the intention to make our cluster extra scalable, we elevated the variety of shards from 8 in HFileService to 1024. In Mussel, information is partitioned into these shards by the hash of the first keys, so we launched Apache Helix to handle these many logical shards. Helix manages the mapping of logical shards to bodily storage nodes mechanically. Every Mussel storage node may maintain a number of logical shards. Every logical shard is replicated throughout a number of Mussel storage nodes.
Leaderless Replication with Kafka
Since Mussel is a read-heavy retailer, we adopted a leaderless structure. Learn requests may very well be served by any of the Mussel storage nodes which have the identical logical shard, which will increase learn scalability. Within the write path, we wanted to think about the next:
- We need to easy the write site visitors to keep away from the impression on the learn path
- Since we don’t have the chief node in every shard, we want a approach to ensure every Mussel storage node applies the write requests in the identical order so the information is constant throughout completely different nodes
To unravel these issues, we launched Kafka as a write-ahead-log right here. For write requests, as a substitute of instantly writing to the Mussel storage node, it’ll first write to Kafka asynchronously. We’ve 1024 partitions for the Kafka subject, every partition belonging to at least one logical shard within the Mussel. Every Mussel storage node will ballot the occasions from Kafka and apply the change to its native retailer. Since there isn’t a leader-follower relationship between the shards, this configuration permits the right write ordering inside a partition, making certain constant updates. The disadvantage right here is that it might probably solely present eventual consistency. Nevertheless, given the derived information use case, it’s an appropriate tradeoff to compromise on consistency within the curiosity of making certain availability and partition tolerance.
Supporting each learn, write, and compaction in a single storage engine
With the intention to cut back the {hardware} price and operational load of managing DynamoDB, we determined to take away it and prolong HFileService as the one storage engine to serve each real-time and offline information. To higher assist each learn and write operations, we used HRegion as a substitute of Hfile. HRegion is a totally practical key-value retailer with MemStore and BlockCache. Internally it makes use of a Log Structured Merged (LSM) Tree to retailer the information and helps each learn and write operations.
An HRegion desk incorporates column households, that are the logical and bodily grouping of columns. There are column qualifiers inside a column household, that are the columns. Column households include columns with time stamped variations. Columns solely exist when they’re inserted, which makes HRegion a sparse database. We mapped our consumer information to HRegion as the next:
With this mapping, for learn queries, we’re in a position to assist:
- Level question by wanting up the information with major key
- Prefix/vary question by scanning information on secondary key
- Queries for the most recent information or information inside a selected time vary, as each real-time and offline information written to Mussel can have a timestamp
As a result of we have now over 4000 consumer tables in Mussel, every consumer desk is mapped to a column household in HRegion as a substitute of its personal desk to scale back scalability challenges on the metadata administration layer. Additionally, as HRegion is a column-based storage engine, every column household is saved in a separate file to allow them to be learn/written independently.
For write requests, it consumes the write request from Kafka and calls the HRegion put API to write down the information instantly. For every desk, it might probably additionally assist customizing the max model and TTL (time-to-live).
Once we serve write requests with HRegion, one other factor to think about is compaction. Compaction must be run with the intention to clear up information that’s deleted or has reached max model or max TTL. Additionally when the MemStore in HRegion reaches a sure dimension, it’s flushed to disk right into a StoreFile. Compaction will merge these information collectively with the intention to cut back disk search and enhance learn efficiency. Nevertheless, alternatively, when compaction is working, it causes greater cpu and reminiscence utilization and blocks writes to forestall JVM (Java Digital Machine) heap exhaustion, which impacts the learn and write efficiency of the cluster.
Right here we use Helix to mark Mussel storage nodes for every logical shard into two forms of assets: on-line nodes and batch nodes. For instance, if we have now 9 Mussel storage nodes for one logical shard, 6 of them are on-line nodes and three of them are batch nodes. The connection between on-line and batch are:
- They each serve write requests
- Solely on-line nodes serve learn requests and we fee restrict the compaction on on-line nodes to have good learn efficiency
- Helix schedules a day by day rotation between on-line nodes and batch nodes. Within the instance above, it strikes 3 on-line nodes to batch and three batch nodes to on-line so these 3 new batch nodes can carry out full pace main compaction to scrub up previous information
With this variation, now we’re in a position to assist each learn and write with a single storage engine.
Supporting bulk load from information warehouse
We assist two forms of bulk load pipelines from information warehouse to Mussel by way of Airflow jobs: merge kind and exchange kind. Merge kind means merging the information from the information warehouse and the information from earlier write with older timestamps in Mussel. Exchange means importing the information from the information warehouse and deleting all the information with earlier timestamps.
We make the most of Spark to rework information from the information warehouse into HFile format and add to S3. Every Mussel storage node downloads the information and makes use of HRegion bulkLoadHFiles API to load these HFiles into the column household.
With this bulk load pipeline, we are able to simply load the delta information into the cluster as a substitute of the complete information snapshot daily. Earlier than the migration, the consumer profile service wanted to load about 4TB information into the cluster day by day. After, it solely must load about 40–80GB, drastically decreasing the associated fee and enhancing the efficiency of the cluster.
In the previous few years, Airbnb has come a great distance in offering a high-quality derived information retailer for our engineers. The latest key-value retailer Mussel is extensively used inside Airbnb and has develop into a foundational constructing block for any key-value primarily based software with robust reliability, availability, scalability, and efficiency ensures. Since its introduction, there have been ~4000 tables created in Mussel, storing ~130TB information in our manufacturing clusters with out replication. Mussel has been working reliably to serve massive quantities of learn, write, and bulk load requests: For instance, mussel-general, our largest cluster, has achieved >99.9% availability, common learn QPS > 800k and write QPS > 35k, with common P95 learn latency lower than 8ms.
Although Mussel can serve our present use instances properly, there are nonetheless many alternatives to enhance. For instance, we’re wanting ahead to offering the read-after-write consistency to our prospects. We additionally need to allow auto-scale and repartition primarily based on the site visitors within the cluster. We’re wanting ahead to sharing extra particulars about this quickly.
Mussel is a collaborative effort of Airbnb’s storage staff together with: Calvin Zou, Dionitas Santos, Ruan Maia, Wonhee Cho, Xiaomou Wang, Yanhan Zhang.
Concerned with engaged on the Airbnb Storage staff? Take a look at this position: Staff Software Engineer, Distributed Storage