0%

Python Multiprocessing

Multiprocessing.Pool中 apply, apply_async, map, map_async的区别

Multi-args Concurrence Blocking Ordered-results
map no yes yes yes
apply yes no yes no
map_async no yes no yes
apply_async yes yes no no
  • apply 与 apply_async相比,apply是阻塞的,通常不在使用。apply_async方法还有一个回调callback,如果提供,则在函数完成时调用。callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
  • map 与 map_async 相比,map_async 不会阻塞主线程。
  • map 和 map_async 返回结果是根据参数的顺序有序,apply 与 apply_async 无序 from multiprocessing import Pool import time
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Pool
import time

result = []

def mycallback(x):
return result.append(x)


def worker(num):
time.sleep(1)
return num


if __name__ == '__main__':
e1 = time.time()
pool = Pool()
for i in range(10):
pool.apply_async(worker, args = (i, ),callback = mycallback)
pool.close()
pool.join()
e2 = time.time()
print(float(e2 - e1)) # 2.07s

print(result) # [2, 4, 3, 0, 1, 5, 6, 7, 8, 9]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Pool
import time

def worker(num):
time.sleep(1)
return num

if __name__ == '__main__':
e1 = time.time()
pool = Pool()

result = pool.map(worker, [i for i in range(10)])

e2 = time.time()
print(float(e2 - e1))

print(result) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Pool
import time

result = []

def mycallback(x):
result.append(x)


def worker(num):
time.sleep(1)
return num


if __name__ == '__main__':
e1 = time.time()
pool = Pool(4)

pool.map_async(worker, [i for i in range(10)], callback = mycallback)

pool.close()
pool.join()

e2 = time.time()
print(float(e2 - e1)) # 2.07s

print(result) # [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

多进程写入同一个文件时,为了避免写入顺序混乱,有如下几种解决办法。

对写入操作进行加锁 对当前进行写入操作的进程锁定,直到该进程写完十个字,也就是一个完成的操作,后解除该进程的锁定,放它离开。切换到另外一个进程来操作。因为一次只能一个进程执行写入操作,而且必须执行完成完整的操作,才允许切换。所以不会造成文件内容的混乱。但是加锁一般会完造成程序的执行效率下降。而且,如果写入操作分散在整个代码的多处,lock作为一个不可直接打包的资源是没有办法作为一个参数直接给Pool的map方法里的函数传参的。为了解决这个问题,有两种解决方法,一种是使用多进程的管理器Manager(),并使用偏函数的办法传递对象Manager.Lock()。第二种是在进程池创建时传递multiprocessing.Lock()对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Pool,Lock,Manager
import time
from functools import partial


def worker(lock,x):
time.sleep(1)

lock.acquire()
try:
with open('result.txt', 'a+') as f:
f.writelines(str(x))
finally:
lock.release()


if __name__ == '__main__':
e1 = time.time()
pool = Pool(4)
manager = Manager()
lock = manager.Lock()

partial_worker = partial(worker, lock)

# for i in range(10):
# pool.apply_async(partial_worker, (i,))
pool.map(partial_worker, [i for i in range(10)])

pool.close()
pool.join()
e2 = time.time()
print(float(e2 - e1))

通过initializer参数在Pool对象创建时传递Lock对象。这种方式将Lock对象变为了所有子进程的全局对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def send_request(data):
api_url = 'http://api.xxxx.com/?data=%s'
start_time = clock()
print urllib2.urlopen(api_url % data).read()
end_time = clock()
lock.acquire()
whit open('request.log', 'a+') as logs:
logs.write('request %s cost: %s\n' % (data, end_time - start_time))
lock.release()
def init(l):
global lock
lock = l

if __name__ == '__main__':
data_list = ['data1', 'data2', 'data3']
lock = Lock()
pool = Pool(8, initializer=init, initargs=(lock,))
pool.map(send_request, data_list)
pool.close()
pool.join()

更优雅的方法,使用multiprocessing的回调函数。 - 把写入操作抽象为单独的一个函数 - 把进程需要写入的内容,作为返回值返回 - 使用回调函数写入进程返回内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Pool
import time


def mycallback(x):
with open('result.txt', 'a+') as f:
f.writelines(str(x))


def worker(num):
time.sleep(1)
return num


if __name__ == '__main__':
e1 = time.time()
pool = Pool()

for i in range(10):
pool.apply_async(worker, (i,), callback=mycallback)

pool.close()
pool.join()
e2 = time.time()
print(float(e2 - e1))