知识关系

所属体系: 工程工具与自动化 / 工程工具链 主题节点: 记录初学Python开发fio测试工具 推荐前置: 服务器稳定性及基准测试方法 相关主题: C++编写fio测试工具 / C++写脚本,我在发疯 原始来源: source/_posts/记录初学Python开发fio测试工具.md 从旧博客迁移;已按知识图谱结构重新归档。


使用Python自动测试IOPS性能并格式化输出

代码过多,请点击阅读全文查看

#!/usr/bin/python3
 
"""
TODO:
    此脚本已测试兼容环境为Debian 11.6
    此脚本的测试路径为"/iopsTest",请提前将你要测试的设备挂载到"/iopsTest"
    注意自行根据盘位修改下列numjobs参数
"""
 
import subprocess, re, os
 
 
# 随机写
def randwrite():
    # 初始化用于存储运行结果的列表
    bw = [0, 0, 0]
    iops = [0, 0, 0]
 
    # fio重复运行4次
    print("随机写进行中...")
    for i in range(4):
        cmd = [
            "fio",
            "-name=YEOS",
            "-size=32G",
            "-runtime=60s",
            "-time_base",
            "-bs=4k",
            "-direct=1",
            "-rw=randwrite",
            "-ioengine=libaio",
            "-numjobs=8",
            "-group_reporting",
            "-iodepth=64",
            f"-filename=/iopsTest/{i}",
            "-randrepeat=0",
        ]
 
        # 将fio运行结果标准输出到管道
        fio1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
        fio2 = subprocess.Popen(
            ["grep", "samples"], stdin=fio1.stdout, stdout=subprocess.PIPE, shell=False
        )
 
        # 使用communicate获取子进程的标准输出并格式化为utf-8编码
        fio = fio2.communicate()[0].decode("utf-8")
 
        # 初始化列表
        bw_num = []
        iops_num = []
 
        # 匹配数字和小数点,并将其元素更新
        for line in fio.split("\n"):
            if "bw" in line:
                bw_num.extend(re.findall(r"\d+\.\d+|\d+", line))
            elif "iops" in line:
                iops_num.extend(re.findall(r"\d+\.\d+|\d+", line))
 
        # 将str类型转换为float后再转换为int
        bw_num = [int(float(e)) for e in bw_num]
        iops_num = [int(float(e)) for e in iops_num]
        print(f"单次带宽运行结果:{bw_num}")
        print(f"单次IOPS运行结果:{iops_num}")
 
        # 跳过第一次运行结果
        if i == 0:
            continue
        else:
            # 格式化fio结果
            for KorM in fio.split("\n"):
                # 判断格式化的结果中是否存在MiB单位的值
                if "MiB" in KorM:
                    # 若有则转换为KiB
                    print("输出结果为MiB单位,将进行转换")
                    bw[0] += bw_num[0] * 1024
                    bw[1] += bw_num[1] * 1024
                    bw[2] += bw_num[3] * 1024
                    iops[0] += iops_num[0]
                    iops[1] += iops_num[1]
                    iops[2] += iops_num[2]
                    print(f"带宽第{i}次值:{bw}")
                    print(f"IOPS第{i}次值:{iops}")
                elif "KiB" in KorM:
                    print("输出结果为KiB单位,不转换")
                    bw[0] += bw_num[0]
                    bw[1] += bw_num[1]
                    bw[2] += bw_num[3]
                    iops[0] += iops_num[0]
                    iops[1] += iops_num[1]
                    iops[2] += iops_num[2]
                    print(f"带宽第{i}次值:{bw}")
                    print(f"IOPS第{i}次值:{iops}")
 
    bwMin = int(bw[0] / 3)
    bwMax = int(bw[1] / 3)
    bwAvg = int(bw[2] / 3)
    iopsMin = int(iops[0] / 3)
    iopsMax = int(iops[1] / 3)
    iopsAvg = int(iops[2] / 3)
 
    print(
        f"\n\n\n随机写均值如下:\n带宽最小值:{bwMin},最大值{bwMax},均值{bwAvg}\nIOPS最小值:{iopsMin},"
        f"最大值{iopsMax},均值{iopsAvg}"
    )
 
 
# 创建读文件
def create_readFile():
    print("初始化读测试环境,至少需要十几分钟甚至几十分钟,等着...")
    clear = subprocess.Popen(["rm", "-rf", "/iopsTest/*"], shell=False)
    clear.wait()
    print("环境检测完成,创建读测试文件...")
    cmd = [
        "fio",
        "-name=create_read",
        "-size=32G",
        "-bs=1M",
        "-direct=1",
        "-rw=write",
        "-ioengine=libaio",
        "-numjobs=8",
        "-filename=/iopsTest/read",
    ]
    create = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
    done = create.communicate()[0].decode("utf-8")
 
 
