TypechoJoeTheme

鱼一的博客 ◡̈

yuyi

知不可乎骤得,托遗响于悲风
网站页面
标签搜索
c++

TCP 学习笔记

TCP in handy lib

server

#include <handy/handy.h>

using namespace std;
using namespace handy;

int main(int argc, const char *argv[]) {
    Logger::getLogger().setLogLevel(Logger::LTRACE);
    EventBase base;
    Signal::signal(SIGINT, [&] { base.exit(); });
    // 启动服务端, 绑定地址 host 和对应的端口 port。 保持监听 20
    TcpServerPtr echo = TcpServer::startServer(&base, "", 2099);
    exitif(echo == NULL, "start tcp server failed");
    // 服务端正常运行后,实现 tcp 链接回调函数
    echo->onConnCreate([] {
        TcpConnPtr con(new TcpConn);
        con->onMsg(new LengthCodec, [](const TcpConnPtr &con, Slice msg) {
            info("recv msg: %.*s", (int) msg.size(), msg.data());
            con->sendMsg(msg);
        });
        return con;
    });    // createcb = cb
    base.loop();
    info("program exited");
}

以上为使用 handy 库的简易服务端 tcp 的代码。 其中主要实现了两个关键功能:

  • 正常运行的等待 tcp 请求的服务端程序
  • 在 tcp 请求连接到服务器后的回调函数

正常运行的等待 tcp 请求的服务端程序

int TcpServer::bind(const std::string &host, unsigned short port, bool reusePort) {
    addr_ = Ip4Addr(host, port);
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    int r = net::setReuseAddr(fd);
    fatalif(r, "set socket reuse option failed");
    r = net::setReusePort(fd, reusePort);
    fatalif(r, "set socket reuse port option failed");
    r = util::addFdFlag(fd, FD_CLOEXEC);
    fatalif(r, "addFdFlag FD_CLOEXEC failed");
    // success: 0 ; failed: -1
    r = ::bind(fd, (struct sockaddr *) &addr_.getAddr(), sizeof(struct sockaddr));
    // check 套接字是否成功绑定到指定的本地地址上    非零时会触发 if 语句
    if (r) {
        close(fd);
        error("bind to %s failed %d %s", addr_.toString().c_str(), errno, strerror(errno));
        return errno;
    }
    r = listen(fd, 20);
    fatalif(r, "listen failed %d %s", errno, strerror(errno));
    info("fd %d listening at %s", fd, addr_.toString().c_str());
    listen_channel_ = new Channel(base_, fd, kReadEvent);
    // readcb_ = 
    listen_channel_->onRead([this] { handleAccept(); });
    return 0;
}

TcpServerPtr TcpServer::startServer(EventBases *bases, const std::string &host, unsigned short port, bool reusePort) {
    TcpServerPtr p(new TcpServer(bases));
    int r = p->bind(host, port, reusePort);
    if (r) {
        error("bind to %s:%d failed %d %s", host.c_str(), port, errno, strerror(errno));
    }
    return r == 0 ? p : NULL;
}

这段代码实现与原始 cpp 实现的主要不同在于,其在成功开始监听 20 端口后,创建了一个 listen_channel 事件用于方便处理回调函数。
onRead 回调函数中调用了 handleAccept() 用于处理来自客户端的 Tcp 连接请求。

