向eventrpc 客户端加入异步/同步操作

之前的实现中,使用一个单独的Dispatcher类,用于分发网络事件,但是这个类也是在主线程中的,这样带来的问题,诚如上一篇文章说的那样, 不能支持异步事件的通知.对客户端而言,当发出一个请求时,必须阻塞等待回复的返回.

于是,今天抽出时间,将Dispatcher类实现为与线程绑定,它运行在副线程中.
由于Dispatcher类中,实际上封装的是epoll相关的操作,那么紧跟着的问题就是,如何在类似epoll/select之类的操作中加入对多线程的支持?比如当前线程正在进行poll 事件操作,如何向其中加入新的事件.

其实这样的问题之前遇到过,比如在这篇文章中提到的那样.这种做法也是比较通用的做法.
然而,我想了一想,这样做也有问题:
1) 实际上并没有将这个添加新事件的操作变成lock-free操作.也就是说,无论如何,添加新事件的时候加锁总是少不了的.
2) 每次添加新的事件之后,都需要向notify fd进行写操作,而实际上,这样的操作是一个系统调用,代价还是不小的.
所以,既然加锁不可避免,为何还要多此一举的对notify fd写一个数据,用于通知有新的事件添加进来,而不是简单的使用类似标志位的做法呢?
另外,我还做了另一个改进.使用两个队列,然后使用两个指针,一个是保存已经添加到epoll的事件,另一个保存还未添加epoll的事件,分别指向两个队列.
这两个指针的名字分别是:current_operate_events_, waiting_operate_events_
当往Dispatcher中添加新的事件时,一律往waiting_operate_events_中写入,而当走到poll主循环的之后,需要将waiting_operate_events_中的事件放入到current_operate_events_中,这个操作看上去是一个复制操作,而实际上可以通过切换指针指向解决:

int Dispatcher::OperateEvents() {
  {
    SpinMutexLock lock(&spin_mutex_);
    EventVector *tmp_event_vector = current_operate_events_;
    current_operate_events_ = waiting_operate_events_;
    waiting_operate_events_ = tmp_event_vector;
  }

  EventEntry *event_entry;
  Event *event;
  for (EventVector::iterator iter = current_operate_events_->begin();
       iter != current_operate_events_->end(); ++iter) {
    event_entry = *iter;
    event = event_entry->event;
    switch (event_entry->event_operation_type) {
      case EVENT_OPERATION_ADD:
        if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD,
                  event->fd_, &(event_entry->epoll_ev)) != 0) {
          LOG_ERROR() << "epoll_ctl for fd " << event->fd_ << " error";
        }
        break;
      case EVENT_OPERATION_DELETE:
        retired_events_.push_back(event_entry);
        break;
      case EVENT_OPERATION_MODIFY:
        if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD,
                      event->fd_, &(event_entry->epoll_ev)) != 0) {
          LOG_ERROR() << "epoll_ctl for fd " << event->fd_ << " error";
        }
        break;
      default:
        break;
    }
  }
  current_operate_events_->clear();
  return 0;
}

这样就迅速的完成了操作新事件的操作,而不需要复制队列操作,仅仅是切换指针的代价,效率提高不少.

实际上这个算法并不新鲜,Linux内核中管理进程时就用到了类似的算法.

解决了Dispatcher使用线程来实现的问题之后,回到最开始的问题:如何使客户端的请求响应异步化?
比如Echo这个service, 产生的代码是:

class EchoService_Stub : public EchoService {
//...
  void Echo(::google::protobuf::RpcController* controller,
                       const ::echo::EchoRequest* request,
                       ::echo::EchoResponse* response,
                       ::google::protobuf::Closure* done);
};

其中的第一个参数:RpcController, 按照官方文档的说法,是可以用来做异步化操作的.但是看了一下提供的虚函数接口,感觉不是这么回事儿.于是在我的实现中,这个参数一律被无视值为NULL.
这一次引入了一个称为Monitor的类:

class Monitor {
 public:
  Monitor();

  ~Monitor();

  void TimeWait(int64 timeout_ms) const;

  void Wait() const;

  void Notify() const;

  void NotifyAll() const;

 private:
  mutable pthread_mutex_t pthread_mutex_;
  mutable pthread_cond_t pthread_cond_;
};

因此,如果要将操作同步化,可以在回调函数中传入Monitor类对象,在回调函数中调用Notify通知事件已经处理完成,而在外部调用Wait操作等待被Notify,比如:

void echo_done(echo::EchoResponse* resp,
               Monitor *monitor) {
  printf("response: %s\n", resp->response().c_str());
  monitor->Notify();
}

int main() {
  Dispatcher dispatcher;
  RpcChannel channel("127.0.0.1", 21118, &dispatcher);
  dispatcher.Start();
  if (!channel.Connect()) {
    printf("connect to server failed, abort\n");
    exit(-1);
  }
  echo::EchoService::Stub stub(&channel);
  echo::EchoRequest request;
  echo::EchoResponse response;
  request.set_message("hello");
  Monitor monitor;
  stub.Echo(NULL, &request, &response,
            gpb::NewCallback(::echo_done, &response, &monitor));
  monitor.Wait();
  channel.Close();
  dispatcher.Stop();

  return 0;
}

而如果需要异步化,就可以不使用Monitor了,但是这样,需要程序员自己去保证数据的同步,就不是这里讨论的范围了.

写下这篇文章时,对应代码的svn revision是143

4 Comments

  1. 站长工具 说:

    博主,兔年快乐!

    [回复]

  2. 匿名 说:

    你的代码风格实在不敢恭维,那大括号写在末尾,实在。。。

    [回复]

  3. 匿名 说:

    @匿名
    大括号写在末尾??
    这个有问题吗?

    [回复]

  4. 匿名 说:

    代码怎么编译呢?

    [回复]

Leave a Reply