Python入门到实战之Python 用 5000 行代码,实现强大的 logging 模块
小职 2021-03-11 来源 : 阅读 590 评论 0

摘要:本文主要介绍了Python入门到实战之Python 用 5000 行代码,实现强大的 logging 模块,通过具体的内容向大家展现,希望对大家Python的学习有所帮助。

本文主要介绍了Python入门到实战之Python 用 5000 行代码,实现强大的 logging 模块,通过具体的内容向大家展现,希望对大家Python的学习有所帮助。

Python入门到实战之Python 用 5000 行代码,实现强大的 logging 模块

Python 的 logging 模块实现了灵活的日志系统。整个模块仅仅 3 个类,不到 5000 行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:

 

logging 简介

 

logging API 设计

 

记录器对象 Logger

 

日志记录对象 LogRecord

 

处理器对象 Hander

 

格式器对象 Formatter

 

滚动日志文件处理器

 

小结

 

小技巧

 

logging 简介

本次代码使用的是 python 3.8.5 的版本,官方中文文档 3.8.8 。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。

 Python入门到实战之Python 用 5000 行代码,实现强大的 logging 模块

 

我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。

 

logging API 设计

先看看日志使用:

 

import logging

 

logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s')

lang = {"name": "python", "age":20}

logging.info('This is a info message %s', lang)

logging.debug('This is a debug message')

logging.warning('This is a warning message')

 

logger = logging.getLogger(__name__)

logger.warning('This is a warning')

输出内容如下:

 

INFO     root       2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20}

WARNING  root       2021-03-04 00:03:53,473 This is a warning message

WARNING  __main__   2021-03-04 00:03:53,473 This is a warning

可以看到 logging 的使用非常方便,模块直接提供了一组API。

 

root = RootLogger(WARNING)  # 默认提供的logger

Logger.root = root

Logger.manager = Manager(Logger.root)

 

def debug(msg, *args, **kwargs): # info,warning等api类似

    if len(root.handlers) == 0:

        basicConfig()  # 默认配置

    root.debug(msg, *args, **kwargs)

 

def getLogger(name=None):

    if name:

        return Logger.manager.getLogger(name)  # 创建特定的logger

    else:

        return root  # 返回默认的logger

这种API的提供方式,我们在 requests 中也有看到。api中很重要的设置config的方式:

 

def basicConfig(**kwargs):

    ...

    if handlers is None:

        filename = kwargs.pop("filename", None)

        mode = kwargs.pop("filemode", 'a')

        if filename:

            h = FileHandler(filename, mode)

        else:

            stream = kwargs.pop("stream", None)

            h = StreamHandler(stream)  # 默认的handler

        handlers = [h]

    dfs = kwargs.pop("datefmt", None)

    style = kwargs.pop("style", '%')

    fs = kwargs.pop("format", _STYLES[style][1])

    fmt = Formatter(fs, dfs, style)  # 生成formatter

    for h in handlers:

        if h.formatter is None:

            h.setFormatter(fmt)

        root.addHandler(h)  # 设置root的handler

    level = kwargs.pop("level", None)

    if level is not None:

        root.setLevel(level)  # 设置日志级别

可以看到,日志的配置主要包括下面几项:

 

level 日志级别

 

format 信息格式化模版

 

filename 输出到文件

 

datefmt %Y-%m-%d %H:%M:%S,uuu 时间的格式模版