void TcpServer::handleAccept() {
    struct sockaddr_in raddr; // 定义远程地址结构体
    socklen_t rsz = sizeof(raddr); // 定义远程地址结构体的长度
    int lfd = listen_channel_->fd(); // 获取监听 socket 的文件描述符
    int cfd;
    // 循环接受客户端连接
    while (lfd >= 0 && (cfd = accept(lfd, (struct sockaddr *) &raddr, &rsz)) >= 0) { 
        sockaddr_in peer, local;
        socklen_t alen = sizeof(peer);
        int r = getpeername(cfd, (sockaddr *) &peer, &alen); // 获取客户端地址
        if (r < 0) {
            error("get peer name failed %d %s", errno, strerror(errno));
            continue;
        }
        r = getsockname(cfd, (sockaddr *) &local, &alen); // 获取本地地址
        if (r < 0) {
            error("getsockname failed %d %s", errno, strerror(errno));
            continue;
        }
        r = util::addFdFlag(cfd, FD_CLOEXEC); // 设置文件描述符标志位
        fatalif(r, "addFdFlag FD_CLOEXEC failed");
        EventBase *b = bases_->allocBase(); // 从 EventBase 对象池中获取一个 EventBase 对象
        auto addcon = [=] { // 定义一个 lambda 表达式,用于添加连接
            TcpConnPtr con = createcb_(); // 创建一个 TcpConn 对象
            con->attach(b, cfd, local, peer); // 将 TcpConn 对象与 EventBase 对象和 socket 文件描述符关联
            if (statecb_) {
                con->onState(statecb_); // 设置连接状态回调函数
            }
            if (readcb_) {
                con->onRead(readcb_); // 设置读事件回调函数
            }
            if (msgcb_) {
                con->onMsg(codec_->clone(), msgcb_); // 设置消息回调函数
            }
        };
        if (b == base_) { // 如果当前 EventBase 对象是主 EventBase 对象,则直接添加连接
            addcon();
        } else { // 否则,将添加连接的操作放到当前 EventBase 对象的任务队列中
            b->safeCall(move(addcon));
        }
    }
    if (lfd >= 0 && errno != EAGAIN && errno != EINTR) { // 如果 accept 函数出错,则输出警告信息
        warn("accept return %d  %d %s", cfd, errno, strerror(errno));
    }
}

其中 getpeernamegetsockname 为系统调用的函数,用于获取远程地址信息和本地地址信息,成功时返回 0.

在 tcp 请求连接到服务器后的回调函数

    // input: 要求为一个 TcpConnPtr 类型的 function
    void onConnCreate(const std::function<TcpConnPtr()> &cb) { createcb_ = cb; }

下面这段 Lambda 函数将作为回调函数的实现存入 createcb_,并等待时机执行

[] {
        TcpConnPtr con(new TcpConn);
        con->onMsg(new LengthCodec, [](const TcpConnPtr &con, Slice msg) {
            info("recv msg: %.*s", (int) msg.size(), msg.data());
            con->sendMsg(msg);
        });
        return con;
    }

client

#include <handy/handy.h>

using namespace std;
using namespace handy;

int main(int argc, const char *argv[]) {
    setloglevel("TRACE");
    EventBase base;
    Signal::signal(SIGINT, [&] { base.exit(); });
    // 建立 socket 套接字 fd, 创建 channel 对象,便于事件循环和通讯
    TcpConnPtr con = TcpConn::createConnection(&base, "127.0.0.1", 2099, 3000);
    // 设置重连的毫秒数 reconnectInterval_  = 3000
    con->setReconnectInterval(3000);
    // 打印输入的传递信息
    con->onMsg(new LengthCodec, [](const TcpConnPtr &con, Slice msg) { info("recv msg: %.*s", (int) msg.size(), msg.data()); });
    // 将状态相关的回调函数存入  statecb_, 当检测到成功连接的状态时,使用 sendMsg 传递信息
    con->onState([=](const TcpConnPtr &con) {
        info("onState called state: %d", con->getState());
        if (con->getState() == TcpConn::Connected) {
            con->sendMsg("hello");
        }
    });
    base.loop();
    info("program exited");
}

其主要实现了三个功能

  • 建立套接字并封装到 channel
  • 设置消息传递的回调函数
  • 设置连接状态检测的回调,并传递消息

建立套接字并封装到 channel

    //可传入连接类型,返回智能指针
    template <class C = TcpConn>
    static TcpConnPtr createConnection(EventBase *base, const std::string &host, unsigned short port, int timeout = 0, const std::string &localip = "") {
        TcpConnPtr con(new C);
        con->connect(base, host, port, timeout, localip);
        return con;
    }

调用该类函数将创建一个 TcpConn 对象,并调用该对象的 connect 函数绑定 host、port等。

