博客
关于我
Golang 实现 Redis(6): 实现 pipeline 模式的 redis 客户端
阅读量:403 次
发布时间:2019-03-05

本文共 3746 字,大约阅读时间需要 12 分钟。

本文将介绍如何实现一个 Redis 客户端,采用 Pipeline 模式来提升吞吐量。Pipeline 模式允许客户端在等待服务端响应之前继续发送下一个请求,从而减少等待网络传输时间。

TCP 作为全双工协议,能够同时进行上行和下行通信。客户端和服务端可以同时发送和接收数据,不会出现冲突。为了确保请求和响应的正确对应,我们为每个 TCP 连接分配了一个 goroutine。服务端接收到的请求会按照先进先出的顺序处理,并返回响应。客户端则通过 channel 向后台协程发送请求,并通过 waitgroup 等待异步处理完成。

Client 结构

type Client struct {    conn        net.Conn    sendingReqs chan *Request    waitingReqs chan *Request    ticker      *time.Ticker    addr        string    ctx         context.Context    cancelFunc  context.CancelFunc    writing     *sync.WaitGroup}type Request struct {    id        uint64    args      [][]byte    reply     redis.Reply    heartbeat bool    waiting   *wait.Wait    err       error}

发送请求

调用者将请求发送给后台协程,并通过 waitgroup 等待异步处理完成:

func (client *Client) Send(args [][]byte) redis.Reply {    request := &Request{        args:      args,        heartbeat: false,        waiting:   &wait.Wait{},    }    request.waiting.Add(1)    client.sendingReqs <- request    timeout := request.waiting.WaitWithTimeout(maxWait)    if timeout {        return reply.MakeErrReply("server time out")    }    if request.err != nil {        return reply.MakeErrReply("request failed: " + err.Error())    }    return request.reply}

核心逻辑

写协程

// 写协程入口func (client *Client) handleWrite() {loop:    for {        select {        case req := <-client.sendingReqs:            client.writing.Add(1)            client.doRequest(req)        case <-client.ctx.Done():            break loop        }    }}

发送请求

func (client *Client) doRequest(req *Request) {    bytes := reply.MakeMultiBulkReply(req.args).ToBytes()    _, err := client.conn.Write(bytes)    i := 0    for err != nil && i < 3 {        err = client.handleConnectionError(err)        if err == nil {            _, err = client.conn.Write(bytes)        }        i++    }    if err == nil {        client.waitingReqs <- req    } else {        req.err = err        req.waiting.Done()        client.writing.Done()    }}

读协程

// 收到服务端的响应func (client *Client) finishRequest(reply redis.Reply) {    request := <-client.waitingReqs    request.reply = reply    if request.waiting != nil {        request.waiting.Done()    }    client.writing.Done()}

启动 client

func MakeClient(addr string) (*Client, error) {    conn, err := net.Dial("tcp", addr)    if err != nil {        return nil, err    }    ctx, cancel := context.WithCancel(context.Background())    return &Client{        addr:        addr,        conn:        conn,        sendingReqs: make(chan *Request, chanSize),        waitingReqs: make(chan *Request, chanSize),        ctx:         ctx,        cancelFunc:  cancel,        writing:     &sync.WaitGroup{},    }, nil}func (client *Client) Start() {    client.ticker = time.NewTicker(10 * time.Second)    go client.handleWrite()    go func() {        err := client.handleRead()        logger.Warn(err)    }()    go client.heartbeat()}

关闭 client

func (client *Client) Close() {    close(client.sendingReqs)    client.writing.Wait()    _ = client.conn.Close()    client.cancelFunc()    close(client.waitingReqs)}

测试

func TestClient(t *testing.T) {    client, err := MakeClient("localhost:6379")    if err != nil {        t.Error(err)    }    client.Start()    result := client.Send([][]byte{        []byte("SET"),        []byte("a"),        []byte("a"),    })    if statusRet, ok := result.(*reply.StatusReply); ok {        if statusRet.Status != "OK" {            t.Error("`set` failed, result: " + statusRet.Status)        }    }    result := client.Send([][]byte{        []byte("GET"),        []byte("a"),    })    if bulkRet, ok := result.(*reply.BulkReply); ok {        if string(bulkRet.Arg) != "a" {            t.Error("`get` failed, result: " + string(bulkRet.Arg))        }    }}

通过这种 Pipeline 模式,客户端可以在等待响应之前继续发送更多请求,大幅提高吞吐量。

转载地址:http://nxqzz.baihongyu.com/

你可能感兴趣的文章
nacos集群搭建
查看>>
Nessus漏洞扫描教程之配置Nessus
查看>>
Nest.js 6.0.0 正式版发布,基于 TypeScript 的 Node.js 框架
查看>>
Netpas:不一样的SD-WAN+ 保障网络通讯品质
查看>>
Netty WebSocket客户端
查看>>
Netty工作笔记0011---Channel应用案例2
查看>>
Netty工作笔记0014---Buffer类型化和只读
查看>>
Netty工作笔记0050---Netty核心模块1
查看>>
Netty工作笔记0084---通过自定义协议解决粘包拆包问题2
查看>>
Netty常见组件二
查看>>
netty底层源码探究:启动流程;EventLoop中的selector、线程、任务队列;监听处理accept、read事件流程;
查看>>
Netty核心模块组件
查看>>
Netty框架的服务端开发中创建EventLoopGroup对象时线程数量源码解析
查看>>
Netty源码—2.Reactor线程模型一
查看>>
Netty源码—4.客户端接入流程一
查看>>
Netty源码—4.客户端接入流程二
查看>>
Netty源码—5.Pipeline和Handler一
查看>>
Netty源码—6.ByteBuf原理二
查看>>
Netty源码—7.ByteBuf原理三
查看>>
Netty源码—7.ByteBuf原理四
查看>>