本文隶属于专题系列: Libevent源码分析

锁操作:

        在 前一篇博文可以看到很多函数在操作前都需要对这个evbuffer进行加锁。同event_base不同,如果evbuffer支持锁的话,要显式地调用函数evbuffer_enable_locking。
//buffer.c文件
int//参数可以是一个锁变量也可以是NULL
evbuffer_enable_locking(struct evbuffer *buf, void *lock)
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
	return -1;
#else
	if (buf->lock)
		return -1;
	if (!lock) {
		//自己分配锁变量
		EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
		if (!lock)
			return -1;
		buf->lock = lock;
		//该evbuffer拥有锁,到时需要释放锁内存
		buf->own_lock = 1;
	} else {
		buf->lock = lock;//使用参数提供的锁
		buf->own_lock = 0;//自己没有拥有锁。不需要释放锁内存
	}
	return 0;
#endif
}

        可以看到,第二个参数可以为NULL。此时函数内部会申请一个锁。明显如果要让evbuffer能使用锁,就必须在一开始就调用evthread_use_windows_threads()或者evthread_use_pthreads(),关于这个可以参考一篇博文

        因为用户操控这个evbuffer,所以Libevent提供了加锁和解锁接口给用户使用。

//buffer.c文件
void
evbuffer_lock(struct evbuffer *buf)
{
	EVBUFFER_LOCK(buf);
}
void
evbuffer_unlock(struct evbuffer *buf)
{
	EVBUFFER_UNLOCK(buf);
}
//evbuffer-internal.h文件
#define EVBUFFER_LOCK(buffer)						\
	do {								\
		EVLOCK_LOCK((buffer)->lock, 0);				\
	} while (0)
#define EVBUFFER_UNLOCK(buffer)						\
	do {								\
		EVLOCK_UNLOCK((buffer)->lock, 0);			\
	} while (0)

        在Libevent内部,一般不会使用这个两个接口,而是直接使用EVBUFFER_LOCK(buf)和EVBUFFER_UNLOCK(buf)。

查找操作:

下图展示了evbuffer的一些查找操作以及调用关系。

        

查找结构体:

        对于一个数组或者一个文件,只需一个下标或者偏移量就可以定位查找了。但对于evbuffer来说,它的数据是由一个个的evbuffer_chain用链表连在一起的。所以在evbuffer中定位,不仅仅要有一个偏移量,还要指明是哪个evbuffer_chain,甚至是在evbuffer_chain中的偏移量。因此Libevent定义了一个查找(定位)结构体:

//buffer.h文件
struct evbuffer_ptr {
	ev_ssize_t pos;//总偏移量,相对于数据的开始位置
	/* Do not alter the values of fields. */
	struct {
		void *chain;//指明是哪个evbuffer_chain
		size_t pos_in_chain; //在evbuffer_chain中的偏移量
	} _internal;
};

        有一点要注意,pos_in_chain是从misalign这个错开空间之后计算的,也就是说其实际偏移量为:chain->buffer+ chain->misalign + pos_in_chain。

        定位结构体有一个对应的操作函数evbuffer_ptr_set,该函数就像fseek函数那样,可以设置或者移动偏移量,并且可以绝对和相对地移动。

//buffer.h文件
enum evbuffer_ptr_how {
	EVBUFFER_PTR_SET, //偏移量是一个绝对位置
	EVBUFFER_PTR_ADD //偏移量是一个相对位置
};
//buffer.c文件
//设置evbuffer_ptr。evbuffer_ptr_set(buf, &pos, 0, EVBUFFER_PTR_SET)
//将这个pos指向链表的开头
//position指明移动的偏移量,how指明该偏移量是绝对偏移量还是相对当前位置的偏移量。
int//这个函数的作用就像C语言中的fseek,设置文件指针的偏移量
evbuffer_ptr_set(struct evbuffer *buf, struct evbuffer_ptr *pos,
    size_t position, enum evbuffer_ptr_how how)
{
	size_t left = position;
	struct evbuffer_chain *chain = NULL;
	EVBUFFER_LOCK(buf);
	//这个switch的作用就是给pos设置新的总偏移量值。
	switch (how) {
	case EVBUFFER_PTR_SET://绝对位置
		chain = buf->first;//从第一个evbuffer_chain算起
		pos->pos = position; //设置总偏移量
		position = 0;
		break;
	case EVBUFFER_PTR_ADD://相对位置
		chain = pos->_internal.chain;//从当前evbuffer_chain算起
		pos->pos += position;//加上相对偏移量
		position = pos->_internal.pos_in_chain;
		break;
	}
	//这个偏移量跨了evbuffer_chain。可能不止跨一个chain。
	while (chain && position + left >= chain->off) {
		left -= chain->off - position;
		chain = chain->next;
		position = 0;
	}
	if (chain) {//设置evbuffer_chain内的偏移量
		pos->_internal.chain = chain;
		pos->_internal.pos_in_chain = position + left;
	} else {//跨过了所有的节点
		pos->_internal.chain = NULL;
		pos->pos = -1;
	}
	EVBUFFER_UNLOCK(buf);
	return chain != NULL ? 0 : -1;
}

        可以看到,该函数只考虑了向后面的chain移动定位指针,不能向。当然如果参数position小于0,并且移动时并不会跨越当前的chain,还是可以的。不过最好不要这样做。如果确实想移回头,那么可以考虑下面的操作。

