Last 26th of September we had the opportunity to present a collaboration between BBVA Data & Analytics and BEEVA at the Theatre of Partners during the AWS Summit Madrid 2017.

In the conference, we presented a cost-effective approach for collaborative filtering based Recommender Systems (RS), that scales to millions of users and a million products. Our implementation made use of AWS EMR with Spark, to produce an implicit matrix factorization model of the original rating matrix. Early in the project we found out that the CPU-based implementation proved to be too expensive and impractical for the calculation of the recommendation itself -i.e. the multiplication of user times product feature vectors, which involves 10^{12 }to 10^{14 }operations. As matrix multiplication is an embarrassingly parallel operation, we decided the task could be done in GPUs.

The talk was well received, so we thought of explaining it in greater detail in a blog post. Here, we start motivating the necessity for very large RS when attacking the long tail of the product catalogue. Next, we briefly review some RS techniques, and give the details of how we implemented them on Spark and AWS EMR clusters. After showing why Spark failed on our use case, we turn back to the origin of the problem, i.e the scale of the number of recommendations needed to be made. Such a huge scale forced us to move to GPUs, and thus, we explain in the following why GPUs are most suited for this task, and detail its implementation using TensorFlow on the AWS p2.8xlarge instances -released for production use only one week before we start using them-. Afterwards, we provide some of the key aspects for going into production in the AWS cloud with these sort of technologies. Finally, we draw some conclusions and lines of future work.

### Recommender Systems and the Long-Tail

Recommender Systems (RS) are becoming ubiquitous to the most common online applications: from personalized suggestions of the next song to listen to, the movie to watch, or the next financial product to purchase, passing through the discovery of a nearby commerce for the next purchase. Such type of RS present customers with tailored suggestions, allowing companies to make known less popular – yet profitable products in the long tail of their catalog.

The long tail phenomena is an statistical property present in distributions following a power-law decay (but not limited to): there are few elements that occur with very high frequency, and a huge number that show up in a relatively small number of cases. When you plot the frequency (aka, popularity) of each element occurrence, you see a tail that appears to have no ending, hence the name long-tail. This is schematically depicted in the figure above. There are many examples of distributions exhibiting a long tail. The number of followers each user have in Twitter could be an example, with Barack Obama being certainly regarded as a short tail element in such distribution (very popular), and the authors of this post as elements in the long tail (much less popular than Obama, we reckon). Other classical examples include the distribution of sales per product in a web retailer, like Amazon, or the popularity distribution of songs and films in platforms like Spotify or Netflix.

So, why is the long tail relevant at all? It turns out to be quite important for economies of scale, as pointed out by Chris Anderson in his famous book The Long Tail: Why the Future of Business Is Selling Less of More. The basic idea is that if you have a very large product catalogue, the accumulated sales volume of rare products can make up or event overcome the sales volume of the few – well known products. As a matter of fact, this business model is the essence of several successful companies such as Amazon, Spotify or Netflix. The problem is, how do we show the entire catalogue to users? This is where Recommender Systems enter the scene: RS are tools designed for filtering non relevant information while providing personalized suggestions. Furthermore, if you want to provide users with a truly personalized experience, you must have a large product catalogue that satisfies the variety of tastes of every possible user.

### Collaborative Filtering methods

The next question is how to build a RS for niche products, in an affordable and scalable way. In case of having the history of adoptions of users (i.e., listenings, watches, clicks, purchases and so on), a well established technique is the so called Collaborative Filtering (CF) for Implicit feedback. Implicit CF methods try to predict the preference a user will have for unseen products in the catalogue, based on the observed preferences all users have had for all products. A quite famous example is Amazon’s “user who bought this product also bought these other products”.

A shortfall of this kind of CF methods (usually known as K-Nearest Neighbors, or KNN), is that it does not provide users with a recommendation for all possible products (not at a low cost, at least), but rather for the K most similar to a given user/product. However, our business model required having a recommendation for every possible product, as only a subset of the product catalogue will be available at a time. This restriction is not so rare in the business world: consider, for instance, discontinued products, that might contain valuable information about user’s preferences, and yet cannot be offered.

Such requirement can be satisfied by another, very popular, collaborative filtering method: matrix factorization. For this technique, the interactions between users and products are stacked forming a large matrix (the rating matrix), that has as many rows as users, and as many columns as products there are. See the matrix R in the figure below. Please notice that such rating matrix will be full of zeros, as the majority of user-item interactions are unknown, and it will have few elements with a 1 (indicating an observed user-item interaction). The task of Implicit Matrix Factorizations models is to complete the rating matrix R, and thus provide a prediction for the preference of a user for every product. These numeric predictions can be sorted, hence producing a list of recommendations.

