// ApplyTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. It returns the receipt // for the transaction, gas used and an error if the transaction failed, // indicating the block was invalid. //应用交易到state database中 func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, uint64, error) { //构造交易消息 msg, err := tx.AsMessage(types.MakeSigner(config, header.Number)) if err != nil { return nil, 0, err }
//EVM进行交易处理,返回为处理结果、消耗gas、是否失败、err,logs是在evm处理时产生的 // Create a new context to be used in the EVM environment context := NewEVMContext(msg, header, bc, author) // Create a new environment which holds all relevant information // about the transaction and calling mechanisms. vmenv := vm.NewEVM(context, statedb, config, cfg) // Apply the transaction to the current state (included in the env) _, gas, failed, err := ApplyMessage(vmenv, msg, gp) if err != nil { return nil, 0, err }
//根据处理结果构建返回数据 // Update the state with pending changes var root []byte if config.IsByzantium(header.Number) { statedb.Finalise(true) } else { root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes() } *usedGas += gas
// Create a new receipt for the transaction, storing the intermediate root and gas used by the tx // based on the eip phase, we're passing whether the root touch-delete accounts. receipt := types.NewReceipt(root, failed, *usedGas) receipt.TxHash = tx.Hash() receipt.GasUsed = gas // if the transaction created a contract, store the creation address in the receipt. if msg.To() == nil { receipt.ContractAddress = crypto.CreateAddress(vmenv.Context.Origin, tx.Nonce()) } // Set the receipt logs and create a bloom for filtering receipt.Logs = statedb.GetLogs(tx.Hash()) receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
// Fail if we're trying to execute above the call depth limit if evm.depth > int(params.CallCreateDepth) { return nil, gas, ErrDepth } // Fail if we're trying to transfer more than the available balance if !evm.Context.CanTransfer(evm.StateDB, caller.Address(), value) { return nil, gas, ErrInsufficientBalance }
var ( to = AccountRef(addr) snapshot = evm.StateDB.Snapshot() ) if !evm.StateDB.Exist(addr) { precompiles := PrecompiledContractsHomestead if evm.ChainConfig().IsByzantium(evm.BlockNumber) { precompiles = PrecompiledContractsByzantium } if precompiles[addr] == nil && evm.ChainConfig().IsEIP158(evm.BlockNumber) && value.Sign() == 0 { // Calling a non existing account, don't do anything, but ping the tracer if evm.vmConfig.Debug && evm.depth == 0 { evm.vmConfig.Tracer.CaptureStart(caller.Address(), addr, false, input, gas, value) evm.vmConfig.Tracer.CaptureEnd(ret, 0, 0, nil) } return nil, gas, nil } evm.StateDB.CreateAccount(addr) } evm.Transfer(evm.StateDB, caller.Address(), to.Address(), value) // Initialise a new contract and set the code that is to be used by the EVM. // The contract is a scoped environment for this execution context only. contract := NewContract(caller, to, value, gas) contract.SetCallCode(&addr, evm.StateDB.GetCodeHash(addr), evm.StateDB.GetCode(addr))
// Even if the account has no code, we need to continue because it might be a precompile start := time.Now()
// Capture the tracer start/end events in debug mode if evm.vmConfig.Debug && evm.depth == 0 { evm.vmConfig.Tracer.CaptureStart(caller.Address(), addr, false, input, gas, value)
// When an error was returned by the EVM or when setting the creation code // above we revert to the snapshot and consume any gas remaining. Additionally // when we're in homestead this also counts for code storage gas errors. if err != nil { evm.StateDB.RevertToSnapshot(snapshot) if err != errExecutionReverted { contract.UseGas(contract.Gas) } } return ret, contract.Gas, err }
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { // Broadcast transaction to a batch of peers not knowing about it peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.SendTransactions(types.Transactions{tx}) } log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) }
// Mined broadcast loop func (self *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.minedBlockSub.Chan() { switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: self.BroadcastBlock(ev.Block, true) // First propagate block to peers self.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } }
// If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } }
func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms pm.fetcher.Start() defer pm.fetcher.Stop() defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations forceSync := time.NewTicker(forceSyncCycle) defer forceSync.Stop()
for { select { case <-pm.newPeerCh: // Make sure we have peers to select from, then sync if pm.peers.Len() < minDesiredPeerCount { break } go pm.synchronise(pm.peers.BestPeer())
case <-forceSync.C: // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer())
// synchronise tries to sync up our local block chain with a remote peer. func (pm *ProtocolManager) synchronise(peer *peer) { // Short circuit if no peers are available if peer == nil { return } // Make sure the peer's TD is higher than our own currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
pHead, pTd := peer.Head() if pTd.Cmp(td) <= 0 { return } // Otherwise try to sync with the downloader mode := downloader.FullSync if atomic.LoadUint32(&pm.fastSync) == 1 { // Fast sync was explicitly requested, and explicitly granted mode = downloader.FastSync } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 { // The database seems empty as the current block is the genesis. Yet the fast // block is ahead, so fast sync was enabled for this node at a certain point. // The only scenario where this can happen is if the user manually (or via a // bad block) rolled back a fast sync node below the sync point. In this case // however it's safe to reenable fast sync. atomic.StoreUint32(&pm.fastSync, 1) mode = downloader.FastSync } // Run the sync cycle, and disable fast sync if we've went past the pivot block if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return } if atomic.LoadUint32(&pm.fastSync) == 1 { log.Info("Fast sync complete, auto disabling") atomic.StoreUint32(&pm.fastSync, 0) } atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is // essential in star-topology networks where a gateway node needs to notify // all its out-of-date peers of the availability of a new block. This failure // scenario will most often crop up in private and hackathon networks with // degenerate connectivity, but it should be healthy for the mainnet too to // more reliably update peers or the local TD state. go pm.BroadcastBlock(head, false) } }
func (pm *ProtocolManager) txsyncLoop() { var ( pending = make(map[discover.NodeID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send )
// send starts a sending a pack of transactions from the sync. send := func(s *txsync) { // Fill pack with transactions up to the target size. size := common.StorageSize(0) pack.p = s.p pack.txs = pack.txs[:0] for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { pack.txs = append(pack.txs, s.txs[i]) size += s.txs[i].Size() } // Remove the transactions that will be sent. s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] if len(s.txs) == 0 { delete(pending, s.p.ID()) } // Send the pack in the background. s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) sending = true go func() { done <- pack.p.SendTransactions(pack.txs) }() }
// pick chooses the next pending sync. pick := func() *txsync { if len(pending) == 0 { return nil } n := rand.Intn(len(pending)) + 1 for _, s := range pending { if n--; n == 0 { return s } } return nil }
for { select { case s := <-pm.txsyncCh: pending[s.p.ID()] = s if !sending { send(s) } case err := <-done: sending = false // Stop tracking peers that cause send failures. if err != nil { pack.p.Log().Debug("Transaction send failed", "err", err) delete(pending, pack.p.ID()) } // Schedule the next send. if s := pick(); s != nil { send(s) } case <-pm.quitSync: return } } }
case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) }
case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval request := make(map[string][]common.Hash)
for hash, announces := range f.announced { if time.Since(announces[0].time) > arriveTimeout-gatherSlack { // Pick a random peer to retrieve from, reset all others announce := announces[rand.Intn(len(announces))] f.forgetHash(hash)
// If the block still didn't arrive, queue for fetching if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.fetching[hash] = announce } } } // Send out all block header requests for peer, hashes := range request { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes go func() { if f.fetchingHook != nil { f.fetchingHook(hashes) } for _, hash := range hashes { headerFetchMeter.Mark(1) fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals } }() } // Schedule the next fetch if blocks are still pending f.rescheduleFetch(fetchTimer)
case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests var headers []*types.Header if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // If no headers were received, but we're expending a DAO fork check, maybe it's that if len(headers) == 0 && p.forkDrop != nil { // Possibly an empty reply to the fork header checks, sanity check TDs verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If // the peer's ahead of this, it too must have a reply to the DAO check if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { verifyDAO = false } } // If we're seemingly on the same chain, disable the drop timer if verifyDAO { p.Log().Debug("Seems to be on the same side of the DAO fork") p.forkDrop.Stop() p.forkDrop = nil return nil } } // Filter out any explicitly requested headers, deliver the rest to the downloader filter := len(headers) == 1 if filter { // If it's a potential DAO fork check, validate against the rules if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { // Disable the fork drop timer p.forkDrop.Stop() p.forkDrop = nil
// Validate the header and either drop the peer or continue if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") return err } p.Log().Debug("Verified to be on the same side of the DAO fork") return nil } // Irrelevant of the fork checks, send the header to the fetcher just in case headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) } }
// Send the filter channel to the fetcher filter := make(chan *headerFilterTask)
select { case f.headerFilter <- filter: case <-f.quit: return nil } // Request the filtering of the header list select { case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: case <-f.quit: return nil } // Retrieve the headers remaining after filtering select { case task := <-filter: return task.headers case <-f.quit: return nil } }
case filter := <-f.headerFilter: // Headers arrived from a remote peer. Extract those that were explicitly // requested by the fetcher, and return everything else so it's delivered // to other parts of the system. var task *headerFilterTask select { case task = <-filter: case <-f.quit: return } headerFilterInMeter.Mark(int64(len(task.headers)))
// Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} for _, header := range task.headers { hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) f.dropPeer(announce.origin) f.forgetHash(hash) continue } // Only keep if not imported by other means if f.getBlock(hash) == nil { announce.header = header announce.time = task.time
// If the block is empty (header only), short circuit into the final import queue if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
case <-completeTimer.C: // At least one header's timer ran out, retrieve everything request := make(map[string][]common.Hash)
for hash, announces := range f.fetched { // Pick a random peer to retrieve from, reset all others announce := announces[rand.Intn(len(announces))] f.forgetHash(hash)
// If the block still didn't arrive, queue for completion if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.completing[hash] = announce } } // Send out all block body requests for peer, hashes := range request { log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread if f.completingHook != nil { f.completingHook(hashes) } bodyFetchMeter.Mark(int64(len(hashes))) go f.completing[hashes[0]].fetchBodies(hashes) } // Schedule the next fetch if blocks are still pending f.rescheduleComplete(completeTimer)
// Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes switch err := f.verifyHeader(block.Header()); err { case nil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock: // Weird future block, don't fail, but neither propagate
default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false)
// Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(block) } }() }
// Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight(p) if err != nil { return err } height := latest.Number.Uint64()
// Request the advertised remote head block and wait for the response head, _ := p.peer.Head() go p.peer.RequestHeadersByHash(head, 1, 0, false)
ttl := d.requestTTL() timeout := time.After(ttl) for { select { case <-d.cancelCh: return nil, errCancelBlockFetch
case packet := <-d.headerCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) != 1 { p.log.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } head := headers[0] p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil
case <-timeout: p.log.Debug("Waiting for head header timed out", "elapsed", ttl) return nil, errTimeout
case <-d.bodyCh: case <-d.receiptCh: // Out of bounds delivery, ignore } } }
func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
if d.mode == FullSync { ceil = d.blockchain.CurrentBlock().NumberU64() } else if d.mode == FastSync { ceil = d.blockchain.CurrentFastBlock().NumberU64() } if ceil >= MaxForkAncestry { floor = int64(ceil - MaxForkAncestry) } p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
// Request the topmost blocks to short circuit binary ancestor lookup head := ceil if head > height { head = height } from := int64(head) - int64(MaxHeaderFetch) if from < 0 { from = 0 } // Span out with 15 block gaps into the future to catch bad head reports limit := 2 * MaxHeaderFetch / 16 count := 1 + int((int64(ceil)-from)/16) if count > limit { count = limit } go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
// Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{}
ttl := d.requestTTL() timeout := time.After(ttl)
for finished := false; !finished; { select { case <-d.cancelCh: return 0, errCancelHeaderFetch
case packet := <-d.headerCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) == 0 { p.log.Warn("Empty head header set") return 0, errEmptyHeaderSet } // Make sure the peer's reply conforms to the request for i := 0; i < len(headers); i++ { if number := headers[i].Number.Int64(); number != from+int64(i)*16 { p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) return 0, errInvalidChain } } // Check if a common ancestor was found finished = true for i := len(headers) - 1; i >= 0; i-- { // Skip any headers that underflow/overflow our requested set if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil { continue } // Otherwise check if we already know the header or not if (d.mode == FullSync && d.blockchain.HasBlock(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head if number > height && i == limit-1 { p.log.Warn("Lied about chain head", "reported", height, "found", number) return 0, errStallingPeer } break } }
case <-timeout: p.log.Debug("Waiting for head header timed out", "elapsed", ttl) return 0, errTimeout
case <-d.bodyCh: case <-d.receiptCh: // Out of bounds delivery, ignore } } // If the head fetch already found an ancestor, return if !common.EmptyHash(hash) { if int64(number) <= floor { p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) return 0, errInvalidAncestor } p.log.Debug("Found common ancestor", "number", number, "hash", hash) return number, nil } // Ancestor not found, we need to binary search over our chain start, end := uint64(0), head if floor > 0 { start = uint64(floor) } for start+1 < end { // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2
ttl := d.requestTTL() timeout := time.After(ttl)
go p.peer.RequestHeadersByNumber(check, 1, 0, false)
// Wait until a reply arrives to this request for arrived := false; !arrived; { select { case <-d.cancelCh: return 0, errCancelHeaderFetch
case packer := <-d.headerCh: // Discard anything not from the origin peer if packer.PeerId() != p.id { log.Debug("Received headers from incorrect peer", "peer", packer.PeerId()) break } // Make sure the peer actually gave something valid headers := packer.(*headerPack).headers if len(headers) != 1 { p.log.Debug("Multiple headers for single request", "headers", len(headers)) return 0, errBadPeer } arrived = true
// Modify the search interval based on the response if (d.mode == FullSync && !d.blockchain.HasBlock(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { end = check break } header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, errBadPeer } start = check
case <-timeout: p.log.Debug("Waiting for search header timed out", "elapsed", ttl) return 0, errTimeout
case <-d.bodyCh: case <-d.receiptCh: // Out of bounds delivery, ignore } } } // Ensure valid ancestry and return if int64(start) <= floor { p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor) return 0, errInvalidAncestor } p.log.Debug("Found common ancestor", "number", start, "hash", hash) return start, nil }
// Create a timeout timer, and the associated header fetcher skeleton := true // Skeleton assembly phase or finishing up request := time.Now() // time of the last skeleton fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer <-timeout.C // timeout channel should be initially empty defer timeout.Stop()
if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) } } // Start pulling the header chain skeleton until all is done getHeaders(from)
for { select { case <-d.cancelCh: return errCancelHeaderFetch
case packet := <-d.headerCh: // Make sure the active peer is giving us the skeleton headers if packet.PeerId() != p.id { log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop()
// If the skeleton's finished, pull any remaining head headers directly from the origin if packet.Items() == 0 && skeleton { skeleton = false getHeaders(from) continue } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { // Don't abort header fetches while the pivot is downloading if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCancelHeaderFetch } } // Pivot done (or not in fast sync) and no more headers, terminate the process p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil case <-d.cancelCh: return errCancelHeaderFetch } } headers := packet.(*headerPack).headers
// If we received a skeleton batch, resolve internals concurrently if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] from += uint64(proced) } // Insert all the new headers and fetch the next batch if len(headers) > 0 { p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) select { case d.headerProcCh <- headers: case <-d.cancelCh: return errCancelHeaderFetch } from += uint64(len(headers)) } getHeaders(from)
case <-timeout.C: if d.dropPeer == nil { // The dropPeer method is nil when `--copydb` is used for a local copy. // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id) break } // Header retrieval timed out, consider the peer bad and drop p.log.Debug("Header request timed out", "elapsed", ttl) headerTimeoutMeter.Mark(1) d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } } select { case d.headerProcCh <- nil: case <-d.cancelCh: } return errBadPeer } } }
a) 把skeleton的headers加入queue.ScheduleSkeleton调度队列, b) 然后执行d.fetchParts()方法。 d.fetchParts()方法主要做了这几件事情 1,对收到的headers执行d.queue.DeliverHeaders()方法。 2,如果d.queue.PendingHeaders有pending的headers,调用d.peers.HeaderIdlePeers获取到idle的peers 3,调用d.queue.ReserveHeaders把pending的headers储备到idle的peers里面 4,用idle的peers调用p.FetchHeaders(req.From, MaxHeaderFetch)去获取headers c) 最后执行d.queue.RetrieveHeaders(),获取到filled进去的headers
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back rollback := []*types.Header{} defer func() { if len(rollback) > 0 { // Flatten the headers and roll them back hashes := make([]common.Hash, len(rollback)) for i, header := range rollback { hashes[i] = header.Hash() } lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 if d.mode != LightSync { lastFastBlock = d.blockchain.CurrentFastBlock().Number() lastBlock = d.blockchain.CurrentBlock().Number() } d.lightchain.Rollback(hashes) curFastBlock, curBlock := common.Big0, common.Big0 if d.mode != LightSync { curFastBlock = d.blockchain.CurrentFastBlock().Number() curBlock = d.blockchain.CurrentBlock().Number() } log.Warn("Rolled back headers", "count", len(hashes), "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) } }()
// Wait for batches of headers to process gotHeaders := false
for { select { case <-d.cancelCh: return errCancelHeaderProcessing
case headers := <-d.headerProcCh: // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } } if d.mode != LightSync { head := d.blockchain.CurrentBlock() if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { return errStallingPeer } } if d.mode == FastSync || d.mode == LightSync { head := d.lightchain.CurrentHeader() if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { return errStallingPeer } } // Disable any rollback and return rollback = nil return nil } // Otherwise split the chunk of headers into batches and process them gotHeaders = true
for len(headers) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: return errCancelHeaderProcessing default: } // Select the next chunk of headers to import limit := maxHeadersProcess if limit > len(headers) { limit = len(headers) } chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { // Collect the yet unknown headers to mark them as uncertain unknown := make([]*types.Header, 0, len(headers)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { unknown = append(unknown, header) } } // If we're importing pure headers, verify based on their recentness frequency := fsHeaderCheckFrequency if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { rollback = append(rollback, chunk[:n]...) } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) return errInvalidChain } // All verifications passed, store newly found uncertain headers rollback = append(rollback, unknown...) if len(rollback) > fsHeaderSafetyNet { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { select { case <-d.cancelCh: return errCancelHeaderProcessing case <-time.After(time.Second): } } // Otherwise insert the headers for content retrieval inserts := d.queue.Schedule(chunk, origin) if len(inserts) != len(chunk) { log.Debug("Stale headers") return errBadPeer } } headers = headers[limit:] origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } } } } }
func (d *Downloader) processFastSyncContent(latest *types.Header) error { // Start syncing state of the reported head block. This should get us most of // the state of the pivot block. stateSync := d.syncState(latest.Root) defer stateSync.Cancel() go func() { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { d.queue.Close() // wake up WaitResults } }() // Figure out the ideal pivot block. Note, that this goalpost may move if the // sync takes long enough for the chain head to move significantly. pivot := uint64(0) if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { pivot = height - uint64(fsMinFullBlocks) } // To cater for moving pivot points, track the pivot block and subsequently // accumulated download results separatey. var ( oldPivot *fetchResult // Locked in pivot block, might change eventually oldTail []*fetchResult // Downloaded content after the pivot ) for { // Wait for the next batch of downloaded data to be available, and if the pivot // block became stale, move the goalpost results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness if len(results) == 0 { // If pivot sync is done, stop if oldPivot == nil { return stateSync.Cancel() } // If sync failed, stop select { case <-d.cancelCh: return stateSync.Cancel() default: } } if d.chainInsertHook != nil { d.chainInsertHook(results) } if oldPivot != nil { results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) } // Split around the pivot block and process the two sides via fast/full sync if atomic.LoadInt32(&d.committed) == 0 { latest = results[len(results)-1].Header if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) pivot = height - uint64(fsMinFullBlocks) } } P, beforeP, afterP := splitAroundPivot(pivot, results) if err := d.commitFastSyncData(beforeP, stateSync); err != nil { return err } if P != nil { // If new pivot block found, cancel old state retrieval and restart if oldPivot != P { stateSync.Cancel()
stateSync = d.syncState(P.Header.Root) defer stateSync.Cancel() go func() { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { d.queue.Close() // wake up WaitResults } }() oldPivot = P } // Wait for completion, occasionally checking for pivot staleness select { case <-stateSync.done: if stateSync.err != nil { return stateSync.err } if err := d.commitPivotBlock(P); err != nil { return err } oldPivot = nil
case <-time.After(time.Second): oldTail = afterP continue } } // Fast sync done, pivot commit done, full import if err := d.importBlockResults(afterP); err != nil { return err } } }