7种检测Python程序运行时间、CPU和内存占用的方法

7种检测Python程序运行时间、CPU和内存占用的方法,第1张

1. 使用装饰亮喊器来衡量函数执行时间

有一个简单方法,那就是定义一个装饰器来测量函数的执行时间,并输出结果:

import time

from functoolsimport wraps

import random

def fn_timer(function):

  @wraps(function)

  def function_timer(*args, **kwargs):

      t0= time.time()

      result= function(*args, **kwargs)

      t1= time.time()

      print("Total time running %s: %s seconds" %

          (function.__name__, str(t1- t0))

)

      return result

return function_timer

@fn_timer

def random_sort(n):

  return sorted([random.random() for i in range(n)])

if __name__== "__main__":

  random_sort(2000000)

输出:Total time running random_sort: 0.6598007678985596 seconds

使用方式的话,就是在要监控的函数定义上面加上 @fn_timer 就行了

或者

# 可监控程序运行时间

import time

import random

def clock(func):

    def wrapper(*args, **kwargs):

        start_time= time.time()

        result= func(*args, **kwargs)

        end_time= time.time()

        print("共耗时: %s秒" % round(end_time- start_time, 5))

        return result

return wrapper

@clock

def random_sort(n):

  return sorted([random.random() for i in range(n)])

if __name__== "__main__":

  random_sort(2000000)

输出结果:共耗时: 0.65634秒

2. 使用timeit模块

另一种方法是使用timeit模块,用来计算平均时间消耗。

执行下面的脚本可以运行该模块。

这里的timing_functions是Python脚本文件名称。

在输出的末尾,可以看到以下结果:4 loops, best of 5: 2.08 sec per loop

这表示测试了4次,平均每次测试重复5次,最好的测试结果是2.08秒。

如果不指定测试或重复次数,默认值为10次测试,每次重复5次。

3. 使用Unix系统中的time命令

然而,装饰器和timeit都是基于Python的。在外部环境测试Python时,unix time实用工具就非常有用。

运行time实用工具:

输出结果为:

Total time running random_sort: 1.3931210041 seconds

real 1.49

user 1.40

sys 0.08

第一行来自预定义的装饰器,其他三行为:

    real表示的是执行脚本的总时间

    user表示的是执行脚本消耗的CPU时间。

    sys表示陆键或的是执行内核函数消耗的时间。

注意:根据维基百科的定义,内核是一个计算机程序,用来管理软件的输入输出,并将其翻译成CPU和其他计算机中的电子设备能够执行的数据处理指令。

因此,Real执行时间和User+Sys执行时间的差就是消耗在输入/输出和系统执行其他任务时消耗的时间。

4. 使用cProfile模块

5. 使用line_profiler模块

6. 使用早伍memory_profiler模块

7. 使用guppy包

如果被监测程序是你自己编羡大此写的,那你可以在程序进程结束的时候主动去通知监测进程。

如果不是仿李,而你要监兄迅测它的运行情况,那么这和 *** 作系统有关。Windows和Linux下的处理方式是不一样的。

在公司里做的一个接口系统,主要是对接第三方的系统接口,所以,这个系统里会和很多其他公司的项目交互。随之而来一个很蛋疼的问题,这么多公司的接口,不同公司接口的稳定性差别很大,访问量大的时候,有的不怎么行的接口就各种出错了。

这个接口系统刚刚开发不久,整个系统中,处于比较边缘的位置,不像其他项目,有日志库,还有短信告警,一旦出问题,很多情况下都是用户反馈回来,所以,我的想法是,拿起python,为这个项目写一个监控。如果在调用某个第三方接口的过程中,大量出错了,说明这个接口有有问题了,就可以更快的采取措施。

项目的也是有日志库的,所有的info,error日志都是每隔一分钟禅败扫描入库,日志库是用的mysql,表里有几个特别重要的字段:

有日志库,就不用自己去线上环境扫日志分析了,直接从日志库入手。由于日志库在线上时每隔1分钟扫,那我就去日志库每隔2分钟扫一次,如果扫到有一定数量的error日志就报警,如果只有一两条错误就可以无视了,也就是短时间爆发大量错误日志,就可以断定系统有问题了。报警方式就用发送邮件,所以,需要做下面几件事情拿袭嫌:

1. *** 作MySql。

2. 发送邮件。

3. 定时任务。

4. 日志。

5. 运行脚本。

明确了以上几件事情,就可以动手了。

*** 作数据库

使用MySQLdb这个驱动,直接 *** 作数据库,主要就是查询 *** 作。

获取数据库的连接:

def get_con():

host = "127.0.0.1"

port = 3306

logsdb = "logsdb"

user = "root"

password = "never tell you"

con = MySQLdb.connect(host=host, user=user, passwd=password, db=logsdb, port=port, charset="utf8")

return con

从日志库里获取数据,获取当前时间之前2分钟的数据,首消手先,根据当前时间进行计算一下时间。之前,计算有问题,现在已经修改。

def calculate_time():

