Ethereum源码阅读笔记-whisper

转载请注明出处:www.huamo.online
字节杭州 求贤若渴:

  1. https://job.toutiao.com/s/JXTdQaH
  2. https://job.toutiao.com/s/JXTMWW3
  3. https://job.toutiao.com/s/JXT1tpC
  4. https://job.toutiao.com/s/JXTdu6h

go-ethereum/whisper

先从whipserv6看起。

Whisper定义

Whisper代表了在以太坊网络中的一个隐秘(dark)通信接口,使用的是以太坊自有的P2P通信层。所谓dark,意思是没有可靠的方法可以来追踪数据包(见于Specs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// whisper/whisperv6/whisper.go

type Whisper struct {
// 协议描述和参数
protocol p2p.Protocol
// 订阅功能安装的消息过滤器
filters *Filters

// 私钥存储
privateKeys map[string]*ecdsa.PrivateKey
// 对称密钥存储
symKeys map[string][]byte
// 和密钥存储相关的互斥锁
keyMu sync.RWMutex

// 互斥地同步消息和过期池
poolMu sync.RWMutex
// 当前由此节点跟踪的信封(envelopes)池
envelopes map[common.Hash]*Envelope
// 消息过期池
expirations map[uint32]*set.SetNonTS

// 互斥地同步活跃节点集合
peerMu sync.RWMutex
// 当前活跃节点的集合
peers map[*Peer]struct{}

// 普通whisper消息的消息队列
messageQueue chan *Envelope
// 点对点消息的消息队列(不会再被做任何转发的消息)
p2pMsgQueue chan *Envelope
// 用于温和的退出的channel
quit chan struct{}

// 存储了配置项的设置信息,以便可以动态修改。
settings syncmap.Map

// 允许处理whisper相关消息的最大时长(以秒为单位)
syncAllowance int

// 指示这个节点是否是纯粹的轻客户端(不转发任何消息)
lightClient bool

// 用于保护stats
statsMu sync.Mutex
// whisper节点的统计信息
stats Statistics

// MailServer接口
mailServer MailServer
}

Whisper节点的配置Config

由于新建一个Whisper实例,需要接收Config参数,所以先来看看Config的定义,见于config.go文件中。

1
2
3
4
5
6
7
8
9
// whisper/whisperv6/config.go

// Config描述了一个whisper节点的配置状态
type Config struct {
// 最大消息长度
MaxMessageSize uint32 `toml:",omitempty"`
// 接受工作量证明的最小值
MinimumAcceptedPOW float64 `toml:",omitempty"`
}

这里面提到了一个MinimumAcceptedPOW,是关于工作量证明的,我在Proof of Work找到了这个设计的初衷。

之所以用到POW,主要是为了防止垃圾消息,也同样是为了缓解网络的压力。计算POW的成本可以理解为“你想让网络将你的消息存储一段时间(即TTL时间段),因此需要分配资源,那么你就需要为此支付价格”。所需的POW应该与消息大小和TTL成正比。

代码中还提供了一个默认配置样例DefaultConfig

1
2
3
4
5
6
// whisper/whisperv6/config.go

var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize, // uint32(1024 * 1024)
MinimumAcceptedPOW: DefaultMinimumPoW, // 0.2
}

新建一个Whisper客户端

搞明白了Config,开始新建一个Whisper客户端,whisper.go中的New()方法会新建一个Whisper客户端,该客户端可以在以太坊P2P网络中进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// whisper/whisperv6/whisper.go

func New(cfg *Config) *Whisper {
// 如果传入的cfg是nil,则使用上面提到的DefaultConfig
if cfg == nil {
cfg = &DefaultConfig
}

// 初始化Whisper结构体
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
// 最长处理时间默认为10s
syncAllowance: DefaultSyncAllowance,
}

// 使用NewFilters为这个whisper新建过滤器
whisper.filters = NewFilters(whisper)

