怎么用Python实现强大的logging模块

本篇内容介绍了“怎么用Python实现强大的 logging 模块”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

目前成都创新互联已为千余家的企业提供了网站建设、域名、雅安服务器托管成都网站托管、企业网站设计、云浮网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

Python 的 logging 模块实现了灵活的日志系统。整个模块仅仅 3 个类,不到 5000 行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:

  • logging 简介

  • logging API 设计

  • 记录器对象 Logger

  • 日志记录对象 LogRecord

  • 处理器对象 Hander

  • 格式器对象 Formatter

  • 滚动日志文件处理器

  • 小结

  • 小技巧

logging 简介

本次代码使用的是 python 3.8.5 的版本,官方中文文档 3.8.8 。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。

功能
logging-modulelogging的API
Logger日志记录器对象类,可以创建一个对象用来记录日志
LogRecord日志记录对象,每条日志记录都封装成一个日志记录对象
Hander处理器对象,负责日志输出到流/文件的控制
Formatter格式器,负责日志记录的格式化
RotatingFileHandler按大小滚动的日志文件记录器
TimedRotatingFileHandler按时间滚动的日志文件处理器

我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。

logging API 设计

先看看日志使用:

import logging  logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s') lang = {"name": "python", "age":20} logging.info('This is a info message %s', lang) logging.debug('This is a debug message') logging.warning('This is a warning message')  logger = logging.getLogger(__name__) logger.warning('This is a warning')

输出内容如下:

INFO     root       2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20} WARNING  root       2021-03-04 00:03:53,473 This is a warning message WARNING  __main__   2021-03-04 00:03:53,473 This is a warning

可以看到 logging 的使用非常方便,模块直接提供了一组API。

root = RootLogger(WARNING)  # 默认提供的logger Logger.root = root Logger.manager = Manager(Logger.root)  def debug(msg, *args, **kwargs): # info,warning等api类似     if len(root.handlers) == 0:         basicConfig()  # 默认配置     root.debug(msg, *args, **kwargs)  def getLogger(name=None):     if name:         return Logger.manager.getLogger(name)  # 创建特定的logger     else:         return root  # 返回默认的logger

这种API的提供方式,我们在 requests 中也有看到。api中很重要的设置config的方式:

def basicConfig(**kwargs):     ...     if handlers is None:         filename = kwargs.pop("filename", None)         mode = kwargs.pop("filemode", 'a')         if filename:             h = FileHandler(filename, mode)         else:             stream = kwargs.pop("stream", None)             h = StreamHandler(stream)  # 默认的handler         handlers = [h]     dfs = kwargs.pop("datefmt", None)     style = kwargs.pop("style", '%')     fs = kwargs.pop("format", _STYLES[style][1])     fmt = Formatter(fs, dfs, style)  # 生成formatter     for h in handlers:         if h.formatter is None:             h.setFormatter(fmt)         root.addHandler(h)  # 设置root的handler     level = kwargs.pop("level", None)     if level is not None:         root.setLevel(level)  # 设置日志级别

可以看到,日志的配置主要包括下面几项:

  • level 日志级别

  • format 信息格式化模版

  • filename 输出到文件

  • datefmt %Y-%m-%d %H:%M:%S,uuu 时间的格式模版

  • style [ % , { ,$] 格式样板

演示代码输出中,可以看到debug日志没有显示,是因为 debug < info :

CRITICAL = 50 FATAL = CRITICAL ERROR = 40 WARNING = 30 WARN = WARNING INFO = 20 DEBUG = 10 NOTSET = 0

记录器对象 Logger

查看Logger之前,先看logger对象的管理类Manager

_loggerClass = Logger  class Manager(object):     def __init__(self, rootnode):         self.root = rootnode         self.disable = 0         self.loggerDict = {}  # 所有日志记录对象的字典     ...     def getLogger(self, name):         rv = None         if name in self.loggerDict:             rv = self.loggerDict[name]  # 获取已经创建过的同名logger             ...         else:             rv = (self.loggerClass or _loggerClass)(name)  # 创建新的logger             rv.manager = self             self.loggerDict[name] = rv             ...         return rv

日志过滤器

class Filterer(object):      def __init__(self):         self.filters = []      def addFilter(self, filter):         self.filters.append(filter)      def removeFilter(self, filter):         self.filters.remove(filter)      def filter(self, record):         rv = True         for f in self.filters:  # 过滤日志             if hasattr(f, 'filter'):                 result = f.filter(record)             else:                 result = f(record) # assume callable - will raise if not             if not result:                 rv = False                 break         return r

核心的 Logger 实际上只是一个控制中心:

class Logger(Filterer):  # logger可以过滤日志     def __init__(self, name, level=NOTSET):         Filterer.__init__(self)         self.name = name         self.level = _checkLevel(level)         self.parent = None  # 日志可以有层级         self.propagate = True         self.handlers = []  # 可以输出到多个handler         self.disabled = False  # 可以关闭         self._cache = {}          def debug(self, msg, *args, **kwargs):  # 输出debug日志         if self.isEnabledFor(DEBUG):             self._log(DEBUG, msg, args, **kwargs)

logger可以判断日志级别:

def isEnabledFor(self, level):     if self.disabled:         return False      try:         return self._cache[level]     except KeyError:         try:             if self.manager.disable >= level:                 is_enabled = self._cache[level] = False             else:                 is_enabled = self._cache[level] = (                     level >= self.getEffectiveLevel()                 )         return is_enabled  def getEffectiveLevel(self):     logger = self     while logger:         if logger.level:             return logger.level         logger = logger.parent     return NOTSET

日志输出:

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,          stacklevel=1):     ...     fn, lno, func = "(unknown file)", 0, "(unknown function)"     ...     # 生成日志记录     record = self.makeRecord(self.name, level, fn, lno, msg, args,                              exc_info, func, extra, sinfo)     # 使用handler处理日志     self.handle(record)