Without going into too many technical details, the Matrix Factorization consist in decomposing the original rating matrix into two, smaller matrices, U and P in the figure above. The recommendation is obtained after multiplying the two of them. As these matrices are forced to be much smaller than the original one, its product will produce a rating matrix without zeros (mathematically, the decomposing matrices U and P have a rank k, with k typically around several tens or a hundred, that is much smaller than the number of users or items).

At this point, it is important to realize of the scale of the rating matrix: in our case, it consisted of tens of millions of users, and more than a million products. Thus, we needed to take an scalable, likely affordable solution to this problem.

### Deep dive into Spark’s Recommender System

There are several efficient and scalable implementations of Implicit Matrix Factorization in the industry. Among them, a prominent one is that provided by Apache Spark, a distributed data-processing engine that can be run easily on Amazon Web Services with an Elastic Mapreduce cluster.

Apache Spark implements a distributed version of the Alternating Least Square (ALS) method with Weight Regularization. In our case, we used the Scala API, as well as the implicit version of the model, designed for modelling preferences rather than ratings. We typically run our experiments on r3/r4 AWS instances, as they are optimized for memory-consuming engines, such as Spark. Cluster sizes ranged from few tens to more than a hundred executors, depending on each case.

As clean code best practice, we implemented a custom definition for every class, forcing a clear separation between the modelling and recommendation phases. These classes cover the initial mapping from source files, the calculation of a matrix factorization model, and the generation of recommendations. We would like to highlight that a proper definition of fields and right types was critical to improve the overall performance. Furthermore, we fine tune all Spark hyper-parameters, such as the level of parallelism, memory and storage fraction, JVM heap, and so on.

After all these efforts, we were able to calculate the matrix factorization model with reasonable resources (both in time and cost). However, the generation of all the required recommendations failed systematically, no matter how many executors were used. Interestingly, CPU usage during the recommendation phase always remained below 30%.

This relatively low CPU usage, allowed us to realize that ALS performance is heavily impacted by its block size. The block size determines how user and product matrices (U and P in the above description) are splitted, so that matrix multiplication can be performed in blocks using standard algebra libraries. Since Spark is a distributed system, the size of the blocks determines both the CPU usage and the amount of data that needs to be transferred among executors. Unfortunately, the block size parameter was hardcoded in the blockify method within the Spark implementation we used (and remains as such, as of Spark’s version 2.2.0), see the figure below. After performing several stress tests, a different value for the default blocksize proved a better performance. Consequently, we implemented a custom version of the bloquify method, that allowed us to parametrize the block size.

Thanks to this parametrization, we managed to increase CPU usage from 30% to 70%, not bad at all! And yet, we were unable to finish the recommendation phase with an 80-node cluster running during 6 hours. What could be going wrong?

### Rethinking the problem: CPU vs GPU

Given the efforts we made trying to optimize several -rather large and optimized- Spark clusters, we thought it would be timely to stop and think about the problem we were facing. It was clear that we were reaping all the benefits of Spark on CPU, and yet, we were unable to complete the task.

Would it be possible that the number of operations required for the recommendation were simply too much? Well, it might be. We had between 2 and 20 million users, depending on the country the RS is applied to, and more than a million items per country. Thus, the number of recommendations was something in between 2 and more than 20 trillions. That is certainly a lot, but it was difficult to understand the size of it.

To visualize how much was a trillion operations, we start looking for examples in nature that could serve as comparison. For instance, we found on the web that the number of dollars in cash all around the world was something around 100 billion. But that is not even close to a trillion! So we thought of moving to another field, where almost everything is counted with big numbers (or very small ones, but nothing intermediate): astrophysics. There, we got several examples that could help us visualizing the size of our problem -see the figure above, where the surface of each ball is proportional to the number of operations-. For instance, the number of stars that constitute the Milky Way -our galaxy, which is not that big- is estimated in 400 billion at most; not close enough. But the number of galaxies in the observable Universe is thought to be around two trillion. Bingo! So, for a medium-size country, we had to make as many recommendations as galaxies there are; and for the largest country, we had ten times that!

Therefore, given the astonishing number of recommendations we had to compute, we needed to parallelize the computation even more. And this is what Graphical Computing Units (GPUs) are made for. Perhaps, you would have heard of them in the context of video games, where they are used for processing video frames as fast as possible. The thing is that an image is nothing else but a matrix, so that was the hardware we needed. Indeed, general purpose GPUs are used nowadays for Deep Learning optimization precisely for the same reason: Deep Learning consists of a bunch of matrix multiplications.

Concerning the level of parallelism achieved by the two hardwares, Ricardo Guerrero, one of the Data Scientist at BEEVA Labs, gave as a brilliant cartoon picture that exemplifies the differences between CPUs and GPUs. CPUs are optimized for making complicated tasks one after another with low latency. It is like a Formula One car: it can run really fast, but transporting only one person at a time. On the contrary, GPUs are optimized for parallel and simple tasks (aka, matrix multiplications). So you could think of them as trailers, which are not that fast, but can transport many things at once, hence providing more bandwidth depending on the nature of the workload. Typical number of cores for a CPU ranges from 2 to 64, whereas a Nvidia Tesla K80 GPU provides 2,496. For reference purposes, the current price for a m4.16xlarge on-demand instance with 64 cores running Linux is 3.55 USD per hour, while the price for a p2.8xlarge on-demand instance with 19,968 cores running Linux is 7.78 USD per hour.

