1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
| func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
return s.bh.Handle(&broadcastMsgTracer{
AtomicBroadcast_BroadcastServer: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Broadcast",
},
})
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
resp := bh.ProcessMessage(msg, addr)
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
tracker := &MetricsTracker{
ChannelID: "unknown",
TxType: "unknown",
Metrics: bh.Metrics,
}
defer func() {
// This looks a little unnecessary, but if done directly as
// a defer, resp gets the (always nil) current state of resp
// and not the return value
tracker.Record(resp)
}()
tracker.BeginValidate()
chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
if chdr != nil {
tracker.ChannelID = chdr.ChannelId
tracker.TxType = cb.HeaderType(chdr.Type).String()
}
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
}
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()
tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
// [x] configSeq,创建两个不同名通道:SystemChannel 的 configSeq 都是 0
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()
tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Configure(config, configSeq) // 交到共识处理
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
}
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}
// ProcessConfigUpdateMsg handles messages of type CONFIG_UPDATE either for the system channel itself
// or, for channel creation. In the channel creation case, the CONFIG_UPDATE is wrapped into a resulting
// ORDERER_TRANSACTION, and in the standard CONFIG_UPDATE case, a resulting CONFIG message
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
if channelID == s.support.ChannelID() {
// 处理系统通道自身的配置更新;明确指定 StandardChannel 的 ProcessConfigUpdateMsg 方法
return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
type SystemChannel struct {
*StandardChannel
templator ChannelConfigTemplator
}
}
logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChannelID())
// If the channel ID does not match the system channel, then this must be a channel creation transaction
bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
// 包装两次
newChannelEnvConfig, err := protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
wrappedOrdererTransaction, err := protoutil.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChannelID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
return wrappedOrdererTransaction, s.support.Sequence(), nil
}
func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
configUpdatePayload, err := protoutil.UnmarshalPayload(envConfigUpdate.Payload)
configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
}
bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
ChannelGroup: channelGroup,
}, dt.bccsp)
}
// 校验逻辑
func (vi *ValidatorImpl) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
return vi.proposeConfigUpdate(configtx)
}
func (vi *ValidatorImpl) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
configUpdateEnv, err := protoutil.EnvelopeToConfigUpdate(configtx)
if err != nil {
return nil, errors.Errorf("error converting envelope to config update: %s", err)
}
configMap, err := vi.authorizeUpdate(configUpdateEnv)
err = vi.verifyReadSet(readSet)
if err != nil {
return nil, errors.Errorf("error authorizing update: %s", err)
}
channelGroup, err := configMapToConfig(configMap, vi.namespace)
if err != nil {
return nil, errors.Errorf("could not turn configMap back to channelGroup: %s", err)
}
return &cb.ConfigEnvelope{
Config: &cb.Config{
Sequence: vi.sequence + 1, // 这里 +1
ChannelGroup: channelGroup,
},
LastUpdate: configtx,
}, nil
}
func (vi *ValidatorImpl) verifyReadSet(readSet map[string]comparable) error {
for key, value := range readSet {
existing, ok := vi.configMap[key]
if !ok {
return errors.Errorf("existing config does not contain element for %s but was in the read set", key)
}
if existing.version() != value.version() {
return errors.Errorf("proposed update requires that key %s be at version %d, but it is currently at version %d", key, value.version(), existing.version())
} else {
log.Printf("proposed update requires that key %s be at version %d, currently at version %d", key, value.version(), existing.version())
}
}
return nil
}
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
chdr, err := protoutil.ChannelHeader(msg)
if err != nil {
return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
}
cs := r.GetChain(chdr.ChannelId)
// New channel creation
if cs == nil {
sysChan := r.SystemChannel()
if sysChan == nil {
return nil, false, nil, errors.New("channel creation request not allowed because the orderer system channel is not defined")
}
cs = sysChan // 创建通道 path(ChainSupport 为 SystemChannel)
}
isConfig := false
switch cs.ClassifyMsg(chdr) {
case msgprocessor.ConfigUpdateMsg: // 创建通道 path;更新锚节点 path(HeaderType_CONFIG_UPDATE 被 ClassifyMsg 归类为 ConfigUpdateMsg)
isConfig = true
case msgprocessor.ConfigMsg:
return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
default:
}
return chdr, isConfig, cs, nil
}
func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) Classification {
switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
return ConfigUpdateMsg
case int32(cb.HeaderType_ORDERER_TRANSACTION):
// In order to maintain backwards compatibility, we must classify these messages
return ConfigMsg
case int32(cb.HeaderType_CONFIG):
// In order to maintain backwards compatibility, we must classify these messages
return ConfigMsg
default:
return NormalMsg
}
}
// Classification represents the possible message types for the system.
type Classification int
const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg Classification = iota
// ConfigUpdateMsg indicates messages of type CONFIG_UPDATE.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
// ConfigMsg indicates message of type ORDERER_TRANSACTION or CONFIG.
// Messages of this type should be processed by ProcessConfigMsg
ConfigMsg
)
// inside func (bw *BlockWriter) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) {
switch chdr.Type {
case int32(cb.HeaderType_ORDERER_TRANSACTION):
bw.registrar.newChain(newChannelConfig)
func (r *Registrar) newChain(configtx *cb.Envelope) {
ledgerResources, err := r.newLedgerResources(configtx)
// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
block := blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})
ledgerResources.Append(block)
ledgerResources.WriteConfigBlockToSpecFile(block)
}
cs, err := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
chainID := ledgerResources.ConfigtxValidator().ChannelID()
r.chains[chainID] = cs
logger.Infof("Created and starting new channel %s, sequence %d, /CHANNEL/APPLICATION version: %d\n", chainID, ledgerResources.ConfigtxValidator().Sequence(), ledgerResources.ConfigtxValidator().ConfigProto().ChannelGroup.Groups["Application"].Version) // sequence 是 1,version 是 1
cs.start()
}
// 系统通道初始 sequence 是 0;应用通道初始 sequence 是系统通道的 sequence + 1
// [x] 如果是应用通道更新配置交易,由应用通道处理;新 sequence 是应用通道的旧 sequence + 1
case int32(cb.HeaderType_CONFIG):
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
err = bw.support.Validate(configEnvelope)
bundle, err := bw.support.CreateBundle(chdr.ChannelId, configEnvelope.Config) // HeaderType_CONFIG 类型
func (cr *configResources) CreateBundle(channelID string, config *cb.Config) (*channelconfig.Bundle, error) {
return channelconfig.NewBundle(channelID, config, cr.bccsp)
}
func NewBundle(channelID string, config *cb.Config, bccsp bccsp.BCCSP) (*Bundle, error) {
configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager) // 这里用 config 里已经更新的 sequence 创建新的 configtxManager
// 等待 WriteBlock 里的协程(有读取 Sequence 操作)结束后再更新 Sequence
// Avoid Bundle update before the go-routine in WriteBlock() finished writing the previous block.
// We do this (in particular) to prevent bw.support.Sequence() from advancing before the go-routine reads it.
// In general, this prevents the StableBundle from changing before the go-routine in WriteBlock() finishes.
bw.committingBlock.Lock()
bw.committingBlock.Unlock()
bw.support.Update(bundle)
func (cr *configResources) Update(bndl *channelconfig.Bundle) {
checkResourcesOrPanic(bndl)
cr.mutableResources.Update(bndl)
}
// // [x] version 何时被更新
// from 1: r.newLedgerResources(configTx), channelconfig.NewBundleSource(bundle, callbacksCopy...):用新 bundle 构造出 ChainSupport,加入 chains 的 map
// from 2: func (bw *BlockWriter) WriteConfigBlock, bw.support.Update(bundle)
func (bs *BundleSource) Update(newBundle *Bundle) {
b, ok := bs.bundle.Load().(*Bundle)
if ok && b != nil {
cp := b.ConfigtxValidator().ConfigProto()
log.Printf("before update: channel name %s\n%d\n", b.ConfigtxValidator().ChannelID(), cp.ChannelGroup.Groups["Application"].Version)
} else {
log.Println(ok, b)
}
bs.bundle.Store(newBundle)
for _, callback := range bs.callbacks {
callback(newBundle)
}
nb := bs.bundle.Load().(*Bundle)
cp := nb.ConfigtxValidator().ConfigProto()
if v, ok := cp.ChannelGroup.Groups["Application"]; ok {
log.Printf("after update: channel name %s, version %d\n", nb.ConfigtxValidator().ChannelID(), v.Version)
}
}
bw.WriteBlock(block, encodedMetadataValue)
// WriteBlock should be invoked for blocks which contain normal transactions.
// It sets the target block as the pending next block, and returns before it is committed.
// Before returning, it acquires the committing lock, and spawns a go routine which will
// annotate the block with metadata and signatures, and write the block to the ledger
// then release the lock. This allows the calling thread to begin assembling the next block
// before the commit phase is complete.
func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
bw.committingBlock.Lock()
bw.lastBlock = block
go func() {
defer bw.committingBlock.Unlock()
bw.commitBlock(encodedMetadataValue)
}()
}
// commitBlock should only ever be invoked with the bw.committingBlock held
// this ensures that the encoded config sequence numbers stay in sync
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
bw.addLastConfig(bw.lastBlock)
bw.addBlockSignature(bw.lastBlock, encodedMetadataValue)
err := bw.support.Append(bw.lastBlock)
if err != nil {
logger.Panicf("[channel: %s] Could not append block: %s", bw.support.ChannelID(), err)
}
logger.Debugf("[channel: %s] Wrote block [%d]", bw.support.ChannelID(), bw.lastBlock.GetHeader().Number)
}
func (bw *BlockWriter) addLastConfig(block *cb.Block) {
configSeq := bw.support.Sequence()
if configSeq > bw.lastConfigSeq {
logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfigBlockNum from %d to %d", bw.support.ChannelID(), bw.lastConfigSeq, configSeq, bw.lastConfigBlockNum, block.Header.Number)
bw.lastConfigBlockNum = block.Header.Number
bw.lastConfigSeq = configSeq
}
lastConfigValue := protoutil.MarshalOrPanic(&cb.LastConfig{Index: bw.lastConfigBlockNum})
logger.Debugf("[channel: %s] About to write block, setting its LAST_CONFIG to %d", bw.support.ChannelID(), bw.lastConfigBlockNum)
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
Value: lastConfigValue,
})
}
|