Guest post by Daniel Bernao, Startup Solutions Architect Manager, AWS
Data storage, transformation, and analysis are parts of the core business of many startups across the world. Data analysts, data scientists, and data engineers use the popular Pandas and NumPy tools as their tooling of choice to work with data in their Jupyter notebooks and Python environments. However, when working with large files in Pandas, memory limitations start to become a data processing bottleneck. As Pandas takes the data it wants to manipulate and tries to fit it into the machine’s memory, it often needs 5 to 10 times more memory than the original data file size.
A common solution to this problem typically is to upgrade specs of the machine. This creates more cost and under-utilizes the processing capability since the majority of instances with large amounts of memory also feature multiple CPU cores. (Pandas just uses 1 core). Another solution is to move to distributed environments in Hadoop / Spark and use several more machines to handle and manipulate data.
Most startups are cost conscious and cannot always dedicate infrastructure resources to build a Hadoop/Spark environment or to purchase new machines with more memory. Of course, techniques like filtering data on loading time or chunking can help, but this is more pain for the already overloaded data engineers.
On AWS, by pairing Amazon SageMaker with memory-mapped out-of-core libraries, startups can work with huge files in cost effective environment while preserving flexibility and freedom of choice. In this post we will walk through how to achieve this with a practical example built on Jupyter notebooks. Amazon SageMaker provides a managed Jupyter notebook service in which customers can choose the mix of compute specs according to a set of instance families. For our examples, we are going to work with the most basic type of instance and show how to work with large files – much larger than the memory size of the machine. Of course, if you can use more powerful instances types, you will benefit from more capabilities (fast processing, computing power, I/O capabilities) and will deliver a faster workflow, but we are going to try to keep our cost as low as possible.
Worth mentioning is that once data is analyzed, manipulated, and cleaned, it can very easily be used for training algorithms to create models from the data. When training, Amazon SageMaker allows you to select the type of instance to be used for the training and bills only for the seconds you used for it training. So, you don’t train in the Notebook’s instance itself. This is really important, as Amazon SageMaker allows you to decouple the infrastructure used for data analysis in Jupyter notebook from the infrastructure to use in the training, which further improves cost efficiencies.
The memory bottleneck scenario
Let’s work with a basic example: I have decided to launch the most basic instance type of the set of managed Jupyter instances in Amazon SageMaker (read documentation here on how to launch a managed notebook instance): The “ml.t2.medium” instance type, providing 2 vCPUs and 4GiB of memory. At the time this post is being written, it has a cost of $0.0464 per hour. This is around $33.40 per month if running 24/7.
Also, I have downloaded into the instance some files from the NYC Taxi Public data sets. Each file from the taxi dataset is around 1.4Gb.
Now, what happens if I try to load a couple of those files with Pandas to try to manipulate them and potentially join them?
` MemoryError ` Our current memory can’t handle it. Loading two files that are around 1.5GB in size, with the current memory setup will yield an error. See the extract examples below from the Jupyter notebook in this Github repository.
```python import pandas as pd ``` ```python !ls -l tax* ``` -rw-rw-r-- 1 ec2-user ec2-user 1378874062 Jul 18 22:21 taxiapril.csv -rw-rw-r-- 1 ec2-user ec2-user 1279994273 Jul 18 22:20 taxifeb.csv -rw-rw-r-- 1 ec2-user ec2-user 1514101716 Jul 18 22:21 taximarch.csv -rw-rw-r-- 1 ec2-user ec2-user 1416176511 Jul 18 22:22 taximay.csv ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 621M 3.2G 64K 20M 108M -/+ buffers/cache: 492M 3.4G Swap: 0B 0B 0B Total: 3.9G 621M 3.2G ```python %time dfapril = pd.read_csv("taxiapril.csv") ``` CPU times: user 19.5 s, sys: 2.34 s, total: 21.9 s Wall time: 28.4 s ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 2.6G 1.2G 64K 68M 418M -/+ buffers/cache: 2.1G 1.7G Swap: 0B 0B 0B Total: 3.9G 2.6G 1.2G ```python dfmarch = pd.read_csv("taxifeb.csv") ``` --------------------------------------------------------------------------- MemoryError Traceback (most recent call last)
What are out-of-core and memory mapping concepts?
Out-of-core is a term that references the processing of data that is too large to fit into a computer’s main memory.
Memory mapping is common technique for a process to access files directly ingesting file data into the process address space. The use of mapped files may significantly reduce I/O data transit, as the file data does not have to be copied into process data buffers, operating under read and write operations. The benefits of using memory mapping techniques are not only fast file access but also the capability to share memory between applications without to duplicate I/O operations on storage devices and the usage of lazy loading using small chunks of RAM for a very large file.
Vaex and Dask are two Python libraries that can help with this scenario. It is really simple to start using them in your Amazon SageMaker managed instance. Each have their specific capabilities, so let’s go over the details of each one with a specific Jupyter notebook.
While we will be talking about Vaex and Dask, there are other out-of-core tools to extend Pandas that you might want to check. Take a look into Pandas ecosystem documentation here.
Using Vaex in Amazon SageMaker
Vaex is a library that in its own words states: “Vaex is a Python library for lazy Out-of-Core DataFrames (similar to Pandas), to visualize and explore big tabular datasets. It can calculate statistics such as mean, sum, count, standard deviation etc, on an N-dimensional grid up to a billion objects/rows per second.”
I found it really useful to read the paper in Arxiv where the authors of Vaex describe their work on the library in detail. In case you’re wondering, the whole concept from Vaex comes from the analysis of astronomical data, with Pandas-like APIs. The lazy evaluation is key concept for Vaex. Vaex works best with hdf5 file type. It includes methods for converting data easily into hdf5 and also contains a method to open files directly from Amazon S3.
In the example below, we’re working in the same ml.t2.medium managed notebook instance with 2 vCPU and 4GiB of memory. Notice how opening the same NYC Taxi files does not have a strong memory impact, and that the speed of the operations is fast, considering the size of the files. More operations and the entire Jupyter notebook used for the example can be found in this Github repo.
With Vaex, we are keeping our data analysis cost low while analyzing datasets larger than memory via the familiar Pandas APIs. And it is really fast. Remember though, instance performance is not only memory related, and you might need to assess other type of instance types to balance your CPU power, memory, and I/O requirements, to find the right level of performance vs cost effectiveness.
```python import vaex ``` ```python %time vaexdffeb = vaex.open('taxifeb.hdf5') ``` CPU times: user 500 ms, sys: 48.5 ms, total: 548 ms Wall time: 548 ms ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 3.7G 184M 64K 1.5G 1.1G -/+ buffers/cache: 1.0G 2.8G Swap: 0B 0B 0B Total: 3.9G 3.7G 184M ```python %time vaexdfapr = vaex.open('taxiapril.hdf5') ``` CPU times: user 8.17 ms, sys: 0 ns, total: 8.17 ms Wall time: 7.6 ms ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 3.7G 184M 64K 1.5G 1.1G -/+ buffers/cache: 1.0G 2.8G Swap: 0B 0B 0B Total: 3.9G 3.7G 184M ```python %time df1 = vaexdffeb.fillna(value=0, column_names=['SR_Flag']) ``` CPU times: user 1.96 ms, sys: 0 ns, total: 1.96 ms Wall time: 1.98 ms ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 3.7G 183M 64K 1.5G 1.1G -/+ buffers/cache: 1.0G 2.8G Swap: 0B 0B 0B Total: 3.9G 3.7G 183M ```python %time dsvaex = vaexfroms3.fillna(value=0, column_names=['SR_Flag']) ``` CPU times: user 1.47 ms, sys: 0 ns, total: 1.47 ms Wall time: 1.48 ms ```python %time dsfiltered = dsvaex [dsvaex.PULocationID > 200] dsfiltered.head(10) ``` CPU times: user 50.5 ms, sys: 0 ns, total: 50.5 ms Wall time: 49.8 ms
Using Dask in Amazon SageMaker
Dask is another out-of-core Python library that can help us on this scenario. Dask also provides familiar Pandas-like API and is focused on enabling and scaling the data analysis and manipulation within a distributed environment, with reduced efforts and no learning curve for Pandas, NumPy, scikit-learn users.
Dask works with a variety of file formats, though Parquet is a nice option and provides methods to open files directly from Amazon S3. Dask dataframes load data lazily and therefore can fit larger than memory files for the computation (note the “.compute” method on the example). You can read more about Dask memory management here.
In the example below, I created a “local” cluster to use all the cores available in the instance being used. You could extend or create a new cluster to scale the analysis across multiple instances and make use of a distributed environment really easily. Again, we are working with the same ml.t2.medium managed notebook instance in Amazon SageMaker with 2 vCPU 4GiB of memory.
Notice how the files larger than memory are read, manipulated fast and efficiently with minimal memory impact. With Dask, we are also able to keep our data analysis cost low, while being able to analyze datasets larger than memory with usual Pandas APIs.
Please take a look in this Github repo for more details and the Jupyter notebook used for the example.
```python !free -th ``` total used free shared buffers cached Mem: 3.9G 1.6G 2.3G 520K 80M 446M -/+ buffers/cache: 1.1G 2.8G Swap: 0B 0B 0B Total: 3.9G 1.6G 2.3G ```python from dask.distributed import LocalCluster, Client cluster = LocalCluster() client = Client(cluster) ``` ```python import dask.dataframe as df ``` ```python %time dfdaskfeb = df.read_csv('./taxiapril.csv') ``` CPU times: user 28.7 ms, sys: 12.2 ms, total: 40.9 ms Wall time: 38.5 ms ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 1.9G 2.0G 568K 80M 508M -/+ buffers/cache: 1.3G 2.6G Swap: 0B 0B 0B Total: 3.9G 1.9G 2.0G ```python %time dfdaskapril = df.read_csv('./taxifeb.csv') ``` CPU times: user 29.6 ms, sys: 0 ns, total: 29.6 ms Wall time: 34.6 ms ```python !free -th ``` total used free shared buffers cached Mem: 3.9G 1.9G 1.9G 568K 80M 569M -/+ buffers/cache: 1.3G 2.5G Swap: 0B 0B 0B Total: 3.9G 1.9G 1.9G ```python %time dfjoin = df.concat([dfdaskfeb,dfdaskapril], axis=0) ``` CPU times: user 28.3 ms, sys: 3.95 ms, total: 32.2 ms Wall time: 31.1 ms ```python %time dfclean= dfdaskapril.fillna(value=0) dfclean.head(5) ``` CPU times: user 8.63 ms, sys: 0 ns, total: 8.63 ms Wall time: 7.85 ms ```python %time dfclean.groupby('PULocationID').count().compute() ``` CPU times: user 733 ms, sys: 130 ms, total: 863 ms Wall time: 20 s
Balance between data analysis capabilities and cost effectiveness is always tough to strike. With Amazon SageMaker and the use of out-of-core libraries, startups can achieve greater balance and better utilization of resources across their data manipulation operations. Amazon SageMaker managed notebook instances provide flexibility to choose the right specs for your work while decoupling the specs that you need for training operations. Being able to choose really powerful instances to reduce your training time on demand, paying only for the seconds you use them, and at the same time having the choice of your notebook instances in your favorite tooling opens large opportunities for cost savings and productiveness across startups.
Interested in ML on AWS? Contact us today!