网络组件

TCP 组件

gtcp 模块实现简便易用、轻量级的 TCPServer 服务端。

使用方式

import "github.com/camry/g/gnet/gtcp"

接口文档

https://pkg.go.dev/github.com/camry/g/gnet/gtcp

我们通过实现一个简单的 echo服务器 来演示 TCPServer 的使用:

package main

import (
    "context"
    "fmt"
    "github.com/camry/g/gnet/gtcp"
)

func main() {
    gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
        defer conn.Close()
        for {
            data, err := conn.Receive(-1)
            if len(data) > 0 {
                if err := conn.Send(append([]byte("> "), data...)); err != nil {
                    fmt.Println(err)
                }
            }
            if err != nil {
                break
            }
        }
    }).Run(context.Background())
}

在这个示例中我们使用了 SendReceive 来发送和接收数据。其中 Receive 方法会通过阻塞方式接收数据,直到客户端”发送完毕一条数据”(执行一次 Send,底层 Socket 通信不带缓冲实现),或者关闭链接。

执行之后我们使用telnet工具来进行测试:

camry@home:~$ telnet 127.0.0.1 8999
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hello
> hello
hi there
> hi there

写入操作

TCP 通信写入操作由 Send 方法实现,并提供了错误重试的机制,由第二个非必需参数 retry 提供。需要注意的是 Send 方法不带任何的缓冲机制,也就是说每调用一次 Send 方法将会立即调用底层的TCP Write方法写入数据(缓冲机制依靠系统底层实现)。因此,如果想要进行输出缓冲控制,请在业务层进行处理。

在进行 TCP 写入时,可靠的通信场景下往往是一写一读,即 Send 成功之后接着便开始 Receive 获取目标的反馈结果。因此 gtcp.Conn 也提供了方便的 SendReceive 方法。

读取操作

TCP 通信读取操作由 Receive 方法实现,同时也提供了错误重试的机制,由第二个非必需参数 retry 提供。Receive 方法提供了内置的读取缓冲控制,读取数据时可以指定读取的长度(由 length 参数指定),当读取到指定长度的数据后将会立即返回。如果 length < 0 那么将会读取所有可读取的缓冲区数据并返回。当 length = 0 时表示获取一次缓冲区的数据后立即返回。

如果使用 Receive(-1) 可以读取所有缓冲区可读数据(长度不定,如果发送的数据包太长有可能会被截断),但需要注意包的解析问题,容易产生非完整包的情况。这个时候,业务层需要根据既定的数据包结构自己负责包的完整性处理。推荐使用后续介绍的简单协议通过 SendPkg/RecvPkg 来实现消息包的发送/接收。

超时处理

gtcp.ConnTCP 通信时的数据写入和读取提供了超时处理,通过方法中的 timeout 参数指定,这块比较简单,不过多阐述。

我们接下来通过通过几个例子来看看如何使用 gtcp.Conn 对象。

使用示例

示例1,简单使用

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"

    "github.com/camry/g/glog"
    "github.com/camry/g/gnet/gtcp"
)

func main() {
    // Server
    go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
        defer conn.Close()
        for {
            data, err := conn.Receive(-1)
            if len(data) > 0 {
                fmt.Println(string(data))
            }
            if err != nil {
                break
            }
        }
    }).Run(context.Background())

    time.Sleep(time.Second)

    // Client
    conn, err := gtcp.NewConn("127.0.0.1:8999")
    if err != nil {
        panic(err)
    }
    for i := 0; i < 10000; i++ {
        if err := conn.Send([]byte(strconv.Itoa(i))); err != nil {
            glog.Error(err)
        }
        time.Sleep(time.Second)
    }
}
  1. Server 端,接收到客户端的数据后立即打印到终端上。
  2. Client 端,使用同一个连接对象,在循环中每隔1秒向服务端发送当前递增的数字。同时,该功能也可以演示出底层 Socket 通信并没有使用缓冲实现,也就是说,执行一次 Send 即立刻向服务端发送数据。因此,客户端需要在本地自行管理好需要发送的缓冲数据。
  3. 执行结果 执行后,可以看到 Server 在终端上输出以下信息:
PS E:\project\Weition\go-example> go run main.go
0
1
2
3
4
...

示例2,回显服务

我们将之前的回显服务改进一下:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/camry/g/glog"
    "github.com/camry/g/gnet/gtcp"
)

func main() {
    // Server
    go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
        defer conn.Close()
        for {
            data, err := conn.Receive(-1)
            if len(data) > 0 {
                if err := conn.Send(append([]byte("> "), data...)); err != nil {
                    fmt.Println(err)
                }
            }
            if err != nil {
                break
            }
        }
    }).Run(context.Background())

    time.Sleep(time.Second)

    // Client
    for {
        if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil {
            if b, err := conn.SendReceive([]byte(time.Now().String()), -1); err == nil {
                fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
            } else {
                fmt.Println(err)
            }
            conn.Close()
        } else {
            glog.Error(err)
        }
        time.Sleep(time.Second)
    }
}

该示例程序中,Client每隔1秒钟向Server发送当前的时间信息,Server接收到之后返回原数据信息,Client接收到Server端返回信息后立即打印到终端。

执行后,输出结果为:

PS E:\project\Weition\go-example> go run main.go
> 2023-05-24 14:33:40.3639125 +0800 CST m=+1.004663301 127.0.0.1:52651 127.0.0.1:8999
> 2023-05-24 14:33:41.37646 +0800 CST m=+2.017205201 127.0.0.1:52652 127.0.0.1:8999
> 2023-05-24 14:33:42.3773981 +0800 CST m=+3.018137801 127.0.0.1:52653 127.0.0.1:8999
...

消息包处理

gtcp 提供了许多方便的原生操作连接数据的方法,但是在绝大多数的应用场景中,开发者需要自己设计数据结构,并进行封包/解包处理,由于TCP消息协议是没有消息边界保护的,因此复杂的网络通信环境中很容易出现 粘包 的情况。 因此 gtcp 也提供了简单的数据协议,方便开发者进行消息包交互,开发者不再需要担心消息包的处理细节,包括封包/解包处理,这一切复杂的逻辑 gtcp 已经帮你处理好了。

简单协议

gtcp 模块提供了简单轻量级数据交互协议,效率非常高,协议格式如下:

数据长度(16bit)|数据字段(变长)
  1. 数据长度:默认为 16位 ( 2字节 ),用于标识该消息体的数据长度,单位为字节,不包含自身的2字节;
  2. 数据字段:变长,根据数据长度可以知道,数据最大长度不能超过 0xFFFF = 65535 bytes = 64 KB

简单协议由 gtcp 封装实现,如果开发者客户端和服务端如果都使用 gtcp 模块来通信则无需关心协议实现,专注 数据 字段封装/解析实现即可。如果涉及和其他开发语言对接,则需要按照该协议实现对接即可,由于简单协议非常简单轻量级,因此对接成本很低。

数据字段也可以为空,即没有任何长度。

操作方法

https://pkg.go.dev/github.com/camry/g/gnet/gtcp

消息包方法命名是在原有的基本连接操作方法中加上了 Pkg 关键词便于区分。

其中,请求参数中的 PkgOption 数据结构如下,用于定义消息包接收策略:

// 数据读取选项
type PkgOption struct {
	HeaderSize  int   // 自定义头大小(默认为2字节,最大不能超过4字节)
	MaxDataSize int   // (byte)数据读取的最大包大小,默认最大不能超过2字节(65535 byte)
	Retry       Retry // 失败重试策略
}

UDP 组件

UDP (User Datagram Protocol) 一种无连接的传输层协议,提供面向事务的简单不可靠信息传送服务。UDP 服务端通过 gudp.Server 实现,客户端通过 gudp.Conn 对象或者工具方法实现。

gudp.Conn 的操作绝大部分类似于 gtcp 的操作方式(大部分的方法名称也相同),但由于 UDP 是面向非连接的协议,因此 gudp.Conn(底层通信端口)也只能完成最多一次数据写入和读取,客户端下一次再与目标服务端进行通信的时候,将需要创建新的 Conn 对象进行通信。

使用方式

import "github.com/camry/g/gnet/gudp"

来一个简单的示例:

package main

import (
    "context"
    "fmt"

    "github.com/camry/g/gnet/gudp"
)

func main() {
    gudp.NewServer("127.0.0.1:8999", func(conn *gudp.Conn) {
        defer conn.Close()
        for {
            if data, _ := conn.Receive(-1); len(data) > 0 {
                fmt.Println(string(data))
            }
        }
    }).Run(context.Background())
}

UDPServer 是阻塞运行的,用户可以在自定义的回调函数中根据读取内容进行并发处理。

Linux 下可以使用以下命令向服务端发送 UDP 数据进行测试,随后查看服务端端是否有输出:

echo "hello" > /dev/udp/127.0.0.1/8999

接口文档

https://pkg.go.dev/github.com/camry/g/gnet/gudp

使用实例

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/camry/g/glog"
    "github.com/camry/g/gnet/gudp"
)

func main() {
    // Server
    go gudp.NewServer("127.0.0.1:8999", func(conn *gudp.Conn) {
        defer conn.Close()
        for {
            data, err := conn.Receive(-1)
            if len(data) > 0 {
                if err := conn.Send(append([]byte("> "), data...)); err != nil {
                    glog.Error(err)
                }
            }
            if err != nil {
                glog.Error(err)
            }
        }
    }).Run(context.Background())

    time.Sleep(time.Second)

    // Client
    for {
        if conn, err := gudp.NewConn("127.0.0.1:8999"); err == nil {
            if b, err := conn.SendReceive([]byte(time.Now().String()), -1); err == nil {
                fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
            } else {
                glog.Error(err)
            }
            conn.Close()
        } else {
            glog.Error(err)
        }
        time.Sleep(time.Second)
    }
}

该示例与 gtcp.Conn 中的通信示例类似,不同的是,客户端与服务端无法保持连接,每次通信都需要创建的新的连接对象进行通信。

执行后,输出结果如下:

PS E:\project\Weition\go-example> go run main.go
> 2023-05-24 14:55:10.2945209 +0800 CST m=+1.004033701 127.0.0.1:52086 127.0.0.1:8999
> 2023-05-24 14:55:11.3077593 +0800 CST m=+2.017267301 127.0.0.1:52087 127.0.0.1:8999
> 2023-05-24 14:55:12.3084505 +0800 CST m=+3.017953701 127.0.0.1:52088 127.0.0.1:8999
...