40天训练-第3天-4-threading、multiprocessing、gevent、multiprocessingpool、threadingpool、gevent pool各种demo

  • by

这里记录几种遇到的demo代码,方便查阅,有用python库自带的、也有网上网友写的,如果可以,会对性能进行一个简单的比对,毕竟性能效率才是我们考虑的重中之重

都是类似与相通的,本人也没有重新去写相关代码,基本都是来自网上,就不一一贴链接了

1.threading

# https://www.cnblogs.com/xiaobeibei26/p/6481707.html
import time
import threading


def f0():
    pass

def f1(a1, a2):
    time.sleep(5)
    f0()


'''下面代码是直接运行下去的,不会等待函数里面设定的sleep'''
t = threading.Thread(target=f1, args=(111, 112))  # 创建线程
t.setDaemon(True)  # 设置为后台线程,这里默认是False,设置为True之后则主线程不用等待子线程
t.start()  # 开启线程

t = threading.Thread(target=f1, args=(111, 112))
t.start()

t = threading.Thread(target=f1, args=(111, 112))
t.start()
# 默认情况下程序会等线程全部执行完毕才停止的,不过可以设置更改为后台线程,使主线程不等待子线程,主线程结束则全部结束

# https://www.cnblogs.com/xiaobeibei26/p/6481707.html
import threading
import time


def do(event):
    print('start')
    event.wait()  # 红灯,所有线程执行都这里都在等待
    print('end')


event_obj = threading.Event()  # 创建一个事件
for i in range(10):  # 创建10个线程
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

time.sleep(5)

event_obj.clear()  # 让灯变红,默认也是红的,阻塞所有线程运行
data = input('请输入要:')
if data == 'True':
    event_obj.set()  # 变绿灯
import threading
from threading import Thread, Lock


def add(a, b):
    result = a + b
    print("result: {} \n".format(result))
    desc_th = threading.enumerate()
    print("Thread describe: {}".format(desc_th))
    count_th = threading.active_count()
    print("Counts of thread: {}".format(count_th))


def py_thread():
    for i in range(3):
        th_1 = Thread(target=add, args=(2, 2), name="a" + str(i), daemon=False)
        th_1.start()


if __name__ == "__main__":
    py_thread()
    print("starting")
import threading, time


class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global n, lock
        time.sleep(1)
        if lock.acquire():
            print(n, self.name)
            n += 1
            lock.release()


if "__main__" == __name__:
    n = 1
    ThreadList = []
    lock = threading.Lock()
    for i in range(1, 200):
        t = MyThread()
        ThreadList.append(t)
    for t in ThreadList:
        t.start()
    for t in ThreadList:
        t.join()

1.1.threadpool

q = queue.Queue()
for url in urls:
    q.put(url)
lst = [q.get() for i in range(q.qsize())]
thread_pool = threadpool.ThreadPool(30)
reqs = threadpool.makeRequests(request_url, lst)
[thread_pool.putRequest(req) for req in reqs]  # 多线程一块执行
thread_pool.wait()  # 线程挂起,直到结束
import requests
import time
import multiprocessing
import threading
import queue


def request_url(target):
    try:
        requests.get(target)
    except:
        pass

class threadPoolManager:
    def __init__(self, urls, workNum=10000, threadNum=20):
        self.workQueue = queue.Queue()
        self.threadPool = []
        self.__initWorkQueue(urls)
        self.__initThreadPool(threadNum)

    # 初始化工作队列
    # 任务入队,Queue内部实现了同步机制
    def __initWorkQueue(self, urls):
        for i in urls:
            self.workQueue.put((request_url, i))

    # 初始化线程
    def __initThreadPool(self, threadNum):
        for i in range(threadNum):
            self.threadPool.append(work(self.workQueue))

    # 等待所有线程运行完毕
    def waitAllComplete(self):
        for i in self.threadPool:
            if i.isAlive():
                i.join()


class work(threading.Thread):
    def __init__(self, workQueue):
        threading.Thread.__init__(self)
        self.workQueue = workQueue
        self.start()

    def run(self):
        # 死循环,从而让创建的线程在一定条件下关闭退出
        while True:
            if self.workQueue.qsize():
                # 任务异步出队,Queue内部实现了同步机制
                do, args = self.workQueue.get(block=False)
                do(args)
                # 通知系统任务完成
                self.workQueue.task_done()
            else:
                break


# urls = ['http://www.ustchacker.com'] * 10
# pool = multiprocessing.Pool(PoolNum)
# data = pool.map(download_requests, urls)
# pool.close()
# pool.join()
url="http://www.baidu.com"
urls=[]
[urls.append(url) for i in range(1000)]
start_time=time.time()
pool = threadPoolManager(urls, threadNum=30)
pool.waitAllComplete()
print(time.time()-start_time)


2.multiprocessing

import os
import threading
import multiprocessing

# Main
print('Main:', os.getpid())


# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()


# Multi-thread
record = []
lock = threading.Lock()

# Multi-process
record = []
lock = multiprocessing.Lock()

if __name__ == '__main__':
    for i in range(5):
        thread = threading.Thread(target=worker, args=('thread', lock))
        thread.start()
        record.append(thread)

    for thread in record:
        thread.join()

    for i in range(5):
        process = multiprocessing.Process(target=worker, args=('process', lock))
        process.start()
        record.append(process)

    for process in record:
        process.join()

2.1.multiprocesspool

import multiprocessing as mul


def f(x):
    return x ** 2


if __name__ == '__main__':
    pool = mul.Pool(5)
    rel = pool.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    print(rel)

3.gevent

# coding=utf8
import gevent
from gevent import monkey
monkey.patch_all()      # 用于将标准库中大部分阻塞式调用修改为协作式运行

import requests


def fetch(url):
    print("get: {}".format(url))
    response = requests.get(url).content
    print("{}: {}".format(url, len(response)))


if __name__ == "__main__":
    gevent.joinall([
        gevent.spawn(fetch, "https://stackoverflow.com/"),
        gevent.spawn(fetch, "https://www.douban.com"),
        gevent.spawn(fetch, "https://www.github.com")
    ])
# coding:utf8
import gevent
from gevent import monkey
monkey.patch_all()      # 用于将标准库中大部分阻塞式调用修改为协作式运行

import requests


def fetch(url):
    print("get: {}".format(url))
    response = requests.get(url).content
    return url, len(response)


if __name__ == "__main__":
    g_list = list()
    for url in ["https://stackoverflow.com/", "https://www.douban.com", "https://www.github.com"]:
        g = gevent.spawn(fetch, url)
        g_list.append(g)
    gevent.joinall(g_list)
    for g in g_list:
        print(g.value)

3.1.gevent pool


# 2
# 不限制pool数量
start_time=time.time()
jobs = [gevent.spawn(request_url, url) for url in urls]
gevent.joinall(jobs)
end_time=time.time()
print(end_time-start_time)
# 10.902864933013916

# 限制pool数量
start_time=time.time()
map_pool = gevent.pool.Pool(30)
data = map_pool.map(request_url, urls)
end_time=time.time()
print(end_time-start_time)
# 11.11320948600769

2019.1.13

发表评论

电子邮件地址不会被公开。 必填项已用*标注