# 随机读
def randread():
    # 初始化用于存储运行结果的列表
    bw = [0, 0, 0]
    iops = [0, 0, 0]
    # fio重复运行4次
    print("随机读进行中...")
 
    for i in range(4):
        cmd = [
            "fio",
            "-name=YEOS",
            "-size=32G",
            "-runtime=60s",
            "-time_base",
            "-bs=4k",
            "-direct=1",
            "-rw=randwrite",
            "-ioengine=libaio",
            "-numjobs=8",
            "-group_reporting",
            "-iodepth=64",
            "-filename=/iopsTest/read",
            "-randrepeat=0",
        ]
 
        # 将fio运行结果标准输出到管道
        fio1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
        fio2 = subprocess.Popen(
            ["grep", "samples"], stdin=fio1.stdout, stdout=subprocess.PIPE, shell=False
        )
 
        # 使用communicate获取子进程的标准输出并格式化为utf-8编码
        fio = fio2.communicate()[0].decode("utf-8")
 
        # 初始化列表
        bw_num = []
        iops_num = []
 
        # 匹配数字和小数点,并将其元素更新
        for line in fio.split("\n"):
            if "bw" in line:
                bw_num.extend(re.findall(r"\d+\.\d+|\d+", line))
            elif "iops" in line:
                iops_num.extend(re.findall(r"\d+\.\d+|\d+", line))
 
        # 将str类型转换为float后再转换为int
        bw_num = [int(float(e)) for e in bw_num]
        iops_num = [int(float(e)) for e in iops_num]
        print(f"单次带宽运行结果:{bw_num}")
        print(f"单次IOPS运行结果:{iops_num}")
 
        # 跳过第一次运行结果
        if i == 0:
            continue
        else:
            # 格式化fio结果
            for KorM in fio.split("\n"):
                # 判断格式化的结果中是否存在MiB单位的值
                if "MiB" in KorM:
                    # 若有则转换为KiB
                    print("输出结果为MiB单位,将进行转换")
                    bw[0] += bw_num[0] * 1024
                    bw[1] += bw_num[1] * 1024
                    bw[2] += bw_num[3] * 1024
                    iops[0] += iops_num[0]
                    iops[1] += iops_num[1]
                    iops[2] += iops_num[2]
                    print(f"带宽第{i}次值:{bw}")
                    print(f"IOPS第{i}次值:{iops}")
                elif "KiB" in KorM:
                    print("输出结果为KiB单位,不转换")
                    bw[0] += bw_num[0]
                    bw[1] += bw_num[1]
                    bw[2] += bw_num[3]
                    iops[0] += iops_num[0]
                    iops[1] += iops_num[1]
                    iops[2] += iops_num[2]
                    print(f"带宽第{i}次值:{bw}")
                    print(f"IOPS第{i}次值:{iops}")
 
    bwMin = int(bw[0] / 3)
    bwMax = int(bw[1] / 3)
    bwAvg = int(bw[2] / 3)
    iopsMin = int(iops[0] / 3)
    iopsMax = int(iops[1] / 3)
    iopsAvg = int(iops[2] / 3)
 
    print(
        f"\n\n\n随机读均值如下:\n带宽最小值:{bwMin},最大值{bwMax},均值{bwAvg}\nIOPS最小值:{iopsMin},"
        f"最大值{iopsMax},均值{iopsAvg}"
    )
 
 