pos.position -= 20;//移回头20个字节。
evbuffer_ptr_set(buf, &pos, 0, EVBUFFER_PTR_SET);

查找一个字符:

        有下面代码的前一个函数是可以用来查找一个字符的,第二个函数则是获取对于位置的字符。
static inline ev_ssize_t
evbuffer_strchr(struct evbuffer_ptr *it, const char chr);
static inline char//获取对应位置的字符
evbuffer_getchr(struct evbuffer_ptr *it);
static inline int
evbuffer_strspn(struct evbuffer_ptr *ptr, const char *chrset);

        函数evbuffer_strchr是从it指向的evbuffer_chain开始查找,会往后面的链表查找。it是一个值-结果参数,如果查找到了,那么it将会指明被查找字符的位置,并返回相对于evbuffer的总偏移量(即it->pos)。如果没有找到,就返回-1。由于实现都是一般的字符比较,所以就不列出代码了。函数evbuffer_getchr很容易理解也不列出代码了。

        第三个函数evbuffer_strspn的参数chrset虽然是一个字符串,其实内部也是比较字符的。该函数所做的操作和C语言标准库里面的strspn函数是一样的。这里也不多说了。关于strspn函数的理解可以查看 这里

查找一个字符串:

        字符串的查找函数有evbuffer_search_range和evbuffer_search,后者调用前者完成查找。

        在讲查找前,先看一个字符串比较函数evbuffer_ptr_memcmp。该函数是比较某一个字符串和从evbuffer中某个位置开始的字符是否相等。明显比较的时候需要考虑到跨evbuffer_chain的问题。
static int //匹配成功会返回0
evbuffer_ptr_memcmp(const struct evbuffer *buf, const struct evbuffer_ptr *pos,
    const char *mem, size_t len)
{
	struct evbuffer_chain *chain;
	size_t position;
	int r;
	//链表数据不够
	if (pos->pos + len > buf->total_len)
		return -1;
	//需要考虑这个要匹配的字符串被分散在两个evbuffer_chain中
	chain = pos->_internal.chain;
	position = pos->_internal.pos_in_chain;//从evbuffer_chain中的这个位置开始
	while (len && chain) {
		size_t n_comparable;//该evbuffer_chain中可以比较的字符数
		if (len + position > chain->off)
			n_comparable = chain->off - position;
		else
			n_comparable = len;
		r = memcmp(chain->buffer + chain->misalign + position, mem,
		    n_comparable);
		if (r)//不匹配
			return r;
		//考虑跨evbuffer_chain
		mem += n_comparable;
		len -= n_comparable;//还有这些是没有比较的
		position = 0;
		chain = chain->next;
	}
	return 0;//匹配成功
}

        该函数首先比较pos指向的当前evbuffer_chain,如果字符mem还有一些字符没有参与比较,那么就需要用下一个evbuffer_chain的数据。

        由于evbuffer的数据是由链表组成的,没办法直接用KMP查找算法或者直接调用strstr函数。有了evbuffer_ptr_memcmp函数,读者可能会想,一个字节一个字节地挪动evbuffer的数据,依次调用evbuffer_ptr_memcmp函数。但evbuffer_search_range函数也不是直接调用函数evbuffer_ptr_memcmp的。而是先用字符查找函数,找到要查找字符串中的第一个字符,然后才调用那个函数。下面是具体的代码。
