##################################################### ### Examples for using the multiprocessing module. ### First: If you think all what is needed is: python my_program -parallel ### YOU ARE WRONG. Quite a bit of work is normally needed, for many reasons. ### To start with, what are the tasks to by subdivided into separate proecesses? ### This is not commonly obvious from a linearly designed program. ### Moreover, there are several idiosincratic details to be considered ### when programing for parallel processing. ### Second: ipython and multiprocessing do not mix well. And I do not know why. ### There are some explanantions in the internet, but the fact is that you are better ### to do your linear testing in ipython and run things better in simple python. ### Import everything needed. The highest level multiprocessing interphase is 'Pool' and 'Manager' from multiprocessing import Pool, cpu_count, TimeoutError, Manager import time from numpy import zeros, ceil from numpy.random import rand, randint, normal, seed, random_integers from numpy.linalg import det ### This gives you process id. information, if you want to follow ### the process asignment hierrachy, etc. import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) ### This is the function to be evaluated in parallel def f( d, i, s): #info('function f') ### Be carefull with the random number generator initializers!! ### Each new process starts with a new state, probably taken from the internal clock ### Therefore, parallel processes may have the SAME seed ### A way to solve it is to pass a random number and use that as seed: seed(s) # Check it out without using seed! n = int(ceil(10*rand())) time.sleep((n/2.0)**2) # fake some random computing time of order n**2 ### Calculates the det of a random size matrix, from n=1 to 500 M = rand(n,n) d[i] = [ i, n, normal(), M, det(rand(n,n))] ### It retruns the matrix size, the matrix and its determinant ### But through the (shared) dictionary d!!! ### If returning anything else than a single item (eg. number, boolean) ### you need to do it this way. return True ### Here we calculate f num_eval times in parallel def FunctionalMultiProc(num_eval = 10): print("This is the FunctionalMultiProc example\n") #info('main line') num_cpus = cpu_count() # number of cpu's available print("Number of available processors: %d" % (num_cpus,)) pool = Pool(processes=num_cpus) # start num_cpus worker processes # ok even if num_cpus=1 ### Start a 'Manager' to share data across processes: mgr = Manager() ### Start a shared dictionary d = mgr.dict() ### A procedural way to send the process ### if some preprocessing is needed for the function arguments ### this might be better: queue = [[] for i in range(num_eval)] # initialize the queue for i in range(num_eval): s = randint(0, 32000 + 1) queue[i] = pool.apply_async(f, ( d, i, s)) # Send f to the next available processor ### the same, in list form #queue = [pool.apply_async(f, ( d, i, s)) for i in range(num_eval)] unfinished = list(range(num_eval)) # list of unfinished processes in queue result = zeros(num_eval) # vector to hold the results while (0 < len(unfinished)): for i in unfinished: try: queue[i].get(timeout=0.1) ### Only gets here if process i has ended ### Do something with the result, stored in d[i]: result[i] = d[i][4] # We keep the determinant print(d[i], result[i]) ### Now, remove the process from queue unfinished.remove(i) ### also remove output from the dictionary del d[i] except TimeoutError: ### Nothing, no problem continue ### Finish using the Pool of processors pool.close() print("At the end, the dictionary is empty:", d) return result ### Third: Pool.apply_async cannot call a method!! ### That is, and instance of a class has a lot of information ### (method names, variable values, etc.) ### and apparently multiprossesing has problems passing this ### information across processes. ### A similar example, using now a method, in OO programing, ### being evaluated in many processors ### This is the stand alone version of method f in OOMultiProc def CallMethf( init, d, proc_num, *args): #print "This is CallMethf." ### Pass the calling instance' dictionary and ### we make a local version, in the current processor tmp = OOMultiProc(init) ### The computational burden of this should be evaluated ### Perhaps the prallel version ends up slower!!! ### Then call the desired method, f d[proc_num] = tmp.f(*args) return True ### A more generic stuff like this def CallMeth( Obj, meth, init, *args): tmp = Obj() tmp.__dict__ = init return tmp.meth(*args) ### would be handdy, but Pool.apply_async cannot pass classes and functions! ### A similar example, using now a method being evaluated in many processors class OOMultiProc: ### This is a constructor that may take a dictionary as initializer def __init__( self, *args, **kwargs): if (len(args) == 0): ### If no non keyword arguments self.__init_args__( *args, **kwargs) else: if (isinstance(args[0],dict)): ###Use a dictionary as initializaer self.__dict__ = args[0] else: self.__init_args__( *args, **kwargs) ### This is the conventional constructor def __init_args__( self, mean=0, sd=1): self.mean = mean self.sd = sd ### Now, the function f (as above) to be evaluated in parallel is a method ### It shares some information from the instance of the class: mean and sd ### But apply_async cannot call a method def f( self, s): ### Be carefull with the random number generator initializers!! ### Each new process starts with a new state, probably taken from the clock ### Therefore, parallel processes may have the SAME seed ### A way to solve it is to pass a random number and use that as seed: seed(s) # Check it out without using seed! n = int(ceil(10*rand())) time.sleep((n/2.0)**2) M = rand(n,n) return [ n, normal( loc=self.mean, scale=self.sd), M, det(rand(n,n))] #Calculates the det of a random size matrix, from n=1 to 10 #An simulates a normal variate ### A method version to do the multiprocessing def DoMultiProc( self, num_eval = 10): print("This is the OO MultiProc. example, self.mean=", self.mean) num_cpus = cpu_count() # number of cpu's available print("Number of available processors: %d" % (num_cpus,)) pool = Pool(processes=num_cpus) # start num_cpus worker processes # ok even if num_cpus=1 ### Use this dictiionary to return arguments mgr = Manager() d = mgr.dict() ### A procedural way to send the process ### if some preprocessing is needed for the function arguments ### These might be better queue = [[] for i in range(num_eval)] for i in range(num_eval): s = random_integers( 0, 32000) #pass the local dictionary for this instance queue[i] = pool.apply_async( CallMethf, ( self.__dict__, d, i, s)) unfinished = list(range(num_eval)) # list of unfinished processes in queue self.result = zeros(num_eval) # vector to hold the results while (0 < len(unfinished)): for i in unfinished: try: ### If all precess take more or less the same time ### use queue[i].get(), otherwise, use a short time out ### to catch the faster processes first: queue[i].get(timeout=0.1) ### Only gets here if the process has ended ### Do something with the result, stored in d[i]: self.result[i] = d[i][3] print(d[i], self.result[i]) ### Now, remove the process from queue unfinished.remove(i) ### also remove output from the dictionary del d[i] except TimeoutError: ### Nothing, no problem continue ### Finish using the Pool of processors pool.close() print("\nAt the end, the dictionary is as we started:", d) print("But the determinats are stored in self.result:", self.result) ### NB: something is wrong with the ipython run command and multiprocessing ### Run your multiprocessing like below. ### If you type in ipython "run Multiproc.py" and then call FunctionalMultiProc() it fails!!! if __name__ == '__main__': ### Run the procedura example: result = FunctionalMultiProc() print("\nThe determinants:", result, "\n") ### Run the OO version, first make the instance: oomulp = OOMultiProc( mean=10, sd=5) ### Call the multiprocessing method: oomulp.DoMultiProc() print("\nEso es to...eso es to......eso es to...eso es tooooodo, amigos!\n")