首页 「网络编程101」事件驱动,有模有样!
文章
取消

「网络编程101」事件驱动,有模有样!

开始基于事件编程

有了pollepoll提供的事件通知机制,我们可以开始基于事件驱动编程了!基于事件驱动,我们可以构建出一个简单的网络编程框架,常见的设计模式包括Reactor模式。

Reactor模式

意会一下Reactor模式的名字,我们可以感受到这个设计模式的核心是“对事件作出反应”。例如,当有读事件(文件描述符可读)产生时,我们开始读取;有写事件产生时,开始写入。进程在没有事件发生时处于挂起状态,只有事件产生后才根据业务逻辑作出相应的反应,这就是事件驱动的Reactor模式,可以由下图表示:

-16355739363901 单线程的Reactor模式1

图中的acceptor专门用于查询连接事件的产生,在有连接事件时创建新的连接,并通知dispatcher监听该连接上的读写事件。dispatcher负责检查并分发事件,实现时可以用pollepoll查询事件产生。

小试牛刀

让我们来改写之前实现的echo_epoll_server,实现一个基于epoll的简单reactor。

回调函数

既然要对事件作出“反应”,我们就需要编写相应的函数,以便在被通知事件发生时及时处理,这样的函数成为回调函数(callback function)。对于我们的echo server,我们只需要处理两种事件即可:连接建立事件,以及读事件。在acceptor检测到有来自客户端的连接时,调用accept_callback处理新连接。在检测到客户端向我们发送数据时,调用read_callback,将数据读出并返回给客户端。

具体而言,accept_callback要与epoll_ctl交互,让epoll帮我们检查新来的连接conn_fd上是否有读写事件。read_callback中可编写业务逻辑,当检测到连接关闭时,通知epoll实例之后不要在检测这个连接上的事件了。

Channel

有了回调函数,我们可以通过epoll_wait查询发生的事件,并调用相关回调函数作出“反应”。用户可以在epoll_eventdata字段中存储fd,获取事件对应的文件描述符。然而,文件描述符多种多样,可能是listen_fd,也可能是conn_fd,如何通过文件描述符,找到与之关联的数据处理函数,并在事件到来时进行处理?

只知道文件描述符的数值是不够的,我们还需要额外信息维护文件描述符的类型及其回调函数。因此,我们引入Channel类,记录这些信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
struct Channel {
    int fd;
    function<void(int, int)> read_callback;

    void on_event(uint32_t events, int efd) {
        if ((events & EPOLLIN) && (read_callback != nullptr)) {
            read_callback(fd, efd);
        }
    }
};

void read_callback(int conn_fd, int efd) {
    TcpConnection conn{conn_fd};
    string msg;
    if (!(msg = conn.blocking_receive_line()).empty()) {
        cout << "server received: " << msg;
        conn.blocking_send(msg);
    } else {
        struct epoll_event event{};
        epoll_ctl(efd, EPOLL_CTL_DEL, conn_fd, &event);
        conn.close();
        cout << "server closed conn_fd: " << conn_fd << endl;
    }
}

void accept_callback(int listen_fd, int efd) {
    TcpListener listener{listen_fd};
    TcpConnection conn = listener.accept();
    conn.set_nonblocking();
    int conn_fd = conn.conn_fd();

    struct epoll_event event{};

    event.events = EPOLLIN | EPOLLET;
    struct Channel *channel = new Channel{conn_fd, read_callback};
    event.data.ptr = channel;
    if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_fd, &event) == -1) {
        cerr << "epoll_ctl add failed: " << strerror(errno) << endl;
        exit(1);
    }
}

由于data可以存储指针,我们直接在里面存放Channel结构体的指针,这样就可以在事件返回时重新找到事件对应的Channel,然后用on_event处理事件。当然,直接操作指针是危险的,上面的程序已经有内存泄漏了,体现在read_callback中关闭连接时,并没有同时释放连接对应的Channel。在这里我们只是用偷懒的方式演示一个简单的reactor模式的实现。

main函数中的循环如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const int MAX_EVENTS = 128;

int main() {
    int efd;

    struct epoll_event event{};
    int n;
    vector<struct epoll_event> events(MAX_EVENTS);
    TcpListener listener;
    listener.listen(8888, true);

    efd = epoll_create1(EPOLL_CLOEXEC);
    if (efd == -1) {
        cerr << "epoll create failed: " << strerror(errno) << endl;
        exit(1);
    }

    event.events = EPOLLIN | EPOLLET;
    struct Channel *acceptor = new Channel{listener.listen_fd(), accept_callback};
    event.data.ptr = acceptor;

    if (epoll_ctl(efd, EPOLL_CTL_ADD, listener.listen_fd(), &event) == -1) {
        cerr << "epoll_ctl add failed: " << strerror(errno) << endl;
        exit(1);
    }

    while (true) {
        n = epoll_wait(efd, events.data(), MAX_EVENTS, -1);
        for (int i = 0; i < n; i++) {
            if (events[i].events & (EPOLLERR | EPOLLHUP) ||
                !(events[i].events & EPOLLIN)) {
                cerr << "epoll error" << endl;
                struct Channel *channel = (Channel *) events[i].data.ptr;
                epoll_ctl(efd, EPOLL_CTL_DEL, channel->fd, &events[i]);
                close(channel->fd);
                delete channel;
            } else {
                if (events[i].events & EPOLLIN) {
                    cout << "epoll in\n";
                }
                struct Channel *channel = (Channel *) events[i].data.ptr;
                channel->on_event(events[i].events, efd);
            }
        }
    }
}