// 将<minPowIdx, cfg.MinimumAcceptedPOW>放入settings内存中
whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
// 将<maxMsgSizeIdx, cfg.MaxMessageSize>放入settings内存中
whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
// 指示消息队列是否溢出
whisper.settings.Store(overflowIdx, false)

// p2p whisper子协议规则
whisper.protocol = p2p.Protocol{
Name: ProtocolName, // "shh"
Version: uint(ProtocolVersion), // uint(uint64(6))
Length: NumberOfMessageCodes, // 128
// 启动运行
Run: whisper.HandlePeer,
// 节点信息
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": ProtocolVersionStr, // "6.0"
// 读取whisper.settings[maxMsgSizeIdx]
"maxMessageSize": whisper.MaxMessageSize(),
// 读取whisper.settings[minPowIdx]
// 如果为空或出错,则返回DefaultMinimumPoW,即0.2
"minimumPoW": whisper.MinPow(),
}
},
}

return whisper
}

总体来说,New()主要是创建了一个空的whisper{}结构体,并尽力初始化,比较重要的地方,就是初始化whisper.protocol子协议字段,这里面涉及到很多魔数。

关于版本号6wiki上有这么一段话:

Tor系统有一个协议,可以在两个节点之间建立连接,但它们互相并不知道对方在哪里,这是用于隐藏服务的Rendezvous协议。隐藏的服务(相当于TCP/IP中一个监听端口的服务)会选择随机数量的”介绍人”节点(我想,这个随机数量一般都是6)。为了做到这一点,它会和每一个介绍人都建立标准的3跳链(3-hop chain),当一个用户想和一个隐藏服务建立连接时,它们就会传播一个请求去连接与特定公钥相关的隐藏服务。

关于消息代号数量128p2p.Protocol结构体中的Lenght字段表示某一子协议中消息代号的总数量,而在whisper/whisperv6/doc.go的常量表中可以看到根据EIP-627提案,whisper协议的消息代号共有128,故为128

HandlePeer:发现新节点后做什么

在新建whisper客户端时,whisper.protocol结构体用HandlerPeer()填充了Run字段,当和一个节点协商该协议时,会在一个新的goroutine上调用Run代表的方法。在whisper这里,当whisper子协议连接被协商的时候,下面的P2P层会调用HandlePeer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// whisper/whisperv6/whisper.go

func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
// 新建一个whisper peer实例,接下来开始追踪它
whisperPeer := newPeer(whisper, peer, rw)

// 同步的将这个peer实例添加到追踪列表中
whisper.peerMu.Lock()
whisper.peers[whisperPeer] = struct{}{}
whisper.peerMu.Unlock()

// 当退出时,将该peer从追踪列表中删除
defer func() {
whisper.peerMu.Lock()
delete(whisper.peers, whisperPeer)
whisper.peerMu.Unlock()
}()

// 握手,状态更新
if err := whisperPeer.handshake(); err != nil {
return err
}
whisperPeer.start() // 启动通信
defer whisperPeer.stop() // 退出时停止通信

return whisper.runMessageLoop(whisperPeer, rw) // 开始消息循环处理
}

这里主要是在发现一个新的节点时,whisper协议所做的操作,大部分操作都是和Peer有关。下面就来看看whisperPeer

whisper中的Peer

peer.go中有关于Peer的定义,Peer代表了在whisper协议中的一个节点连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// whisper/whisperv6/peer.go

type Peer struct {
// 本地whisper节点
host *Whisper
// 远程whisper节点
peer *p2p.Peer
// 消息读写句柄
ws p2p.MsgReadWriter

// 远程节点是否可信
trusted bool
// 远程节点要求的POW值
powRequirement float64
// 布隆过滤器锁
bloomMu sync.Mutex
// 布隆过滤器
bloomFilter []byte
// 是否是全节点过滤器
fullNode bool

// 存储该节点已知的消息,以避免浪费带宽
known *set.Set // Messages already known by the peer to avoid wasting bandwidth

// 优雅的退出连接
quit chan struct{}
}

