读取单个日志#
1. 读全部日志#
处理方法:直接读取整体文件内容,或者逐行读取,返回lines列表
</>
python
1def readlog(logfile):
2 with open(logfile, 'r') as f:
3 return f.read() or f.readlines()2. 读部分日志(限定时间范围)#
需要考虑:日志文件过大,逐行判断时间戳性能过低
处理方法:
- 二分查找起始位置
</>
python
1import datetime
2import os
3
4def bs_read_log(logfile, st, et):
5 text = []
6 file_size = os.path.getsize(logfile)
7
8 with open(logfile, 'r', encoding='utf-8') as f:
9 # 二分查找定位起始位置
10 low, high = 0, file_size
11 start_pos = 0
12
13 while low < high:
14 mid = (low + high) // 2
15 f.seek(mid)
16 f.readline() # 移动到完整行首
17 line = f.readline()
18
19 if not line:
20 break
21
22 line_tm = logline_time(line)
23 if line_tm is None: # 跳过无效行
24 continue
25
26 if line_tm < st:
27 low = mid + 1
28 start_pos = f.tell() # 记录最后有效位置
29 else:
30 high = mid
31
32 # 定位到起始区域
33 f.seek(start_pos)
34 while True:
35 line = f.readline()
36 if not line:
37 break
38
39 line_tm = logline_time(line)
40 if line_tm is None:
41 continue
42
43 if line_tm > et:
44 break
45
46 if st <= line_tm <= et:
47 text.append(line.strip())
48
49 return text- 二分查找起始位置 + 分块检测日志时间
</>
python
1def readlog(logfile, st, et):
2 # 入参检测
3 if st > et:
4 return None
5 # 根据日志大小设置分块大小
6 chunksize = 1024
7 # chunk值初始设置为chunksize 大小,可以在读第一行日志时先判断日志时间是否在(st, et)区间,不在即可直接退出
8 chunk = 1024
9 text = []
10 file_size = os.path.getsize(logfile)
11 with open(logfile, 'rb') as f:
12 # 二分查找定位起始位置
13 low, high = 0, file_size
14 start_pos = 0
15
16 while low < high:
17 mid = (low + high) // 2
18 f.seek(mid)
19 f.readline() # 移动到完整行首
20 line = f.readline()
21
22 if not line:
23 break
24
25 line_tm = logline_time(line)
26 if line_tm is None: # 跳过无效行
27 continue
28
29 if line_tm < st:
30 low = mid + 1
31 start_pos = f.tell() # 记录最后有效位置
32 else:
33 high = mid
34
35 # 定位到起始区域
36 f.seek(start_pos)
37 while(line:=f.readline().decode(errors='ignore')):
38 chunk += 1
39 if chunk >= chunksize:
40 chunk = 1
41 if (tm:=logline_time(line)) != None and tm > et:
42 break
43
44 text.append(line)
45
46 # 如果追求读取范围结束时间准确性,可以从后往前逐行清除区间外日志,不追求结束时间准确性可以不做
47 while logline_time(text[-1]) > et:
48 text.pop(-1)
49 return text3. 反向读日志#
思路:定位到文件尾,逐行读取
核心:
- 定位文件尾:
file.seek(0, 2) - 逐个字符读取,直到换行符yield输出:
buffer = file.read(1) + buffer需要捕获处理file.read的异常if buffer[0] == '\n':yield buffer
</>
python
1def r_read(fp):
2 with open(fp, 'rb') as f:
3 f.seek(0, 2)
4 pos = f.tell()
5 buffer = ''
6 while pos >= 0:
7 f.seek(pos)
8 try:
9 c = f.read(1)
10 except:
11 print('decode error, pass')
12 c = ''
13 if c == '\n':
14 yield buffer
15 buffer = ''
16 else:
17 buffer = c + buffer
18 pos -= 1
19 # 边界情况
20 yield buffer
21
22for line in r_read():
23 # 满足条件退出
24 if condition(line):
25 break
26else:
27 # 未找到符合条件的日志
28 print(f'NA')批量读取日志#
- 需求:从一批log文件中,读取指定时间范围的日志内容
- 思路:
- Path捕获全部符合指定文件名模式的日志
- 通过日志名中的时间命名规则,过滤时间范围外的日志文件
- 排序,逐个读取单个日志文件,日志合并
</>
python
1def batch_read_log(logpath, timerange):
2 text = []
3 logpattern = f"*.log"
4 get_log_time = lambda log:timestr2ts(log.stem.split('-')[-1].split('.')[0], '%Y_%m_%d_%H_%M_%S')
5 logfiles = sorted(Path(logpath).rglob(logpattern), key=lambda p:p.stem)
6 lognum = len(logfiles)
7 logger.info(f'Reading logs (total {lognum})...')
8 i = 0
9 for i, log in enumerate(logfiles):
10 if get_log_time(log) >= timerange[0]:
11 break
12
13 readlogs = []
14 while i < lognum and (logtext:=readlog(logfiles[i], timerange[0], timerange[1])):
15 if logfiles[i].name in readlogs:
16 i += 1
17 continue
18 logger.debug(f'reading {logfiles[i].name}')
19 text.extend(logtext)
20 readlogs.append(logfiles[i].name)
21 i += 1
22
23 size = len(text)
24 if size == 0:
25 logger.warning('read 0 logs')
26 return text
27 logbegin = logline_time(text[0])[1]
28 while (logend:=logline_time(text[-1])) == -1:
29 text.pop(-1)
30 logger.info(f'read {len(readlogs):>2} logs [ {logbegin[:19]} ~ {logend[1][:19]} ] size:{size}')
31 return texttodo#
优化:只让日志时间在边界的日志文件在单个读取时传入时间范围,其余直接跳过时间范围判断快速读取