void TcpConn::connect(EventBase *base, const string &host, unsigned short port, int timeout, const string &localip) {
    fatalif(state_ != State::Invalid && state_ != State::Closed && state_ != State::Failed, "current state is bad state to connect. state: %d", state_);
    destHost_ = host;
    destPort_ = port;
    connectTimeout_ = timeout;
    connectedTime_ = util::timeMilli();
    localIp_ = localip;
    Ip4Addr addr(host, port);
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    fatalif(fd < 0, "socket failed %d %s", errno, strerror(errno));
    net::setNonBlock(fd);
    int t = util::addFdFlag(fd, FD_CLOEXEC);
    fatalif(t, "addFdFlag FD_CLOEXEC failed %d %s", t, strerror(t));
    int r = 0;
    if (localip.size()) {
        Ip4Addr addr(localip, 0);
        r = ::bind(fd, (struct sockaddr *) &addr.getAddr(), sizeof(struct sockaddr));
        error("bind to %s failed error %d %s", addr.toString().c_str(), errno, strerror(errno));
    }
    if (r == 0) {
        r = ::connect(fd, (sockaddr *) &addr.getAddr(), sizeof(sockaddr_in));
        if (r != 0 && errno != EINPROGRESS) {
            error("connect to %s error %d %s", addr.toString().c_str(), errno, strerror(errno));
        }
    }

    sockaddr_in local;
    socklen_t alen = sizeof(local);
    if (r == 0) {
        r = getsockname(fd, (sockaddr *) &local, &alen);
        if (r < 0) {
            error("getsockname failed %d %s", errno, strerror(errno));
        }
    }
    // 创建套接字,将目标 host 和 port 传入 Ip4Addr 对象保存
    state_ = State::Handshaking;
    // 将 tcp 连接的准备工作封装进 事件 中
    attach(base, fd, Ip4Addr(local), addr);
    if (timeout) {
        TcpConnPtr con = shared_from_this();
        timeoutId_ = base->runAfter(timeout, [con] {
            if (con->getState() == Handshaking) {
                con->channel_->close();
            }
        });
    }
}

常规的 socket 套接字创建,保存参数中的 host 和 port 到变量中.

void TcpConn::attach(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) {
    fatalif((destPort_ <= 0 && state_ != State::Invalid) || (destPort_ >= 0 && state_ != State::Handshaking),
            "you should use a new TcpConn to attach. state: %d", state_);
    base_ = base;
    state_ = State::Handshaking;
    local_ = local;
    peer_ = peer;
    delete channel_;
    channel_ = new Channel(base, fd, kWriteEvent | kReadEvent);
    trace("tcp constructed %s - %s fd: %d", local_.toString().c_str(), peer_.toString().c_str(), fd);
    TcpConnPtr con = shared_from_this();
    con->channel_->onRead([=] { con->handleRead(con); });
    con->channel_->onWrite([=] { con->handleWrite(con); });
}

使用 channel 的 read、write 事件处理器进行监听. channel 的参数类型为 std::function<void()> void onRead(const Task &readcb) { readcb_ = readcb; } 其将当前 TcpConn 的handleRead 方法存入 channel 的 readcb_ 成员变量中。

void TcpConn::handleRead(const TcpConnPtr &con) {
    if (state_ == State::Handshaking && handleHandshake(con)) {
        return;
    }
    while (state_ == State::Connected) {
        input_.makeRoom();
        int rd = 0;
        // 套接字返回值正常时
        if (channel_->fd() >= 0) {
            rd = readImp(channel_->fd(), input_.end(), input_.space());
            trace("channel %lld fd %d readed %d bytes", (long long) channel_->id(), channel_->fd(), rd);
        }
        if (rd == -1 && errno == EINTR) {
            continue;
        } else if (rd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
            for (auto &idle : idleIds_) {
                handyUpdateIdle(getBase(), idle);
            }
            if (readcb_ && input_.size()) {
                readcb_(con);
            }
            break;
        } else if (channel_->fd() == -1 || rd == 0 || rd == -1) {
            cleanup(con);
            break;
        // rd 读取未完成时
        } else {  // rd > 0
            input_.addSize(rd);
        }
    }
}

当 state 为连接时,通过 readImp 不断读取套接字中的 input_ 数据。input_.makeRoominput_.addSize(rd) 来进行控制更新读取内容。

readImp 函数实现virtual int readImp(int fd, void *buf, size_t bytes) { return ::read(fd, buf, bytes); } ,其中 bufbytesinput_.end()input_.space()
〉::read(fd, buf, bytes) 函数的返回结果是读取的字节数。该函数用于从文件描述符 fd 指定的文件中读取最多 bytes 个字节的数据,并将数据存储到 buf 指向的缓冲区中。
如果读取成功,::read 函数返回实际读取的字节数,可能小于请求的字节数 bytes。如果读取失败,::read 函数返回 -1,并设置 errno 变量表示错误类型。
总的来说,::read(fd, buf, bytes) 函数的返回结果是读取的字节数,可能小于请求的字节数 bytes。如果读取失败,返回 -1,并设置 errno 变量表示错误类型。

