从 rpc 包的 Server 端 和 Client 端入手,学习 Server/Client 的源码实现。并以一个例子作为总结。最后总结了rpc实现的几个学习的要点。
Golang net/rpc 包学习
golang 提供了一个开箱即用的RPC服务,实现方式简约而不简单。
RPC 简单介绍
远程过程调用 (Remote Procedure Call,RPC) 是一种计算机通信协议。允许运行再一台计算机的程序调用另一个地址空间的子程序(一般是开放网络种的一台计算机),而程序员就像调用调用本地程序一样,无需额外的做交互编程。RPC 是一种 CS (Client-Server) 架构的模式,通过发送请求-接收响应的方式进行信息的交互。
有很多广泛使用的RPC框架,例如 gRPC, Thrift, Dubbo, brpc 等。这里的RPC 框架有的实现了跨语言调用,有的实现了服务注册发现等。比我们今天介绍的官方提供的 rpc 包要使用广泛的多。但是,通过对net/rpc
的学习,可以使我们对一个rpc框架做一个最基本的了解。
Golang 的实现
rpc 是 cs 架构,所以既有客户端,又有服务端。下面,我们先分析通信的编码,之后从服务端、客户端角度分析RPC的实现。
通信编码
golang 在rpc 实现中,抽象了协议层,我们可以自定义协议实现我们自己的接口。如下是协议的接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 服务端
type ServerCodec interface {
ReadRequestHeader ( * Request ) error
ReadRequestBody ( interface {}) error
WriteResponse ( * Response , interface {}) error
// Close can be called multiple times and must be idempotent.
Close () error
}
// 客户端
type ClientCodec interface {
WriteRequest ( * Request , interface {}) error
ReadResponseHeader ( * Response ) error
ReadResponseBody ( interface {}) error
Close () error
}
而包中提供了基于gob 二进制编码的编解码实现。当然我们也可以实现自己想要的编解码方式。
Server 端实现
结构定义
1
2
3
4
5
6
7
type Server struct {
serviceMap sync . Map // 保存Service
reqLock sync . Mutex // 读请求的锁
freeReq * Request
respLock sync . Mutex // 写响应的锁
freeResp * Response
}
server端通过互斥锁的方式支持了并发执行。由于每个请求和响应都需要定义Request/Response 对象,为了减少内存的分配,这里使用了一个freeReq/freeResp 链表实现了两个对象池。
当需要Request 对象时,从 freeReq 链表中获取,当使用完毕后,再放回链表中。
服务的注册
service保存在 Server 的 serviceMap 中,每个Service 的信息如下:
1
2
3
4
5
6
type service struct {
name string // 服务名
rcvr reflect . Value // 服务对象
typ reflect . Type // 服务类型
method map [ string ] * methodType // 注册方法
}
从上面可以看到,一个类型以及该类型的多个方法可以被注册为一个Service。在注册服务时,通过下面的方法将服务保存在serviceMap 中。
1
2
3
4
// 默认使用对象方法名
func ( server * Server ) Register ( rcvr interface {}) error {}
// 指定方法名
func ( server * Server ) RegisterName ( name string , rcvr interface {}) error {}
服务的调用
首先,是rpc 服务的启动。和大部分的网络应用一致,在accept一个连接后,会启动一个协程做消息处理,代码如下:
1
2
3
4
5
6
7
8
for {
conn , err := lis . Accept ()
if err != nil {
log . Print ( "rpc.Serve: accept:" , err . Error ())
return
}
go server . ServeConn ( conn )
}
其次,对于每一个连接,服务端会不断获取请求,并异步发送响应。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
for {
// 读取请求
service , mtype , req , argv , replyv , keepReading , err := server . readRequest ( codec )
if err != nil {
if debugLog && err != io . EOF {
log . Println ( "rpc:" , err )
}
if ! keepReading {
break
}
if req != nil {
// 发送请求
server . sendResponse ( sending , req , invalidRequest , codec , err . Error ())
server . freeRequest ( req ) // 释放 req 对象
}
continue
}
wg . Add ( 1 )
// 并发处理每个请求
go service . call ( server , sending , wg , mtype , req , argv , replyv , codec )
}
最后,由于异步发送请求,所以请求的顺序和响应顺序不一定一致。所以,在响应报文中,会携带请求报文的seq (序列号),保证消息的一致性。
除此之外,为了兼容http 服务,net/rpc
包还通过http包实现的 Hijack 方式,将 http 协议转换为 rpc 协议。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func ( server * Server ) ServeHTTP ( w http . ResponseWriter , req * http . Request ) {
// 客户端通过 CONNECT 方法连接
// 通过Hijack 拿到tcp 连接
conn , _ , err := w .( http . Hijacker ). Hijack ()
if err != nil {
log . Print ( "rpc hijacking " , req . RemoteAddr , ": " , err . Error ())
return
}
// 发送客户端,支持 RPC 协议
io . WriteString ( conn , "HTTP/1.0 " + connected + "\n\n" )
// 开始 RPC 的请求响应
server . ServeConn ( conn )
}
Client 端实现
客户端的连接相较于服务端是比较简单的。我们从发起连接、发送请求、读取响应三个角度学习。
RPC 的连接
由于该RPC支持HTTP协议做连接升级,因此,有几种连接方式。
直接使用 tcp 协议。
1
func Dial ( network , address string ) ( * Client , error ) {}
使用 http 协议。 http 协议可以指定路径,或者使用默认的rpc 路径。
1
2
3
4
// 默认路径 "/_goRPC_"
func DialHTTP ( network , address string ) ( * Client , error ) {}
// 使用默认的路径
func DialHTTPPath ( network , address , path string ) ( * Client , error ) {}
请求的发送
RPC 请求的发送,提供了同步和异步的接口调用,方式如下:
1
2
3
4
// 异步
func ( client * Client ) Go ( serviceMethod string , args interface {}, reply interface {}, done chan * Call ) * Call {}
// 同步
func ( client * Client ) Call ( serviceMethod string , args interface {}, reply interface {}) error {}
从内部实现可以知道,都是通过Go 异步的方式拿到返回数据。
下面,我们看内部如何实现请求的发送:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func ( client * Client ) send ( call * Call ) {
// 客户端正常的情况下
seq := client . seq
client . seq ++ // 请求的序列号
client . pending [ seq ] = call
// 对请求进行编码,包括请求方法、参数。
// Encode and send the request.
client . request . Seq = seq
client . request . ServiceMethod = call . ServiceMethod
// client 可以并发 发起 Request, 然后异步等待 Done
err := client . codec . WriteRequest ( & client . request , call . Args )
// 是否有发送失败,如果发送成功,则保存在pending map中,等待请求结果。
if err != nil {
client . mutex . Lock ()
call = client . pending [ seq ]
delete ( client . pending , seq )
client . mutex . Unlock ()
if call != nil {
call . Error = err
call . done ()
}
}
}
从上面可以看出,对于一个客户端,可以同时发送多条请求,然后异步等待响应。
读取响应
在rpc 连接成功后,会建立一个连接,专门用于做响应的读取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for err == nil {
response = Response {}
err = client . codec . ReadResponseHeader ( & response )
if err != nil {
break
}
seq := response . Seq
client . mutex . Lock ()
call := client . pending [ seq ] // 从 pending 列表中删除
delete ( client . pending , seq )
client . mutex . Unlock ()
// 解码body
// 此处有多种判断,判断是否有异常
client . codec . ReadResponseBody ( nil )
// 最后通知异步等待的请求,调用完成
call . done ()
}
通过循环读取响应头,响应body,并将读取结果通知调用rpc 的异步请求,完成一次响应的读取。
简单例子
下面我们官方提供的一个简单例子,对rpc包学习做个总结。
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Args struct { // 请求参数
A , B int
}
type Quotient struct { // 一个响应的类型
Quo , Rem int
}
type Arith int
// 定义了乘法和除法
func ( t * Arith ) Multiply ( args * Args , reply * int ) error {
* reply = args . A * args . B
return nil
}
func ( t * Arith ) Divide ( args * Args , quo * Quotient ) error {
if args . B == 0 {
return errors . New ( "divide by zero" )
}
quo . Quo = args . A / args . B
quo . Rem = args . A % args . B
return nil
}
func main () {
serv := rpc . NewServer ()
arith := new ( Arith )
serv . Register ( arith ) // 服务注册
// 通过http 监听,到时做协议转换
http . ListenAndServe ( "0.0.0.0:3000" , serv )
}
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func main () {
client , err := rpc . DialHTTP ( "tcp" , "127.0.0.1:3000" )
if err != nil {
log . Fatal ( "dialing:" , err )
}
dones := make ([] chan * rpc . Call , 0 , 10 )
// 先同步发起请求
for i := 0 ; i < 10 ; i ++ {
quotient := new ( Quotient )
args := & Args { i + 10 , i }
divCall := client . Go ( "Arith.Divide" , args , quotient , nil )
dones = append ( dones , divCall . Done )
log . Print ( "send" , i )
}
log . Print ( "---------------" )
// 之后异步读取
for idx , done := range dones {
replyCall := <- done // will be equal to divCall
args := replyCall . Args .( * Args )
reply := replyCall . Reply .( * Quotient )
log . Printf ( "%d / %d = %d, %d %% %d = %d\n" , args . A , args . B , reply . Quo ,
args . A , args . B , reply . Rem )
log . Print ( "recv" , idx )
}
}
我们可以学到什么
最后,做一个学习的总结。
对统一连接上的不同请求实现异步操作,通过请求、响应需要保证数据的一致性。
链表方式实现一个对象池
对 http 包中实现的Hijack 方式的一次简单实践,通过http协议升级为rpc协议。劫持了原有http协议的tcp连接,转为rpc使用。
rpc 的实现,通过gob编码,应该是不支持与其他语言通信的。需要自己实现编解码方式。
rpc 的实现,也不支持服务的注册和发现,需要我们自己去维护服务方。