//buffer.c文件
struct evbuffer_ptr
evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start)
{
	return evbuffer_search_range(buffer, what, len, start, NULL);
}
//参数start和end指明了查找的范围
struct evbuffer_ptr
evbuffer_search_range(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start, const struct evbuffer_ptr *end)
{
	struct evbuffer_ptr pos;
	struct evbuffer_chain *chain, *last_chain = NULL;
	const unsigned char *p;
	char first;
	EVBUFFER_LOCK(buffer);
	//初始化pos
	if (start) {
		memcpy(&pos, start, sizeof(pos));
		chain = pos._internal.chain;
	} else {
		pos.pos = 0;
		chain = pos._internal.chain = buffer->first;
		pos._internal.pos_in_chain = 0;
	}
	if (end)
		last_chain = end->_internal.chain;
	if (!len || len > EV_SSIZE_MAX)
		goto done;
	first = what[0];
	//在本函数里面并不考虑到what的数据量比较链表的总数据量还多。
	//但在evbuffer_ptr_memcmp函数中会考虑这个问题。此时该函数直接返回-1。
	//本函数之所以没有考虑这样情况,可能是因为,在[start, end]之间有多少
	//数据是不值得统计的,时间复杂度是O(n)。不是一个简单的buffer->total_len
	//就能获取到的
	while (chain) {
		const unsigned char *start_at =
		    chain->buffer + chain->misalign +
		    pos._internal.pos_in_chain;
		//const void * memchr ( const void * ptr, int value, size_t num );
		//函数的作用是:在ptr指向的内存块中(长度为num个字节),需找字符value。
		//如果找到就返回对应的位置,找不到返回NULL
		p = memchr(start_at, first,
		    chain->off - pos._internal.pos_in_chain);
		if (p) {//找到了what[0]
			pos.pos += p - start_at;
			pos._internal.pos_in_chain += p - start_at;
			//经过上面的两个 += 后,pos指向了这个chain中出现等于what[0]字符的位置
			//但本函数是要匹配一个字符串,而非一个字符
			//evbuffer_ptr_memcmp比较整个字符串。如果有需要的话,该函数会跨
			//evbuffer_chain进行比较,但不会修改pos。如果成功匹配,那么返回0
			if (!evbuffer_ptr_memcmp(buffer, &pos, what, len)) {//匹配成功
				//虽然匹配成功了,但可能是用到了end之后的链表数据。这也等于没有找到
				if (end && pos.pos + (ev_ssize_t)len > end->pos)
					goto not_found;
				else
					goto done;
			}
			//跳过这个等于what[0]的字符
			++pos.pos;
			++pos._internal.pos_in_chain;
			//这个evbuffer_chain已经全部都对比过了。没有发现目标
			if (pos._internal.pos_in_chain == chain->off) {
				chain = pos._internal.chain = chain->next;
				pos._internal.pos_in_chain = 0;//下一个chain从0开始
			}
		} else {//这个evbuffer_chain都没有找到what[0]
			if (chain == last_chain)
				goto not_found;
			//此时直接跳过这个evbuffer_chain
			pos.pos += chain->off - pos._internal.pos_in_chain;
			chain = pos._internal.chain = chain->next;
			pos._internal.pos_in_chain = 0;//下一个chain从0开始
		}
	}
not_found:
	pos.pos = -1;
	pos._internal.chain = NULL;
done:
	EVBUFFER_UNLOCK(buffer);
	return pos;
}

        evbuffer_ptr_memcmp函数和evbuffer_search函数是有区别的。前者只会比较从pos指定位置开始的字符串,不会在另外的地方找一个字符串。而后者则会在后面另外找一个字符串进行比较。

查找换行符:

        换行符是一个比较重要的符号,例如http协议就基于行的。Libevent实现了一个简单的http服务器,因此在内部Libevent实现了一些读取一行数据函数以及与行相关的操作。

        有些系统行尾用\r\n有些则直接用\n,这些不统一给编程造成了一些麻烦。因此在Libevent中定义了一个枚举类型,专门来用表示eol(end of line)的。

  • EVBUFFER_EOL_LF:行尾是’\n’字符
  • EVBUFFER_EOL_CRLF_STRICT:行尾是”\r\n”,一个回车符一个换行符
  • EVBUFFER_EOL_CRLF:行尾是”\r\n”或者’\n’。这个是很有用的,因为可能标准的协议里面要求”\r\n”,但一些不遵循标准的用户可能使用’\n’
  • EVBUFFER_EOL_ANY:行尾是任意次序或者任意数量的’\r’或者’\n’。这种格式不是很有用,只是用来向后兼容而已

         函数evbuffer_readln是用来读取evbuffer中的一行数据(不会读取行尾符号)。

enum evbuffer_eol_style {
	EVBUFFER_EOL_ANY,
	EVBUFFER_EOL_CRLF,
	EVBUFFER_EOL_CRLF_STRICT,
	EVBUFFER_EOL_LF
};
//成功返回读取到的一行数据。否则返回NULL。该行数据会自动加上'\0'结尾
//如果n_read_out不为NULL,则被赋值为读取到的一行的字符数
char *
evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
		enum evbuffer_eol_style eol_style)
{
	struct evbuffer_ptr it;
	char *line;
	size_t n_to_copy=0, extra_drain=0;
	char *result = NULL;
	EVBUFFER_LOCK(buffer);
	if (buffer->freeze_start) {
		goto done;
	}
	//根据eol_style行尾类型找到行尾。返回值的位置偏移量就指向那个行尾符号
	//行尾符号前面的evbuffer数据就是一行的内容。extra_drain指明这个行尾
	//有多少个字符。后面需要把这个行尾符号删除,方便以后再次读取一行
	it = evbuffer_search_eol(buffer, NULL, &extra_drain, eol_style);
	if (it.pos < 0)
		goto done;
	n_to_copy = it.pos;//并不包括换行符
	if ((line = mm_malloc(n_to_copy+1)) == NULL) {
		event_warn("%s: out of memory", __func__);
		goto done;
	}
	//复制并删除n_to_copy字节
	evbuffer_remove(buffer, line, n_to_copy);
	line[n_to_copy] = '\0';
	//extra_drain指明是行尾符号占有的字节数。现在要删除之
	evbuffer_drain(buffer, extra_drain);
	result = line;
done:
	EVBUFFER_UNLOCK(buffer);
	if (n_read_out)
		*n_read_out = result ? n_to_copy : 0;
	return result;
}

        在函数内部会申请空间,并且从evbuffer中提取出一行数据。下面看看函数evbuffer_search_eol是怎么实现的。该函数执行查找行尾的工作,它在内部区分4种不同的行尾类型。

