TL;DR

对于系统通道而言,创建通道交易只是普通类型的交易,不是系统通道的更新通道配置交易。

bundle.ConfigtxValidator().Sequence() 返回 sequence 值;configtx.NewValidatorImpl() 赋值 sequence

系统通道初始 sequence 是 0;应用通道初始 sequence 是系统通道(创建应用通道由系统通道处理)当下的 sequence 加 1。如果是应用通道更新配置的交易,由应用通道处理,新 sequence 是应用通道的旧 sequence 加 1。

应用通道的 /Channel/Application 的初始 version 是 1。

流程分析

orderer 启动流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
lf, _, err := createLedgerFactory(conf, metricsProvider)
    func NewProvider(conf *Conf, indexConfig *IndexConfig, metricsProvider metrics.Provider) (*BlockStoreProvider, error) {
// 得到 blockledger.Factory lf,字段 blkstorageProvider 是 BlockStoreProvider(底层是 levelDB)

// 初次启动时 lf.ChannelIDs() 为空(数子目录的个数得到通道的个数),初始化账本;非初次启动仅打印日志
initializeBootstrapChannel(bootstrapBlock, lf)
    gl, err := lf.GetOrCreate(channelID) // 走 create,返回 blockledger.ReadWriter 类型
        func (p *BlockStoreProvider) Open(ledgerid string) (*BlockStore, error) { // 创建 block store
    if err := gl.Append(genesisBlock); err != nil { // 写数据库

// 选择启动集群的区块(选系统通道的最新配置块和创世块中高度较高的)
    index, err := protoutil.GetLastConfigIndexFromBlock(lastBlock) // 创世块 LastConfig.Index 为 0

// 初始化
manager := initializeMultichannelRegistrar(
    func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
        configTx := configTx(rl) // 拿通道最新配置块的第 0 个交易(配置块只有一个交易,Envelope 类型)
        ledgerResources, err := r.newLedgerResources(configTx)
            payload, err := protoutil.UnmarshalPayload(configTx.Payload)   
            configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) 
            bundle, err := channelconfig.NewBundle(chdr.ChannelId, configEnvelope.Config, r.bccsp)
                configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager)
                    return &ValidatorImpl{
                        namespace:   namespace,
                        pm:          pm,
                        sequence:    config.Sequence, // !!! 更新 sequence
                        configMap:   configMap,
                        channelID:   channelID,
                        configProto: config,
                    }, nil
                return &Bundle{
                    policyManager:   policyManager,
                    channelConfig:   channelConfig,
                    configtxManager: configtxManager,
                }, nil

                func (b *Bundle) ConfigtxValidator() configtx.Validator {
                    return b.configtxManager
                }
            mutableResources: channelconfig.NewBundleSource(bundle, callbacksCopy...) // *BundleSource 类型,包了 bundle(atomic.Value)

        if _, ok := ledgerResources.ConsortiumsConfig(); ok { // 是系统通道时 ok 才为 true
            chain, err := newChainSupport(
            r.templator = msgprocessor.NewDefaultTemplator(chain, r.bccsp)
			chain.Processor = msgprocessor.NewSystemChannel(

			logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s, sequence %d",
			channelID, protoutil.BlockHeaderHash(genesisBlock.Header), chain.SharedConfig().ConsensusType(), chain.ConfigtxValidator().Sequence()) // sequence 是 0
            r.chains[channelID] = chain
			r.systemChannelID = channelID
			r.systemChannel = chain
            defer chain.start()
        } else {
            chain, err := newChainSupport(
            r.chains[channelID] = chain
			chain.start()
        }

// gRPC
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
if err := grpcServer.Start(); err != nil {
    // AtomicBroadcastServer is the server API for AtomicBroadcast service.
    type AtomicBroadcastServer interface {
        // broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
        Broadcast(AtomicBroadcast_BroadcastServer) error
        // deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
        Deliver(AtomicBroadcast_DeliverServer) error
    }

// ConfigEnvelope is designed to contain *_all_* configuration for a chain with no dependency
// on previous configuration transactions.
//
// It is generated with the following scheme:
//   1. Retrieve the existing configuration
//   2. Note the config properties (ConfigValue, ConfigPolicy, ConfigGroup) to be modified
//   3. Add any intermediate ConfigGroups to the ConfigUpdate.read_set (sparsely)
//   4. Add any additional desired dependencies to ConfigUpdate.read_set (sparsely)
//   5. Modify the config properties, incrementing each version by 1, set them in the ConfigUpdate.write_set
//      Note: any element not modified but specified should already be in the read_set, so may be specified sparsely
//   6. Create ConfigUpdate message and marshal it into ConfigUpdateEnvelope.update and encode the required signatures
//     a) Each signature is of type ConfigSignature
//     b) The ConfigSignature signature is over the concatenation of signature_header and the ConfigUpdate bytes (which includes a ChainHeader)
//   5. Submit new Config for ordering in Envelope signed by submitter
//     a) The Envelope Payload has data set to the marshaled ConfigEnvelope
//     b) The Envelope Payload has a header of type Header.Type.CONFIG_UPDATE
//
// The configuration manager will verify:
//   1. All items in the read_set exist at the read versions
//   2. All items in the write_set at a different version than, or not in, the read_set have been appropriately signed according to their mod_policy
//   3. The new configuration satisfies the ConfigSchema
type ConfigEnvelope struct {
	Config               *Config   `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"`
	LastUpdate           *Envelope `protobuf:"bytes,2,opt,name=last_update,json=lastUpdate,proto3" json:"last_update,omitempty"`
}
    type Config struct {
        Sequence             uint64       `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
        ChannelGroup         *ConfigGroup `protobuf:"bytes,2,opt,name=channel_group,json=channelGroup,proto3" json:"channel_group,omitempty"`
    }

orderer 处理创建应用通道交易

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
    return s.bh.Handle(&broadcastMsgTracer{
		AtomicBroadcast_BroadcastServer: srv,
		msgTracer: msgTracer{
			debug:    s.debug,
			function: "Broadcast",
		},
	})
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		msg, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}
		resp := bh.ProcessMessage(msg, addr)
		err = srv.Send(resp)
		if resp.Status != cb.Status_SUCCESS {
			return err
		}
		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}
	}
}
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
	tracker := &MetricsTracker{
		ChannelID: "unknown",
		TxType:    "unknown",
		Metrics:   bh.Metrics,
	}
	defer func() {
		// This looks a little unnecessary, but if done directly as
		// a defer, resp gets the (always nil) current state of resp
		// and not the return value
		tracker.Record(resp)
	}()
	tracker.BeginValidate()
	chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
	if chdr != nil {
		tracker.ChannelID = chdr.ChannelId
		tracker.TxType = cb.HeaderType(chdr.Type).String()
	}
	if err != nil {
		logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
		return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
	}
	if !isConfig {
		logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
		configSeq, err := processor.ProcessNormalMsg(msg)
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
		}
		tracker.EndValidate()
		tracker.BeginEnqueue()
		if err = processor.WaitReady(); err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
		err = processor.Order(msg, configSeq)
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
	} else { // isConfig
		logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
        // [x] configSeq,创建两个不同名通道:SystemChannel 的 configSeq 都是 0
		config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
		}
		tracker.EndValidate()
		tracker.BeginEnqueue()
		if err = processor.WaitReady(); err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
		err = processor.Configure(config, configSeq) // 交到共识处理
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
	}
	logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
	return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

// ProcessConfigUpdateMsg handles messages of type CONFIG_UPDATE either for the system channel itself
// or, for channel creation.  In the channel creation case, the CONFIG_UPDATE is wrapped into a resulting
// ORDERER_TRANSACTION, and in the standard CONFIG_UPDATE case, a resulting CONFIG message
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    if channelID == s.support.ChannelID() {
		// 处理系统通道自身的配置更新;明确指定 StandardChannel 的 ProcessConfigUpdateMsg 方法
		return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate) 

            type SystemChannel struct {
                *StandardChannel
                templator ChannelConfigTemplator
            }
	}

	logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChannelID())
	// If the channel ID does not match the system channel, then this must be a channel creation transaction
	bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
	// 包装两次
    newChannelEnvConfig, err := protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    wrappedOrdererTransaction, err := protoutil.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChannelID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    return wrappedOrdererTransaction, s.support.Sequence(), nil
}

func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
	configUpdatePayload, err := protoutil.UnmarshalPayload(envConfigUpdate.Payload)
	configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
	configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
	if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
		return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
	}
	bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
		ChannelGroup: channelGroup,
	}, dt.bccsp)
}

// 校验逻辑
func (vi *ValidatorImpl) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
	return vi.proposeConfigUpdate(configtx)
}
func (vi *ValidatorImpl) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
	configUpdateEnv, err := protoutil.EnvelopeToConfigUpdate(configtx)
	if err != nil {
		return nil, errors.Errorf("error converting envelope to config update: %s", err)
	}
	configMap, err := vi.authorizeUpdate(configUpdateEnv)
	err = vi.verifyReadSet(readSet)
	if err != nil {
		return nil, errors.Errorf("error authorizing update: %s", err)
	}
	channelGroup, err := configMapToConfig(configMap, vi.namespace)
	if err != nil {
		return nil, errors.Errorf("could not turn configMap back to channelGroup: %s", err)
	}
	return &cb.ConfigEnvelope{
		Config: &cb.Config{
			Sequence:     vi.sequence + 1, // 这里 +1
			ChannelGroup: channelGroup,
		},
		LastUpdate: configtx,
	}, nil
}
func (vi *ValidatorImpl) verifyReadSet(readSet map[string]comparable) error {
	for key, value := range readSet {
		existing, ok := vi.configMap[key]
		if !ok {
			return errors.Errorf("existing config does not contain element for %s but was in the read set", key)
		}
		if existing.version() != value.version() {
			return errors.Errorf("proposed update requires that key %s be at version %d, but it is currently at version %d", key, value.version(), existing.version())
		} else {
			log.Printf("proposed update requires that key %s be at version %d, currently at version %d", key, value.version(), existing.version())
		}
	}
	return nil
}

// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
	chdr, err := protoutil.ChannelHeader(msg)
	if err != nil {
		return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
	}
	cs := r.GetChain(chdr.ChannelId)
	// New channel creation
	if cs == nil {
		sysChan := r.SystemChannel()
		if sysChan == nil {
			return nil, false, nil, errors.New("channel creation request not allowed because the orderer system channel is not defined")
		}
		cs = sysChan    // 创建通道 path(ChainSupport 为 SystemChannel)
	}
	isConfig := false
	switch cs.ClassifyMsg(chdr) {
	case msgprocessor.ConfigUpdateMsg: // 创建通道 path;更新锚节点 path(HeaderType_CONFIG_UPDATE 被 ClassifyMsg 归类为 ConfigUpdateMsg)
		isConfig = true
	case msgprocessor.ConfigMsg:
		return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
	default:
	}
	return chdr, isConfig, cs, nil
}
	func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) Classification {
		switch chdr.Type {
		case int32(cb.HeaderType_CONFIG_UPDATE):
			return ConfigUpdateMsg
		case int32(cb.HeaderType_ORDERER_TRANSACTION):
			// In order to maintain backwards compatibility, we must classify these messages
			return ConfigMsg
		case int32(cb.HeaderType_CONFIG):
			// In order to maintain backwards compatibility, we must classify these messages
			return ConfigMsg
		default:
			return NormalMsg
		}
	}
    // Classification represents the possible message types for the system.
    type Classification int
    const (
        // NormalMsg is the class of standard (endorser or otherwise non-config) messages.
        // Messages of this type should be processed by ProcessNormalMsg.
        NormalMsg Classification = iota
        // ConfigUpdateMsg indicates messages of type CONFIG_UPDATE.
        // Messages of this type should be processed by ProcessConfigUpdateMsg.
        ConfigUpdateMsg
        // ConfigMsg indicates message of type ORDERER_TRANSACTION or CONFIG.
        // Messages of this type should be processed by ProcessConfigMsg
        ConfigMsg
    )

// inside func (bw *BlockWriter) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) {
switch chdr.Type {
case int32(cb.HeaderType_ORDERER_TRANSACTION):
	bw.registrar.newChain(newChannelConfig) 
	func (r *Registrar) newChain(configtx *cb.Envelope) {
		ledgerResources, err := r.newLedgerResources(configtx)
		// If we have no blocks, we need to create the genesis block ourselves.
		if ledgerResources.Height() == 0 {
			block := blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})
			ledgerResources.Append(block)
			ledgerResources.WriteConfigBlockToSpecFile(block)
		}
		cs, err := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
		chainID := ledgerResources.ConfigtxValidator().ChannelID()
		r.chains[chainID] = cs
		logger.Infof("Created and starting new channel %s, sequence %d, /CHANNEL/APPLICATION version: %d\n", chainID, ledgerResources.ConfigtxValidator().Sequence(), ledgerResources.ConfigtxValidator().ConfigProto().ChannelGroup.Groups["Application"].Version) // sequence 是 1,version 是 1
		cs.start()
	}

// 系统通道初始 sequence 是 0;应用通道初始 sequence 是系统通道的 sequence + 1
// [x] 如果是应用通道更新配置交易,由应用通道处理;新 sequence 是应用通道的旧 sequence + 1

case int32(cb.HeaderType_CONFIG):
	configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
	err = bw.support.Validate(configEnvelope)
	bundle, err := bw.support.CreateBundle(chdr.ChannelId, configEnvelope.Config) // HeaderType_CONFIG 类型
		func (cr *configResources) CreateBundle(channelID string, config *cb.Config) (*channelconfig.Bundle, error) {
			return channelconfig.NewBundle(channelID, config, cr.bccsp)
		}
			func NewBundle(channelID string, config *cb.Config, bccsp bccsp.BCCSP) (*Bundle, error) {
				configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager) // 这里用 config 里已经更新的 sequence 创建新的 configtxManager
	// 等待 WriteBlock 里的协程(有读取 Sequence 操作)结束后再更新 Sequence
	// Avoid Bundle update before the go-routine in WriteBlock() finished writing the previous block.
	// We do this (in particular) to prevent bw.support.Sequence() from advancing before the go-routine reads it.
	// In general, this prevents the StableBundle from changing before the go-routine in WriteBlock() finishes.
	bw.committingBlock.Lock()
	bw.committingBlock.Unlock()
	bw.support.Update(bundle)
		func (cr *configResources) Update(bndl *channelconfig.Bundle) {
			checkResourcesOrPanic(bndl)
			cr.mutableResources.Update(bndl)
		}
			// // [x] version 何时被更新
			// from 1: r.newLedgerResources(configTx), channelconfig.NewBundleSource(bundle, callbacksCopy...):用新 bundle 构造出 ChainSupport,加入 chains 的 map
			// from 2: func (bw *BlockWriter) WriteConfigBlock, bw.support.Update(bundle) 
			func (bs *BundleSource) Update(newBundle *Bundle) {
				b, ok := bs.bundle.Load().(*Bundle)
				if ok && b != nil {
					cp := b.ConfigtxValidator().ConfigProto()
					log.Printf("before update: channel name %s\n%d\n", b.ConfigtxValidator().ChannelID(), cp.ChannelGroup.Groups["Application"].Version)
				} else {
					log.Println(ok, b)
				}
				bs.bundle.Store(newBundle)
				for _, callback := range bs.callbacks {
					callback(newBundle)
				}
				nb := bs.bundle.Load().(*Bundle)
				cp := nb.ConfigtxValidator().ConfigProto()
				if v, ok := cp.ChannelGroup.Groups["Application"]; ok {
					log.Printf("after update: channel name %s, version %d\n", nb.ConfigtxValidator().ChannelID(), v.Version)
				}
			}
bw.WriteBlock(block, encodedMetadataValue)
	// WriteBlock should be invoked for blocks which contain normal transactions.
	// It sets the target block as the pending next block, and returns before it is committed.
	// Before returning, it acquires the committing lock, and spawns a go routine which will
	// annotate the block with metadata and signatures, and write the block to the ledger
	// then release the lock.  This allows the calling thread to begin assembling the next block
	// before the commit phase is complete.
	func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
		bw.committingBlock.Lock()
		bw.lastBlock = block
		go func() {
			defer bw.committingBlock.Unlock()
			bw.commitBlock(encodedMetadataValue)
		}()
	}

// commitBlock should only ever be invoked with the bw.committingBlock held
// this ensures that the encoded config sequence numbers stay in sync
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
	bw.addLastConfig(bw.lastBlock)
	bw.addBlockSignature(bw.lastBlock, encodedMetadataValue)
	err := bw.support.Append(bw.lastBlock)
	if err != nil {
		logger.Panicf("[channel: %s] Could not append block: %s", bw.support.ChannelID(), err)
	}
	logger.Debugf("[channel: %s] Wrote block [%d]", bw.support.ChannelID(), bw.lastBlock.GetHeader().Number)
}
func (bw *BlockWriter) addLastConfig(block *cb.Block) {
	configSeq := bw.support.Sequence()
	if configSeq > bw.lastConfigSeq {
		logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfigBlockNum from %d to %d", bw.support.ChannelID(), bw.lastConfigSeq, configSeq, bw.lastConfigBlockNum, block.Header.Number)
		bw.lastConfigBlockNum = block.Header.Number
		bw.lastConfigSeq = configSeq
	}
	lastConfigValue := protoutil.MarshalOrPanic(&cb.LastConfig{Index: bw.lastConfigBlockNum})
	logger.Debugf("[channel: %s] About to write block, setting its LAST_CONFIG to %d", bw.support.ChannelID(), bw.lastConfigBlockNum)
	block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
		Value: lastConfigValue,
	})
}

创世块创建流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// configtxgen -profile TwoOrgsOrdererGenesis -channelID system-channel -outputBlock ./system-genesis-block/genesis.block --bccsp=SW
// 通用
var profileConfig *genesisconfig.Profile
profileConfig = genesisconfig.Load(profile)
    func InitViper(v *viper.Viper, configName string) error {
	    var altPath = os.Getenv("FABRIC_CFG_PATH")

// 类别 1
if outputBlock != "" {
    if err := doOutputBlock(profileConfig, channelID, outputBlock); err != nil {

if config.Orderer == nil {
    return errors.Errorf("refusing to generate block which is missing orderer section")
}
if config.Consortiums == nil {
    logger.Warning("Genesis block does not contain a consortiums group definition.  This block cannot be used for orderer bootstrap.")
}
genesisBlock := pgen.GenesisBlockForChannel(channelID)
    // Block constructs and returns a genesis block for a given channel ID.
    func (f *factory) Block(channelID string) *cb.Block {
        payloadChannelHeader := protoutil.MakeChannelHeader(cb.HeaderType_CONFIG, msgVersion, channelID, epoch)
        payloadSignatureHeader := protoutil.MakeSignatureHeader(nil, protoutil.CreateNonceOrPanic())
        protoutil.SetTxID(payloadChannelHeader, payloadSignatureHeader)
        payloadHeader := protoutil.MakePayloadHeader(payloadChannelHeader, payloadSignatureHeader)
        payload := &cb.Payload{Header: payloadHeader, Data: protoutil.MarshalOrPanic(&cb.ConfigEnvelope{Config: &cb.Config{ChannelGroup: f.channelGroup}})} // cb.Config.Sequence 取默认值,为 0
        envelope := &cb.Envelope{Payload: protoutil.MarshalOrPanic(payload), Signature: nil}
        block := protoutil.NewBlock(0, nil)
        block.Data = &cb.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(envelope)}}
        block.Header.DataHash = protoutil.BlockDataHash(block.Data)
        block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
            Value: protoutil.MarshalOrPanic(&cb.LastConfig{Index: 0}),
        })
        block.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES] = protoutil.MarshalOrPanic(&cb.Metadata{
            Value: protoutil.MarshalOrPanic(&cb.OrdererBlockMetadata{
                LastConfig: &cb.LastConfig{Index: 0},
            }),
        })
        return block
    }
        // OrdererBlockMetadata defines metadata that is set by the ordering service.
        type OrdererBlockMetadata struct {
            LastConfig           *LastConfig `protobuf:"bytes,1,opt,name=last_config,json=lastConfig,proto3" json:"last_config,omitempty"`
            ConsenterMetadata    []byte      `protobuf:"bytes,2,opt,name=consenter_metadata,json=consenterMetadata,proto3" json:"consenter_metadata,omitempty"`
        }
            type LastConfig struct {
                Index                uint64   `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
            }

应用通道 configTx 创建流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// configtxgen -profile TwoOrgsChannel -outputCreateChannelTx ./channel-artifacts/mychannel.tx -channelID mychannel --bccsp=SW
// 类别 2
var baseProfile *genesisconfig.Profile // nil
if outputChannelCreateTx != "" {
    if err := doOutputChannelCreateTx(profileConfig, baseProfile, channelID, outputChannelCreateTx); err != nil {
        configtx, err = encoder.MakeChannelCreationTransaction(channelID, nil, conf) // conf 是 profileConfig
            template, err := DefaultConfigTemplate(conf)
            return MakeChannelCreationTransactionFromTemplate(channelID, signer, conf, template)
                return protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG_UPDATE, channelID, signer, newConfigUpdateEnv, msgVersion, epoch) // 初始为 HeaderType_CONFIG_UPDATE,func (s *SystemChannel) ProcessConfigUpdateMsg 里会重新包装消息类型
					payloadChannelHeader := MakeChannelHeader(txType, msgVersion, channelID, epoch)
		err = writeFile(outputChannelCreateTx, protoutil.MarshalOrPanic(configtx), 0640)

func NewChannelCreateConfigUpdate(channelID string, conf *genesisconfig.Profile, templateConfig *cb.ConfigGroup) (*cb.ConfigUpdate, error) {
	if conf.Application == nil {
		return nil, errors.New("cannot define a new channel with no Application section")
	}
	if conf.Consortium == "" {
		return nil, errors.New("cannot define a new channel with no Consortium value")
	}
	newChannelGroup, err := NewChannelGroup(conf)
	if err != nil {
		return nil, errors.Wrapf(err, "could not turn parse profile into channel group")
	}
	updt, err := update.Compute(&cb.Config{ChannelGroup: templateConfig}, &cb.Config{ChannelGroup: newChannelGroup})
	if err != nil {
		return nil, errors.Wrapf(err, "could not compute update")
	}

	// Add the consortium name to create the channel for into the write set as required.
	updt.ChannelId = channelID
	updt.ReadSet.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{Version: 0}
	updt.WriteSet.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
		Version: 0,
		Value: protoutil.MarshalOrPanic(&cb.Consortium{
			Name: conf.Consortium,
		}),
	}
	return updt, nil
}
    type ConfigValue struct {
        Version              uint64   `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
        Value                []byte   `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
        ModPolicy            string   `protobuf:"bytes,3,opt,name=mod_policy,json=modPolicy,proto3" json:"mod_policy,omitempty"`
    }

func Compute(original, updated *cb.Config) (*cb.ConfigUpdate, error) {
	readSet, writeSet, groupUpdated := computeGroupUpdate(original.ChannelGroup, updated.ChannelGroup)
	return &cb.ConfigUpdate{
		ReadSet:  readSet,
		WriteSet: writeSet,
	}, nil
}
func computeGroupUpdate(original, updated *cb.ConfigGroup) (readSet, writeSet *cb.ConfigGroup, updatedGroup bool) {
	return &cb.ConfigGroup{
		Version:  original.Version,
		Policies: readSetPolicies,
		Values:   readSetValues,
		Groups:   readSetGroups,
	}, &cb.ConfigGroup{
		Version:   original.Version + 1, // 这里 +1
		Policies:  writeSetPolicies,
		Values:    writeSetValues,
		Groups:    writeSetGroups,
		ModPolicy: updated.ModPolicy,
	}, true
}

更新 anchor peer

1
2
3
// 类别 3
if outputAnchorPeersUpdate != "" {
    if err := doOutputAnchorPeersUpdate(profileConfig, channelID, outputAnchorPeersUpdate, asOrg); err != nil {

应用通道创建成功后得到的配置块结构

{
	-"data": {
		-"data": [
			-{
				-"payload": {
					-"data": {
						-"config": {
							-"channel_group": {
								-"groups": {
									-"Application": {
										+"groups": { … },
										"mod_policy": "Admins",
										+"policies": { … },
										+"values": { … },
										"version": "1"
									},
									+"Orderer": { … }
								},
								"mod_policy": "Admins",
								+"policies": { … },
								+"values": { … },
								"version": "0"
							},
							"sequence": "1"
						},
						+"last_update": { … }
					},
					-"header": {
							-"channel_header": {
							"channel_id": "mychannel1",
							"epoch": "0",
							"extension": null,
							"timestamp": "2022-07-20T02:35:18Z",
							"tls_cert_hash": null,
							"tx_id": "",
							"type": 1,
							"version": 0
						},
						+"signature_header": { … }
					}
				},
				"signature": "MEUCIQC+YYLNGa1pAFHCm1njE6jVcMdSNyjRvfYuGvV8PPfY+QIgETpMBTVaUDjvdYqdkxf982C0/Q1YmTjVpf6Fk4chV9c="
			}
		]
	},
	+"header": { … },
	+"metadata": { … }
}

其它

阻塞是因为 select 循环在运行 case sn := <-c.snapC:,所以 case s := <-submitC 阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
func (c *Chain) WaitReady() error {
	if err := c.isRunning(); err != nil {
		return err
	}

	select {
	case c.submitC <- nil:
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
	}

	return nil
}

chain 的 Sequence 已更新,但还有属于旧 Sequence 的交易进来,此时会执行重新校验。

1
// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.