使用python开发一个多接口并发压测脚本

背景:接口1返回数据用于接口2,3请求,接口3返回数据用于接口4请求

实现代码如下

#!/usr/local/bin/python3
import queue
import random
import sys
import os
import time
import threading
import requests,json
import math

#全局计数器
def in_counter(rtype):
    global errornum,rernum,rernum_y,allnum_y,allnum_zs,allnum_uy,allnum_uz#声明为全局变量,否则无法在此子函数中计算它
    if rtype == "nre":
        errornum += 1
    elif rtype == "yre":
        rernum_y += 1
    elif rtype == "re":
        rernum += 1
    elif rtype == "all_y":
        allnum_y += 1
    elif rtype == "all_uy":
        allnum_uy += 1
    elif rtype == "all_zs":
        allnum_zs += 1
    elif rtype == "all_uz":
        allnum_uz += 1
    else:
        print("统计信息出现错误",flush=True)

def get_phone_num(th):
    area_code = ['111','112','113','114','115','116','117','118','119','120','121','122','123','124','125','126','127','128','129','130','131']
    phone_num=""
    phone_num += area_code[th] #根据线程号获取地区码,防止重复
    phone_num += str(random.randint(0,999)).zfill(3) #生成中间三位号码,不足三位前补零
    phone_num += str(random.randint(0,9999)).zfill(4) #生成中间四位号码,不足四位前补零
    
    return phone_num
