背景:接口1返回数据用于接口2,3请求,接口3返回数据用于接口4请求
实现代码如下
#!/usr/local/bin/python3
import queue
import random
import sys
import os
import time
import threading
import requests,json
import math
#全局计数器
def in_counter(rtype):
global errornum,rernum,rernum_y,allnum_y,allnum_zs,allnum_uy,allnum_uz#声明为全局变量,否则无法在此子函数中计算它
if rtype == "nre":
errornum += 1
elif rtype == "yre":
rernum_y += 1
elif rtype == "re":
rernum += 1
elif rtype == "all_y":
allnum_y += 1
elif rtype == "all_uy":
allnum_uy += 1
elif rtype == "all_zs":
allnum_zs += 1
elif rtype == "all_uz":
allnum_uz += 1
else:
print("统计信息出现错误",flush=True)
def get_phone_num(th):
area_code = ['111','112','113','114','115','116','117','118','119','120','121','122','123','124','125','126','127','128','129','130','131']
phone_num=""
phone_num += area_code[th] #根据线程号获取地区码,防止重复
phone_num += str(random.randint(0,999)).zfill(3) #生成中间三位号码,不足三位前补零
phone_num += str(random.randint(0,9999)).zfill(4) #生成中间四位号码,不足四位前补零
return phone_num
#接口1
def query1(onenum,th):
url="http://"+server_ip+":port/url_path"
for i in range(onenum):
time.sleep(sleep_time)
callingphone=get_phone_num(th)
data={
"***":"****",
"***":"****",
"****":citycode,
"****":callingphone,
"***":""
}
try:
r=requests.post(url=url,json=data,headers=headers)
jsonStr = json.loads(r.text)
retCodeStr=jsonStr['retCode']
if retCodeStr!=0:
with open(log_file,'a') as file:
file.write("1接口报错如下:"+r.text+"
")
in_counter("yre")
elif un_pro_num !=0 and i% un_pro_num == 0:
shared_queue_q.put(callingphone+","+jsonStr['***'])
else :
shared_queue_x.put(callingphone+","+jsonStr['***'])
in_counter("all_y")
except Exception as e:
in_counter("nre")
print("1接口请求失败:",{str(e)},flush=True)
#接口2
def unbind1(dh):
url="http://"+server_ip+":port/url_path"
while True:
#从队列1中获取待处理数据
item = shared_queue_x.get()
#遇到结束标识结束线程并写入一个新的标记供其他线程使用
if item is None:
shared_queue_x.put(None)
break
time.sleep(sleep_time)
data={
"****": item.split(",")[0],
"****": "1",
"***": "****",
"***": item.split(",")[1]
}
try:
r=requests.post(url=url,json=data,headers=headers)
jsonStr = json.loads(r.text)
retCodeStr=jsonStr['retCode']
if retCodeStr!=0:
with open(log_file,'a') as file:
file.write("2接口报错如下:"+r.text+"
")
in_counter("re")
in_counter("all_uy")
except Exception as e:
in_counter("nre")
print("2接口请求失败:",{str(e)},flush=True)
#接口3
def query2(zhs):
url="http://"+server_ip+":port/url_path"
while True:
#从2队列中获取数据
item = shared_queue_q.get()
if item is None:
shared_queue_q.put(None)
break
toMobile=get_phone_num(zhs)
time.sleep(sleep_time)
data={
"*****": item.split(",")[0],
"****":toMobile,
"***":"****",
"****":citycode,
"****":item.split(",")[1],
}
try:
r=requests.post(url=url,json=data,headers=headers)
jsonStr = json.loads(r.text)
retCodeStr=jsonStr['retCode']
if retCodeStr!=0:
with open(log_file,'a') as file:
file.write("3接口报错如下:"+r.text+"--"+str(item)+","+toMobile+"
")
in_counter("re")
else :
shared_queue_z.put(item.split(",")[0]+","+toMobile+","+jsonStr['***'])
in_counter("all_zs")
except Exception as e:
in_counter("nre")
print("3接口请求失败:",{str(e)},flush=True)
#接口4
def unbind2(uzs):
url="http://"+server_ip+":port/url_path"
while True:
#从确认绑定队列中获取待解绑数据
item = shared_queue_z.get()
if item is None:
shared_queue_z.put(None)
break
time.sleep(sleep_time)
data={
"****": "****",
"****": "****",
"***": item.split(",")[0],
"****": item.split(",")[1],
"****": item.split(",")[2]
}
try:
r=requests.post(url=url,json=data,headers=headers)
jsonStr = json.loads(r.text)
retCodeStr=jsonStr['retCode']
if retCodeStr!=0:
with open(log_file,'a') as file:
file.write("4接口报错如下:"+r.text+"
")
in_counter("re")
in_counter("all_uz")
except Exception as e:
in_counter("nre")
print("4接口请求失败:",{str(e)},flush=True)
errornum=0#统计报错信息
rernum_y=0#单独统计接口1retCode非0的请求
rernum=0#非1接口解析错误数统计,retCode非0的请求
allnum_y=0#1接口执行总次数统计
allnum_zs=0#2接口执行总次数统计
allnum_uy=0#3接口执行总次数统计
allnum_uz=0#4接口执行总次数统计
sleep_time=0.05#设置休眠时长,控制请求频率
#错误日志存储路径
log_file="/home/saicops/zcgpy/virmobile/error.log"
citycode="**"
dyeing="***"#染色标记位,控制请求环境
headers={
"*****":"****",
"Content-Type":"application/json",
"x-dyeing":dyeing
}
# 创建一个先进先出队列对象,存储数据用于2接口
shared_queue_x = queue.Queue()
# 创建一个先进先出队列对象,存储数据用于3接口
shared_queue_q = queue.Queue()
# 创建一个先进先出队列对象,存储数据用于4接口
shared_queue_z = queue.Queue()
try:
fornum=int(sys.argv[1])#执行入参获取启动线程数
onenum=int(sys.argv[2])#执行入参获取单线程运行次数
server_ip=sys.argv[3]#获取请求服务器地址
un_pro_num=int(sys.argv[4])#获取不2的比例,为n则1/n的预绑不参与2,0则全部2,1为全部不2
except Exception as e:
print("入参错误,终止程序运行,报错:",{str(e)})
print("请按'**.sh 启动线程数 单线程运行次数 服务器ip地址 不2的比例-正整数-n则为n分之一预绑不参与2'的格式运行命令")
sys.exit(0)
StratTime=time.time()
# 创建并启动M个1线程
print("1开始了......",flush=True)
threads_put = []
for th in range(fornum):
t = threading.Thread(target=query1,args=(onenum,th,))
threads_put.append(t)
t.start()
threads_get = []
# 创建并启动N个2线程
if un_pro_num !=1:
print("2开始了......",flush=True)
if fornum >1:
fornum_u=fornum-1
else:
fornum_u=1
for dh in range(fornum_u):
p = threading.Thread(target=unbind1,args=(dh,))
threads_get.append(p)
p.start()
# 创建并启动N个3线程&4线程
threads_zhs = []
threads_uq = []
if un_pro_num != 0 :
print("3开始了......",flush=True)
fornum_z=math.ceil(fornum/un_pro_num)
for zhs in range(fornum_z):
z = threading.Thread(target=query2,args=(zhs,))
threads_zhs.append(z)
z.start()
print("4开始了......线",flush=True)
for uzs in range(fornum_z):
uq = threading.Thread(target=unbind2,args=(uzs,))
threads_uq.append(uq)
uq.start()
#等待1线程全部结束
for t in threads_put:
t.join()
#通知1队列&2队列结束
shared_queue_x.put(None)
shared_queue_q.put(None)
endtime_p=time.time()
runtime_p=endtime_p-StratTime
print("1接口运行"+str(fornum)+"个线程"+str(onenum)+"次,合计执行"+str(allnum_y)+"次,总耗时:"+str(runtime_p)+"s,请求失败次数:"+str(errornum)+",接口解析失败次数:"+str(rernum_y)+",TPS约为"+str(allnum_y/runtime_p)+"次/s")
#等待2程全部结束
for p in threads_get:
p.join()
if un_pro_num !=1:
endtime1_g=time.time()
runtime1_g=endtime1_g-StratTime
print("2接口运行"+str(fornum_u)+"个线程合计执行"+str(allnum_uy)+"次,总耗时:"+str(runtime1_g)+"s,TPS约为"+str(allnum_uy/runtime1_g)+"次/s")
#等待3线程全部结束
for z in threads_zhs:
z.join()
#通知3队列结束
shared_queue_z.put(None)
if un_pro_num != 0 :
endtime1_z=time.time()
runtime1_z=endtime1_z-StratTime
print("3接口运行"+str(fornum_z)+"个线程合计执行"+str(allnum_zs)+"次,总耗时:"+str(runtime1_z)+"s,TPS约为"+str(allnum_zs/runtime1_z)+"次/s")
for uq in threads_uq:
uq.join()
if un_pro_num != 0 :
endtime1_uq=time.time()
runtime1_uq=endtime1_uq-StratTime
print("4接口"+str(fornum_z)+"个线程合计执行"+str(allnum_uz)+"次,除1接口外解析失败次数:"+str(rernum)+"次,总耗时:"+str(runtime1_uq)+"s,TPS约为"+str(allnum_uz/runtime1_uq)+"次/s")