本文原载于我的博客:https://ziyang.moe/article/mydb4.html
本章涉及代码都在 https://github.com/CN-GuoZiyang/MYDB/tree/master/src/main/java/top/guoziyang/mydb/backend/dm/logger 和 https://github.com/CN-GuoZiyang/MYDB/blob/master/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java 中。
前言MYDB 提供了崩溃后的数据恢复功能。DM 层在每次对底层数据 *** 作时,都会记录一条日志到磁盘上。在数据库奔溃之后,再次启动时,可以根据日志的内容,恢复数据文件,保证其一致性。
日志读写日志的二进制文件,按照如下的格式进行排布:
[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]
其中 XChecksum 是一个四字节的整数,是对后续所有日志计算的校验和。Log1 ~ LogN 是常规的日志数据,BadTail 是在数据库崩溃时,没有来得及写完的日志数据,这个 BadTail 不一定存在。
每条日志的格式如下:
[Size][Checksum][Data]
其中,Size 是一个四字节整数,标识了 Data 段的字节数。Checksum 则是该条日志的校验和。
单条日志的校验和,其实就是通过一个指定的种子实现的:
private int calChecksum(int xCheck, byte[] log) { for (byte b : log) { xCheck = xCheck * SEED + b; } return xCheck; }
这样,对所有日志求出校验和,求和就能得到日志文件的校验和了。
Logger 被实现成迭代器模式,通过 next() 方法,不断地从文件中读取下一条日志,并将其中的 Data 解析出来并返回。next() 方法的实现主要依靠 internNext(),大致如下,其中 position 是当前日志文件读到的位置偏移:
private byte[] internNext() { if(position + OF_DATA >= fileSize) { return null; } // 读取size ByteBuffer tmp = ByteBuffer.allocate(4); fc.position(position); fc.read(tmp); int size = Parser.parseInt(tmp.array()); if(position + size + OF_DATA > fileSize) { return null; } // 读取checksum+data ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size); fc.position(position); fc.read(buf); byte[] log = buf.array(); // 校验 checksum int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length)); int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA)); if(checkSum1 != checkSum2) { return null; } position += log.length; return log; }
在打开一个日志文件时,需要首先校验日志文件的 XChecksum,并移除文件尾部可能存在的 BadTail,由于 BadTail 该条日志尚未写入完成,文件的校验和也就不会包含该日志的校验和,去掉 BadTail 即可保证日志文件的一致性。
private void checkAndRemoveTail() { rewind(); int xCheck = 0; while(true) { byte[] log = internNext(); if(log == null) break; xCheck = calChecksum(xCheck, log); } if(xCheck != xChecksum) { Panic.panic(Error.BadLogFileException); } // 截断文件到正常日志的末尾 truncate(position); rewind(); }
向日志文件写入日志时,也是首先将数据包裹成日志格式,写入文件后,再更新文件的校验和,更新校验和时,会刷新缓冲区,保证内容写入磁盘。
public void log(byte[] data) { byte[] log = wrapLog(data); ByteBuffer buf = ByteBuffer.wrap(log); lock.lock(); try { fc.position(fc.size()); fc.write(buf); } catch(IOException e) { Panic.panic(e); } finally { lock.unlock(); } updateXChecksum(log); } private void updateXChecksum(byte[] log) { this.xChecksum = calChecksum(this.xChecksum, log); fc.position(0); fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum))); fc.force(false); } private byte[] wrapLog(byte[] data) { byte[] checksum = Parser.int2Byte(calChecksum(0, data)); byte[] size = Parser.int2Byte(data.length); return Bytes.concat(size, checksum, data); }恢复策略
恢复策略来自于 NYADB2 的恢复策略,比较烧脑(我感觉)。
DM 为上层模块,提供了两种 *** 作,分别是插入新数据(I)和更新现有数据(U)。至于为啥没有删除数据,这个会在 VM 一节叙述。
DM 的日志策略很简单,一句话就是:
在进行 I 和 U *** 作之前,必须先进行对应的日志 *** 作,在保证日志写入磁盘后,才进行数据 *** 作。
这个日志策略,使得 DM 对于数据 *** 作的磁盘同步,可以更加随意。日志在数据 *** 作之前,保证到达了磁盘,那么即使该数据 *** 作最后没有来得及同步到磁盘,数据库就发生了崩溃,后续也可以通过磁盘上的日志恢复该数据。
对于两种数据 *** 作,DM 记录的日志如下:
- (Ti, I, A, x),表示事务 Ti 在 A 位置插入了一条数据 x
- (Ti, U, A, oldx, newx),表示事务 Ti 将 A 位置的数据,从 oldx 更新成 newx
我们首先不考虑并发的情况,那么在某一时刻,只可能有一个事务在 *** 作数据库。日志会看起来像下面那样:
(Ti, x, x), ..., (Ti, x, x), (Tj, x, x), ..., (Tj, x, x), (Tk, x, x), ..., (Tk, x, x)单线程
由于单线程,Ti、Tj 和 Tk 的日志永远不会相交。这种情况下利用日志恢复很简单,假设日志中最后一个事务是 Ti:
- 对 Ti 之前所有的事务的日志,进行重做(redo)
- 接着检查 Ti 的状态(XID 文件),如果 Ti 的状态是已完成(包括 committed 和 aborted),就将 Ti 重做,否则进行撤销(undo)
接着,是如何对事务 T 进行 redo:
- 正序扫描事务 T 的所有日志
- 如果日志是插入 *** 作 (Ti, I, A, x),就将 x 重新插入 A 位置
- 如果日志是更新 *** 作 (Ti, U, A, oldx, newx),就将 A 位置的值设置为 newx
undo 也很好理解:
- 倒序扫描事务 T 的所有日志
- 如果日志是插入 *** 作 (Ti, I, A, x),就将 A 位置的数据删除
- 如果日志是更新 *** 作 (Ti, U, A, oldx, newx),就将 A 位置的值设置为 oldx
注意,MYDB 中其实没有真正的删除 *** 作,对于插入 *** 作的 undo,只是将其中的标志位设置为 invalid。对于删除的探讨将在 VM 一节中进行。
多线程经过以上的 *** 作,就能保证了 MYDB 在单线程下的恢复性。对于多线程的情况下呢?我们来考虑下面的两种情况。
第一种:
T1 begin T2 begin T2 U(x) T1 R(x) ... T1 commit MYDB break down
在系统崩溃时,T2 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会撤销 T2,它对数据库的影响会被消除。但是由于 T1 读取了 T2 更新的值,既然 T2 被撤销,那么 T1 也应当被撤销。这种情况,就是级联回滚。但是,T1 已经 commit 了,所有 commit 的事务的影响,应当被持久化。这里就造成了矛盾。所以这里需要保证:
规定1:正在进行的事务,不会读取其他任何未提交的事务产生的数据。
第二种情况,假设 x 的初值是 0
T1 begin T2 begin T1 set x = x+1 // 产生的日志为(T1, U, A, 0, 1) T2 set x = x+1 // 产生的日志为(T1, U, A, 1, 2) T2 commit MYDB break down
在系统崩溃时,T1 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会对 T1 进行撤销,对 T2 进行重做,但是,无论撤销和重做的先后顺序如何,x 最后的结果,要么是 0,要么是 2,这都是错误的。
出现这种问题的原因, 归根结底是因为我们的日志太过简单, 仅仅记录了"前相"和"后相". 并单纯的依靠"前相"undo, 依靠"后相"redo. 这种简单的日志方式和恢复方式, 并不能涵盖住所有数据库 *** 作形成的语义
解决方法有两种:
- 增加日志种类
- 限制数据库 *** 作
MYDB 采用的是限制数据库 *** 作,需要保证:
规定2:正在进行的事务,不会修改其他任何未提交的事务修改或产生的数据。
在 MYDB 中,由于 VM 的存在,传递到 DM 层,真正执行的 *** 作序列,都可以保证规定 1 和规定 2。VM 如何保证这两条规定,会在 VM 层一节中说明(VM 的坑还挺大)。有了这两条规定,并发情况下日志的恢复也就很简单了:
- 重做所有崩溃时已完成(committed 或 aborted)的事务
- 撤销所有崩溃时未完成(active)的事务
在恢复后,数据库就会恢复到所有已完成事务结束,所有未完成事务尚未开始的状态。
实现首先规定两种日志的格式:
private static final byte LOG_TYPE_INSERT = 0; private static final byte LOG_TYPE_UPDATE = 1; updateLog: [LogType] [XID] [UID] [OldRaw] [NewRaw] insertLog: [LogType] [XID] [Pgno] [Offset] [Raw]
和原理中描述的类似,recover 例程主要也是两步:重做所有已完成事务,撤销所有未完成事务:
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) { lg.rewind(); while(true) { byte[] log = lg.next(); if(log == null) break; if(isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if(!tm.isActive(xid)) { doInsertLog(pc, log, REDO); } } else { UpdateLogInfo xi = parseUpdateLog(log); long xid = xi.xid; if(!tm.isActive(xid)) { doUpdateLog(pc, log, REDO); } } } } private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) { Map> logCache = new HashMap<>(); lg.rewind(); while(true) { byte[] log = lg.next(); if(log == null) break; if(isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if(tm.isActive(xid)) { if(!logCache.containsKey(xid)) { logCache.put(xid, new ArrayList<>()); } logCache.get(xid).add(log); } } else { UpdateLogInfo xi = parseUpdateLog(log); long xid = xi.xid; if(tm.isActive(xid)) { if(!logCache.containsKey(xid)) { logCache.put(xid, new ArrayList<>()); } logCache.get(xid).add(log); } } } // 对所有active log进行倒序undo for(Entry > entry : logCache.entrySet()) { List logs = entry.getValue(); for (int i = logs.size()-1; i >= 0; i --) { byte[] log = logs.get(i); if(isInsertLog(log)) { doInsertLog(pc, log, UNDO); } else { doUpdateLog(pc, log, UNDO); } } tm.abort(entry.getKey()); } }
updateLog 和 insertLog 的重做和撤销处理,分别合并成一个方法来实现:
private static void doUpdateLog(PageCache pc, byte[] log, int flag) { int pgno; short offset; byte[] raw; if(flag == REDO) { UpdateLogInfo xi = parseUpdateLog(log); pgno = xi.pgno; offset = xi.offset; raw = xi.newRaw; } else { UpdateLogInfo xi = parseUpdateLog(log); pgno = xi.pgno; offset = xi.offset; raw = xi.oldRaw; } Page pg = null; try { pg = pc.getPage(pgno); } catch (Exception e) { Panic.panic(e); } try { PageX.recoverUpdate(pg, raw, offset); } finally { pg.release(); } } private static void doInsertLog(PageCache pc, byte[] log, int flag) { InsertLogInfo li = parseInsertLog(log); Page pg = null; try { pg = pc.getPage(li.pgno); } catch(Exception e) { Panic.panic(e); } try { if(flag == UNDO) { DataItem.setDataItemRawInvalid(li.raw); } PageX.recoverInsert(pg, li.raw, li.offset); } finally { pg.release(); } }
注意,doInsertLog() 方法中的删除,使用的是 DataItem.setDataItemRawInvalid(li.raw);,dataItem 将在下一节中说明,大致的作用,就是将该条 DataItem 的有效位设置为无效,来进行逻辑删除。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)