now = time.mktime(datetime.now().timetuple())-60*2

result = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now))

return result

然后,根据时间和日志级别去日志库查询数据

def get_data():

select_time = calculate_time()

logger.info("select time:"+select_time)

sql = "select file_name,message from logsdb.app_logs_record " \

"where log_time >"+"'"+select_time+"'" \

"and level="+"'ERROR'" \

"order by log_time desc"

conn = get_con()

cursor = conn.cursor()

cursor.execute(sql)

results = cursor.fetchall()

cursor.close()

conn.close()

return results

发送邮件

使用python发送邮件比较简单,使用标准库smtplib就可以

这里使用163邮箱进行发送,你可以使用其他邮箱或者企业邮箱都行,不过host和port要设置正确。

def send_email(content):

sender = "sender_monitor@163.com"

receiver = ["rec01@163.com", "rec02@163.com"]

host = 'smtp.163.com'

port = 465

msg = MIMEText(content)

msg['From'] = "sender_monitor@163.com"

msg['To'] = "rec01@163.com,rec02@163.com"

msg['Subject'] = "system error warning"

try:

smtp = smtplib.SMTP_SSL(host, port)

smtp.login(sender, '123456')

smtp.sendmail(sender, receiver, msg.as_string())

logger.info("send email success")

except Exception, e:

logger.error(e)

定时任务

使用一个单独的线程,每2分钟扫描一次,如果ERROR级别的日志条数超过5条,就发邮件通知。

def task():

while True:

logger.info("monitor running")

results = get_data()

if results is not None and len(results) >5:

content = "recharge error:"

logger.info("a lot of error,so send mail")

for r in results:

content += r[1]+'\n'

send_email(content)

sleep(2*60)

日志

为这个小小的脚本配置一下日志log.py,让日志可以输出到文件和控制台中。

# coding=utf-8

import logging

logger = logging.getLogger('mylogger')

logger.setLevel(logging.DEBUG)

fh = logging.FileHandler('monitor.log')

fh.setLevel(logging.INFO)

ch = logging.StreamHandler()

ch.setLevel(logging.INFO)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

fh.setFormatter(formatter)

ch.setFormatter(formatter)

logger.addHandler(fh)

logger.addHandler(ch)

所以,最后,这个监控小程序就是这样的app_monitor.py

# coding=utf-8

import threading

import MySQLdb

from datetime import datetime

import time

import smtplib

from email.mime.text import MIMEText

from log import logger

def get_con():

host = "127.0.0.1"

port = 3306

logsdb = "logsdb"

user = "root"

password = "never tell you"

con = MySQLdb.connect(host=host, user=user, passwd=password, db=logsdb, port=port, charset="utf8")

return con

def calculate_time():

now = time.mktime(datetime.now().timetuple())-60*2

result = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now))

return result

def get_data():

select_time = calculate_time()

logger.info("select time:"+select_time)

sql = "select file_name,message from logsdb.app_logs_record " \

"where log_time >"+"'"+select_time+"'" \

"and level="+"'ERROR'" \

"order by log_time desc"

conn = get_con()

cursor = conn.cursor()

cursor.execute(sql)

results = cursor.fetchall()

cursor.close()

conn.close()

return results

def send_email(content):

sender = "sender_monitor@163.com"

receiver = ["rec01@163.com", "rec02@163.com"]

host = 'smtp.163.com'

port = 465

msg = MIMEText(content)

msg['From'] = "sender_monitor@163.com"

msg['To'] = "rec01@163.com,rec02@163.com"

msg['Subject'] = "system error warning"

try:

smtp = smtplib.SMTP_SSL(host, port)

smtp.login(sender, '123456')

smtp.sendmail(sender, receiver, msg.as_string())

logger.info("send email success")

except Exception, e:

logger.error(e)

def task():

while True:

logger.info("monitor running")

results = get_data()

if results is not None and len(results) >5:

content = "recharge error:"

logger.info("a lot of error,so send mail")

for r in results:

content += r[1]+'\n'

send_email(content)

time.sleep(2*60)

def run_monitor():

monitor = threading.Thread(target=task)

monitor.start()

if __name__ == "__main__":

run_monitor()

运行脚本

脚本在服务器上运行,使用supervisor进行管理。

在服务器(centos6)上安装supervisor,然后在/etc/supervisor.conf中加入一下配置:

复制代码代码如下:

[program:app-monitor]

command = python /root/monitor/app_monitor.py

directory = /root/monitor

user = root

然后在终端中运行supervisord启动supervisor。

在终端中运行supervisorctl,进入shell,运行status查看脚本的运行状态。

总结

这个小监控思路很清晰,还可以继续修改,比如:监控特定的接口,发送短信通知等等。

因为有日志库,就少了去线上正式环境扫描日志的麻烦,所以,如果没有日志库,就要自己上线上环境扫描,在正式线上环境一定要小心哇~


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12354543.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-24
下一篇 2023-05-24

发表评论

登录后才能评论

评论列表(0条)

保存