新建一个远程节点的本地代理Peer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// whisper/whisperv6/peer.go

// 新建一个whisper协议下的peer对象,但这个方法并不会去握手。
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
host: host,
peer: remote,
ws: rw,
trusted: false, // 默认不可信
powRequirement: 0.0, // 初始化为0
known: set.New(),
quit: make(chan struct{}),
bloomFilter: MakeFullNodeBloom(), //创建一个长度为64的布隆过滤器,并且所有位初始化为0xFF
fullNode: true, // 已经初始化为全节点过滤器
}
}

和远程节点握手:handshake()

handshake()向远程节点发送协议初始化状态信息,同时也会验证远程节点的状态。状态消息有以下几点要素:

  1. 消息代号为statusCode,即0
  2. 消息的payload是一个列表:[whisper协议版本号(即6), 本地节点需要的最小POW值, 本地节点感兴趣的消息过滤器]
  3. 只有版本号是强制验证的,后续2个参数都是可选的,即本地节点要求的最小POW本地节点感兴趣的消息过滤器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// whisper/whisperv6/peer.go

func (peer *Peer) handshake() error {

// 异步地发送握手状态消息
errc := make(chan error, 1) // error channel
go func() {
pow := peer.host.MinPow() // 获取本地节点(自己)的POW最小需求值
powConverted := math.Float64bits(pow) // 将float64转化为uint64
// BloomFilter()为本地节点所有感兴趣的话题返回一个集成的布隆过滤器
// 要求远程节点只能发送被通告的布隆过滤器中匹配的消息
// 如果不匹配,则被认为是垃圾消息,并将与该远程节点断开连接。
bloom := peer.host.BloomFilter()
// 通过读写句柄向远程节点发送状态消息(状态码为0,表示这是一个状态消息)
// 这个消息包含了[whisper的协议版本号,POW值,消息相关的布隆过滤器]
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
}()

// 获取远程节点状态数据,并且验证协议是否匹配
packet, err := peer.ws.ReadMsg() // 从读写句柄中读取远程消息
if err != nil {
return err
}
// 正常情况下,第一个数据包的代号应该是状态码,否则就报错
if packet.Code != statusCode {
return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
}
// 数据包是rlp序列化格式,需要进行解码
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
_, err = s.List()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
}
// 读取前8个字节,作为协议版本号
//(上面也看到了,状态消息的格式为一个列表:[whisper的协议版本号,POW值,消息相关的布隆过滤器])
peerVersion, err := s.Uint()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
}
// 如果获取到的协议版本号不是uint64(6),那么说明协议不匹配。
if peerVersion != ProtocolVersion {
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
}

// 只有版本号是强制要求的,后续的参数都是可选的
powRaw, err := s.Uint() // 继续读取8个字节,作为POW值
// 如果没出错,则验证pow和bloomfilter;如果出错,也没什么,继续往下走,即后续参数是可选的
if err == nil {
pow := math.Float64frombits(powRaw) // uint64 -> float64
// pow无穷大,不是数字,小于0,都会报错
if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
}
// 用pow更新peer.powRequirement
peer.powRequirement = pow

var bloom []byte
err = s.Decode(&bloom) //将剩余的部分全部解码,并存至bloom[]中
if err == nil {
sz := len(bloom)
// 初始化时,布隆过滤器的长度都是BloomFilterSize,即64,否则就验证报错。
if sz != BloomFilterSize && sz != 0 {
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
}
// 用解码后的bloom更新peer.bloomFilter,以及peer.fullNode
peer.setBloomFilter(bloom)
}
}

// 阻塞等待通道是否返回错误,如果出错,则报错。
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}
return nil
}

开始通信:start()

start()初始化

参考资料

转载请注明出处:www.huamo.online