日志记录的生产,就是创建一个LogRecord对象:

_logRecordFactory = LogRecord  def makeRecord(self, name, level, fn, lno, msg, args, exc_info,                func=None, extra=None, sinfo=None):     ...     rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,                          sinfo)     ...     return rv

使用logger对象的所有handler处理日志:

def handle(self, record):     c = self     found = 0     while c:         for hdlr in c.handlers:  # 使用所有的handler处理日志             found = found + 1             if record.levelno >= hdlr.level:                 hdlr.handle(record)

root-logger的handler是在config中配置的:

def basicConfig(**kwargs):     ...     root.addHandler(h)  # 设置root的handler

日志记录对象 LogRecord

日志记录对象非常简单:

class LogRecord(object):     def __init__(self, name, level, pathname, lineno,                  msg, args, exc_info, func=None, sinfo=None, **kwargs):         ct = time.time()         self.name = name  # logger名称         self.msg = msg  # 日志标识信息         ...         self.args = args  # 变量         self.levelname = getLevelName(level)         ...          def getMessage(self):         msg = str(self.msg)         if self.args:             msg = msg % self.args  # 格式化消息         return msg

处理器对象 Hander

顶级Handler定义了Handler的模版方法

class Handler(Filterer):  # 处理器也可以过滤日志     def __init__(self, level=NOTSET):         Filterer.__init__(self)         self._name = None         self.level = _checkLevel(level)  # handler也有日志级别         self.formatter = None         _addHandlerRef(self)         self.createLock()              def handle(self, record):  # 处理日志         rv = self.filter(record)  # 过滤日志         if rv:             self.acquire()  # 申请锁             try:                 self.emit(record)  # 提交记录,由不同子类实现              finally:                 self.release()  # 释放锁         return rv

默认的console流 StreamHandler

class StreamHandler(Handler):      terminator = '\n'  # 自动换行      def __init__(self, stream=None):         Handler.__init__(self)         if stream is None:             stream = sys.stderr  # 默认使用stderr输出         self.stream = stream          def emit(self, record):         try:             msg = self.format(record)  # 格式化日志记录             stream = self.stream             stream.write(msg + self.terminator)  # 写日志             self.flush()  # 刷新写缓存         except Exception:             ...          def format(self, record):         if self.formatter:             fmt = self.formatter         else:             fmt = _defaultFormatter         return fmt.format(record)  # 使用格式化器格式化日志记录

为什么使用stderr,可以看下面的测试中的输出都是到console:

print("haha") print("fatal error", file=sys.stderr) sys.stderr.write("fatal error\n")

格式器对象 Formatter

格式化器主要使用Formatter和Style实现

class Formatter(object):     def __init__(self, fmt=None, datefmt=None, style='%', validate=True):         self._style = _STYLES[style][0](fmt)         self._fmt = self._style._fmt         self.datefmt = datefmt          def format(self, record):         record.message = record.getMessage()         s = self.formatMessage(record)         return s              def formatMessage(self, record):         return self._style.format(record)  # 格式化

Style类

class PercentStyle(object):      default_format = '%(message)s'     asctime_format = '%(asctime)s'     asctime_search = '%(asctime)'     validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I)      def __init__(self, fmt):         self._fmt = fmt or self.default_format      def usesTime(self):         return self._fmt.find(self.asctime_search) >= 0      def validate(self):         """Validate the input format, ensure it matches the correct style"""         if not self.validation_pattern.search(self._fmt):             raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0]))      def _format(self, record):         return self._fmt % record.__dict__  # 格式化日志记录对象      def format(self, record):         try:             return self._format(record)         except KeyError as e:             raise ValueError('Formatting field not found in record: %s' % e)

滚动日志文件处理器

线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:

