Hello all,

My question is hopefully particular enough to not relate to any of the other ones that I've read. I'm wanting to use subprocess and multiprocessing to spawn a bunch of jobs serially and return the return code to me. The problem is that I don't want to wait() so I can spawn the jobs all at once, but I do want to know when it finishes so I can get the return code. I'm having this weird problem where if I poll() the process it won't run. It just hangs out in the activity monitor without running (I'm on a Mac). I thought I could use a watcher thread, but I'm hanging on the q_out.get() which is leading me to believe that maybe I'm filling up the buffer and deadlocking. I'm not sure how to get around this. This is basically what my code looks like. If anyone has any better ideas on how to do this I would be happy to completely change my approach.

def watchJob(p1,out_q):
while p1.poll() == None:
    pass
print "Job is done"
out_q.put(p1.returncode)

def runJob(out_q):
p1 = Popen(['../../bin/jobexe','job_to_run'], stdout = PIPE)
t = threading.Thread(target=watchJob, args=(p1,out_q))
t.start()

out_q= Queue()
outlst=[]
proc = Process(target=runJob, args=(out_q,))
proc.start()
outlst.append(out_q.get()) # This hangs indefinitely
proc.join()

It seems to me that with stdout=PIPE, the subprocess may hang if nobody reads its stdout. Apparently, you are not doing anything with the output of jobexe, so you don't need stdout=PIPE. You can also use the communicate() method instead of a watching thread, why not

def runJob(out_q):
    p1 = Popen(['../../bin/jobexe','job_to_run'])
    p1.communicate()
    out_q.put(p1.returncode)

out_q= Queue()
outlst=[]
proc = Process(target=runJob, args=(out_q,))
proc.start()
outlst.append(out_q.get())
proc.join()

Sorry to not be more clear. I'm actually wanting to use the output for a logfile. Also, I'm trying to use multiprocessing to spawn this across multiple processors. That's why I can't use communicate(), because it will call a wait() and all of my processes won't spawn. I was just trying to do it with one before I tried to do it in a loop. With no stdout it still hangs on the out_q.get() line.

The following code works for me

#!/usr/bin/env python
# -*-coding: utf8-*-
# ./main.py
from multiprocessing import Process, Queue
from subprocess import Popen, PIPE

def runJob(out_q):
    p1 = Popen(['./jobexe','job_to_run'], stdout=PIPE)
    out, err = p1.communicate()
    out_q.put((p1.returncode, out))

out_q= Queue()
outlst=[]
proc = Process(target=runJob, args=(out_q,))
proc.start()
outlst.append(out_q.get())
proc.join()
print(outlst)

I used this jobexe program

#!/usr/bin/env python
# -*-coding: utf8-*-
# jobexe

if __name__ == '__main__':
    import sys
    print(sys.argv)

Did you by any chance import Queue from Queue instead of importing it from multiprocessing ? This would block.

Your argument with communicate() does not hold water because the child process calls communicate (the process which runs runJob()). This does not prevent the main process from starting new children.

If jobexe outputs a lot of data, then it makes sense to write a loop to read the output and write it to a file instead of using communicate().

Thanks for the response. You're correct. That code does work for me and does return the return code as expected. The problem is that it doesn't work the way I'm needing it to with spawning multiple jobs. So this code just runs one of the jobs, finishes the job, and then runs the next one. It doesn't run them simulateously on two different processors.

#!/usr/bin/env python
# -*-coding: utf8-*-
# ./main.py
from multiprocessing import Process, Queue
from subprocess import Popen, PIPE
def runJob(out_q):
    p1 = Popen(['./jobexe','job_to_run'], stdout=PIPE)
    out, err = p1.communicate()
    out_q.put((p1.returncode, out))
out_q= Queue()
outlst=[]
for i in range(2):
    proc = Process(target=runJob, args=(out_q,))
    proc.start()
    outlst.append(out_q.get())
    proc.join()
print(outlst)

You will have to continuously check the status of the processes, or the value of some variable/dictionary/object to know when it changes. You can use is_alive() to check the status of a process in multiprocessing. You could also use a manager.dict() set to True for each processes' key, and have the function set it to False (or return code) when finished.

Also take a look at pool from multiprocessing Click Here as I am not sure if it is what you want or not.

Yes, I'm more worried about getting them to run concurrently right now. My first attempt was to do nothing that has a wait() and then have a thread looking to see if the job was finished. This however did not work for me. Do you have any ideas of how I can spawn multiple jobs that run in parallel?

Write program to run some test code in parallel and use a while loop to check is_alive() for each process and/or a manager dictionary that is updated when the function/thread finishes.

def process_status(list_of_ids):
    while True:
        done = True
        for pid in list_of_ids:
            if pid.is_alive():
                done = False

        if done:
            return
        time.sleep(0.1)  ## don't hog the processor

You only need to start all the jobs before you call out_q.get().

I'm now able to get the jobs to run concurrently with and without using Pool. I'm having one last issue though. I have a database of jobs to run that has say 100,000 jobs, if I had for example 128 processors I could do mp.Pool(128) and spawn 128 jobs. The problem is I want to keep all 128 processors always performing a job. So when one finishes I want to add a different job. Is this possible with pool, or is this only possible with using Process()? I'm really stumped at a good way of monitoring and adding new jobs in across all the processors. Would it be proper etiquette to resubmit this question in a new thread since it might be rather involved? Thanks!

I believe your problem is more suited to a "job queue" library rather than trying to implement monitoring, scheduling etc. on top of a raw process pool. Take a look at Celery which implements a job queue + provides monitoring facilities.

http://www.celeryproject.org/

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.