測試框架實踐--多線程

iTesting2019-04-14 03:25:38

iTesting,愛測試,愛分享


前面幾次的分享,我從一個數據驅動的實現展開去,先後討論了什麼是數據驅動,如何實現數據驅動,數據驅動在自動化框架裏如何應用。
Python數據驅動實踐(一)–ddt實現數據驅動
Python數據驅動實踐(二)–教你用Python實現數據驅動
Python數據驅動實踐(三)–動態添加測試用例
Python測試框架實現(四)–動態挑選測試用例

為什麼要講這些呢?

因為這些是一個測試框架必不可少的部分,看看前面4次的分享,雖然可以成功的運行,但我們採用了順序運行的方式,跟實際使用相差很遠,現實中我們一般實現“併發”運行我們的測試用例。

説起併發,Python 有多線程(Multiple Threading)和多進程(Multiple Process)之分, 由於GIL即Global Interpreter Lock(全局解釋器鎖)的存在,python多線程並不能實現真正的併發(無論你的CPU是否多核),相反,多進程因為有獨佔的內存空間可以真正的實現併發。但在我們的自動化測試框架裏,我們還是希望利用線程的公用內存空間特性,特別是同一個測試類我們希望有統一的setup, teardown特性。而這個多進程就不太適用,加上我們測試用例也不是CPU計算密集型,多線程方案的“併發”看起來是最佳選擇。

但是是不是就一定要用threading.Thread呢?

我們先看看”傳統“的多線程併發。 一個通用的多線程模板是這樣的:

 1import threading
2import queue
3import time
4exitFlag = 0
5class MyThread(threading.Thread):
6    def __init__(self, q):
7        threading.Thread.__init__(self)
8        self.q = q
9    def run(self):
10        do_something(self.q)
11def do_something(q):
12    while not exitFlag:
13        queue_lock.acquire()
14        if not work_queue.empty():
15            data = q.get()
16            queue_lock.release()
17            print("Now thread %s is processing %s" % (threading.currentThread(), data))
18            time.sleep(1)
19        else:
20            queue_lock.release()
21if __name__ == "__main__":
22    num_worker_threads = 3
23    target_case = ['case1''case2''case3''case4''case5']
24    queue_lock = threading.Lock()
25    work_queue = queue.Queue()
26    threads = []
27    start_time = time.time()
28    # Create new thread
29    for case in range(num_worker_threads):
30        thread = MyThread(work_queue)
31        thread.start()
32        threads.append(thread)
33    # Fill queue
34    queue_lock.acquire()
35    for case in target_case:
36        work_queue.put(case)
37    queue_lock.release()
38    # Wait queue empty
39    while not work_queue.empty():
40        pass
41    # Notify thread quite
42    exitFlag = 1
43    # Wait all threads done
44    for t in threads:
45        t.join()
46    end_time = time.time()
47    print("All cases finished with running time --%s" % (end_time - start_time))

把我們前面實現的順序運行改成併發, 只需要改成如下就可以實現多線程:

 1import threading
