Note:本文实际绑定的版本是branch5.0(2018-7-25)。
对象基础
对象是什么
上一篇文章中,介绍了redis的各种基础数据结构,实际上,那些数据结构不是直接对应到那些redis指令的,而是加了一层object层,每种object可以用特定的1种或多种基础数据结构来实现。
对象类型
总共有7种:
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */
对象编码
#define OBJ_ENCODING_RAW 0 /* 什么数据结构都没用到 */
#define OBJ_ENCODING_INT 1 /* 整数 */
#define OBJ_ENCODING_HT 2 /* 散列表 */
#define OBJ_ENCODING_ZIPMAP 3 /* zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* 已废弃 */
#define OBJ_ENCODING_ZIPLIST 5 /* ziplist */
#define OBJ_ENCODING_INTSET 6 /* intset */
#define OBJ_ENCODING_SKIPLIST 7 /* skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* 嵌入式sds */
#define OBJ_ENCODING_QUICKLIST 9 /* 由多个ziplist组成的链表(linked list) */
#define OBJ_ENCODING_STREAM 10 /* radix tree of listpacks */
对象结构体
typedef struct redisObject {
unsigned type:4; // 上文已介绍
unsigned encoding:4; // 上文已介绍
unsigned lru:LRU_BITS; // 计时的
int refcount;
void *ptr;
} robj;
createObject
这个是最基本的创建对象接口:
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; // 根据访问频率的
} else {
o->lru = LRU_CLOCK(); // 根据最近访问时间淘汰的,精确到分钟
}
return o;
}
蛮简单的,没啥好说。
对象释放
每个对象的释放都是自动的,在decrRefCount里执行,会自动根据o->type来执行不同的释放动作。
对象的编码情况
根据decrRefCount可以知道type和encoding的对应关系:
- OBJ_STRING: OBJ_ENCODING_INT / OBJ_ENCODING_RAW / OBJ_ENCODING_EMBSTR
- OBJ_LIST: OBJ_ENCODING_QUICKLIST(OBJ_ENCODING_ZIPLIST的list实际上没有被用到)
- OBJ_SET: OBJ_ENCODING_HT / OBJ_ENCODING_INTSET
- OBJ_ZSET: OBJ_ENCODING_SKIPLIST / OBJ_ENCODING_ZIPLIST
- OBJ_HASH: OBJ_ENCODING_HT / OBJ_ENCODING_ZIPLIST
- OBJ_MODULE: 无
- OBJ_STREAM: 无
P.S.,并不是所有的redis功能指令都会创建object的,例如pfadd。
redisCommand
结构
struct redisCommand {
char *name; // 即暴露给客户端的指令名
redisCommandProc *proc; // 接口
int arity; // 参数数量
char *sflags; // 字符串形式的flags
int flags; // sflags的数值表示
redisGetKeysProc *getkeys_proc; // 貌似还在施工的
int firstkey;
int lastkey;
int keystep;
long long microseconds, calls;
};
server.c里面定义了各种指令。
flags解释
/*
* w: 写指令
* r: 读指令
* m: 会增加内存使用量的指令(如果out of memory了就不要使用)
* a: 管理员指令(SAVE / SHUTDOWN / ···)
* p: 发布订阅相关指令
* f: 没有指令用到这个flag
* s: 不能在脚本中使用的指令
* R: 带有随机性的指令(SPOP / RANDOMKEY / ···)
* S: 软指令,会输出数组,有确定性(hkeys/hvals/smembers/sdiff/sunion/sinter/keys)
* l: 在加载数据库过程中允许使用的指令(select/shutdown/···)
* t: 和主从的数据同步状态有关
* M: 开启了MONITOR时不需要列入监控的指令(exec)
* k: 和cluster模式有关,只有restore-asking用到
* F: 高速指令,O(1)或O(log(N))。如果会触发del(del可能会拖时间),那么不是高速指令(如set不是高速指令,而get是)
*/
t_string
特点
- 大小不超过512MB
- 二进制安全
- t_string.c都是业务逻辑,主要还是看sds的实现
createRawStringObject
robj *createRawStringObject(const char *ptr, size_t len) {
return createObject(OBJ_STRING, sdsnewlen(ptr,len));
}
需要2次内存分配,返回一个type为OBJ_STRING、encoding为OBJ_ENCODING_RAW的sds对象。
createEmbeddedStringObject
这个和createRawStringObject的最大区别是,只做一次内存分配:
robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
encoding是OBJ_ENCODING_EMBSTR。
t_list
特点
- t_list就只用了quicklist,入口是robj *createQuicklistObject(void);
和list有关的配置项
list-max-ziplist-size:(就是quicklist的fill)
是用来控制quicklist里的节点(ziplist)的大小的,大小有2种指标:entry数或者ziplist字节总数。
按字节总数:
- -5: max size: 64 Kb <-- not recommended for normal workloads
- -4: max size: 32 Kb <-- not recommended
- -3: max size: 16 Kb <-- probably not recommended
- -2: max size: 8 Kb <-- good(默认值)
- -1: max size: 4 Kb <-- good
按entry数:
- 要填大于等于0的数
- 每个节点最多存这么多个entries
- 每个节点单独计数
list-compress-depth
- 默认0,表示关闭压缩功能
- depth表示头尾各有多少个节点不压缩
- 头尾节点必然不压缩
- 1表示只有head和tail不压缩,其他节点都压缩
- 2表示head、head->next、tail->prev、tail不压缩,其他节点都压缩;>2的情况以此类推
这2个全局参数是通过调用这个函数,填入quicklist对象的:
void quicklistSetOptions(quicklist *quicklist, int fill, int depth);
t_set
特点
- 如果sadd第一个元素是整数,那么创建的是intset,否则是set(dict,即hashtable)
- 元素都是sds,意味着要么是整数,要么是二进制安全字符串
当满足一些条件时,t_set会从intset升级到set:
- 添加的元素不是整数时
- 包含元素过多时
// set-max-intset-entries 默认 512
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
交集并集运算
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds ele;
int j, cardinality = 0;
int diff_algo = 1; // diff有2种算法,自动选最佳的
// 初始化sets
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
// 这个kv可能不存在,跳过
if (!setobj) {
sets[j] = NULL;
continue;
}
// 检查这个kv是不是set
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
// 用algo_one_work, algo_two_work算出,用哪个DIFF算法比较好,
// 一个O(N*M),N=是第一个set的元素数量,M是set数量
// 一个 O(N),N=所有set的元素数量之和
if (op == SET_OP_DIFF && sets[0]) {
long long algo_one_work = 0, algo_two_work = 0;
for (j = 0; j < setnum; j++) {
if (sets[j] == NULL) continue;
algo_one_work += setTypeSize(sets[0]);
algo_two_work += setTypeSize(sets[j]);
}
algo_one_work /= 2; // 0.5的常数
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2; // 比较并选择其一
if (diff_algo == 1 && setnum > 1) {
// 按照包含元素数量 降序排序 方便尽快找到重复元素
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}
dstset = createIntsetObject(); // 存结果的
if (op == SET_OP_UNION) { // 最简单的,往dstset怼元素就行了
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (setTypeAdd(dstset,ele)) cardinality++;
sdsfree(ele);
}
setTypeReleaseIterator(si);
}
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
// DIFF Algorithm 1:
// 遍历第一个集合每个元素,与其他集合做setTypeIsMember操作,
// 只在第一个集合的元素就添加到dstset里
si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; /* no key is an empty set. */
if (sets[j] == sets[0]) break; /* same set! */
if (setTypeIsMember(sets[j],ele)) break;
}
if (j == setnum) {
/* There is no other set with this element. Add it. */
setTypeAdd(dstset,ele);
cardinality++;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
// DIFF Algorithm 2:
// 第一个集合的元素都add到dstset,然后对于其他集合的元素,执行
// setTypeRemove(dstset,ele),如果移除成功,说明重复了,计数-1
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
if (setTypeRemove(dstset,ele)) cardinality--;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
if (cardinality == 0) break;
}
}
// 已经得到结果后
if (!dstkey) {
// 直接reply
addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
}
setTypeReleaseIterator(si);
decrRefCount(dstset);
} else {
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,
op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);
}
t_hash
特点
- 默认用OBJ_ENCODING_ZIPLIST
- 用一个hashTypeLookupWriteOrCreate来获取一个hash对象,如果hash对象不存在会即时创建一个
和list有关的配置项
hash-max-ziplist-entries 512
用ziplist时,最多存512个元素
hash-max-ziplist-value 64
单个元素超过64字节时,就得转成hashtable
t_zset
特点
- 有升级转换也有降级转换
- 如果是skiplist编码,那么其实还用了dict,用来判断key是不是已经在zset里了
- dict的key是引用skiplist里的,所以不会有2个key副本,没有冗余
和zset有关的配置项
用这2个配置项来控制什么时候升级什么时候降级:
zset-max-ziplist-entries 128
用ziplist时,最多存128个元素
zset-max-ziplist-value 64
单个元素超过64字节时,就得转成hashtable
写作不易,您的支持是我写作的动力!