struct Buffer {
    Buffer() : buf_(NULL), b_(0), e_(0), cap_(0), exp_(512) {}
   private:
    char *buf_;
    size_t b_, e_, cap_, exp_;

    char *end() const { return buf_ + e_; }


    size_t space() const { return cap_ - e_; }

可见其初始值皆为 0。

    void makeRoom() {
        if (space() < exp_)
            expand(0);
    }

    void Buffer::expand(size_t len) {
        size_t ncap = std::max(exp_, std::max(2 * cap_, size() + len));
        char *p = new char[ncap];
        std::copy(begin(), end(), p);
        e_ -= b_;
        b_ = 0;
        delete[] buf_;
        buf_ = p;
        cap_ = ncap;
    }

然而,在使用 makeRoom 函数后,其调用 expand 扩充 cap_

void addSize(size_t len) { e_ += len; } 调整 e_ 控制读取。

设置消息传递的回调函数

    con->onMsg(new LengthCodec, [](const TcpConnPtr &con, Slice msg) { info("recv msg: %.*s",(int) msg.size(), msg.data()); }); 

该 lambda 函数将作为 cb 传入到下面的onMsg 函数中。
onMsg 函数具体实现如下

void TcpConn::onMsg(CodecBase *codec, const MsgCallBack &cb) {
    assert(!readcb_);
    // 将 codec_ 指向的对象释放,并将其指向 codec 所指向的对象
    codec_.reset(codec);
    // 将 lambda 函数赋值给 readcb_
    onRead([cb](const TcpConnPtr &con) {
        int r = 1;
        while (r) {
            Slice msg;
            r = con->codec_->tryDecode(con->getInput(), msg);
            if (r < 0) {
                con->channel_->close();
                break;
            } else if (r > 0) {
                trace("a msg decoded. origin len %d msg len %ld", r, msg.size());
                cb(con, msg);
                con->getInput().consume(r);
            }
        }
    });
}

当触发到 readcb_ 时,调用该 lambda 函数。第一步使用传入 codec 的 tryDecode 函数将当前连接对象中 input_ 读取到 msg 中。第二步,使用前面提到的 cb 函数在终端打印信息。

int LineCodec::tryDecode(Slice data, Slice &msg) {
    if (data.size() == 1 && data[0] == 0x04) {
        msg = data;
        return 1;
    }
    for (size_t i = 0; i < data.size(); i++) {
        if (data[i] == '\n') {
            if (i > 0 && data[i - 1] == '\r') {
                msg = Slice(data.data(), i - 1);
                return static_cast<int>(i + 1);
            } else {
                msg = Slice(data.data(), i);
                return static_cast<int>(i + 1); 
            }
        }
    }
    return 0;
}

设置连接状态检测的回调,并传递消息

前面的内容提到,关于消息传递的回调函数是从当前连接的 input_ 成员变量中读取消息信息的。那么这个变量里的内容是从哪儿来的呢。