2import queue
3import time
4from common.test_case_finder import DiscoverTestCases, unpack_test_cases_from_functions
5exitFlag = 0
6def f(case):
7    name, func, value = case
8    try:
9        if value:
10            func.__call__(name, *value)
11        else:
12            func.__call__(name)
13    except:
14        # traceback.print_exc()
15        cases_run_fail.append(name)
16    else:
17        cases_run_success.append(name)
18    return cases_run_fail, cases_run_success
19class MyThread(threading.Thread):
20    def __init__(self, q):
21        threading.Thread.__init__(self)
22        self.q = q
23    def run(self):
24        while not exitFlag:
25            queue_lock.acquire()
26            if not work_queue.empty():
27                data = self.q.get()
28                f(data)
29                time.sleep(5)
30                queue_lock.release()
31                print("Now thread %s is processing %s" % (threading.currentThread(), data))
32            else:
33                queue_lock.release()
34if __name__ == "__main__":
35    num_worker_threads = 5
36    mypath = r"D:\ktest\tests\test_page1"
37    cases_to_run = []
38    cases_run_success = []
39    cases_run_fail = []
40    discover_cases = DiscoverTestCases(mypath)
41    mds = discover_cases.get_modules_spec()
42    raw_test_cases = discover_cases.find_classes_in_module(mds)
43    cases_to_run = unpack_test_cases_from_functions(raw_test_cases)
44    queue_lock = threading.Lock()
45    work_queue = queue.Queue()
46    threads = []
47    start_time = time.time()
48    # Create new thread
49    for case in range(num_worker_threads):
50        thread = MyThread(work_queue)
51        thread.start()
52        threads.append(thread)
53    # Fill queue
54    queue_lock.acquire()
55    for case in cases_to_run:
56        work_queue.put(case)
57    queue_lock.release()
58    # Wait queue empty
59    while not work_queue.empty():
60        pass
61    # Notify thread quite
62    exitFlag = 1
63    # Wait all threads done
64    for t in threads:
65        t.join()
66    end_time = time.time()
67    print("All cases finished with running time --{:.2f} seconds" .format(end_time - start_time))
68    print('Below cases are passed:\n %s' %cases_run_success)
69    print('Below cases are failed:\n %s' % cases_run_fail)

可以看到,你自己要做很多事情來保證多線程的正常運行,你要維護queue,要注意資源鎖,你要使用join來等待子線程都完成。 多線程難道都這麼麻煩嗎?

“人生苦短,我用python”, 不知道你們聽過沒 :)

multiprocessing.dummy 來助你一臂之力!

dummy是multiprocessing的一個克隆體,唯一的區別在於,dummy使用線程而multiprocessing使用進程。

 1from multiprocessing.dummy import Pool as ThreadPool
2import time
3from common.test_case_finder import DiscoverTestCases, unpack_test_cases_from_functions
4def f(case):
5    name, func, value = case
6    try:
7        if value:
8            func.__call__(name, *value)
9        else:
10            func.__call__(name)
11    except:
12        # traceback.print_exc()
13        cases_run_fail.append(name)
14    else:
15        cases_run_success.append(name)
16    return cases_run_fail, cases_run_success
17if __name__ == "__main__":
18    number_of_threads = 5
19    mypath = r"D:\ktest\tests\test_page1"
20    cases_to_run = []
21    cases_run_success = []
22    cases_run_fail = []
23    discover_cases = DiscoverTestCases(mypath)
24    mds = discover_cases.get_modules_spec()
25    raw_test_cases = discover_cases.find_classes_in_module(mds)
26    cases_to_run = unpack_test_cases_from_functions(raw_test_cases)
27    start_time = time.time()
28    with ThreadPool(number_of_threads) as p:
29        p.map(f, cases_to_run)
30    p.close()
31    p.join()
32    end_time = time.time()
33    print('Below cases are passed:\n %s' %cases_run_success)
34    print('Below cases are failed:\n %s' % cases_run_fail)
35    print("All cases finished with running time --{:.2f} seconds" .format(end_time - start_time))

可以看到,傳統的threading.Thread除開定義Thread類外,還要用了這麼多篇幅來做多線程,multiprocessing.dummy只用一個Pool就全搞定了。

好了,今天我們講到這裏,併發也實現了,動態挑選也實現了,後面我們就要實現test fixture, 敬請期待!


 -  End  -

作者:

Kevin Cai, 江湖人稱蔡老師。

兩性情感專家,非著名測試開發。

技術路線的堅定支持者,始終相信Nobody can be somebody。      

                     

· 猜你喜歡的文章 ·

功能測試進階系列直播説明

爬蟲入門 --打造網站自生成系統(預告篇)

測試往何處去 -- 新時期測試如何面對挑戰

先點贊、再轉發、麼麼噠↓↓

https://hk.wxwenku.com/d/200085804