web_test/run.py
2024-07-31 20:54:28 +08:00

70 lines
2.2 KiB
Python

import json
import subprocess
def run_web_test(url, count, bind, name):
command = ["./target/release/web_test", url, count, f"socks5h://{bind}", name]
result = subprocess.run(command, capture_output=True, text=True)
print(result)
return result.stdout.strip()
def sort_by_field(input_dict):
# 提取字典中的键值对,并根据值中的 'field' 进行排序
sorted_items = sorted(input_dict.items(), key=lambda item: item[1]['latency'])
# 将排序后的键值对转换为包含二元组的数组
result = [(key, value) for key, value in sorted_items]
return result
def main(json_file, url, count):
# 读取JSON文件
with open(json_file, 'r') as file:
data = json.load(file)
results = {}
# 针对每一个name和bind运行程序
for entry in data:
name = entry['name']
bind = entry['bind']
result = run_web_test(url, count, bind, name)
if result.find("error") == -1:
data = process_string(result)
results[name] = data
results = sort_by_field(results)
return results
def process_string(input_string):
# 将字符串按逗号分割
parts = input_string.split(',')
# 确保字符串有5个部分
if len(parts) != 5:
raise ValueError("输入字符串必须由5部分组成")
# 提取并转换各部分数据
name = parts[0].strip()
latency = float(parts[1].strip())
variance = float(parts[2].strip())
success_count = int(parts[3].strip())
failure_count = int(parts[4].strip())
# 返回提取的信息
return {
'name': name,
'latency': latency,
'variance': variance,
'success_count': success_count,
'failure_count': failure_count
}
if __name__ == "__main__":
json_file = "all_server.json" # 替换为你的JSON文件名
url = "https://cp.cloudflare.com/generate_204" # 替换为你的url
count = "10" # 替换为你的count
results = main(json_file, url, count)
# 打印或保存结果
print(json.dumps(results, indent=4))
# 如果你想保存到文件,可以取消以下注释
with open('output.json', 'w') as outfile:
json.dump(results, outfile, indent=4)