redis之AOF文件加载过程


redis支持2种持久化功能,分别是RDB持久化和AOF(Append Only File)持久化。

今天总结下redis加载AOF文件过程,以及加载AOF过程中遇到的问题。

由于AOF文件里面包含了重建redis数据库状态所需要的所有命令,因此在redis启动过程中需要加载一次AOF文件(前提是redis配置文件中使用的是aof文件),这样就可以还原之前的redis所有状态了。

redis在启动时加载AOF过程如下

 int main(int argc, char **argv) {   ...    // 如果服务器不是运行在 SENTINEL 模式,那么执行以下代码     if (!server.sentinel_mode) { //sentinel和集群只能二选1         /* Things not needed when running in Sentinel mode. */         redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);     #ifdef __linux__         linuxOvercommitMemoryWarning();     #endif         // 从 AOF 文件或者 RDB 文件中载入数据         loadDataFromDisk();         // 判断是否启动集群         if (server.cluster_enabled) { //在该函数前会先载入cluster配置nodes.conf,见initServer->clusterInit;             if (verifyClusterConfigWithData() == REDIS_ERR) {                 redisLog(REDIS_WARNING,                     "You can't have keys in a DB different than DB 0 when in "                     "Cluster mode. Exiting.");                 exit(1);             }         }         // 打印 TCP 端口         if (server.ipfd_count > 0)             redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);         // 打印本地套接字端口         if (server.sofd > 0)             redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);     } else { //sentinel和集群只能二选1         sentinelIsRunning();     }       /* Warning the user about suspicious maxmemory setting. */     // 检查不正常的 maxmemory 配置     if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {         redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);     }       // 运行事件处理器,一直到服务器关闭为止     aeSetBeforeSleepProc(server.el,beforeSleep);     aeMain(server.el);        // 服务器关闭,停止事件循环     aeDeleteEventLoop(server.el);       return 0; }

从上面代码得知,redis在loadDataFromDisk()中加载AOF文件。

 void loadDataFromDisk(void) { //loadDataFromDisk和rdbSave对应加载写入     // 记录开始时间     long long start = ustime();     // AOF 持久化是否已打开     if (server.aof_state == REDIS_AOF_ON) {         // 尝试载入 AOF 文件         if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)             // 打印载入信息,并计算载入耗时长度             redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);     // AOF 持久化未打开     } else {         // 尝试载入 RDB 文件         if (rdbLoad(server.rdb_filename) == REDIS_OK) {             // 打印载入信息,并计算载入耗时长度             redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",                 (float)(ustime()-start)/1000000);         } else if (errno != ENOENT) {             redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));             exit(1);         }     } }  

若开启使用的是AOF持久化,则调用loadAppendOnlyFile进行加载AOF文件。

 int loadAppendOnlyFile(char *filename) {      //伪客户端     struct redisClient *fakeClient;       // 打开 AOF 文件     FILE *fp = fopen(filename,"r");、     struct redis_stat sb;     int old_aof_state = server.aof_state;     long loops = 0;        // 检查文件的正确性     if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {         server.aof_current_size = 0;         fclose(fp);         return REDIS_ERR;     }      // 检查文件是否正常打开     if (fp == NULL) {         redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));         exit(1);     }     /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI      * to the same file we're about to read.       * 暂时性地关闭 AOF ,防止在执行 MULTI 时,           * EXEC 命令被传播到正在打开的 AOF 文件中。      */     server.aof_state = REDIS_AOF_OFF;     //创建一个伪客户端     fakeClient = createFakeClient();     // 设置服务器的状态为:正在载入         startLoading(fp); //startLoading 定义于 rdb.c       while(1) {         int argc, j;         unsigned long len;         robj **argv;         char buf[128];         sds argsds;         struct redisCommand *cmd;           /* Serve the clients from time to time           * 间隔性地处理客户端发送来的请求                   * 因为服务器正处于载入状态,所以能正常执行的只有 PUBSUB 等模块          */         if (!(loops++ % 1000)) {             loadingProgress(ftello(fp));             processEventsWhileBlocked();         }          // 读入文件内容到缓存         if (fgets(buf,sizeof(buf),fp) == NULL) {             if (feof(fp))                 break; // 文件已经读完,跳出             else                 goto readerr;         }           // 确认协议格式,比如 *3rn         if (buf[0] != '*') goto fmterr;         // 取出命令参数,比如 *3rn 中的 3         argc = atoi(buf+1);        // 至少要有一个参数(被调用的命令)         if (argc < 1) goto fmterr;         // 从文本中创建字符串对象:包括命令,以及命令参数                 // 例如 $3rnSETrn$3rnKEYrn$5rnVALUErn                // 将创建三个包含以下内容的字符串对象:         // SET 、 KEY 、 VALUE         argv = zmalloc(sizeof(robj*)*argc);         for (j = 0; j < argc; j++) {             if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;               if (buf[0] != '$') goto fmterr;                // 读取参数值的长度             len = strtol(buf+1,NULL,10);             // 读取参数值                         argsds = sdsnewlen(NULL,len);                         if (len && fread(argsds,len,1,fp) == 0)                 goto fmterr;                           // 为参数创建对象             argv[j] = createObject(REDIS_STRING,argsds);               if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */         }           /* Command lookup           *查找命令          */         cmd = lookupCommand(argv[0]->ptr);         if (!cmd) {             redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);             exit(1);         }           /* Run the command in the context of a fake client           * 调用伪客户端,执行命令          */         fakeClient->argc = argc;         fakeClient->argv = argv;         cmd->proc(fakeClient);           /* The fake client should not have a reply */         redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);         /* The fake client should never get blocked */         redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);           /* Clean up. Command code may have changed argv/argc so we use the          * argv/argc of the client instead of the local variables.           * 清理命令和命令参数对象          */         for (j = 0; j < fakeClient->argc; j++)             decrRefCount(fakeClient->argv[j]);         zfree(fakeClient->argv);     }       /* This point can only be reached when EOF is reached without errors.      * If the client is in the middle of a MULTI/EXEC, log error and quit.       * 如果能执行到这里,说明 AOF 文件的全部内容都可以正确地读取,           * 但是,还要检查 AOF 是否包含未正确结束的事务      */     if (fakeClient->flags & REDIS_MULTI) goto readerr;       fclose(fp);    // 释放伪客户端     freeFakeClient(fakeClient);    // 复原 AOF 状态     server.aof_state = old_aof_state;    // 停止载入     stopLoading();     // 更新服务器状态中, AOF 文件的当前大小     aofUpdateCurrentSize();     // 记录前一次重写时的大小     server.aof_rewrite_base_size = server.aof_current_size;     return REDIS_OK;   // 读入错误 readerr:     // 非预期的末尾,可能是 AOF 文件在写入的中途遭遇了停机     if (feof(fp)) {         redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");      // 文件内容出错     } else {         redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));     }     exit(1); // 内容格式错误 fmterr:     redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");     exit(1); }  

由于AOF文件中保存的都是用户可见的一条条命令,因此loadAppendOnlyFile通过创建一个伪客户端读取AOF文件获取一条条命令来恢复执行。

 

redis读取AOF文件并还原数据库的状态过程如下:

redis之AOF文件加载过程

 

以上就是服务器根据读入AOF文件进行还原数据库状态的原理。

 

从上面的介绍可以知道加载AOF文件是redis启动过程中的步骤,当业务线程启动连接redis获取或者保存数据时,若redis正在加载数据,则有可能获取不到数据:

 127.0.0.1:6379> get a (error) LOADING Redis is loading the dataset in memory

原因是当加载过程中,有些客户端发来的命令是不执行的,直接返回上述结果。只有命令带有"l"(小写的L)标志的才会执行,比如info命令等。

 

那么怎么判断AOF文件是否加载完成呢?

刚才说到加载时可以执行info命令,redis info 命令以一种易于理解和阅读的格式,返回关于 Redis 服务器的各种信息和统计数值。该统计信息中有Persistence 参数,该参数中记录着 RDB 和 AOF 的相关信息,其中有个loading字段标志这数据是否正在加载,当正在加载时为1,加载完为0,因此可以根据该字段来判断数据是否加载完毕。

 127.0.0.1:6379> info Persistence # Persistence loading:1    // 1表示正在加载数据 ...   127.0.0.1:6379> info Persistence # Persistence loading:0   // 0表示数据加载完毕 ...  

 

因此业务线程可以根据info命令获取loading进行判断数据是否加载完毕。

 public boolean isLoading(){   try{         String info = (String)redisTemplate.getRequiredConnectionFactory().getConnection()                       .info("Persistence").get("loading");         if("0".equalsIgnoreCase(info)){           return true;         }   }catch (Exception e){       error("Failed to get loading status : {}", e.getMessage());   }   return false; }

 

分享到