simple_epoll_reactorecho_epoll_server的主要不同在于epoll_eventdata字段存放的是Channel指针,用Channel将文件描述符与回调函数关联,用on_event判断事件类型并选择相应的回调函数执行。在这个例子中,只有读事件的回调函数。之后更复杂的reactor实现中我们还会加入写事件的回调函数。

作为拓展,有兴趣的读者可以思考一下如何实现一个基于poll的简单reactor。

Dispatcher

如果你已经思考了如何用poll实现一个reactor,你会发现它的代码和我们用epoll写的echo server很不一样。为了屏蔽pollepoll的差别,我们可以封装一个Dispatcher类,作为不同事件通知机制的统一交互接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct DispatcherEvent {
    int fd{-1};
    int revents{0};
};

class Dispatcher {
public:
    virtual bool add(const Channel *channel) = 0;
    virtual bool remove(const Channel *channel) = 0;
    virtual bool update(const Channel *channel) = 0;
    virtual std::vector<struct DispatcherEvent> dispatch() = 0;

    const int DISPATCHER_MAX_EVENTS{128};
};

class EpollDispatcher : public Dispatcher { ... };
class PollDispatcher : public Dispatcher { ... };

Dispatcher的操作即对事件的“增删改查”,其输入为Channel类,从中获取要监听的文件描述符fd以及事件(读/写);输出为DispatcherEvent类。为了屏蔽pollepollrevents中事件类型定义的不同,我们重新封装了下面的事件类型,作为DispatcherEventrevents返回:

1
2
3
#define EVENT_ERR   0x01
#define EVENT_READ  0x02
#define EVENT_WRITE 0x04

这样一来,用户就可以根据需要,在实现时自行选择使用哪种dispatcher。

控制反转

在上文简单版Reactor实现中,我们只用了一个Channel结构体同时负责listen_fdconn_fd两种套接字,并用on_event调用它们各自的回调函数。在框架实现时,Channel所需要的状态可能更加复杂。例如,我们希望管理listen_fdChannel同时保存相应的TcpListener,而管理conn_fdChannel能保存相应的TcpConnection,并新增与连接事件有关的回调函数。

对于框架而言,它所知道的只是事件发生时,要作出“反应”,除此之外它都不知道。因此,框架只会傻傻地调用on_event。我们需要在实现不同的Channel时进一步细化on_event的逻辑,这其实就是依赖反转的体现。

为了实现清晰,我们在Channel的基础上新增AcceptorChannelConnectionChannel,管理各自更为复杂的状态和功能,并使用std::bind将自己内部的函数与Channel的回调函数绑定。

至此,我们可以明确这些类的不同职责。Channel是我们的网络编程框架使用的基本类,在框架的实现中,我们规定了何时调用on_event,并且规定了on_event函数的固定流程,即判断传入的事件是否可读可写,并根据Channel实例是否有读写相关的回调函数,对这些函数进行调用。而用户可以在Channel基础上扩展,细化其要管理的状态以及回调函数的行为,以顾及基本框架照顾不到的细节。最终,系统在调用on_event时,表现出用户扩展的Channel的子类的行为,实现控制反转。

AcceptorChannel

我们在AcceptorChannel内部管理一个TcpListener,利用之前封装好的接口与网络套接字交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class AcceptorChannel : public Channel {
public:
    AcceptorChannel() = delete;
    explicit AcceptorChannel(int port) {
        cout << "listening port: " << port << endl;
        listener.listen(port, true);
        _fd = listener.listen_fd();
        _read_callback = std::bind(&AcceptorChannel::handle_connection_established, this);
    }

private:
    vector<ChannelOp> handle_connection_established() {
        TcpConnection conn = listener.accept();
        conn.set_nonblocking();
        vector<ChannelOp> ret;
        ret.push_back({CHANNEL_ADD, CHANNEL_CONN, conn.conn_fd()});
        cout << "AcceptorChannel handle_connection_established: new conn fd=" << conn.conn_fd() << endl;
        return ret;
    }
    TcpListener listener;
};

ConnectionChannel