struct evbuffer_ptr
evbuffer_search_eol(struct evbuffer *buffer,
    struct evbuffer_ptr *start, size_t *eol_len_out,
    enum evbuffer_eol_style eol_style)
{
	struct evbuffer_ptr it, it2;
	size_t extra_drain = 0;
	int ok = 0;
	EVBUFFER_LOCK(buffer);//加锁
	if (start) {
		memcpy(&it, start, sizeof(it));
	} else {//从头开始找
		it.pos = 0;
		it._internal.chain = buffer->first;
		it._internal.pos_in_chain = 0;
	}
	switch (eol_style) {
	case EVBUFFER_EOL_ANY:
		if (evbuffer_find_eol_char(&it) < 0)
			goto done;
		memcpy(&it2, &it, sizeof(it));
		//该case的就是寻找在最前面的任意数量的\r和\n。
		//evbuffer_strspn返回第一个不是\r或者\n的下标。
		//此时extra_drain前面的都是\r或者\n。直接删除即可
		extra_drain = evbuffer_strspn(&it2, "\r\n");
		break;
	case EVBUFFER_EOL_CRLF_STRICT: {//\r\n
		it = evbuffer_search(buffer, "\r\n", 2, &it);
		if (it.pos < 0)//没有找到
			goto done;
		extra_drain = 2;
		break;
	}
	case EVBUFFER_EOL_CRLF://\n或者\r\n
		while (1) {//这个循环的一个chain一个chain地检查的
			//从it指向的evbuffer_chain开始往链表后面查找\n和\r。
			//找到两个中的一个即可。两个都有就优先查找\n。
			//会修改it的内容,使得it指向查找到的位置。查找失败返回-1。
			//如果一个evbuffer_chain有\n或者\r就马上返回。
			if (evbuffer_find_eol_char(&it) < 0)
				goto done;
			if (evbuffer_getchr(&it) == '\n') {//获取对应位置的字符
				extra_drain = 1;
				break;
			} else if (!evbuffer_ptr_memcmp(
				    buffer, &it, "\r\n", 2)) {//如果刚才找到的是\r就再测测是不是\r\n
				extra_drain = 2;
				break;
			} else {
				//\r的后面不是\n,此时跳过\r,继续查找
				if (evbuffer_ptr_set(buffer, &it, 1,
					EVBUFFER_PTR_ADD)<0)
					goto done;
			}
		}
		break;
	case EVBUFFER_EOL_LF://\n
		if (evbuffer_strchr(&it, '\n') < 0)//没有找到
			goto done;
		extra_drain = 1;
		break;
	default:
		goto done;
	}
	ok = 1;
done:
	EVBUFFER_UNLOCK(buffer);
	if (!ok) {
		it.pos = -1;
	}
	if (eol_len_out)
		*eol_len_out = extra_drain;
	return it;
}

回调函数:

        evbuffer有一个回调函数队列成员callbacks,向evbuffer删除或者添加数据时,就会调用这些回调函数。之所以是回调函数队列,是因为一个evbuffer是可以添加多个回调函数的,而且同一个回调函数可以被添加多次。

        使用回调函数时有一点要注意:因为当evbuffer被添加或者删除数据时,就会调用这些回调函数,所以在回调函数里面不要添加或者删除数据,不然将导致递归,死循环。

        evbuffer的回调函数对bufferevent来说是非常重要的,bufferevent的一些重要功能都是基于evbuffer的回调函数完成的。

回调相关结构体:

//buffer.h文件
struct evbuffer_cb_info {
	//添加或者删除数据之前的evbuffer有多少字节的数据
	size_t orig_size;
	size_t n_added;//添加了多少数据
	size_t n_deleted;//删除了多少数据
	//因为每次删除或者添加数据都会调用回调函数,所以上面的三个成员只能记录从上一次
	//回调函数被调用后,到本次回调函数被调用这段时间的情况。
};
//两个回调函数类型
typedef void (*evbuffer_cb_func)(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg);
//buffer_compat.h文件。这个类型的回调函数已经不被推荐使用了
typedef void (*evbuffer_cb)(struct evbuffer *buffer, size_t old_len, size_t new_len, void *arg);
//evbuffer-internal.h文件
//内部结构体,结构体成员对用户透明
struct evbuffer_cb_entry {
	/** Structures to implement a doubly-linked queue of callbacks */
	TAILQ_ENTRY(evbuffer_cb_entry) next;
	/** The callback function to invoke when this callback is called.
	    If EVBUFFER_CB_OBSOLETE is set in flags, the cb_obsolete field is
	    valid; otherwise, cb_func is valid. */
	union {//哪个回调类型。一般都是evbuffer_cb_func
		evbuffer_cb_func cb_func;
		evbuffer_cb cb_obsolete;
	} cb;
	void *cbarg;//回调函数的参数
	ev_uint32_t flags;//该回调的标志
};
struct evbuffer {
	...
	//可以添加多个回调函数。所以需要一个队列存储
	TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
};

设置回调函数:

        下面看一下怎么设置回调函数。