Thus, the only thing left was learning to program such GPU devices. At the very low level, this is done in frameworks like Nvidia CUDA and cuDNN. But we wanted a more productive higher level API, open source, stable and with a large community behind it. Enter TensorFlow.

On November 2015 Google released TensorFlow as an open source software library for performing numerical computation, with special focus on GPUs. Since then, it has grown to be a popular tool for developing and implementing any machine learning model. So we decided to take our chances with TensorFlow.

In order to fit into memory the matrices resulting from the factorization, we develop a simple partitioning strategy so that the matrix multiplication were performed in blocks. Also, we make use of TF Queues, which result in a 33% increase in performance. Long story short, we managed to produce all the recommendations (and sort them) in less than 2 hours! Compare this result with the 6-hours run on an 80-node cluster using Apache Spark, which as a matter of fact, never ended. For instance, the reduction in cost is above a factor of 20.

### Going into production

In order to be able to launch the defined recommendation system on a production environment, we defined a scalable, reliable and flexible architecture on Amazon Web Services. This solution is based in the foundations of cloud architecture, like infrastructure as code, automation, scalability and managed services.

We have created both scheduled and triggered AWS Lambdas to orchestrate each step in the recommendation process in a serverless fashion, from feeding the process and generating the model to getting access to the final recommended values.

This solution follows two different approaches for data processing. First, we use EMR running Apache Spark in order to calculate more general computations, i.e, generating our own custom models. For more resource hungry specific computing processes, we build our own processing clusters on Nvidia GPU based EC2 P2 instances to run TensorFlow code snippets. Those clusters grow and shrink on the basis of the workload size.

We do intensively use services as CloudFormation and EC2 Container Registry, both from a infrastructure and a code management point of view to automatize the creation and management of cloud resources. It is important to notice that due to the characteristics our recommendation system, scalability was one of the main pain points we needed to address. Being able to configure and provide our infrastructure, specially GPU instances, as code allows us to fit production processing workloads in a matter of properties configuration.

### Lessons learned and next steps

We consider that one of the key strategies for succeeding with this use case was involving Data Science and Data Engineering teams from the very beginning. We often find projects where one of the two sides is dismissed and taken into consideration too late in the delivery chain, leading to wrong formal verification techniques, unaccurate algorithms, non-scalable solutions, costly infrastructure, security concerns or poor performance. Keeping in mind both mathematical and engineering aspects when building from the scratch is a warranty in the long term.

Speaking about scalability, Big Data complexity has been traditionally measured in volume; the bigger your data source is, the fancier your system is. That is not the case here, as we were handling a relatively small dataset (TByte scale), but we had to compute trillions of vector operations with them. This is where traditional CPU-based frameworks fail badly as GPUs can run thousands of mathematical operations in parallel whereas current CPUs can only run less than one hundred. GPUs have disrupted the Data Science and Big Data frameworks landscape. In our opinion, those providing better support for them will lead the race in the coming years.

This trend has become more and more obvious in the last months with the exponential popularity rise of GPU frameworks like TensorFlow and MxNet. But it was not so crystal clear one year ago when we started this collaboration. For instance, AWS P2 GPU instance family was announced for production use just one week before we started. Being able to develop using state of the art technologies and try fast at low cost using a cloud provider was a huge advantage for time-to-market reduction. However, our container approach architecture faced some challenges; for instance, it was not easy, and still is not, to deploy customized GPUs instances using Docker containers. Also, from the perspective of an end-to-end solution, we had to implement ad-hoc monitoring and scaling solutions.

Regarding our next steps, although the presented solution is cheap and scalable, standard matrix factorization Recommender Systems such ALS-WR are known to provide recommendations of moderate quality for very sparse rating matrices. Thus, our current work focuses on the implementation of full GPU-based, neural network collaborative filtering methods that, leveraging the AWS infrastructure, provides relevant personalized long tail recommendations. Indeed, our architecture has been built to support a multi-model schema, so that custom developments might be included within the image registry and be further deployed to new processing pipelines.

### Acknowledgment

We would like to thank all the teams that make this project a reality: Payments Business Unit at BBVA, the engineering and support teams at BEEVA, Data and Open Innovation Platform and the payments team at BBVA Data & Analytics.

Jessica Olmos, Cristian Galán, Valentín Quintas, Pedro García,

Roberto Andradas, Iván Fernández and César Silgo from BEEVA

Juan Ramón Duque, Marco Creatura, Javier García Santamaría

and Juan Arévalo from BBVA Data & Analytics