class BaseRotatingHandler(logging.FileHandler):     def emit(self, record):         try:             if self.shouldRollover(record): # 判断是否需要滚动                 self.doRollover()  # 滚动日志             logging.FileHandler.emit(self, record)  # 输出日志         except Exception:             self.handleError(record)          def rotate(self, source, dest):         if not callable(self.rotator):             if os.path.exists(source):                 os.rename(source, dest)  # 重命名日志文件         else:             self.rotator(source, dest)

按大小滚动 RotatingFileHandler

按照文件大小滚动的处理器:

class RotatingFileHandler(BaseRotatingHandler):      def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):         if maxBytes > 0:             mode = 'a'         BaseRotatingHandler.__init__(self, filename, mode, encoding, delay)         self.maxBytes = maxBytes  # 单个文件大小上限         self.backupCount = backupCount  # 日志备份数量              def doRollover(self):  # 执行滚动         if self.stream:             self.stream.close()  # 关闭当前的流             self.stream = None         if self.backupCount > 0:             for i in range(self.backupCount - 1, 0, -1):                 sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))                 dfn = self.rotation_filename("%s.%d" % (self.baseFilename,                                                         i + 1))                 if os.path.exists(sfn):                     if os.path.exists(dfn):                         os.remove(dfn)                     os.rename(sfn, dfn)             dfn = self.rotation_filename(self.baseFilename + ".1")             if os.path.exists(dfn):                 os.remove(dfn)             self.rotate(self.baseFilename, dfn)  # 重命名文件         if not self.delay:             self.stream = self._open()  # 如果shouldRollover延迟,可以打开新的流      def shouldRollover(self, record):  # 判断是否需要滚动         if self.stream is None:  # 立即打开流             self.stream = self._open()         if self.maxBytes > 0:                msg = "%s\n" % self.format(record)             self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature             if self.stream.tell() + len(msg) >= self.maxBytes:  # 判断大小                 return 1         return 0

文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。

按照日期滚动 TimedRotatingFileHandler

按照日期滚动的处理器:

class TimedRotatingFileHandler(BaseRotatingHandler):     def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):         BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)         self.when = when.upper()         self.backupCount = backupCount         self.utc = utc         self.atTime = atTime         # 日期设置,支持多种方式         if self.when == 'S':             self.interval = 1 # one second             self.suffix = "%Y-%m-%d_%H-%M-%S"             self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"         ...          self.extMatch = re.compile(self.extMatch, re.ASCII)         self.interval = self.interval * interval # multiply by units requested         filename = self.baseFilename         if os.path.exists(filename):             t = os.stat(filename)[ST_MTIME]  # 最后修改时间         else:             t = int(time.time())         self.rolloverAt = self.computeRollover(t)  # 提前计算终止时间      def computeRollover(self, currentTime):         # 判断的方法还是很长很复杂的,先pass      def shouldRollover(self, record):         t = int(time.time())         if t >= self.rolloverAt:  # 判断是否到期             return 1         return 0      def doRollover(self):         ...         dfn = self.rotation_filename(self.baseFilename + "." +                                      time.strftime(self.suffix, timeTuple))         #  滚动日志文件         if os.path.exists(dfn):             os.remove(dfn)         self.rotate(self.baseFilename, dfn)         if self.backupCount > 0:             for s in self.getFilesToDelete():                 os.remove(s)         ...         # 计算下一个时间点         newRolloverAt = self.computeRollover(currentTime)         ...         self.rolloverAt = newRolloverAt

日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。

小结

logging的处理逻辑大概是这样的:

  • 创建Logger对象,提供API,用来接收应用程序日志

  • Logger对象包括多个Handler

  • 每个Handler有一个Formatter对象

  • 每条日志都会生成一个LogRecord对象

  • 使用不同的Handler对象将LogRecored对象提交到不同的流

  • 每个日志对象通过Formatter格式化输出

  • 可以使用按日期/文件大小的方式进行日志文件的滚动记录

小技巧

覆盖对象的 __reduce__ 方法,让对象支持 reduce 函数:

class RootLogger(Logger):     def __init__(self, level):         Logger.__init__(self, "root", level)      def __reduce__(self):         return getLogger, ()

线程锁的创建和释放:

_lock = threading.RLock()  def _acquireLock():     if _lock:         _lock.acquire()  def _releaseLock():     if _lock:         _lock.release()

线程锁的使用:

def addHandler(self, hdlr):     _acquireLock()     try:         self.handlers.append(hdlr)     finally:         _releaseLock()  def removeHandler(self, hdlr):     _acquireLock()     try:         self.handlers.remove(hdlr)     finally:         _releaseLock()

参考链接

  • Logging in Python https://realpython.com/python-logging/

  • 日志操作手册 https://docs.python.org/zh-cn/3.8/howto/logging-cookbook.html#cookbook-rotator-namer

  • Python 的日志记录工具 https://docs.python.org/zh-cn/3.8/library/logging.html

“怎么用Python实现强大的 logging 模块”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


网站栏目:怎么用Python实现强大的logging模块
网页网址:http://myzitong.com/article/gseces.html