struct evbuffer_cb_entry *
evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg)
{
	struct evbuffer_cb_entry *e;
	if (! (e = mm_calloc(1, sizeof(struct evbuffer_cb_entry))))
		return NULL;
	EVBUFFER_LOCK(buffer);//加锁
	e->cb.cb_func = cb;
	e->cbarg = cbarg;
	e->flags = EVBUFFER_CB_ENABLED;//标志位, 允许回调
	TAILQ_INSERT_HEAD(&buffer->callbacks, e, next);
	EVBUFFER_UNLOCK(buffer);//解锁
	return e;
}

        参数cbarg就是回调函数被调用时的那个arg参数,这点对于熟悉Libevent的读者应该不难理解。上面这个函数是被一个evbuffer_cb_entry结构体指针插入到callbacks队列的前面,有关TAILQ_HEAD队列和相关的插入操作可以参考博文《TAILQ_QUEUE队列》。

        上面函数返回一个evbuffer_cb_entry结构体指针。用户可以利用这个返回的结构体作一些处理,因为这个结构体已经和添加的回调函数绑定了。比如可以设置这个回调函数的标志值。或者利用这个结构体指针作为标识,从队列中找到这个回调函数并删除之。如下面代码所示:

int //设置标志
evbuffer_cb_set_flags(struct evbuffer *buffer,
		      struct evbuffer_cb_entry *cb, ev_uint32_t flags)
{
	/* the user isn't allowed to mess with these. */
	flags &= ~EVBUFFER_CB_INTERNAL_FLAGS;
	EVBUFFER_LOCK(buffer);
	cb->flags |= flags;
	EVBUFFER_UNLOCK(buffer);
	return 0;
}
int //清除某个标志
evbuffer_cb_clear_flags(struct evbuffer *buffer,
		      struct evbuffer_cb_entry *cb, ev_uint32_t flags)
{
	/* the user isn't allowed to mess with these. */
	flags &= ~EVBUFFER_CB_INTERNAL_FLAGS;
	EVBUFFER_LOCK(buffer);
	cb->flags &= ~flags;
	EVBUFFER_UNLOCK(buffer);
	return 0;
}
int //从队列中删除这个回调函数
evbuffer_remove_cb_entry(struct evbuffer *buffer,
			 struct evbuffer_cb_entry *ent)
{
	EVBUFFER_LOCK(buffer);
	TAILQ_REMOVE(&buffer->callbacks, ent, next);
	EVBUFFER_UNLOCK(buffer);
	mm_free(ent);
	return 0;
}
int //根据用户设置的回调函数和回调参数这两个量 从队列中删除
evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg)
{
	struct evbuffer_cb_entry *cbent;
	int result = -1;
	EVBUFFER_LOCK(buffer);
	TAILQ_FOREACH(cbent, &buffer->callbacks, next) {
		if (cb == cbent->cb.cb_func && cbarg == cbent->cbarg) {
			result = evbuffer_remove_cb_entry(buffer, cbent);
			goto done;
		}
	}
done:
	EVBUFFER_UNLOCK(buffer);
	return result;
}
//Libevent还是提供了一个删除所有回调函数的接口
static void
evbuffer_remove_all_callbacks(struct evbuffer *buffer)
{
	struct evbuffer_cb_entry *cbent;
	while ((cbent = TAILQ_FIRST(&buffer->callbacks))) {
	    TAILQ_REMOVE(&buffer->callbacks, cbent, next);
	    mm_free(cbent);
	}
}

        前面的代码展示了两个回调函数的类型,分别是evbuffer_cb_func和evbuffer_cb。后者在回调的时候可以得知删除或者添加数据之前的数据量和之后的数据量。但这两个数值都可以通过evbuffer_cb_info获取。所以evbuffer_cb回调类型的优势没有了。此外,还有一个问题。一般是通过evbuffer_setcb函数设置evbuffer_cb类型的回调函数。而这个函数会先删除之前添加的所有回调函数。

void
evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg)
{
	EVBUFFER_LOCK(buffer);
	if (!TAILQ_EMPTY(&buffer->callbacks))
		evbuffer_remove_all_callbacks(buffer);//清空之前的回调函数
	if (cb) {
		struct evbuffer_cb_entry *ent =
		    evbuffer_add_cb(buffer, NULL, cbarg);
		ent->cb.cb_obsolete = cb;
		ent->flags |= EVBUFFER_CB_OBSOLETE;
	}
	EVBUFFER_UNLOCK(buffer);
}

        可以看到,evbuffer_setcb为标志位flags加上了EVBUFFER_CB_OBSOLETE属性。从名字可以看到这是一个已经过时的属性。其实evbuffer_setcb已经被推荐使用了。

Libevent如何调用回调函数:

        下面看一下是怎么调用回调函数的。其实现也简单,直接遍历回调队列,然后依次调用回调函数。