style [ % , { ,$] 格式样板

演示代码输出中,可以看到debug日志没有显示,是因为 debug < info :

 

CRITICAL = 50

FATAL = CRITICAL

ERROR = 40

WARNING = 30

WARN = WARNING

INFO = 20

DEBUG = 10

NOTSET = 0

记录器对象 Logger

查看Logger之前,先看logger对象的管理类Manager

 

_loggerClass = Logger

 

class Manager(object):

    def __init__(self, rootnode):

        self.root = rootnode

        self.disable = 0

        self.loggerDict = {}  # 所有日志记录对象的字典

    ...

    def getLogger(self, name):

        rv = None

        if name in self.loggerDict:

            rv = self.loggerDict[name]  # 获取已经创建过的同名logger

            ...

        else:

            rv = (self.loggerClass or _loggerClass)(name)  # 创建新的logger

            rv.manager = self

            self.loggerDict[name] = rv

            ...

        return rv

日志过滤器

 

class Filterer(object):

 

    def __init__(self):

        self.filters = []

 

    def addFilter(self, filter):

        self.filters.append(filter)

 

    def removeFilter(self, filter):

        self.filters.remove(filter)

 

    def filter(self, record):

        rv = True

        for f in self.filters:  # 过滤日志

            if hasattr(f, 'filter'):

                result = f.filter(record)

            else:

                result = f(record) # assume callable - will raise if not

            if not result:

                rv = False

                break

        return r

核心的 Logger 实际上只是一个控制中心:

 

class Logger(Filterer):  # logger可以过滤日志

    def __init__(self, name, level=NOTSET):

        Filterer.__init__(self)

        self.name = name

        self.level = _checkLevel(level)

        self.parent = None  # 日志可以有层级

        self.propagate = True

        self.handlers = []  # 可以输出到多个handler

        self.disabled = False  # 可以关闭

        self._cache = {}

     

    def debug(self, msg, *args, **kwargs):  # 输出debug日志

        if self.isEnabledFor(DEBUG):

            self._log(DEBUG, msg, args, **kwargs)

logger可以判断日志级别:

 

def isEnabledFor(self, level):

    if self.disabled:

        return False

 

    try:

        return self._cache[level]

    except KeyError:

        try:

            if self.manager.disable >= level:

                is_enabled = self._cache[level] = False

            else:

                is_enabled = self._cache[level] = (

                    level >= self.getEffectiveLevel()

                )

        return is_enabled

 

def getEffectiveLevel(self):

    logger = self

    while logger:

        if logger.level:

            return logger.level

        logger = logger.parent

    return NOTSET

日志输出:

 

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,

         stacklevel=1):

    ...

    fn, lno, func = "(unknown file)", 0, "(unknown function)"

    ...

    # 生成日志记录

    record = self.makeRecord(self.name, level, fn, lno, msg, args,

                             exc_info, func, extra, sinfo)

    # 使用handler处理日志

    self.handle(record)

日志记录的生产,就是创建一个LogRecord对象:

 

_logRecordFactory = LogRecord

 

def makeRecord(self, name, level, fn, lno, msg, args, exc_info,

               func=None, extra=None, sinfo=None):

    ...

    rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,

                         sinfo)

    ...

    return rv

使用logger对象的所有handler处理日志:

 

def handle(self, record):

    c = self

    found = 0

    while c:

        for hdlr in c.handlers:  # 使用所有的handler处理日志

            found = found + 1

            if record.levelno >= hdlr.level:

                hdlr.handle(record)

root-logger的handler是在config中配置的:

 

def basicConfig(**kwargs):

    ...

    root.addHandler(h)  # 设置root的handler

日志记录对象 LogRecord

日志记录对象非常简单:

 

class LogRecord(object):

    def __init__(self, name, level, pathname, lineno,

                 msg, args, exc_info, func=None, sinfo=None, **kwargs):

        ct = time.time()

        self.name = name  # logger名称

        self.msg = msg  # 日志标识信息

        ...

        self.args = args  # 变量

        self.levelname = getLevelName(level)

        ...

     

    def getMessage(self):

        msg = str(self.msg)

        if self.args:

            msg = msg % self.args  # 格式化消息

        return msg

处理器对象 Hander

顶级Handler定义了Handler的模版方法

 