# 随机读写
def randrw():
    # 初始化用于存储运行结果的列表
    bw = [0, 0, 0, 0, 0, 0]
    iops = [0, 0, 0, 0, 0, 0]
    # fio重复运行4次
    print("随机读写进行中...")
    for i in range(4):
        cmd = [
            "fio",
            "-name=YEOS",
            "-size=32G",
            "-runtime=60s",
            "-time_base",
            "-bs=4k",
            "-direct=1",
            "-rw=randrw",
            "-ioengine=libaio",
            "-numjobs=8",
            "-group_reporting",
            "-iodepth=64",
            "-filename=/iopsTest/read",
            "-rwmixwrite=30",
            "-randrepeat=0",
        ]
 
        # 将fio运行结果标准输出到管道
        fio1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
 
        fio2 = subprocess.Popen(
            ["grep", "samples"], stdin=fio1.stdout, stdout=subprocess.PIPE, shell=False
        )
 
        # 使用communicate获取子进程的标准输出并格式化为utf-8编码
        fio = fio2.communicate()[0].decode("utf-8")
 
        # 初始化列表
        bw_num = []
        iops_num = []
        # 匹配数字和小数点,并将其元素更新
        for line in fio.split("\n"):
            if "bw" in line:
                bw_num.extend(re.findall(r"\d+\.\d+|\d+", line))
            elif "iops" in line:
                iops_num.extend(re.findall(r"\d+\.\d+|\d+", line))
 
        # 将str类型转换为float后再转换为int
        bw_num = [int(float(e)) for e in bw_num]
        iops_num = [int(float(e)) for e in iops_num]
 
        print(f"单次带宽运行结果:{bw_num}")
        print(f"单次IOPS运行结果:{iops_num}")
        # 跳过第一次运行结果
        if i == 0:
            continue
        else:
            # 格式化fio结果
            for KorM in fio.split("\n"):
                # 判断格式化的结果中是否存在MiB单位的值
                if "MiB" in KorM:
                    print("输出结果为MiB单位,将进行转换")
                    # 若有则转换为KiB
                    # 读带宽
                    bw[0] += bw_num[0] * 1024
                    bw[1] += bw_num[1] * 1024
                    bw[2] += bw_num[3] * 1024
                    # 写带宽
                    bw[3] += bw_num[6] * 1024
                    bw[4] += bw_num[7] * 1024
                    bw[5] += bw_num[9] * 1024
                    # 读IOPS
                    iops[0] += iops_num[0]
                    iops[1] += iops_num[1]
                    iops[2] += iops_num[2]
                    # 写IOPS
                    iops[3] += iops_num[5]
                    iops[4] += iops_num[6]
                    iops[5] += iops_num[7]
                    print(f"带宽第{i}次值:{bw}")
                    print(f"IOPS第{i}次值:{iops}")
                    # 因fio结果存在读写两行,避免重复执行,所以直接跳过后续循环
                    break
                elif "KiB" in KorM:
                    print("输出结果为KiB单位,不转换")
                    # 读带宽
                    bw[0] += bw_num[0]
                    bw[1] += bw_num[1]
                    bw[2] += bw_num[3]
                    # 写带宽
                    bw[3] += bw_num[6]
                    bw[4] += bw_num[7]
                    bw[5] += bw_num[9]
                    # 读IOPS
                    iops[0] += iops_num[0]
                    iops[1] += iops_num[1]
                    iops[2] += iops_num[2]
                    # 写IOPS
                    iops[3] += iops_num[5]
                    iops[4] += iops_num[6]
                    iops[5] += iops_num[7]
                    print(f"带宽第{i}次值:{bw}")
                    print(f"IOPS第{i}次值:{iops}")
                    # 因fio结果存在读写两行,避免重复执行,所以直接跳过后续循环
                    break
 
    RbwMin = int(bw[0] / 3)
    RbwMax = int(bw[1] / 3)
    RbwAvg = int(bw[2] / 3)
 
    WbwMin = int(bw[3] / 3)
    WbwMax = int(bw[4] / 3)
    WbwAvg = int(bw[5] / 3)
 
    RiopsMin = int(iops[3] / 3)
    RiopsMax = int(iops[4] / 3)
    RiopsAvg = int(iops[5] / 3)
 
    WiopsMin = int(iops[0] / 3)
    WiopsMax = int(iops[1] / 3)
    WiopsAvg = int(iops[2] / 3)
    print(
        f"\n\n\n随机读写均值如下:\n"
        f"读:\n带宽最小值:{RbwMin},最大值{RbwMax},均值{RbwAvg}\nIOPS最小值:{RiopsMin},"
        f"最大值{RiopsMax},均值{RiopsAvg}"
        "\n"
        f"写:\n带宽最小值:{WbwMin},最大值{WbwMax},均值{WbwAvg}\nIOPS最小值:{WiopsMin},"
        f"最大值{WiopsMax},均值{WiopsAvg}"
    )
 
 
def rm_file():
    print("请等待程序清除测试残留文件...")
    rm = os.system("rm -rf /smbTest/*")
    print("清除完毕,程序结束!")
 
 
if __name__ == "__main__":
    print("欢迎使用群晖测试工具\n本工具测试内容:\n路径挂载模式下IOPS性能测试")
    rm_file()
    randwrite()
    rm_file()
    create_readFile()
    randread()
    randrw()
    rm_file()