    con->onState([=](const TcpConnPtr &con) {
        info("onState called state: %d", con->getState());
        if (con->getState() == TcpConn::Connected) {
            con->sendMsg("hello");
        }
    });
void TcpConn::sendMsg(Slice msg) {
    codec_->encode(msg, getOutput());
    sendOutput();
}

msg 内的信息通过 Buffer &getOutput() { return output_; } 存储到 output_

void LineCodec::encode(Slice msg, Buffer &buf) {
    buf.append(msg).append("\r\n");
}

接着使用 void sendOutput() { send(output_); } 调用 send 函数

void TcpConn::send(Buffer &buf) {
    if (channel_) {
        // 检验是否可写
        if (channel_->writeEnabled()) {  // just full
            output_.absorb(buf);
        }
        if (buf.size()) {
            // 写入
            ssize_t sended = isend(buf.begin(), buf.size());
            // 消耗缓冲区已被写入的数据
            buf.consume(sended);
        }
        // 检测是数据是否写入完成
        if (buf.size()) {
            output_.absorb(buf);
            if (!channel_->writeEnabled()) {
                channel_->enableWrite(true);
            }
        }
    } else {
        warn("connection %s - %s closed, but still writing %lu bytes", local_.toString().c_str(), peer_.toString().c_str(), buf.size());
    }
}

这里给出检验是否可写的代码 bool Channel::writeEnabled() {return events_ & kWriteEvent;}

Buffer &Buffer::absorb(Buffer &buf) {
    if (&buf != this) {
        if (size() == 0) {
            char b[sizeof buf];
            memcpy(b, this, sizeof b);
            memcpy(this, &buf, sizeof b);
            memcpy(&buf, b, sizeof b);
            std::swap(exp_, buf.exp_);  // keep the origin exp_
        } else {
            append(buf.begin(), buf.size());
            buf.clear();
        }
    }
    return *this;
}

这段代码实现了 Buffer 类中的 absorb 函数,用于将另一个 Buffer 对象中的数据合并到当前对象中。

具体来说,absorb 函数的参数是一个 Buffer 类型的引用 buf,表示要合并的另一个 Buffer 对象。在函数中,首先判断 buf 是否与当前对象相同,如果相同则不进行任何操作,直接返回当前对象的引用。如果不同,则根据当前对象是否为空进行不同的操作。

如果当前对象为空,即 size() 返回值为 0,说明当前对象中没有数据。此时,将当前对象和 buf 中的数据进行交换,即将当前对象的数据存储到临时变量 b 中,然后将 buf 中的数据存储到当前对象中,最后将 b 中的数据存储到 buf 中。这样,buf 中的数据就被合并到了当前对象中。同时,使用 std::swap 函数交换了当前对象和 buf 中的 exp_ 成员变量,以保留当前对象的 exp_ 值。

如果当前对象不为空,即 size() 返回值不为 0,说明当前对象中已经有数据。此时,使用 append 函数将 buf 中的数据追加到当前对象的末尾,然后使用 clear 函数清空 buf 中的数据。这样,buf 中的数据就被合并到了当前对象中。

最后,函数返回当前对象的引用 *this,以便支持链式调用。

总的来说,这段代码实现了 Buffer 类中的 absorb 函数,用于将另一个 Buffer 对象中的数据合并到当前对象中。

合并后,下一步进行写入操作。

ssize_t TcpConn::isend(const char *buf, size_t len) {
    size_t sended = 0;
    while (len > sended) {
        // virtual int writeImp(int fd, const void *buf, size_t bytes) { return ::write(fd, buf, bytes); }
        ssize_t wd = writeImp(channel_->fd(), buf + sended, len - sended);
        trace("channel %lld fd %d write %ld bytes", (long long) channel_->id(), channel_->fd(), wd);
        if (wd > 0) {
            sended += wd;
            continue;
        } else if (wd == -1 && errno == EINTR) {
            continue;
        } else if (wd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
            if (!channel_->writeEnabled()) {
                channel_->enableWrite(true);
            }
            break;
        } else {
            error("write error: channel %lld fd %d wd %ld %d %s", (long long) channel_->id(), channel_->fd(), wd, errno, strerror(errno));
            break;
        }
    }
    return sended;
}

写入完成后会进一步对缓冲区进行检测,并消耗指定长度的数据buf.consume(sended);

    Buffer &consume(size_t len) {
        b_ += len;
        if (size() == 0)
            clear();
        return *this;
    }

具体来说,consume 函数的参数是一个 size_t 类型的整数 len,表示要消耗的数据长度。在函数中,将 b_ 成员变量的值增加 len,表示从缓冲区中消耗了 len 个字节的数据。然后,判断当前缓冲区中是否还有数据,即 size() 返回值是否为 0。如果当前缓冲区中没有数据,即 size() 返回值为 0,说明已经消耗了所有数据,此时调用 clear 函数清空缓冲区。最后,函数返回当前对象的引用 *this,以便支持链式调用。

赞(0)
版权属于:

鱼一的博客 ◡̈

本文链接:

https://yuyi.monster/archives/203/(转载时请注明本文出处及文章链接)

评论 (0)

More Info for me 📱

IP信息

人生倒计时

今日已经过去小时
这周已经过去
本月已经过去
今年已经过去个月