跳过正文
  1. Teches/
  2. 程序语言/
  3. python/
  4. 程序功能/
  5. 日志分析/

读日志

·626 字·3 分钟
目录

读取单个日志
#

1. 读全部日志
#

处理方法:直接读取整体文件内容,或者逐行读取,返回lines列表

</> python
1def readlog(logfile):
2    with open(logfile, 'r') as f:
3        return f.read() or f.readlines()

2. 读部分日志(限定时间范围)
#

需要考虑:日志文件过大,逐行判断时间戳性能过低
处理方法:

  1. 二分查找起始位置
</> python
 1import datetime
 2import os
 3
 4def bs_read_log(logfile, st, et):
 5    text = []
 6    file_size = os.path.getsize(logfile)
 7    
 8    with open(logfile, 'r', encoding='utf-8') as f:
 9        # 二分查找定位起始位置
10        low, high = 0, file_size
11        start_pos = 0
12        
13        while low < high:
14            mid = (low + high) // 2
15            f.seek(mid)
16            f.readline()  # 移动到完整行首
17            line = f.readline()
18            
19            if not line:
20                break
21                
22            line_tm = logline_time(line)
23            if line_tm is None:  # 跳过无效行
24                continue
25                
26            if line_tm < st:
27                low = mid + 1
28                start_pos = f.tell()  # 记录最后有效位置
29            else:
30                high = mid
31        
32        # 定位到起始区域
33        f.seek(start_pos)
34        while True:
35            line = f.readline()
36            if not line:
37                break
38                
39            line_tm = logline_time(line)
40            if line_tm is None:
41                continue
42                
43            if line_tm > et:
44                break
45                
46            if st <= line_tm <= et:
47                text.append(line.strip())
48    
49    return text
  1. 二分查找起始位置 + 分块检测日志时间
</> python
 1def readlog(logfile, st, et):
 2    # 入参检测
 3    if st > et:
 4        return None
 5    # 根据日志大小设置分块大小
 6    chunksize = 1024
 7    # chunk值初始设置为chunksize 大小,可以在读第一行日志时先判断日志时间是否在(st, et)区间,不在即可直接退出
 8    chunk = 1024
 9    text = []
10    file_size = os.path.getsize(logfile)
11    with open(logfile, 'rb') as f:
12        # 二分查找定位起始位置
13        low, high = 0, file_size
14        start_pos = 0
15        
16        while low < high:
17            mid = (low + high) // 2
18            f.seek(mid)
19            f.readline()  # 移动到完整行首
20            line = f.readline()
21            
22            if not line:
23                break
24                
25            line_tm = logline_time(line)
26            if line_tm is None:  # 跳过无效行
27                continue
28                
29            if line_tm < st:
30                low = mid + 1
31                start_pos = f.tell()  # 记录最后有效位置
32            else:
33                high = mid
34
35        # 定位到起始区域
36        f.seek(start_pos)
37        while(line:=f.readline().decode(errors='ignore')):
38            chunk += 1
39            if chunk >= chunksize:
40                chunk = 1
41                if (tm:=logline_time(line)) != None and tm > et:
42                    break
43
44            text.append(line)
45
46    # 如果追求读取范围结束时间准确性,可以从后往前逐行清除区间外日志,不追求结束时间准确性可以不做
47    while logline_time(text[-1]) > et:
48        text.pop(-1)
49    return text

3. 反向读日志
#

思路:定位到文件尾,逐行读取
核心:

  1. 定位文件尾:
    file.seek(0, 2)
  2. 逐个字符读取,直到换行符yield输出:
    1. buffer = file.read(1) + buffer 需要捕获处理file.read的异常
    2. if buffer[0] == '\n':yield buffer
</> python
 1def r_read(fp):
 2    with open(fp, 'rb') as f:
 3        f.seek(0, 2)
 4        pos = f.tell()
 5        buffer = ''
 6        while pos >= 0:
 7            f.seek(pos)
 8            try:
 9                c = f.read(1)
10            except:
11                print('decode error, pass')
12                c = ''
13            if c == '\n':
14                yield buffer
15                buffer = ''
16            else:
17                buffer = c + buffer
18            pos -= 1
19        # 边界情况
20        yield buffer
21
22for line in r_read():
23    # 满足条件退出
24    if condition(line):
25        break
26else:
27    # 未找到符合条件的日志
28    print(f'NA')

批量读取日志
#

  1. 需求:从一批log文件中,读取指定时间范围的日志内容
  2. 思路:
    1. Path捕获全部符合指定文件名模式的日志
    2. 通过日志名中的时间命名规则,过滤时间范围外的日志文件
    3. 排序,逐个读取单个日志文件,日志合并
</> python
 1def batch_read_log(logpath, timerange):
 2    text = []
 3    logpattern = f"*.log"
 4    get_log_time = lambda log:timestr2ts(log.stem.split('-')[-1].split('.')[0], '%Y_%m_%d_%H_%M_%S')
 5    logfiles = sorted(Path(logpath).rglob(logpattern), key=lambda p:p.stem)
 6    lognum = len(logfiles)
 7    logger.info(f'Reading logs (total {lognum})...')
 8    i = 0
 9    for i, log in enumerate(logfiles):
10        if get_log_time(log) >= timerange[0]:
11            break
12        
13    readlogs = []
14    while i < lognum and (logtext:=readlog(logfiles[i], timerange[0], timerange[1])):
15        if logfiles[i].name in readlogs:
16            i += 1
17            continue
18        logger.debug(f'reading {logfiles[i].name}')
19        text.extend(logtext)
20        readlogs.append(logfiles[i].name)
21        i += 1
22
23    size = len(text)
24    if size == 0:
25        logger.warning('read 0 logs')
26        return text
27    logbegin = logline_time(text[0])[1]
28    while (logend:=logline_time(text[-1])) == -1:
29        text.pop(-1)
30    logger.info(f'read {len(readlogs):>2} logs [ {logbegin[:19]} ~ {logend[1][:19]} ]  size:{size}')
31    return text

todo
#

优化:只让日志时间在边界的日志文件在单个读取时传入时间范围,其余直接跳过时间范围判断快速读取