Data partitioning optimization in Apache Druid. Part I.
If you are going to use (or already use) Apache Druid, sooner or later you will encounter performance issues, so that you will have to tune the process of ingesting data. In this short two article series I will outline the most important things that you need to consider, and what might be tricky when you start diving into the Druid ingestion process.
Firstly, we will cover the things that you should examine when preparing for data ingestion in your Apache Druid. Though this article is mostly for people that are already familiar with Apache Druid basics, let’s start with some basics to get us all on the same page:
- Segment — the smallest unit of storage in Apache Druid. Data in Druid has to be partitioned into time chunks (the range of the time chunk is called the segment granularity), and these time chunks are divided into segments. The target segment size should be in the range of 300–700mb. Segments in Druid are immutable.
- Query granularity — a parameter directly impacting the possible roll-up ratio. It is also the maximum resolution for which we will be able to query the data (can’t be higher than the segment granularity).
- Ingestion — process of uploading data into Apache Druid.
Roll-up ratio — one of the most meaningful factors to consider when ingesting data into Druid. It can be described as the ratio of the number of events before and after ingestion. When we are ingesting data into Druid, it is being aggregated based on the timestamp (chosen query granularity) and all of the dimensions that were specified in the ingestion specification. It can be described as a simple SQL group by statement.
- Druid basic ingestion partitioning types (see table below) — there are three main types of partitioning available in Apache Druid. Dynamic partitioning is based on the number/size of the rows in the segments and is characterized by a single step of ingestion; hashed partitioning will improve data locality but requires an additional ingestion step; single dimension partitioning uses one of the dataset fields for partitioning the data (though potentially being highly beneficial, it may also lead to a skewed segment distribution).
Data exploration — three most important questions
Choosing and configuring the right type of partitioning is one of the most important factors required to achieve both solid query performance and possible low ingestion time. When choosing the right approach, you should first explore your data and understand the way that your system will be used.
Is there a need to make periodic updates to your data? Are we still in the process of early development?
Data stored in Druid is immutable, so there is no direct means of editing single rows of data. It’s possible to bypass it by using lookups or (since Druid 0.18) joins, which can partially cover updates, but each of these has their limitations — both lookups and joins should operate on small data as they impact performance. Going for lookups or joins means that we also have to maintain another datasource (more on lookups and joins).
Typically, there is always a need to perform some kind of updates, which are required not only by specific parts of our system (e.g. updating statuses, adding new information, changing dates) but also because of adding new features. This is especially important during development of our system. In most cases when we are using Druid, we are also implementing technology that will assist in proper preparation of data (e.g. one of the most widely used — Apache Spark). As you may have guessed, this can lead to frequent ingestion of data to Druid. This is because it’s hard to establish all of the system requirements at one time and maintain them throughout the project lifecycle. It’s also common that we want to show the system to users before all of the features have been developed.
We can take as an example a system that collects data from website activity. At first, all we need is to know which parts of the website are the most viewed, what is most important to users, how many new users we get every day, what is the user retention, etc. That can be our first step (Minimum Viable Product) but at some point we are probably going to enhance our system and implement something like sessionization logic, for instance, which will mark the events performed by one user during a single session on our website. For this purpose, Apache Spark seems like a perfect fit (if you want to dig into this process, I can recommend looking at the streaming and batch sessionization articles), but after performing Spark calculations to prepare data we need to reingest the data into Druid to add those newly calculated fields to our dataset (like session id, sequential number of events in the session, etc.). As you might imagine, this situation can occur more than once (requirements tend to change and evolve over time). Based on this, in most cases it is recommended to start with dynamic partitioning, and change it later to hashed or single dimension partitioning as required. Although dynamic partitioning is faster, it comes with some inevitable drawbacks — by using this type we won’t achieve perfect roll-up.
It is worth mentioning that at some point the system will become much more stable, and periodical ingestion due to implementation of new features is going to occur much less often. If at that point we don’t need to do regular reingestion, then it is worth considering hashed partitioning.
What are the most important things from the user perspective? Understand your data!
This is something that we developers often have a hard time implementing properly — the system that we are developing needs to be efficient, not only from a technical point of view but also from a user perspective. A solution that seems ideal from a pure infrastructure and data perspective won’t always be the best from the standpoint of real users. It is worth investing some time to properly understand the data and user requirements.
Data partitioning is very important in all of the systems that are processing data — in Druid we have obligatory partitioning by event timestamp, but we can also use single dimension partitioning (data in a single segment can be partitioned additionally by one of the dimensions). In this case it is crucial to understand the users’ requirements, because picking the correct field can impact our system performance. Druid core queries are based on time series and group by — if we can pick some of the dimensions that are used very often in queries and add it to partitioning, this can provide a marked performance boost. In the case of our web application, one of the natural candidates for partitioning dimension can be the source (like mobile/tablet/PC). We can consider aiming for benefits from single dimension partitioning, but we have to bear in mind that the dimension used for this should not only be frequently used but also equally distributed among the data (to avoid segment size skewness).
The second thing to consider is what data is important from a user’s perspective. We often seek to put as much information as we can into our dataset, and in most cases it is not possible to achieve good performance with this approach. When designing the structure of events of our system, we may be trying to cram in as much information as we can, which can sometimes result in hundreds or even thousands of possible different values arising from each event added to our database. Do we need such a large amount of columns? Will anyone be able to reach any interesting conclusions based on hundreds of columns? It seems unlikely, and that is why you should consider limiting the columns that you want to put into Druid to the ones that are really necessary (or divide them into multiple datasets). The fewer columns you put in Druid, the better your rollup ratio will be and the smaller the size of data. This doesn’t mean that you have to abandon all of the other information, however — you can still store them and use some fast SQL solution on top (like Apache Presto or Hive with Spark) to extract data if needed (e.g. for purposes of Machine Learning or user support).
Can you achieve a good rollup ratio in your data?
In the case of this parameter, the biggest impact comes from query granularity, the number of dimensions we want to keep in our dataset, and their cardinality. Typically, when we want to ingest a dataset with hundreds of columns, our rollup ratio will likely be poor (so there is no need to go for hash partitioning, as we won’t benefit from it). It is hard to establish solid guidelines to indicate when rollup ratio is poor, but we can assume that anything below two is not worth consideration, as hashed partitioning is the slowest of all the ingestion types.
Technical aspects of partitioning during data ingestions — chosen aspects
In the above section we spoke about data exploration and its impact on choosing the right ingestion method for Druid. From a technical point of view, most of the important parameters are already well covered in Apache Druid documentation, but let’s now dive into some chosen areas:
Resources — historical vs middle manager / indexers
I like to refer to those nodes as the ones that are the foundation of the Apache Druid system. The first one is responsible for querying the historical data (historical data is the one outside of the real time analytics window, which is typically 99% of data stored in Druid) and the second is for querying the real time data and ingestion of both historical and real time data.
So here comes the question, which is also related to the ones above. How much resources can I divide between those nodes? Can I allocate enough resources to maintain good performance of queries on historical data while still being able to perform reingestions?
If we are low on resources for middle managers we may not be able to achieve perfect rollup during data ingestion. Here the most crucial thing will be our rollup ratio — is it worth investing more time and resources to achieve that? Typically, if we have a rollup ratio below two, it’s not worth aiming for perfect rollup, as it takes much more time to ingest.
On the other hand, if we put too many resources on the middle manager at the cost of historicals, it will make the data harder to access, which is actually a trait of any system built on top of Druid.
Best-effort vs perfect rollup
Rollups can be divided into two main types: best effort and perfect. Perfect rollup requires an additional step in the ingestion process , which makes it slower, but on the other hand it usually leads to better query performance and lower data size.
As I mentioned earlier, perfect rollup in most cases will achieve better results than best-effort, but at the cost of resources and ingestion time. Perfect rollup means that our data can’t be rolled better after ingestion (we are achieving the best rollup based on chosen query granularity and selected columns). If we are going for perfect rollup we need to perform two-step ingestion. In the first step, we ingest data into Druid and in the second part (called merge) we merge the results from the first step to achieve perfect rollup and a given segment size . As you can imagine, this heavily impacts the ingestion time (in the second article I will provide some real-life comparisons regarding efficiency).
On the other hand best-effort rollup consists of just a single step (the same as the first step from perfect rollup ingestion). This can greatly reduce the ingestion time if we are willing to compromise on rollup efficiency. To minimize the difference you should…
…prepare your date prior to ingestion
When you choose best-effort roll-up you have to take certain things into consideration (which are not that important when going for perfect rollup):
- At least one segment must be created from a single input file. We can’t make one Druid segment from two input files (which is not the case in perfect rollup, as we have the merge stage).
- Partition the data in accordance with the expected segment size (data should be stored in a quantity equal to or lower than the expected number of segments — prefered segment size = 400–700 mb)
- Sort the data by timestamp prior to ingestion. If we can do this, then rollup will be better, as data from the same time range will be in the same file. Of course, it heavily depends on the chosen segment and query granularity, because it may in fact turn out that data from within a single query granularity size will be shared among multiple files.
- Choose a proper data format for ingestion. Right now, there are plenty of formats that we can choose from. In terms of pure ingestion performance there is no major difference between formats, but it will definitely impact the time needed to transfer files to middle managers. Consider using something other than JSON (like Parquet or Avro — which will be also less demanding for your preprocessing application and better suited for fast SQL solutions, if required)).
There are a lot of factors that should be considered when planning druid data ingestion into Druid. As mentioned before, Apache Druid has a very good documentation that clearly explains the role of parameters — but before you dig into the parameters of ingestion, firstly you need to choose the proper type of data partitioning. In the second article I will present the way we handled ingestion in one of our projects, along with chosen parameters and explanations. I will walk you through the detailed process of optimization of druid ingestions on real-life production examples (from zero to hero). A summary of the factors outlined above, which can impact the process of choosing the best ingestion scheme, are contained in the table below (these are just suggestions, applicable to most data, but corner cases may differ).