Update: Here is a more specific example
Suppose I want to compile some statistical data from a sizable set of files: I can make a generator
(line for line in fileinput.input(files)) and some processor:
from collections import defaultdict
scores = defaultdict(int)
if 'Result' in line:
res = line.split('\"').split('-')
scores[res] += 1
The question is how to handle this when one gets to the
Of course it's possible to define a
multiprocessing.sharedctypes as well as a custom
struct instead of a
defaultdict but this seems rather painful. On the other hand I can't think of a pythonic way to instantiate something before the process or to return something after a generator has run out to the main thread.
Best How To :
So you basically create a histogram. This is can easily be parallelized, because histograms can be merged without complication. One might want to say that this problem is trivially parallelizable or "embarrassingly parallel". That is, you do not need to worry about communication among workers.
Just split your data set into multiple chunks, let your workers work on these chunks independently, collect the histogram of each worker, and then merge the histograms.
In practice, this problem is best off by letting each worker process/read its own file. That is, a "task" could be a file name. You should not start pickling file contents and send them around between processes through pipes. Let each worker process retrieve the bulk data directly from files. Otherwise your architecture spends too much time with inter-process communication, instead of doing some real work.
Do you need an example or can you figure this out yourself?
Edit: example implementation
I have a number of data files with file names in this format:
data1.txt, ... .
The goal is to create a histogram over the words contained in the data files. This is the code:
from multiprocessing import Pool
from collections import Counter
"""This function is run by a worker process.
The `filepath` argument is communicated to the worker
through a pipe. The return value of this function is
communicated to the manager through a pipe.
hist = Counter()
with open(filepath) as f:
for line in f:
hist[line.strip()] += 1
"""This function runs in the manager (main) process."""
# Collect paths to data files.
datafile_paths = glob.glob("data*.txt")
# Create a pool of worker processes and distribute work.
# The input to worker processes (function argument) as well
# as the output by worker processes is transmitted through
# pipes, behind the scenes.
pool = Pool(processes=3)
histograms = pool.map(build_histogram, datafile_paths)
# Properly shut down the pool of worker processes, and
# wait until all of them have finished.
# Merge sub-histograms. Do not create too many intermediate
# objects: update the first sub-histogram with the others.
# Relevant docs: collections.Counter.update
merged_hist = histograms
for h in histograms[1:]:
for word, count in merged_hist.items():
print "%s: %s" % (word, count)
if __name__ == "__main__":