多线程

import threading
import time
from threading import current_thread


# 定义线程方法
def my_thread(arg1, arg2):
    print(threading.currentThread().getName(), 'start')
    print('%s %s' % (arg1, arg2))
    time.sleep(1)
    print(threading.currentThread().getName(), 'end')


# 测试
def test_func():
    for i in range(1, 6):
        t1 = threading.Thread(target=my_thread, args=(i, i + 1))
        t1.start()
    print(current_thread().getName(), 'end')


class MyThread(threading.Thread):
    def run(self):
        print(current_thread().getName(), 'start')
        print('run')
        print(current_thread().getName(), 'stop')


def test_func2():
    t1 = MyThread()
    t1.start()
    t1.join()
    print(current_thread().getName(), 'end')


# test_func()
test_func2()

Thread 方法

  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

经典的消费者和生产者

from threading import Thread, current_thread
import time
import random
from queue import Queue

queue = Queue(5)
# 经典的消费者和生产者问题

class ProducerThread(Thread):
    def run(self):
        name = current_thread().getName()
        nums = range(100)
        global queue
        while True:
            num = random.choice(nums)
            queue.put(num)
            print('生产者 %s 生产了数据 %s' % (name, num))
            t = random.randint(1, 3)
            time.sleep(t)
            print('生产者 %s 睡眠了 %s 秒' % (name, t))


class ConsumerTheard(Thread):
    def run(self):
        name = current_thread().getName()
        global queue
        while True:
            num = queue.get()
            queue.task_done()
            print('消费者 %s 消耗了数据 %s' % (name, num))
            t = random.randint(1, 5)
            time.sleep(t)
            print('消费者 %s 睡眠了 %s 秒' % (name, t))

def test_func():
    p1 = ProducerThread(name='p1')
    p1.start()
    p2 = ProducerThread(name='p2')
    p2.start()
    p3 = ProducerThread(name='p3')
    p3.start()
    c1 = ConsumerTheard(name='c1')
    c1.start()
    c2 = ConsumerTheard(name='c2')
    c2.start()

test_func()

线程同步(锁)

  • 创建锁:lock = threading.Lock()
  • 加锁:lock.acquire()
  • 解锁:lock.release()
from queue import Queue
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name)
        process_data(self.name, self.q)
        print
        "Exiting " + self.name


def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print("%s processing %s" % (threadName, data))
        else:
            queueLock.release()
        time.sleep(1)


threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue(10)
threads = []
threadID = 1

# 创建新线程
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# 填充队列
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# 等待队列清空
while not workQueue.empty():
    pass

# 通知线程是时候退出
exitFlag = 1

# 使用 json 方法等待所有线程完成
for t in threads:
    t.join()
print("Exiting Main Thread")

线程合并(join方法)

需要主线程要等待子线程运行完后,再退出可以使用 join 方法

# 创建的 thread 调用 join 确保子线程结束
def test_func2():
    t1 = MyThread()
    t1.start()
    t1.join()
    print(current_thread().getName(), 'end')

线程间通信

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。

Queue 方法

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item, block=True, timeout=None) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当 Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作

参考