[ How to stream results from Multiprocessing.Pool to csv? ]
I have a python process (2.7) that takes a key, does a bunch of calculations and returns a list of results. Here is a very simplified version.
I am using multiprocessing to create threads so this can be processed faster. However, my production data has several million rows and each loop takes progressively longer to complete. The last time I ran this each loop took over 6 minutes to complete while at the start it takes a second or less. I think this is because all the threads are adding results into resultset and that continues to grow until it contains all the records.
Is it possible to use multiprocessing to stream the results of each thread (a list) into a csv or batch resultset so it writes to the csv after a set number of rows?
Any other suggestions for speeding up or optimizing the approach would be appreciated.
import numpy as np import pandas as pd import csv import os import multiprocessing from multiprocessing import Pool global keys keys = [1,2,3,4,5,6,7,8,9,10,11,12] def key_loop(key): test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix.tolist() return test_list if __name__ == "__main__": try: pool = Pool(processes=8) resultset = pool.imap(key_loop,(key for key in keys) ) loaddata =  for sublist in resultset: loaddata.append(sublist) with open("C:\\Users\\mp_streaming_test.csv", 'w') as file: writer = csv.writer(file) for listitem in loaddata: writer.writerow(listitem) file.close print "finished load" except: print 'There was a problem multithreading the key Pool' raise
Here is an answer consolidating the suggestions Eevee and I made
keys = [1,2,3,4,5,6,7,8,9,10,11,12] def key_loop(key): test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix.tolist() return test_list if __name__ == "__main__": try: pool = Pool(processes=8) resultset = pool.imap(key_loop, keys, chunksize=200) with open("C:\\Users\\mp_streaming_test.csv", 'w') as file: writer = csv.writer(file) for listitem in resultset: writer.writerow(listitem) print "finished load" except: print 'There was a problem multithreading the key Pool' raise
Again, the changes here are
- Iterate over
resultsetdirectly, rather than needlessly copying it to a list first.
- Feed the
keyslist directly to
pool.imapinstead of creating a generator comprehension out of it.
- Providing a larger
imapthan the default of 1. The larger
chunksizereduces the cost of the inter-process communication required to pass the values inside
keysto the sub-processes in your pool, which can give big performance boosts when
keysis very large (as it is in your case). You should experiment with different values for
chunksize(try something considerably larger than 200, like 5000, etc.) and see how it affects performance. I'm making a wild guess with 200, though it should definitely do better than 1.
The following very simple code collects many worker's data into a single CSV file. A worker takes a key and returns a list of rows. The parent processes several keys at a time, using several workers. When each key is done, the parent writes output rows, in order, to a CSV file.
Be careful about order. If each worker writes to the CSV file directly, they'll be out of order or will stomp on each others. Having each worker write to its own CSV file will be fast, but will require merging all the data files together afterward.
import csv, multiprocessing, sys def worker(key): return [ [key, 0], [key+1, 1] ] pool = multiprocessing.Pool() # default 1 proc per CPU writer = csv.writer(sys.stdout) for resultset in pool.imap(worker, [1,2,3,4]): for row in resultset: writer.writerow(row)
1,0 2,1 2,0 3,1 3,0 4,1 4,0 5,1
My bet would be that dealing with the large structure at once using appending is what makes it slow. What I usually do is that I open up as many files as cores and use modulo to write to each file immediately such that the streams don't cause trouble compared to if you'd direct them all into the same file (write errors), and also not trying to store huge data. Probably not the best solution, but really quite easy. In the end you just merge back the results.
Define at start of the run:
num_cores = 8 file_sep = "," outFiles = [open('out' + str(x) + ".csv", "a") for x in range(num_cores)]
Then in the key_loop function:
def key_loop(key): test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix.tolist() outFiles[key % num_cores].write(file_sep.join([str(x) for x in test_list]) + "\n")
Afterwards, don't forget to close:
[x.close() for x in outFiles]
Iterate over blocks like mentioned in the comments. Writing/processing 1 line at a time is going to be much slower than writing blocks.
Handling errors (closing of files)
IMPORTANT: I'm not sure of the meaning of the "keys" variable, but the numbers there will not allow modulo to ensure you have each process write to each individual stream (12 keys, modulo 8 will make 2 processes write to the same file)