ConnectionChannel中,我们加入与连接有关的回调函数connection_closed_callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ConnectionChannel : public Channel {
public:
    ConnectionChannel() = delete;
    explicit ConnectionChannel(int fd) {
        cout << "new connection with fd: " << fd << endl;
        conn.set_fd(fd);
        _fd = fd;
        _read_callback = bind(&ConnectionChannel::handle_connection_read, this);
        _write_callback = bind(&ConnectionChannel::handle_connection_write, this);
    }

    std::function<void(TcpConnection&)> message_callback{nullptr};
    std::function<void(TcpConnection&)> write_completed_callback{nullptr};
    std::function<void(TcpConnection&)> connection_closed_callback{nullptr};

private:
    vector<ChannelOp> handle_connection_read() {
        cout << "ConnectionChannel: handle_connection_read\n";
        if (conn.buffer_receive() > 0) {
            if (message_callback != nullptr) {
                message_callback(conn);
            }
        } else {
            conn.buffer_out.end_input();
            return handle_connection_closed();
        }
        return {};
    }

    vector<ChannelOp> handle_connection_write() {
        cout << "ConnectionChannel: handle_connection_write\n";
        conn.buffer_send();
        if (conn.buffer_in.eof()) {
            if (write_completed_callback != nullptr) {
                write_completed_callback(conn);
            }
        }
        return {};
    }

    vector<ChannelOp> handle_connection_closed() {
        cout << "ConnectionChannel: connection closed\n";
        vector<ChannelOp> ret;
        if (connection_closed_callback != nullptr) {
            connection_closed_callback(conn);
        }
        ret.push_back({CHANNEL_REMOVE, CHANNEL_CONN, _fd});
        return ret;
    }
    TcpConnection conn{-1};
};

在上文simple_epoll_reactor的实现中,我们直接在Channel的回调函数里与Dispatcher交互,实现事件的增删。为了将Channel与dispatcher解耦,我们让Channelon_event返回ChannelOp,用ChannelOp表达这次处理完成后是否要向dispatcher中新增要监听的事件或是别的操作。在main函数的循环中,对ChannelOp进行处理。

示例:使用Dispatcher

在有了多层封装后,main函数的循环进一步简化。我们新增了channel_map,用于将DispatcherEvent返回的文件描述符编号与具体的Channel对应起来。此外,针对on_event返回的ChannelOp,也要在循环中处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void on_message(TcpConnection& conn) {
    if (!conn.buffer_out.eof()) {
        string msg = conn.buffer_read_line();
        cout << "server received: " << msg;
        conn.buffer_write(msg);
    }
}

int main() {
    unordered_map<int, unique_ptr<Channel>> channel_map;
    unique_ptr<Dispatcher> dispatcher{new EpollDispatcher()};

    unique_ptr<Channel> acceptor{new AcceptorChannel(8888)};

    dispatcher->add(acceptor.get());
    channel_map[acceptor->fd()] = move(acceptor);

    while (true) {
        vector<DispatcherEvent> events{dispatcher->dispatch()};
        for (auto &event : events) {
            if (!channel_map.count(event.fd)) {
                continue;
            }
            Channel *channel = channel_map[event.fd].get();
            vector<ChannelOp> channel_ops{channel->on_event(event.revents)};
            for (auto &op : channel_ops) {
                if (op.op == CHANNEL_ADD) {
                    if (op.channel_type == CHANNEL_CONN) {
                        ConnectionChannel *channel = new ConnectionChannel(op.fd);
                        channel->message_callback = on_message;
                        dispatcher->add(channel);
                        channel_map[op.fd] = unique_ptr<Channel>(channel);
                        cout << "new connection established: fd=" << op.fd << endl;
                    }
                } else if (op.op == CHANNEL_REMOVE) {
                    Channel *channel = channel_map[op.fd].get();
                    dispatcher->remove(channel);
                    channel_map.erase(op.fd);
                    close(op.fd);
                }
            }
        }
    }

    return 0;
}

总结

本文实现了一个简单的Reactor模式编程框架,基于事件驱动编程,可以获得较高的效率,和较好的可扩展性。我们引入了Channel层抽象,用于管理不同的文件描述符和它们的事件处理逻辑之间的关系。随后,引入了Dispatcher层抽象,用于隔离不同的事件通知机制之间的区别。为了将文件描述符与对应的Channel关联,我们引入了channel_map。最后,我们介绍了控制反转是如何影响框架的具体行为的。通过引入新的抽象层,可以使代码在实现时更清晰简洁,更具有扩展性。

完整代码仓库

「网络编程101」系列全部代码可在我的GitHub代码仓库中查看:Captor: An easy-to-understand Reactor in C++

欢迎提出各类宝贵的修改意见和issues,指出其中的错误和不足!

最后,感谢你读到这里,希望我们都有所收获!

References

本文由作者按照 CC BY 4.0 进行授权

「网络编程101」提升效率,减少等待!

「网络编程101」拼好最后一块积木,多线程Reactor框架!