cosmos-abci
Cosmos abci
记录一下cosmos-sdk与tendermint-abci的衔接。
tendermint
rpc请求的路由在
rpc/core/routes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events.
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"),
"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""),
// info API
"health": rpc.NewRPCFunc(Health, ""),
"status": rpc.NewRPCFunc(Status, ""),
"net_info": rpc.NewRPCFunc(NetInfo, ""),
"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"),
"genesis": rpc.NewRPCFunc(Genesis, ""),
"block": rpc.NewRPCFunc(Block, "height"),
"block_results": rpc.NewRPCFunc(BlockResults, "height"),
"commit": rpc.NewRPCFunc(Commit, "height"),
"tx": rpc.NewRPCFunc(Tx, "hash,prove"),
"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page"),
"validators": rpc.NewRPCFunc(Validators, "height"),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
"consensus_state": rpc.NewRPCFunc(ConsensusState, ""),
"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height"),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""),
// tx broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"),
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"),
//...
}接收到交易会通过
CheckTx
进入交易池,在mempool.clist_mempool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID})
}
func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
txSize = len(tx)
)
//...省略多行代码
//CheckTxAsync最终调用的是abci.CheckTx
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
//设置回调来决定要不要添加到交易池中
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))
return nil
}
//设置的回调,即当abci.CheckTx返回时,会回调这个方法,重要的是调用resCbFirstTime
func (mem *CListMempool) reqResCb(
tx []byte,peerID uint16,peerP2PID p2p.ID, externalCb func(*abci.Response),
) func(res *abci.Response) {
return func(res *abci.Response) {
//...
mem.resCbFirstTime(tx, peerID, peerP2PID, res)
//..
}
}
//resCbFirstTime会决定交易会不会进入到交易池中
func (mem *CListMempool) resCbFirstTime(
tx []byte, peerID uint16, peerP2PID p2p.ID, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
//abci.CheckTx方法返回错误就不会添加到交易池中
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
tx: tx,
}
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
}
//...
default:
// ignore other messages
}
}state/execution.go
中的CreateProposal
方法会将交易池中的交易取出来进行打包组成区块state/execution.go
中区块组成后,执行交易在这里execBlockOnProxyApp
方法里。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block, stateDB dbm.DB) (*ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0
txIndex := 0
abciResponses := NewABCIResponses(block)
//设置回调,当abci.DeliverTx执行完后,统计交易成功和失败
// Execute transactions and get hash.
proxyCb := func(req *abci.Request, res *abci.Response) {
if r, ok := res.Value.(*abci.Response_DeliverTx); ok {
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
txRes := r.DeliverTx
if txRes.Code == abci.CodeTypeOK {
validTxs++
} else {
logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++
}
abciResponses.DeliverTx[txIndex] = txRes
txIndex++
}
}
proxyAppConn.SetResponseCallback(proxyCb)
commitInfo, byzVals := getBeginBlockValidatorInfo(block, stateDB)
//abci.BeginBlock在这里会被调用
// Begin block
var err error
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(),
Header: types.TM2PB.Header(&block.Header),
LastCommitInfo: commitInfo,
ByzantineValidators: byzVals,
})
if err != nil {
logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
return nil, err
}
//abci.DeliverTx在这里会被调用,但注意的是,就算DeliverTx结果返回错误,这里也不会影响交易入块,记录在链上的交易会包含失败的结果
// Run txs of block.
for _, tx := range block.Txs {
proxyAppConn.DeliverTxAsync(abci.RequestDeliverTx{Tx: tx})
if err := proxyAppConn.Error(); err != nil {
return nil, err
}
}
//abci.EndBlock在这里会被调用
// End block.
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
if err != nil {
logger.Error("Error in proxyAppConn.EndBlock", "err", err)
return nil, err
}
logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
return abciResponses, nil
}以上代码
proxyAppConn
的实现之一为abci/client/local_client.go
。local_client
中主要是使用以下这个接口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16type Application interface {
// Info/Query Connection
Info(RequestInfo) ResponseInfo // Return application info
SetOption(RequestSetOption) ResponseSetOption // Set application option
Query(RequestQuery) ResponseQuery // Query for state
// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
}这个是abci的接口了。
cosmos-sdk
实现的就实现这个接口
cosmos-sdk
在baseapp/abci.go
里是对Application
的实现。
然后交易的处理都在baseapp/baseapp.go
里。
1 | func (app *BaseApp) runTx(mode runTxMode, txBytes []byte, tx sdk.Tx) (result sdk.Result) { |
插件化里,上面牵涉到3个会调插件的地方
1 | type Msg interface { |
在x/xx/types/msg.go下会实现这个接口,在CheckTx或者DeliverTx的时候都会调用ValidateBasic
方法
其次是,anteHandler,如果外面初始化时赋值了,CheckTx或者DeliverTx时就会执行
最后是,runMsgs,runMsg里其实是根据Tx消息的路由,找到对应的Handler进行处理,在x/xx/Handler.go会有具体处理,路由是在实例化app时进行app.AddRouter进行添加进去的。如下gaia代码
1 | // RegisterRoutes registers all module routes and module querier routes |
可以看到最后一句,查询路由也是在这里添加的。所以在查询,也就是调用Query
的时候,就会走对应的路由。
另起一行
这里由更新验证者为契机,查看相关的代码,捋下结构。
更新设置验证者的方式
- genesis里进行配置
- 发送CreateValidator交易添加验证者,
staking
模块会处理这个交易,并且在abci.EndBlock
调用时,staking
模块返回需要更新的验证者集合。
genesis里配置验证者的传递
cosmos
启动时调用tendermint
的node代码,传入相关配置。
1 | //cosmos/server/start.go |
在tendermint
中,state
存储在db中的key为stateKey
,保存了链的当前相关状态。
在cosmos
的staking
中也应该存储了验证者集合,一会想得起来我就去看看怎么查询。
staking
staking
是cosmos实现用来选择验证者集合。在staking
的handler.go
中可以查看相关操作。
其中通过发送CreateValidator
交易,新建验证者,并设置相关参数。
1 | //handleMsgCreateValidator |
另外,更新验证者是abci.EndBlock
。
1 | // Called every block, update validator set |