class Handler(Filterer):  # 处理器也可以过滤日志

    def __init__(self, level=NOTSET):

        Filterer.__init__(self)

        self._name = None

        self.level = _checkLevel(level)  # handler也有日志级别

        self.formatter = None

        _addHandlerRef(self)

        self.createLock()

         

    def handle(self, record):  # 处理日志

        rv = self.filter(record)  # 过滤日志

        if rv:

            self.acquire()  # 申请锁

            try:

                self.emit(record)  # 提交记录,由不同子类实现  

            finally:

                self.release()  # 释放锁

        return rv

默认的console流 StreamHandler

 

class StreamHandler(Handler):

 

    terminator = '\n'  # 自动换行

 

    def __init__(self, stream=None):

        Handler.__init__(self)

        if stream is None:

            stream = sys.stderr  # 默认使用stderr输出

        self.stream = stream

     

    def emit(self, record):

        try:

            msg = self.format(record)  # 格式化日志记录

            stream = self.stream

            stream.write(msg + self.terminator)  # 写日志

            self.flush()  # 刷新写缓存

        except Exception:

            ...

     

    def format(self, record):

        if self.formatter:

            fmt = self.formatter

        else:

            fmt = _defaultFormatter

        return fmt.format(record)  # 使用格式化器格式化日志记录

为什么使用stderr,可以看下面的测试中的输出都是到console:

 

print("haha")

print("fatal error", file=sys.stderr)

sys.stderr.write("fatal error\n")

格式器对象 Formatter

格式化器主要使用Formatter和Style实现

 

class Formatter(object):

    def __init__(self, fmt=None, datefmt=None, style='%', validate=True):

        self._style = _STYLES[style][0](fmt)

        self._fmt = self._style._fmt

        self.datefmt = datefmt

     

    def format(self, record):

        record.message = record.getMessage()

        s = self.formatMessage(record)

        return s

         

    def formatMessage(self, record):

        return self._style.format(record)  # 格式化

Style类

 

class PercentStyle(object):

 

    default_format = '%(message)s'

    asctime_format = '%(asctime)s'

    asctime_search = '%(asctime)'

    validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I)

 

    def __init__(self, fmt):

        self._fmt = fmt or self.default_format

 

    def usesTime(self):

        return self._fmt.find(self.asctime_search) >= 0

 

    def validate(self):

        """Validate the input format, ensure it matches the correct style"""

        if not self.validation_pattern.search(self._fmt):

            raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0]))

 

    def _format(self, record):

        return self._fmt % record.__dict__  # 格式化日志记录对象

 

    def format(self, record):

        try:

            return self._format(record)

        except KeyError as e:

            raise ValueError('Formatting field not found in record: %s' % e)

滚动日志文件处理器

线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:

 

class BaseRotatingHandler(logging.FileHandler):

    def emit(self, record):

        try:

            if self.shouldRollover(record): # 判断是否需要滚动

                self.doRollover()  # 滚动日志

            logging.FileHandler.emit(self, record)  # 输出日志

        except Exception:

            self.handleError(record)

     

    def rotate(self, source, dest):

        if not callable(self.rotator):

            if os.path.exists(source):

                os.rename(source, dest)  # 重命名日志文件

        else:

            self.rotator(source, dest)

按大小滚动 RotatingFileHandler

按照文件大小滚动的处理器:

 