static void //在evbuffer_add中调用的该函数,running_deferred为0
evbuffer_run_callbacks(struct evbuffer *buffer, int running_deferred)
{
	struct evbuffer_cb_entry *cbent, *next;
	struct evbuffer_cb_info info;
	size_t new_size;
	ev_uint32_t mask, masked_val;
	int clear = 1;
	if (running_deferred) {
		mask = EVBUFFER_CB_NODEFER|EVBUFFER_CB_ENABLED;
		masked_val = EVBUFFER_CB_ENABLED;
	} else if (buffer->deferred_cbs) {
		mask = EVBUFFER_CB_NODEFER|EVBUFFER_CB_ENABLED;
		masked_val = EVBUFFER_CB_NODEFER|EVBUFFER_CB_ENABLED;
		/* Don't zero-out n_add/n_del, since the deferred callbacks
		   will want to see them. */
		clear = 0;
	} else { //一般都是这种情况
		mask = EVBUFFER_CB_ENABLED;
		masked_val = EVBUFFER_CB_ENABLED;
	}
	if (TAILQ_EMPTY(&buffer->callbacks)) {//用户没有设置回调函数
		//清零
		buffer->n_add_for_cb = buffer->n_del_for_cb = 0;
		return;
	}
	//没有添加或者删除数据
	if (buffer->n_add_for_cb == 0 && buffer->n_del_for_cb == 0)
		return;
	new_size = buffer->total_len;
	info.orig_size = new_size + buffer->n_del_for_cb - buffer->n_add_for_cb;
	info.n_added = buffer->n_add_for_cb;
	info.n_deleted = buffer->n_del_for_cb;
	if (clear) {//清零,为下次计算做准备
		buffer->n_add_for_cb = 0;
		buffer->n_del_for_cb = 0;
	}
	//遍历回调函数队列,调用回调函数
	for (cbent = TAILQ_FIRST(&buffer->callbacks);
	     cbent != TAILQ_END(&buffer->callbacks);
	     cbent = next) {
		next = TAILQ_NEXT(cbent, next);
		//该回调函数没有enable
		if ((cbent->flags & mask) != masked_val)
			continue;
		if ((cbent->flags & EVBUFFER_CB_OBSOLETE))//已经不被推荐使用了
			cbent->cb.cb_obsolete(buffer,
			    info.orig_size, new_size, cbent->cbarg);
		else
			cbent->cb.cb_func(buffer, &info, cbent->cbarg);//调用用户设置的回调函数
	}
}

        无论是删除数据还是添加数据的函数,例如evbuffer_add和evbuffer_drain函数,都是会调用evbuffer_invoke_callbacks函数的。而这个函数会调用evbuffer_run_callbacks函数。

evbuffer与网络IO:

从Socket中读取数据:

        Libevent通过evbuffer_read函数从一个socket中读取数据到evbuffer中。在读取socket数据之前,Libevent会调用ioctl函数来获取这个socket的读缓冲区中有多少字节,进而确定本次要读多少字节到evbuffer中。Libevent会根据要读取的字节数,在真正read之前会先把evbuffer扩容,免得在read的时候缓冲区不够。

        在扩容的时候,如果所在的系统是支持类似readv这样的可以把数据读取到像一个iovec结构体函数,那么Libevent就会选择在n个evbuffer_chain中找到足够的空闲空间(往往通过申请堆空间),因为这样可以使用类似Linux的iovec结构体。把链表的各个evbuffer_chain的空闲空间的地址赋值给iovec数组(如下图所示),然后调用readv函数直接读取,readv会把数据读取到相应的chain中。

        

        上图中,有一点是和Libevent的实际情况不符合的。在evbuffer_read中,最后的那个几个evbuffer_chain一般是没有数据的,只有空闲的区域。上图为了好看,就加上了data这区域。

        将链表的各个evbuffer_chain的空闲空间的地址赋值给iovec数组,这个操作是由函数_evbuffer_read_setup_vecs完成的。

//让vecs数组的指针指向evbuffer中的可用chain.标明哪个chain可用并且从chain的哪里开始,以及可用的字节数
//howmuch是要扩容的大小。vecs、n_vecs_avail分别是iovec数组和数组的大小
//chainp是值-结果参数,它最后指向第一个有可用空间的chain
int 
_evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
    struct evbuffer_iovec *vecs, int n_vecs_avail,
    struct evbuffer_chain ***chainp, int exact)
{
	struct evbuffer_chain *chain;
	struct evbuffer_chain **firstchainp;
	size_t so_far;
	int i;
	if (howmuch < 0)
		return -1;
	so_far = 0;
	//因为找的是evbuffer链表中的空闲空间,所以从最后一个有数据的chain中开始找
	firstchainp = buf->last_with_datap;
	if (CHAIN_SPACE_LEN(*firstchainp) == 0) {//这个chain已经没有空间了
		firstchainp = &(*firstchainp)->next;//那么只能下一个chain了
	}
	//因为Libevent在调用本函数之前,一般会调用_evbuffer_expand_fast来扩大
	//evbuffer的可用空间。所以下面的循环中并没有判断chain是否为NULL,就直接
	//chain->next
	chain = *firstchainp;
	for (i = 0; i < n_vecs_avail && so_far < (size_t)howmuch; ++i) {
		size_t avail = (size_t) CHAIN_SPACE_LEN(chain);
		//如果exact为真,那么即使这个chain有更多的可用空间,也不会使用。只会
		//要自己正需要的空间
		if (avail > (howmuch - so_far) && exact)
			avail = howmuch - so_far;
		vecs[i].iov_base = CHAIN_SPACE_PTR(chain);//这个chain的可用空间的开始位置
		vecs[i].iov_len = avail;//可用长度
		so_far += avail;
		chain = chain->next;
	}
	*chainp = firstchainp; //指向第一个有可用空间的chain
	return i;//返回需要多少个chain才能有howmuch这么多的空闲空间
}

        在Windows系统,虽然没有readv函数,但它有WSARecv函数,可以把数据读取到一个类似iovec的结构体中,所以在Windows系统中,Libevent还是选择在n个evbuffer_chain中找到足够的空闲空间。所以在Libevent中有下面的宏定义:

//buffer.c文件
	//sys/uio.h文件定义了readv函数
#if defined(_EVENT_HAVE_SYS_UIO_H) || defined(WIN32)
#define USE_IOVEC_IMPL //该宏标志所在的系统支持类似readv的函数
#endif
//Windows系统定义了下面结构体
	typedef struct __WSABUF {
  		u_long   len;
  		char FAR *buf;
} WSABUF, *LPWSABUF;

        如果所在的系统不支持类似readv这样的函数,那么Libevent就只能在一个evbuffer_chain申请一个足够大的空间,然后直接调用read函数了。

        前面说到的扩容,分别是由函数_evbuffer_expand_fast和函数evbuffer_expand_singlechain完成的。在《evbuffer结构与基本操作》一文中已经有对这两个函数的介绍,这里就不多说了。

        由于存在是否支持类似readv函数 这两种情况,所以evbuffer_read在实现上也出现了两种实现。

        上面说了这么多,还是来看一下evbuffer_read的具体实现吧。
//buffer.c文件
//返回读取到的字节数。错误返回-1,断开了连接返回0
int //howmuch指出此时evbuffer可以使用的空间大小
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
	struct evbuffer_chain **chainp;
	int n;
	int result;
#ifdef USE_IOVEC_IMPL //所在的系统支持iovec或者是Windows操作系统
	int nvecs, i, remaining;
#else
	struct evbuffer_chain *chain;
	unsigned char *p;
#endif
	EVBUFFER_LOCK(buf);//加锁
	//冻结缓冲区尾部,禁止在尾部追加数据
	if (buf->freeze_end) {
		result = -1;
		goto done;
	}
	//获取这个socket接收缓冲区里面有多少字节可读.通过ioctl实现
	n = get_n_bytes_readable_on_socket(fd);
	if (n <= 0 || n > EVBUFFER_MAX_READ)//每次只读EVBUFFER_MAX_READ(4096)个字符
		n = EVBUFFER_MAX_READ;
	if (howmuch < 0 || howmuch > n)
		howmuch = n;
#ifdef USE_IOVEC_IMPL //所在的系统支持iovec或者是Windows操作系统
	 //NUM_READ_IOVEC等于4
	//扩大evbuffer,使得其有howmuch字节的空闲空间
	//在NUM_READ_IOVEC个evbuffer_chain中扩容足够的空闲空间
	if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
		result = -1;
		goto done;
	} else {
		//在posix中IOV_TYPE为iovec,在Windows中为WSABUF
		IOV_TYPE vecs[NUM_READ_IOVEC];
#ifdef _EVBUFFER_IOVEC_IS_NATIVE //所在的系统支持iovec结构体
		nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
		    NUM_READ_IOVEC, &chainp, 1);
#else //Windows系统。因为没有native的 iovec
		//在Windows系统中,evbuffer_iovec定义得和posix中的iovec一样.
		//因为_evbuffer_read_setup_vecs函数只接受像iovec那样结构体的参数
		struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
		nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
		    &chainp, 1);
		//因为在Windows中,需要使用WSABUF结构体读取数据,所以要从evbuffer_iovec
		//中将一些值提出出来,放到vecs中
		for (i=0; i < nvecs; ++i)
			WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
#endif
//完成把数据从socket fd中读取出来
#ifdef WIN32
		{
			DWORD bytesRead;
			DWORD flags=0;
			//虽然Windows支持类似readv的函数,但Windows没有readv函数,只有下面的函数
			if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {
				if (WSAGetLastError() == WSAECONNABORTED)
					n = 0;
				else
					n = -1;
			} else
				n = bytesRead;
		}
#else
		n = readv(fd, vecs, nvecs);//POSIX
#endif
	}
//如果所在的系统不支持 iovec并且不是Windows系统。也就是说不支持类似
//readv这样的函数。那么只能把所有的数据都读到一个chain中
#else /*!USE_IOVEC_IMPL*/ 
	//把一个chain扩大得可以有howmuch字节的空闲空间
	if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {
		result = -1;
		goto done;
	}
	p = chain->buffer + chain->misalign + chain->off;
//读取数据
#ifndef WIN32
	n = read(fd, p, howmuch);
#else
	n = recv(fd, p, howmuch, 0);
