TAGS :Viewed: 2 - Published at: a few seconds ago

[ Why does using multiprocessing with pandas apply lead to such a dramatic speedup? ]

Suppose I have a pandas dataframe and a function I'd like to apply to each row. I can call df.apply(apply_fn, axis=1), which should take time linear in the size of df. Or I can split df and use pool.map to call my function on each piece, and then concatenate the results.

I was expecting the speedup factor from using pool.map to be roughly equal to the number of processes in the pool (new_execution_time = original_execution_time/N if using N processors -- and that's assuming zero overhead).

Instead, in this toy example, time falls to around 2% (0.005272 / 0.230757) when using 4 processors. I was expecting 25% at best. What is going on and what am I not understanding?

import numpy as np
from multiprocessing import Pool
import pandas as pd
import pdb
import time

n = 1000
variables = {"hello":np.arange(n), "there":np.random.randn(n)}
df = pd.DataFrame(variables)

def apply_fn(series):
    return pd.Series({"col_5":5, "col_88":88,
                      "sum_hello_there":series["hello"] + series["there"]})

def call_apply_fn(df):
    return df.apply(apply_fn, axis=1)

n_processes = 4  # My machine has 4 CPUs
pool = Pool(processes=n_processes)

t0 = time.process_time()
new_df = df.apply(apply_fn, axis=1)
t1 = time.process_time()
df_split = np.array_split(df, n_processes)
pool_results = pool.map(call_apply_fn, df_split)
new_df2 = pd.concat(pool_results)
t2 = time.process_time()
new_df3 = df.apply(apply_fn, axis=1)  # Try df.apply a second time
t3 = time.process_time()

print("identical results: %s" % np.all(np.isclose(new_df, new_df2)))  # True
print("t1 - t0 = %f" % (t1 - t0))  # I got 0.230757
print("t2 - t1 = %f" % (t2 - t1))  # I got 0.005272
print("t3 - t2 = %f" % (t3 - t2))  # I got 0.229413

I saved the code above and ran it using python3 my_filename.py.

PS I realize that in this toy example new_df can be created in a much more straightforward way, without using apply. I'm interested in applying similar code with a more complex apply_fn that doesn't just add columns.

Answer 1

Edit (My previous answer was actually wrong.)

time.process_time() (doc) measures time only in the current process (and doesn't include sleeping time). So the time spent in child processes is not taken into account.

I run your code with time.time(), which measures real-world time (showing no speedup at all) and with a more reliable timeit.timeit (about 50% speedup). I have 4 cores.