#接口1
def query1(onenum,th):
    url="http://"+server_ip+":port/url_path"
    for i in range(onenum):
        time.sleep(sleep_time)
        callingphone=get_phone_num(th)
        data={
            "***":"****",
            "***":"****",
            "****":citycode,
            "****":callingphone,
            "***":""
        }
        try:
            r=requests.post(url=url,json=data,headers=headers)
            jsonStr = json.loads(r.text)
            retCodeStr=jsonStr['retCode']
            if retCodeStr!=0:
                with open(log_file,'a') as file:
                    file.write("1接口报错如下:"+r.text+"
")
                in_counter("yre")
            elif un_pro_num !=0 and i% un_pro_num == 0:
                shared_queue_q.put(callingphone+","+jsonStr['***'])
            else :
                shared_queue_x.put(callingphone+","+jsonStr['***'])
            in_counter("all_y")
        except Exception as e:
            in_counter("nre")
            print("1接口请求失败:",{str(e)},flush=True) 
#接口2
def unbind1(dh):
    url="http://"+server_ip+":port/url_path"
    while True:
        #从队列1中获取待处理数据
        item = shared_queue_x.get()
        #遇到结束标识结束线程并写入一个新的标记供其他线程使用
        if item is None: 
            shared_queue_x.put(None)
            break
        time.sleep(sleep_time)
        data={
            "****": item.split(",")[0],
            "****": "1",
            "***": "****",
            "***": item.split(",")[1]
        }
        try:
            r=requests.post(url=url,json=data,headers=headers)
            jsonStr = json.loads(r.text)
            retCodeStr=jsonStr['retCode']
            if retCodeStr!=0:
                with open(log_file,'a') as file:
                    file.write("2接口报错如下:"+r.text+"
")
                in_counter("re")
            in_counter("all_uy")
        except Exception as e:
            in_counter("nre")
            print("2接口请求失败:",{str(e)},flush=True)
#接口3
def query2(zhs):
    url="http://"+server_ip+":port/url_path"
    while True:
        #从2队列中获取数据
        item = shared_queue_q.get()
        if item is None:
            shared_queue_q.put(None)
            break
        toMobile=get_phone_num(zhs)
        time.sleep(sleep_time)
        data={
            "*****": item.split(",")[0],
            "****":toMobile,
            "***":"****",
            "****":citycode,
            "****":item.split(",")[1],
        }
        try:
            r=requests.post(url=url,json=data,headers=headers)
            jsonStr = json.loads(r.text)
            retCodeStr=jsonStr['retCode']
            if retCodeStr!=0:
                with open(log_file,'a') as file:
                    file.write("3接口报错如下:"+r.text+"--"+str(item)+","+toMobile+"
")
                in_counter("re")
            else :
                shared_queue_z.put(item.split(",")[0]+","+toMobile+","+jsonStr['***'])
            in_counter("all_zs")
        except Exception as e:
            in_counter("nre")
            print("3接口请求失败:",{str(e)},flush=True)
#接口4
def unbind2(uzs):
    url="http://"+server_ip+":port/url_path"
    while True:
        #从确认绑定队列中获取待解绑数据
        item = shared_queue_z.get()
        if item is None:
            shared_queue_z.put(None)
            break
        time.sleep(sleep_time)
        data={
            "****": "****",
            "****": "****",
            "***": item.split(",")[0],
            "****": item.split(",")[1],
            "****": item.split(",")[2]
        }
        try:
            r=requests.post(url=url,json=data,headers=headers)
            jsonStr = json.loads(r.text)
            retCodeStr=jsonStr['retCode']
            if retCodeStr!=0:
                with open(log_file,'a') as file:
                    file.write("4接口报错如下:"+r.text+"
")
                in_counter("re")
            in_counter("all_uz")
        except Exception as e:
            in_counter("nre")
            print("4接口请求失败:",{str(e)},flush=True)

errornum=0#统计报错信息
rernum_y=0#单独统计接口1retCode非0的请求
rernum=0#非1接口解析错误数统计,retCode非0的请求
allnum_y=0#1接口执行总次数统计
allnum_zs=0#2接口执行总次数统计
allnum_uy=0#3接口执行总次数统计
allnum_uz=0#4接口执行总次数统计
sleep_time=0.05#设置休眠时长,控制请求频率
#错误日志存储路径
log_file="/home/saicops/zcgpy/virmobile/error.log"

citycode="**"
dyeing="***"#染色标记位,控制请求环境

headers={
        "*****":"****",
        "Content-Type":"application/json",
        "x-dyeing":dyeing
    }

# 创建一个先进先出队列对象,存储数据用于2接口
shared_queue_x = queue.Queue()
# 创建一个先进先出队列对象,存储数据用于3接口
shared_queue_q = queue.Queue()
# 创建一个先进先出队列对象,存储数据用于4接口
shared_queue_z = queue.Queue()

try:
    fornum=int(sys.argv[1])#执行入参获取启动线程数
    onenum=int(sys.argv[2])#执行入参获取单线程运行次数
    server_ip=sys.argv[3]#获取请求服务器地址
    un_pro_num=int(sys.argv[4])#获取不2的比例,为n则1/n的预绑不参与2,0则全部2,1为全部不2
except Exception as e:
    print("入参错误,终止程序运行,报错:",{str(e)})
    print("请按'**.sh 启动线程数 单线程运行次数 服务器ip地址 不2的比例-正整数-n则为n分之一预绑不参与2'的格式运行命令")
    sys.exit(0)

StratTime=time.time()
# 创建并启动M个1线程
print("1开始了......",flush=True)
threads_put = []
for th in range(fornum):
    t = threading.Thread(target=query1,args=(onenum,th,))
    threads_put.append(t)
    t.start()

threads_get = []
# 创建并启动N个2线程
if un_pro_num !=1:
    print("2开始了......",flush=True)
    if fornum >1:
        fornum_u=fornum-1
    else:
        fornum_u=1
    for dh in range(fornum_u):
        p = threading.Thread(target=unbind1,args=(dh,))
        threads_get.append(p)
        p.start()
# 创建并启动N个3线程&4线程
threads_zhs = []
threads_uq = []
if un_pro_num != 0 :
    print("3开始了......",flush=True)
    fornum_z=math.ceil(fornum/un_pro_num)
    for zhs in range(fornum_z):
        z = threading.Thread(target=query2,args=(zhs,))
        threads_zhs.append(z)
        z.start()

    print("4开始了......线",flush=True)
    for uzs in range(fornum_z):
        uq = threading.Thread(target=unbind2,args=(uzs,))
        threads_uq.append(uq)
        uq.start()

#等待1线程全部结束
for t in threads_put:
    t.join()
#通知1队列&2队列结束
shared_queue_x.put(None)
shared_queue_q.put(None)

endtime_p=time.time()
runtime_p=endtime_p-StratTime
print("1接口运行"+str(fornum)+"个线程"+str(onenum)+"次,合计执行"+str(allnum_y)+"次,总耗时:"+str(runtime_p)+"s,请求失败次数:"+str(errornum)+",接口解析失败次数:"+str(rernum_y)+",TPS约为"+str(allnum_y/runtime_p)+"次/s")

#等待2程全部结束
for p in threads_get:
    p.join()
    
if un_pro_num !=1:
    endtime1_g=time.time()
    runtime1_g=endtime1_g-StratTime
    print("2接口运行"+str(fornum_u)+"个线程合计执行"+str(allnum_uy)+"次,总耗时:"+str(runtime1_g)+"s,TPS约为"+str(allnum_uy/runtime1_g)+"次/s")

#等待3线程全部结束
for z in threads_zhs:
    z.join()
#通知3队列结束
shared_queue_z.put(None)
    
if un_pro_num != 0 :
    endtime1_z=time.time()
    runtime1_z=endtime1_z-StratTime
    print("3接口运行"+str(fornum_z)+"个线程合计执行"+str(allnum_zs)+"次,总耗时:"+str(runtime1_z)+"s,TPS约为"+str(allnum_zs/runtime1_z)+"次/s")
    
for uq in threads_uq:
    uq.join()

if un_pro_num != 0 :
    endtime1_uq=time.time()
    runtime1_uq=endtime1_uq-StratTime
    print("4接口"+str(fornum_z)+"个线程合计执行"+str(allnum_uz)+"次,除1接口外解析失败次数:"+str(rernum)+"次,总耗时:"+str(runtime1_uq)+"s,TPS约为"+str(allnum_uz/runtime1_uq)+"次/s")