精通 Filecoin:Lotus真实数据处理之Provider处理存储
接上篇,当 Client 接收到用户的存储交易,创建一个 /fil/storage/mk/1.0.1 协议的流,然后通过流发送存储交易。处理这个协议的正是 HandleDealStream 方法。这个方法直接调用自身的 receiveDeal 方法进行处理。receiveDeal 方法处理如下:- 从流中读取存储提案 - Proposal对象。- proposal, err := s.ReadDealProposal() - 这里的流对象是 - dealStream对象(storagemarket/network/deal_stream.go),这个对象对原始流对象进行了封装。
- 获取 ipld node 对象。 - proposalNd, err := cborutil.AsIpld(proposal.DealProposal) 
- 生成矿工交易对象。 - deal := &storagemarket.MinerDeal{ Client: s.RemotePeer(), Miner: p.net.ID(), ClientDealProposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: storagemarket.StorageDealUnknown, Ref: proposal.Piece, }
- 调用 fsm 状态组的 - Begin的方法,生成一个状态机,并开始跟踪矿工交易对象。- err = p.deals.Begin(proposalNd.Cid(), deal) 
- 保存流对象到连接管理器中。 - err = p.conns.AddStream(proposalNd.Cid(), s) 
- 发送事件到 fsm 状态组,从而开始对交易对象进行处理。 - return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen) - 当处理机收到 - ProviderEventOpen状态事件时,因为初始状态为默认值 0,即- StorageDealUnknown,事件处理器对象经过内部处理找到对应的目的状态为- StorageDealValidating,从而调用其处理函数- ValidateDealProposal函数进行处理。
1、`ValidateDealProposal` 函数
这个函数用来验证交易提案对象。
- 调用 Lotus Provider 适配器对象的 - GetChainHead方法,获取区块链顶部 tipset key 和其高度。- tok, height, err := environment.Node().GetChainHead(ctx.Context())if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err)) }
- 验证客户发送的交易提案对象。如果验证不通过,则发送拒绝事件。 - if err := providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature); err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("verifying StorageDealProposal: %w", err)) }
- 检查交易提案中指定的矿工地址是否正确。如果不正确,则发送拒绝事件。 - proposal := deal.Proposalif proposal.Provider != environment.Address() { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("incorrect provider for deal")) }
- 检查交易指定的高度是否正确。如果不正确,则发送拒绝事件。 - if height > proposal.StartEpoch-environment.DealAcceptanceBuffer() { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("deal start epoch is too soon or deal already expired")) }
- 检查费用是否OK,如果不OK,则发送拒绝事件。 - minPrice := big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30)) if proposal.StoragePricePerEpoch.LessThan(minPrice) { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("storage price per epoch less than asking price: %s < %s", proposal.StoragePricePerEpoch, minPrice)) }
- 检查交易的大小是否匹配。如果不匹配,则发送拒绝事件。 - if proposal.PieceSize < environment.Ask().MinPieceSize { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("piece size less than minimum required size: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize)) }if proposal.PieceSize > environment.Ask().MaxPieceSize { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("piece size more than maximum allowed size: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize)) }
- 获取客户的资金。 - clientMarketBalance, err := environment.Node().GetBalance(ctx.Context(), proposal.Client, tok) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting client market balance failed: %w", err)) }
- 如果客户可用资金小于总的交易费用,则发送拒绝事件。 - if clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available too small")) }
- 如果交易是验证过的,则进行验证。 
- fsm 上下文对象的 - Trigger方法,发送事件。- return ctx.Trigger(storagemarket.ProviderEventDealDeciding) - 当状态机收到这个事件后,经过事件处理器把状态从 - StorageDealUnknown修改为- StorageDealAcceptWait,从而调用其处理函数- DecideOnProposal确定是否接收交易。
2、`DecideOnProposal` 函数
这个函数用来决定接受或拒绝交易。
- 调用环境对象的 - RunCustomDecisionLogic方法,运行自定义逻辑来验证是不接收客户交易。- accept, reason, err := environment.RunCustomDecisionLogic(ctx.Context(), deal)if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("custom deal decision logic failed: %w", err)) }
- 如果不接收,则发送拒绝事件。 - if !accept { return ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason)) }
- 调用环境对象的 - SendSignedResponse方法,发送签名的响应给客户端。- err = environment.SendSignedResponse(ctx.Context(), &network.Response{ State: storagemarket.StorageDealWaitingForData, Proposal: deal.ProposalCid, })if err != nil { return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) }- 这个方法找到对应的流,然后对响应进行签名,生成签名的响应对象,最后通过流发送响应。 
- 断开与客户端的连接。 - if err := environment.Disconnect(deal.ProposalCid); err != nil { log.Warnf("closing client connection: %+v", err) }
- 调用 fsm 上下文对象的 - Trigger方法,发送一个事件。- return ctx.Trigger(storagemarket.ProviderEventDataRequested) - 当状态机收到这个事件后,经过事件处理器把状态从 - StorageDealAcceptWait修改为- StorageDealWaitingForData,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。- 当数据开始传输时,数据传输组件发送 - ProviderEventDataTransferInitiated事件,经过事件处理器把状态从- StorageDealWaitingForData修改为- StorageDealTransferring,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。- 当数据传输完成时,数据传输组件发送 - ProviderEventDataTransferCompleted事件,经过事件处理器把状态从- StorageDealTransferring修改为- StorageDealVerifyData,从而调用其处理函数- VerifyData验证数据。
3、`VerifyData` 函数
这个函数验证接受到的数据与交易提案中的 pieceCID 相匹配。
VerifyData 函数流程如下:
- 调用环境对象的 - GeneratePieceCommitmentToFile方法,生成碎片的 CID 、碎片所在目录和元数据目录。- pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, shared.AllSelector()) - GeneratePieceCommitmentToFile方法内容如下:
- 调用文件存储对象的 - CreateTemp方法,创建一个临时文件。- f, err := pio.store.CreateTemp() 
- 生成一个清理函数。 - cleanup := func() { f.Close() _ = pio.store.Delete(f.Path()) }
- 从底层存储对象中获取指定 CID 的内容,然后写入指定文件。 - err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...) 
- 获取文件大小,即碎片大小。 - pieceSize := uint64(f.Size()) 
- 定位到文件开头位置。 - _, err = f.Seek(0, io.SeekStart) 
- 使用文件内容生成碎片 ID。 - commitment, paddedSize, err := GeneratePieceCommitment(rt, f, pieceSize) 
- 关闭文件。 - _ = f.Close() 
- 返回碎片 CID 和文件路径。 - return commitment, f.Path(), paddedSize, nil 
- 如果矿工设置了 - universalRetrievalEnabled标志,则直接调用- GeneratePieceCommitmentWithMetadata函数进行处理。- if p.p.universalRetrievalEnabled { return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector) }- universalRetrievalEnabled标志如果为真,则存储矿工会跟踪碎片中的所有 CID,因此对于所有 CID 都可以被检索,而不仅是 Root CID。
- 否则,调用 piece IO 对象的 - GeneratePieceCommitmentToFile方法进行处理。- pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector) - payloadCid表示根 Root CID。- piece IO 对象的 - GeneratePieceCommitmentToFile方法处理如下:
- 返回碎片 CID 和碎片路径。 - return pieceCid, piecePath, filestore.Path(""), err
- 验证生成的碎片 CID 和矿工交易中交易提案的碎片 CID是否一致。如果不一致,则发送拒绝事件。 - if pieceCid != deal.Proposal.PieceCID { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP")) }
3. 调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath)当状态机收到这个事件后,经过事件处理器把状态从 `StorageDealVerifyData` 修改为 `StorageDealEnsureProviderFunds`,从而调用其处理函数 `EnsureProviderFunds` 确定是否接收交易。同时,在调用处理函数之前,通过 `Action` 函数,修改矿工交易对象的 `PiecePath` 和 `MetadataPath` 两个属性。
4、`EnsureProviderFunds` 函数
这个函数用来确定矿工有足够的资金来处理当前交易。
- 获取 Lotus Provider 适配器。 - node := environment.Node() 
- 获取区块链顶部 tipset 对应的 key 和高度。 - tok, _, err := node.GetChainHead(ctx.Context())if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err)) }
- 获取矿工的 worker 地址。 - waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok)if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err)) }
- 调用 Lotus Provider 适配器的 - EnsureFunds方法,确保矿工有足够的资金来处理当前交易。- mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok)if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err)) }
- 如果返回的 - mcid是空的,那么意味着已经实时确认,则调用 fsm 上下文对象的- Trigger方法,发送一个事件。- if mcid == cid.Undef { return ctx.Trigger(storagemarket.ProviderEventFunded) }
- 否则,调用 fsm 上下文对象的 - Trigger方法,发送另一个事件。- return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid) - 当状态机收到这个事件后,经过事件处理器把状态从 - StorageDealEnsureProviderFunds修改为- StorageDealProviderFunding,从而调用其处理函数- WaitForFunding等待产一步的消息上链。同时,在调用处理函数之前,通过- Action函数,修改矿工交易对象的- PublishCid属性。
5、`WaitForFunding` 函数
这个函数用来等待消息上链。消息上链之后,调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
函数内容如下:
node := environment.Node()return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err))
    }
    if code != exitcode.Ok {
        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String()))
    }
    return ctx.Trigger(storagemarket.ProviderEventFunded)
})当状态机收到 ProviderEventFunded 这个事件后,经过事件处理器把状态从 StorageDealProviderFunding 修改为 StorageDealPublish,从而调用其处理函数 PublishDeal 把交易信息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。
6、`PublishDeal` 函数
这个函数主要用来提交交易信息上链。
- 生成矿工交易对象。 - smDeal := storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, ProposalCid: deal.ProposalCid, State: deal.State, Ref: deal.Ref, }
- 调用 Lotus Provider 适配器对象的 - PublishDeals把交易信息上链。- mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err)) }
- 调用 fsm 上下文对象的 - Trigger方法,发送事件。- return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid) - 当状态机收到这个事件后,经过事件处理器把状态从 - StorageDealPublish修改为- StorageDealPublishing,从而调用其处理函数- WaitForPublish等待交易信息上链。
7、`WaitForPublish` 函数
这个函数用来等待交易信息上链,然后给客户端发送响应,然后断开与客户端的连接。最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealPublished。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublishing 修改为 StorageDealStaged,从而调用其处理函数 HandoffDeal 开始扇区密封处理。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 ConnectionClosed 和 DealID 属性。
return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err))
    }
    if code != exitcode.Ok {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String()))
    }
    var retval market.PublishStorageDealsReturn
    err = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err))
    }    return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])
})8、`HandoffDeal` 函数
这个函数调用 miner 的 Provide 适配器的
- 使用碎片路径生成文件对象。 - file, err := environment.FileStore().Open(deal.PiecePath)if err != nil { return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err)) }
- 使用碎片文件流生成碎片流。 - paddedReader, paddedSize := padreader.New(file, uint64(file.Size())) 
- 调用 Lotus Provider 适配器对象的 - OnDealComplete方法,通知交易已经完成,从而把碎片加入某个扇区中。- err = environment.Node().OnDealComplete( ctx.Context(), storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, ProposalCid: deal.ProposalCid, State: deal.State, Ref: deal.Ref, DealID: deal.DealID, FastRetrieval: deal.FastRetrieval, PiecePath: filestore.Path(environment.FileStore().Filename(deal.PiecePath)), }, paddedSize, paddedReader, )if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) }
- 调用 fsm 上下文对象的 - Trigger方法,发送事件。- return ctx.Trigger(storagemarket.ProviderEventDealHandedOff) - 当状态机收到这个事件后,经过事件处理器把状态从 - StorageDealStaged修改为- StorageDealSealing,从而调用其处理函数- VerifyDealActivated等待扇区密封结果。
9、`VerifyDealActivated` 函数
- 生成回调函数。 - cb := func(err error) { if err != nil { _ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) } else { _ = ctx.Trigger(storagemarket.ProviderEventDealActivated) } }- 当 Lotus Provider 适配器对象检查到交易对象变化时会调用这个回调函数,从而发送相应的事件。 - 当状态机收到这个事件后,经过事件处理器把状态从 - StorageDealSealing修改为- StorageDealActive,从而调用其处理函数- RecordPieceInfo记录相关信息。
- 调用 Lotus Provider 适配器对象的 - OnDealSectorCommitted方法,等待扇区被提交。- err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb)if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) }
- 返回空。 - return nil 
9、`RecordPieceInfo` 函数
这个函数主要记录相关信息。
最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealCompleted。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealActive 修改为 StorageDealCompleted,最终结束状态机处理。
这里会删除碎片的临时文件。
本文链接:https://www.8btc.com/article/632253
转载请注明文章出处
声明:此文出于传递更多信息之目的,并不意味着赞同其观点或证实其描述。本网站所提供的信息,只供参考之用。
- 相关阅读
- 
                                        
                                        选择去中心化存储与Filecoin几大理由(上)2022-05-18 filecoin
- 
                                        
                                        【图说100问·Filecoin】第62问:什么是网络基线?2021-04-04 filecoin
- 
                                        
                                        Filecoin 中的信誉系统2021-02-19 filecoin
- 
                                        
                                        Filecoin上演大逃亡?投资者:冲高之后就开空2020-10-15 filecoin
- 
                                        
                                        千年鸽王 Filecoin 终于要上线!它其实是个国产项目2020-10-15 filecoin
 
                                 
                     
         
         
         币种收益
 币种收益
                               矿机收益
 矿机收益
                               显卡收益
 显卡收益
                               收益计算
 收益计算
                               矿池信息
 矿池信息
                               FIL排行榜
 FIL排行榜
                               
        




 矿机产品
矿机产品 算力挖矿
算力挖矿 
         
                             
                             
                             
                             
                            




 
                             
                             
                             
                             
                             
 