链接文章
- 自动化测试基础设施—介绍
- 自动化测试基础设施—部署loki,grafana,influxdb
- 自动化测试基础设施—git安装部署
- 自动化测试基础设施—基础设施集成方案
- 自动化测试基础设施—git创建自动化测试工程
为了记录测试过程,我们会把一些测试相关信息,记录在测试报告里,方便分析失败的测试用例的时候,能够根据一份报告就可以了解测试过程,方便进行问题定位。但是随着用例数量的增加,会导致测试报告逐渐增大,少则几十兆,大则一两个G,导致测试报告通过浏览器打开的时候要么加载很慢,要么就是浏览器崩溃。
针对以上情况,就需要把测试过程日志提取出来,写入一个日志系统,测试报告中每条用例嵌入对应的日志链接或者日志查询的关键字信息即可。这里采用loki作为日志服务器来保存日志。
针对测试过程中需要记录的数据相关信息,如:接口返回稳定性情况,界面加载时间,内存,cpu等数据相关的信息时候,就需要一个直观的,可视化好的界面来展示这些数据变化,这里引入了influxdb数据库。
接下来就看一下怎么把loki和influxdb集成到我们的测试中来。
集成loki通过loki的官方文档提供的API,我们通过http请求的方式,将日志发送到loki服务器即可实现日志的存储,通过关键字和时间区间来查询具体的日志信息。如下图:
打开官方文档,查找官方文档中下面的部分内容:
这是官方文档提供的http请求方式的示例代码,我们根据代码集成到自动化测试工程中
- 打开测试工程,在根目录下创建utils包,并创建influxdb_loki.py文件,用于编写loki和influxdb的写入方法。
- 在influxdb_loki.py中编写send_loki方法,发送日志信息并保存到服务器,代码内容如下:
# -*- coding:utf-8 -*-
import requests
import time
import json
import traceback
loki_push_url = "http://:/loki/api/v1/push" # ip,port根据个人部署服务器信息补充完整
def send_loki(tag, log):
# 封装发送请求body
body = {
"streams": [
{
"stream": {
"tag": tag
},
"values": [
[str(time.time_ns()), log]
]
}
]
}
try:
requests.post(loki_push_url, data=json.dumps(body), headers={"Content-Type": "application/json"})
except Exception as e:
traceback.print_exc(e)
- 日志已经可以写入到日志服务中,但是如何把测试报告中的测试用例和日志对应起来呢,为了能够准确的找到对应case的日志,我们需要针对每个用例生成一个唯一标识把测试报告中的用例和loki的日志对应起来,用例ID采用内置的uuid来生成,更新测试用例文件test_demo.py,,代码如下:
# -*- coding:utf-8 -*-
import time
import allure
import pytest
import uuid
from steps.step import send_get_req, verify_status
from utils.influxdb_loki import send_loki
@pytest.mark.testdemo
class TestDemo:
@pytest.fixture(autouse=True)
def init_case(self):
self.caseid = uuid.uuid4()
self.loki_tag = "testDemo"
self.loki_caseid = "caseid={}".format(self.caseid)
yield
allure.dynamic.description_html(
"""
- 测试用例ID: {}
- 日志查询地址:loki服务
""".format(self.caseid) # 写入到报告中,用于和loki日志对应
)
@allure.title("测试正常的url请求")
def test_pass(self):
send_loki(self.loki_tag, "{} message=开始请求服务器".format(self.loki_caseid))
res = send_get_req("https://postman-echo.com/get")
send_loki(self.loki_tag, "{} message=请求发送完成,收到响应结果为:{}".format(self.loki_caseid, res.text))
verify_status(res, 200)
send_loki(self.loki_tag, "{} message=用例断言通过".format(self.loki_caseid))
@allure.title("测试异常的url请求")
def test_fail(self):
send_loki(self.loki_tag, "{} message=开始请求服务器".format(self.loki_caseid))
res = send_get_req("https://postman-echo.com/get01")
send_loki(self.loki_tag, "{} message=请求发送完成,收到响应结果为:{}".format(self.loki_caseid, res.text))
verify_status(res, 404)
send_loki(self.loki_tag, "{} message=用例断言通过".format(self.loki_caseid))
- 现在已经完成了测试用例集成loki日志系统,现在运行一下看看效果。
pytest -m testdemo --alluredir=allure-results
allure serve allure-results
可以看到生成的报告中,用例描述信息中已经生成的用例ID和对应的loki的服务器地址,便于直接跳转日志服务器查询
日志集成完成了,下面我们看一下怎么配置grafana展示loki信息。
配置grafana展示loki信息打开我们的grafana服务的界面,配置loki数据源
- 浏览器打开“http://:3000”并登录,点击左侧Configuration图标
-
添加数据源,点击“Data sources"标签,并点击”Add data source“, 进入如下界面
-
搜索框搜索”loki“并选择,并在接下来界面中的url中输入loki服务地址,默认:http://:3100, 点击底部的”Save & Test" 按钮
-
点击左侧工具栏的“Explore”按钮,进入如下界面
选择数据源为配置的loki,并点击“Log browser",进入如下界面,用来查找日志选项
-
并根据如下序号进行选择,会显示出我们刚才自动化执行的日志信息
如果要根据caseid来筛选日志信息,则需要在搜索框中输入 {tag=“testDemo”} |=“caseid=“,并点击右上角的”Run query” 按钮, 如:
具体的loki的查询的写法,可以参考搜索框右侧的帮助按钮,如:
至此,我们和loki的集成基本上算是结束了。
在用例执行过程中,我们会记录一些测试数据并观察这些数据的变化,如接口响应时间,UI的界面渲染时间,应用的启动时间,系统的资源占用数据等,因为这些数据都是随着时间在不断变化,我们引入了influxdb来保存,influxdb提供了web界面,可以看到数据的变化图表。下面就看一下怎么配置influxdb数据服务
配置influxdb- 创建influxdb的数据库,登录influxdb的web系统,点击左侧“Load Data" 下面的 “Buckets”,在进入的界面中,点击右侧的“Create Bucket"按钮
- 输入要创建的数据库名,如”AutoTestBucket“,Delete Data选项根据个人业务需要,可以不删除(Never),或者删除多少天之前的数据(Older Than),选择好之后,点击”Create“按钮
- 创建APItoken,点击左侧“Load Data" 下面的 “API Tokens”,在进入的界面中,点击右侧的“Generate API Token"按钮,选择”Read/Write API Token“,输入描述信息,在Read和Wirte 区域选择对应的数据库名,并点击”Save“按钮
- 点击新创建的”API Token“并查看api token信息
复制token信息,用于接下来的自动化测试中上传数据信息。
先看一下influxdb给出的通过python发送数据的例子,点击左侧“Load Data" 下面的 “Sources”,在进入的界面中,在界面中找到python的图标并点击进去,会展示python的示例代码
根据界面提示的内容,安装对应的python库 influxdb-client
pip install influxdb-client
依赖库安装完成之后,接下来我们就根据示例代码,集成influxdb到我们自动化工程中
集成influxdb- 打开demo4py工程,更新influxdb_loki.py文件,根据示例代码我们提取出动态信息作为参数,针对token,url之类的信息作为常量,代码如下:
# -*- coding:utf-8 -*-
import requests
import time
import json
import traceback
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
loki_push_url = "http://:3100/loki/api/v1/push"
influxdb_url = "http://:8086"
influxdb_bucket = "AutoTestBucket"
influxdb_org = "xxx"
influxdb_token = "x8Tw_WAjduaHANMLjMFDuV9FjdUIS9J03MZVsefPzSl99EhR9knDKGCELF4bX4qgMAJBffjXBLjDsmD9KAA=="
def send_loki(tag, log):
# 封装发送请求body
body = {
"streams": [
{
"stream": {
"tag": tag
},
"values": [
[str(time.time_ns()), log]
]
}
]
}
try:
requests.post(loki_push_url, data=json.dumps(body), headers={"Content-Type": "application/json"})
except Exception as e:
traceback.print_exc(e)
def send_influxdb(table_name, tag_key, tag_value, field_key, field_value):
"""
:param table_name: 表名
:param tag_key: 数据的tag标记,如按照什么分类,如:功能模块(func)
:param tag_value: 标记值,如分类的名称,如:订单,支付等
:param field_key: 数据的名称,如:query,submit等
:param field_value: 数据的值
:return:
"""
with InfluxDBClient(url=influxdb_url, token=influxdb_token, org=influxdb_org) as client:
try:
write_api = client.write_api(write_options=SYNCHRONOUS)
data = "{},{}={} {}={}".format(table_name, tag_key,tag_value,field_key,field_value)
write_api.write(influxdb_bucket, influxdb_org, data)
write_api.flush()
except Exception as e:
traceback.print_exc(e)
- 在测试用例中添加时间统计并上传数据到influxdb,为了演示数据上传,我们先用接口返回时间来统计,用例代码调整后如下:
# -*- coding:utf-8 -*-
import random
import time
import allure
import pytest
import uuid
from steps.step import send_get_req, verify_status
from utils.influxdb_loki import send_loki, send_influxdb
@pytest.mark.testdemo
class TestDemo:
@pytest.fixture(autouse=True)
def init_case(self):
self.caseid = uuid.uuid4()
self.loki_tag = "testDemo"
self.loki_caseid = "caseid={}".format(self.caseid)
yield
allure.dynamic.description_html(
"""
- 测试用例ID: {}
- 日志查询地址:loki服务
""".format(self.caseid) # 写入到报告中,用于和loki日志对应
)
@allure.title("测试正常的url请求")
def test_pass(self):
send_loki(self.loki_tag, "{} message=开始请求服务器".format(self.loki_caseid))
start_timestamp = time.time()*1000
res = send_get_req("https://postman-echo.com/get")
time.sleep(random.random()) # 为了效果临时添加
end_timestamp = time.time()*1000
send_influxdb("autotest","模块","订单模块","订单查询耗时",end_timestamp-start_timestamp)
send_loki(self.loki_tag, "{} message=请求发送完成,收到响应结果为:{}".format(self.loki_caseid, res.text))
verify_status(res, 200)
send_loki(self.loki_tag, "{} message=用例断言通过".format(self.loki_caseid))
@allure.title("测试异常的url请求")
def test_fail(self):
send_loki(self.loki_tag, "{} message=开始请求服务器".format(self.loki_caseid))
res = send_get_req("https://postman-echo.com/get01")
send_loki(self.loki_tag, "{} message=请求发送完成,收到响应结果为:{}".format(self.loki_caseid, res.text))
verify_status(res, 404)
send_loki(self.loki_tag, "{} message=用例断言通过".format(self.loki_caseid))
- 重新执行用例,为了体现出数据效果,可以多执行几次,借助于pytest-repeat库,可指定次数执行
安装pytest-repeat库,多次执行用例命令
pip install pytest-repeat
pytest -m testdemo --count=10
- 看一下influxdb页面的结果,能看到响应时间曲线图,针对视图的更多用法请参考influxDB官网
到现在为止,我们完成了git+jenkins+pytest+allure+loki+influxdb的集成,可以执行以下看看效果了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)