#endif
#endif /* USE_IOVEC_IMPL *///终止前面的if宏
	if (n == -1) {//错误
		result = -1;
		goto done;
	}
	if (n == 0) {//断开了连接
		result = 0;
		goto done;
	}
#ifdef USE_IOVEC_IMPL//使用了iovec结构体读取数据。需要做一些额外的处理
	//chainp是由_evbuffer_read_setup_vecs函数调用得到。它指向未从fd读取数据时
	//第一个有空闲空间位置的chain
	remaining = n;//n等于读取到的字节数
	//使用iovec读取数据时,只是把数据往chain中填充,并没有修改evbuffer_chain
	//的成员,比如off偏移量成员。此时就需要把这个off修改到正确值
	for (i=0; i < nvecs; ++i) {
		//CHAIN_SPACE_LEN(*chainp)返回的是填充数据前的空闲空间。
		//除了最后那个chain外,其他的chain都会被填满的。所以对于非last
		//chain,直接把off加上这个space即可。
		ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
		if (space < remaining) {//前面的chain
			(*chainp)->off += space;
			remaining -= (int)space;
		} else {//最后那个chain
			(*chainp)->off += remaining;
			buf->last_with_datap = chainp;//指向最后一个有数据的chain
			break;
		}
		chainp = &(*chainp)->next;
	}
#else
	chain->off += n;
	//调整last_with_datap,使得*last_with_datap指向最后一个有数据的chain
	advance_last_with_data(buf);
#endif
	buf->total_len += n;
	buf->n_add_for_cb += n;//添加了n字节
	/* Tell someone about changes in this buffer */
	evbuffer_invoke_callbacks(buf);//evbuffer添加了数据,就需要调用回调函数
	result = n;
done:
	EVBUFFER_UNLOCK(buf);
	return result;
}

往socket写入数据:

        因为evbuffer是用链表的形式存放数据,所以要把这些链表上的数据写入socket,那么使用writev这个函数是十分有效的。同前面一样,使用iovec结构体数组,就需要设置数组元素的指针。这个工作由evbuffer_write_iovec函数完成。

        正如前面的从socket读出数据,可能所在的系统并不支持writev这样的函数。此时就只能使用一般的write函数了,但这个函数要求数据放在一个连续的空间。所以Libevent有一个函数evbuffer_pullup,用来把链表内存拉直,即把一定数量的数据从链表中copy到一个连续的内存空间。这个连续的空间也是由某个evbuffer_chain的buffer指针指向,并且这个evbuffer_chain会被插入到链表中。这个时候就可以直接使用write或者send函数发送这特定数量的数据了。

        不同于读,写操作还有第三种可能。那就是sendfile。如果所在的系统支持sendfile,并且用户是通过evbuffer_add_file添加数据的,那么此时Libevent就是所在系统的sendfile函数发送数据。

        Libevent内部一般通过evbuffer_write函数把数据写入到socket fd中。下面是具体的实现。

//buffer.c文件
int
evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd)
{
	//把evbuffer的所有数据都写入到fd中
	return evbuffer_write_atmost(buffer, fd, -1);
}
int//howmuch是要写的字节数。如果小于0,那么就把buffer里的所有数据都写入fd
evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
    ev_ssize_t howmuch)
{
	int n = -1;
	EVBUFFER_LOCK(buffer);
	//冻结了链表头,无法往fd写数据。因为写之后,还要把数据从evbuffer中删除
	if (buffer->freeze_start) {
		goto done;
	}
	if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
		howmuch = buffer->total_len;
	if (howmuch > 0) {
#ifdef USE_SENDFILE //所在的系统支持sendfile
		struct evbuffer_chain *chain = buffer->first;
		//需通过evbuffer_add_file添加数据,才会使用sendfile
		if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE)) //并且要求使用sendfile
			n = evbuffer_write_sendfile(buffer, fd, howmuch);
		else {
#endif
#ifdef USE_IOVEC_IMPL //所在的系统支持writev这类函数
		//函数内部会设置数组元素的成员指针,以及长度成员
		n = evbuffer_write_iovec(buffer, fd, howmuch);
#elif defined(WIN32)
		/* XXX(nickm) Don't disable this code until we know if
		 * the WSARecv code above works. */
		//把evbuffer前面的howmuch字节拉直。使得这howmuch字节都放在一个chain里面
		//也就是放在一个连续的空间,不再是之前的多个链表节点。这样就能直接用
		//send函数发送了。
		void *p = evbuffer_pullup(buffer, howmuch);
		n = send(fd, p, howmuch, 0);
#else
		void *p = evbuffer_pullup(buffer, howmuch);
		n = write(fd, p, howmuch);
#endif
#ifdef USE_SENDFILE
		}
#endif
	}
	if (n > 0)
		evbuffer_drain(buffer, n);//从链表中删除已经写入到socket的n个字节
done:
	EVBUFFER_UNLOCK(buffer);
	return (n);
}

参考:

        http://www.wangafu.net/~nickm/libevent-book/Ref7_evbuffer.html

你可能感兴趣的内容
0条评论

dexcoder

这家伙太懒了 <( ̄ ﹌  ̄)>
Owner