1. 程式人生 > >Processing in Python: How I learned to love parallelized applies with Dask and Numba

Processing in Python: How I learned to love parallelized applies with Dask and Numba

Go fast with Numba and Dask

As a master’s candidate of Data Science at the University of San Francisco, I get to regularly wrangle with data. Applies are one of the many tricks I’ve picked up to help create new features or clean-up data. Now, I’m only data scientist-ish and not an expert in computer science. I am, however, a tinkerer that enjoys making code faster. Today, I’ll be sharing my experiences with parallelizing applies, with a particular focus on common data prep tasks.

Python aficionados may know that Python implements what’s known as a Global Interpreter Lock. Those more grounded in computer science can tell you more, but for our purposes, the GIL can make using all of those cpu cores in your computer tricky. What’s worse, our chief data wrangler package, Pandas, rarely implements multi-processing code.

Apply vs Multiprocessing.map

%time df.some_col.apply(lambda x : clean_transform_kthx(x))Wall time: HAH! RIP BUDDY# WHY YOU NO RUN IN PARALLEL!?

Those of us crossing over from the R realm know that the Tidyverse has done some wonderful things for handling data. One of my favorite packages, plyr, allows R users to easily parallelize their applies on data frames. From Hadley Wickham:

plyr is a set of tools for a common set of problems: you need to split up a big data structure into homogeneous pieces, apply a function to each piece and then combine all the results back together

What I wanted was plyr for Python! Sadly, it does not yet exist, but I used a hacky solution from the multiprocessing package for a while. It certainly works, but I wanted something that was more akin to regular Pandas applies…but like, parallel and stuff.

Dask

Thanks for all the cores AMD!

We spend a bit of class time on Spark so when I started using Dask, it was easier to grasp its main conceits. Dask is designed to run in parallel across many cores or computers but mirror many of the functions and syntax of Pandas.

Let’s dive in to an example! For a recent data challenge, I was trying to take an external source of data (many geo-encoded points) and match them to a bunch of street blocks we were analyzing. I was calculating euclidean distances and using a simple max-heuristic to assign it to a block:

Is the point close to L3? The L1 + L2 may shock you…

My original apply:

my_df.apply(lambda x: nearest_street(x.lat,x.lon),axis=1)

My Dask apply:

dd.from_pandas(my_df,npartitions=nCores).\   map_partitions(      lambda df : df.apply(         lambda x : nearest_street(x.lat,x.lon),axis=1)).\   compute(get=get)
# imports at the end

Pretty similar right? The apply statement is wrapped around a map_partitions, there’s a compute() at the end, and I had to initialize npartitions. Spark users will find this familiar, but let’s disentangle this a bit for the rest of us. Partitions are just that, your Pandas data frame divided up into chunks. On my computer with 6-Cores/12-Threads, I told it to use 12 partitions. Dask handles the rest for you thankfully.

Next, map_partitions is simply applying that lambda function to each partition. Since many of our data processing code operates on each row independently, we do not have to worry too much about the order of these operations (which row goes first or last is irrelevant). Lastly, the compute() is telling Dask to process everything that came before and deliver the end product to me. Many distributed libraries like Dask or Spark implement ‘lazy evaluation’, or creating a list of tasks and only executing when prompted to do so. Here, compute() calls Dask to map the apply to each partition and (get=get) tells Dask to run this in parallel.

I did not use a Dask apply because I am iterating over rows to generate a new array that will become a feature. Using map_partitions and an apply allows me to send two columns of a single row into the function nearest_street(). A Dask apply maps across rows of entire columns, which would not work with the function as written.

Here are the imports for the Dask code:

from dask import dataframe as ddfrom dask.multiprocessing import getfrom multiprocessing import cpu_count
nCores = cpu_count()

Since I was classifying my data based on some simple algebraic calculations (Pythagorean theorem basically), I figured it would run quickly enough in typical Python code that looks like this:

matches = []for i in intersections:   l3 = np.sqrt( (i[0] - i[1])**2 + (i[2] - i[3])**2 )   # ... Some more of these   dist = l1 + l2
   if dist < (l3 * 1.2):      matches.append(dist)      # ... More stuff
### you get the idea, there's a for-loop checking to see if ### my points are close to my streets and then returning closest### I even used numpy, that means fast right?
It was not.

Broadcasting is the idea of writing code with a vector mindset as opposed to scalar. Say I have an array, and I want to futz with it. Normally, I would iterate over it and transform each cell individually.

# over one arrayfor cell in array:   cell * CONSTANT - CONSTANT2
# over two arraysfor i in range(len(array)):   array[i] = array[i] + array2[i]

Instead, I can skip the for loops entirely and perform operations across the entire array. Numpy functions incorporate broadcasting and can be used to perform element-wise computations (1-element in an array to a corresponding 1-element in another array).

# over one array(array * CONSTANT) - CONSTANT2
# over two arrays of same length# different lengths follow broadcasting rules  array = array - array2

Broadcasting can accomplish so much more, but let’s look at my skeleton code:

from numba import jit
@jit # numba magicdef some_func()   l3_arr = np.sqrt( (intersections[:,0] - intersections[:,1])**2 +\                     (intersections[:,2] - intersections[:,3])**2 )   # now l3 is an array containing all of my block lengths   # likewise, l1 and l2 are now equal sized arrays    # containing distance of point to all intersections
   dist = l1_arr + l2_arr
   match_arr = dist < (l3_arr * 1.2)   # so instead of iterating, I just immediately compare all of my   # point-to-street distances at once and have a handy    # boolean index

Essentially, we’re changing for i in array: do stuff to do stuff on array. The best part is that it’s fast, even compared to parallelizing versus Dask. The good part is that if we stick to basic Numpy and Python, we can Just-In-Time compile just about any function. The bad part is that it only plays well with Numpy and simple Python syntax. I had to strip out all of the numerical calculations from my functions into sub-functions, but the speed increase was magical…

Putting it all together

To combine my Numba function with Dask, I simply applied the function with map_partition(). I was curious if parallelized operations and broadcasting could work hand in hand for a speed-up. I was pleasantly surprised to see a large speed up, especially with larger data sets:

Go Numba go!
So x is: 1, 10, 100, 1000…

The first graph indicates that linear computation without broadcasting performs poorly. We see that parallelizing the code with Dask is almost as effective as using Numba+broadcasting, but clearly, Dask+Numba outperforms others.

I include the second graph to anger people that like simple and interpretable graphics. Or it’s there to show that Dask comes with some overhead costs, but Numba does not. I took head(nRows) to create these charts and noticed it was not until 1k — 10k rows that Dask came into its own. I also found it curious that Numba alone was consistently faster than Dask, although the combination of Dask+Numba could not be beat at large nRows.

Optimizations

To be able to JIT compile with Numba, I re-wrote my functions to take advantage of broadcasting. Out-of-curiosity, I reran these functions to compare Numba+Broadcasting vs Just Broadcasting (Numpy only basically). On average, @jit executes about 24% faster for identical code.

Thanks JIT!

I’m sure there are ways to optimize even further, but I liked that I was able to quickly port my previous work into Dask and Numba for a 60x speed increase. Numba only really requires that I stick to Numpy functions and think about arrays all at once. Dask is very user friendly and offers a familiar syntax for Pandas or Spark users. If there are other speed tricks that are easy to implement, please feel free to share!