One of the most common issues we've seen in retail is that decision-makers are stuck with reports that take hours to run. To give that problem a technical spin, we often hear the performance tuning mantra: “The fastest function call is the call that’s never made.”
Let’s apply the principle to data processing. This should mean we favor pre-computing information over costly aggregates at run time. But in practice, retailers often struggle with pre-computation because of the complexity of user experience design and the dynamic nature of the metrics themselves.
Consider the periodic portfolio review cycle: the purchasing department (buyers) have to decide which products are performing better than others and suggest changes to their product ranges based on their understanding of customer demand. This process can take weeks to months; the buyers have to analyze hundreds of matrices across different time periods before taking this decision. When we compare these matrices across time, we have to normalize the value to accommodate for events like opening/closing of stores in a region. If you have about three years of data in the system, the combination of different time periods and matrices make per-computation difficult.
As a result, most retailers end up running analytical workloads as batch processes inside their data warehouse — with all the latency that entails.
These are exactly the challenges that we faced in one of our large retail engagements. In this article, we’ll explore the approaches we took to deliver rapid retail analytics using solutions based on open source technology. We’ll also share some of the lessons we’ve learned from building the system and maintaining it for the past four years.When it comes to analyzing data, the volumes will vary from retailer to retailer; some may need to analyze a few gigabytes, others may have terabytes and beyond. In our use case, the retailer had about ten terabytes in their data warehousing system. That’s a lot of data. We started by trying to reduce that, using whiteboarding and tracing the source of data. That allowed us to identify redundant copies of data, as well as instances of aggregates that weren’t relevant to the problems we were trying to solve.
We were left with a data pool of about one terabyte, which you could argue isn’t sufficiently large to qualify as ‘big data’. But it is big enough to stretch the relational database solutions for responsive analytics. We tried a few options — Spark, Hbase, and monetdb — but finally selected R.One of the factors which favored R was its data manipulation capabilities. Even at the prototype stage, we could appreciate the expressive nature of the language and were able to concisely represent our model. At the start of our engagement, R was widely viewed as being solely for interactive use and not at all ideal for ‘server’ use. Today, that situation is changing — but even so, the fact that it runs on a single thread of the CPU — which in theory limits its performance — was seen as making it ill-suited for server-side analytic processing.
In fact, being single threaded by itself isn’t a serious concern. We realized we could overcome the resource limitation by using multiple R processes behind a load balancer. The publication of the COST paper — which argues that a cleverly written single thread app can outperform a large cluster — has done much to establish the possibilities of the single threaded paradigm. Without doubt, we’d have found it much easier convincing our stakeholders that R was the right choice if that paper had been published a little earlier!Another big plus for R is its out-of-the-box capability to manipulate columnar data via data frames. This has been enhanced further by the work of Matt Dowle and others, with their work on data.table, which make incredible improvements in memory and compute efficiency for very large data sets.
For those of you interested in comparing data.table’s group performance with other options in R, such as dplyr, or in Python, where there’s pandas, we recommend this extensive report. As with all benchmarks, the numbers will vary by use case, so remember to test and profile the performance in the context of your data problem.The rapid improvements in memory also played into our thinking when it came to the project design. These days, we think nothing of getting over a terabyte of RAM on a single host. That mattered to us because infrastructure sizing demands that you strike a delicate balance between operational cost, complexity, performance and business needs. The kind of data analytics metrics we were after required random scans, aggregates and lots of look-up tables. And because RAM is faster than disk by orders of magnitude, it was best suited to the kinds of data operations we would encounter.
Ultimately, we went with a cluster of nodes with enough RAM to hold our entire data set in memory.So far, we have discussed general techniques of using a load balancer to overcome single-threaded nature of R and the speed of the data.table package when working with data in memory. To maximize the business benefits of this setup, we looked at how we could apply our deep knowledge of retail data so that we could identify levers that would enable us to fine tune the system. The two most important levers we found are granularity and partition.
Granularity of aggregates: For most use cases, a daily or weekly aggregate of sales, stock and movements data would be sufficient. Since granularity is inversely proportional to data volume, pick the right level of aggregates to suit your needs.
Partition: No matter what granularity you choose it is always better to partition them so that you don’t have to hold the entire data set in a single instance of R. We found it easier to partition by time so every single R process will hold one week of data as per the defined granularity.With the right granularity and partition, we’re able to scale the solution across multiple machines both horizontally and vertically. This in effect became a full-blown distributed system — and that means coping with failures at various levels.
One benefit of working with an analytical system is that by its nature, it’s not ‘transactional’ — so we could afford a few seconds of downtime. R enables us to take snapshots of current working sessions, which helped us when it came to fault tolerance.
The data pipeline would create R snapshots during data load; the R processes are spawned from these snapshots and respond to requests. In case of failure, we can spin up additional R instances from these snapshots in a matter of seconds. Given that our retail data was only changing every few hours, downtime of a few seconds is acceptable. If the frequency of change is higher — or you want to deal with real-time data — the snapshot approach may not be practical.Programming in a distributed system can get tricky very quickly. With so many moving parts we decided to embrace shared-nothing architecture. This means that each R node is unaware of the existence of any other R nodes. We were still left with one problem: the control node should be aware of which R process holds what partition of data. We solved that with a simple convention of what year week should listen on what port and what node - if the setup is much more complicated we would have gone with some form of service discovery.
You can think of this paradigm as some kind of Map Reduce where individual R partitions act like map and the control node act as the reducer.
Having partitioned the data and having a single R process for each partition, our setup looks like this:
Though MapReduce is usually associated with Hadoop, the paradigm itself is both simple and sufficiently responsive to make it suitable for a wide variety of problems. Because we have partitioned the data, our setup has enough data parallelism built in to successfully leverage the MapReduce paradigm.As a result of running our data analytics in R, we were able to cut reporting times for our client massively. The simulation and reports that previously took between three to six hours are now done in less than 20 seconds. The system had been in production since 2014 and had dramatically improved the retailer’s decision making capabilities.
- Track data to its source. Usually, in a legacy system, the total volume of data required to solve the problem is at least few orders of magnitude larger than what is needed
- The single threaded model is more powerful many realize. If you were to consume more resources, consider a load balancer across multiple forked processes to scale horizontally
- RAM is faster than disk and getting more affordable. Consider keeping as much data in RAM as possible
- Embrace immutable server. Spin up a new one in case of failure from snapshots
- Consider MapReduce as programming paradigm for distributed R models