class RotatingFileHandler(BaseRotatingHandler):

 

    def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):

        if maxBytes > 0:

            mode = 'a'

        BaseRotatingHandler.__init__(self, filename, mode, encoding, delay)

        self.maxBytes = maxBytes  # 单个文件大小上限

        self.backupCount = backupCount  # 日志备份数量

         

    def doRollover(self):  # 执行滚动

        if self.stream:

            self.stream.close()  # 关闭当前的流

            self.stream = None

        if self.backupCount > 0:

            for i in range(self.backupCount - 1, 0, -1):

                sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))

                dfn = self.rotation_filename("%s.%d" % (self.baseFilename,

                                                        i + 1))

                if os.path.exists(sfn):

                    if os.path.exists(dfn):

                        os.remove(dfn)

                    os.rename(sfn, dfn)

            dfn = self.rotation_filename(self.baseFilename + ".1")

            if os.path.exists(dfn):

                os.remove(dfn)

            self.rotate(self.baseFilename, dfn)  # 重命名文件

        if not self.delay:

            self.stream = self._open()  # 如果shouldRollover延迟,可以打开新的流

 

    def shouldRollover(self, record):  # 判断是否需要滚动

        if self.stream is None:  # 立即打开流

            self.stream = self._open()

        if self.maxBytes > 0:    

            msg = "%s\n" % self.format(record)

            self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature

            if self.stream.tell() + len(msg) >= self.maxBytes:  # 判断大小

                return 1

        return 0

文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。

 

按照日期滚动 TimedRotatingFileHandler

按照日期滚动的处理器:

 

class TimedRotatingFileHandler(BaseRotatingHandler):

    def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):

        BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)

        self.when = when.upper()

        self.backupCount = backupCount

        self.utc = utc

        self.atTime = atTime

        # 日期设置,支持多种方式

        if self.when == 'S':

            self.interval = 1 # one second

            self.suffix = "%Y-%m-%d_%H-%M-%S"

            self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"

        ...

 

        self.extMatch = re.compile(self.extMatch, re.ASCII)

        self.interval = self.interval * interval # multiply by units requested

        filename = self.baseFilename

        if os.path.exists(filename):

            t = os.stat(filename)[ST_MTIME]  # 最后修改时间

        else:

            t = int(time.time())

        self.rolloverAt = self.computeRollover(t)  # 提前计算终止时间

 

    def computeRollover(self, currentTime):

        # 判断的方法还是很长很复杂的,先pass

 

    def shouldRollover(self, record):

        t = int(time.time())

        if t >= self.rolloverAt:  # 判断是否到期

            return 1

        return 0

 

    def doRollover(self):

        ...

        dfn = self.rotation_filename(self.baseFilename + "." +

                                     time.strftime(self.suffix, timeTuple))

        #  滚动日志文件

        if os.path.exists(dfn):

            os.remove(dfn)

        self.rotate(self.baseFilename, dfn)

        if self.backupCount > 0:

            for s in self.getFilesToDelete():

                os.remove(s)

        ...

        # 计算下一个时间点

        newRolloverAt = self.computeRollover(currentTime)

        ...

        self.rolloverAt = newRolloverAt

日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。

 

小结

logging的处理逻辑大概是这样的:

 

创建Logger对象,提供API,用来接收应用程序日志

 

Logger对象包括多个Handler

 

每个Handler有一个Formatter对象

 

每条日志都会生成一个LogRecord对象

 

使用不同的Handler对象将LogRecored对象提交到不同的流

 

每个日志对象通过Formatter格式化输出

 

可以使用按日期/文件大小的方式进行日志文件的滚动记录

 

小技巧

覆盖对象的 __reduce__ 方法,让对象支持 reduce 函数:

 

class RootLogger(Logger):

    def __init__(self, level):

        Logger.__init__(self, "root", level)

 

    def __reduce__(self):

        return getLogger, ()

线程锁的创建和释放:

 

_lock = threading.RLock()

 

def _acquireLock():

    if _lock:

        _lock.acquire()

 

def _releaseLock():

    if _lock:

        _lock.release()

线程锁的使用:

 

def addHandler(self, hdlr):

    _acquireLock()

    try:

        self.handlers.append(hdlr)

    finally:

        _releaseLock()

 

def removeHandler(self, hdlr):

    _acquireLock()

    try:

        self.handlers.remove(hdlr)

    finally:

        _releaseLock() 



我是小职,记得找我

✅ 解锁高薪工作

✅ 免费获取学习教程,开发工具,代码大全,参考书籍

Python入门到实战之Python 用 5000 行代码,实现强大的 logging 模块

本文由 @小职 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程