新注册的用户请输入邮箱并保存,随后登录邮箱激活账号。后续可直接使用邮箱登录!

Commit 4f7df435 authored by Ramboll.Wong's avatar Ramboll.Wong

fix issues#2: Recode simple protocol mgr

parent b65e9893
......@@ -305,14 +305,11 @@ func (p *protocolExchanger) PushProtocols(pid peer.ID) error {
signalC := make(chan struct{}, 1)
_, loaded := p.pushSignalMap.LoadOrStore(pid, &signalC)
if loaded {
// pushing now , retry it until current pushing finished.
go func() {
time.Sleep(500 * time.Millisecond)
err := p.PushProtocols(pid)
if err != nil {
p.logger.Warnf("[ProtocolExchanger][PushProtocols] push failed, (remote pid: %s)", pid)
}
}()
time.Sleep(time.Second)
err := p.PushProtocols(pid)
if err != nil {
p.logger.Warnf("[ProtocolExchanger][PushProtocols] push failed, (remote pid: %s)", pid)
}
return nil
}
defer p.pushSignalMap.Delete(pid)
......@@ -336,9 +333,9 @@ var _ mgr.ProtocolManager = (*simpleProtocolMgr)(nil)
// simpleProtocolMgr is a simple implementation of mgr.ProtocolManager interface.
// ProtocolManager manages all protocol and protocol msg handler for all peers.
type simpleProtocolMgr struct {
sync.Mutex
mu sync.RWMutex
lPID peer.ID
protocolHandlers sync.Map // map[protocol.ID]host.MsgPayloadHandler
protocolHandlers map[protocol.ID]handler.MsgPayloadHandler
protocolBook store.ProtocolBook
supportedN mgr.ProtocolSupportNotifyFunc
unsupportedN mgr.ProtocolSupportNotifyFunc
......@@ -347,64 +344,80 @@ type simpleProtocolMgr struct {
// NewSimpleProtocolMgr create a new simple mgr.ProtocolManager instance.
func NewSimpleProtocolMgr(localPID peer.ID, protocolBook store.ProtocolBook) mgr.ProtocolManager {
return &simpleProtocolMgr{
mu: sync.RWMutex{},
lPID: localPID,
protocolHandlers: sync.Map{},
protocolHandlers: make(map[protocol.ID]handler.MsgPayloadHandler),
protocolBook: protocolBook,
supportedN: nil,
unsupportedN: nil,
}
}
// RegisterMsgPayloadHandler register a protocol supported by us and map a handler.MsgPayloadHandler to this protocol.
func (s *simpleProtocolMgr) RegisterMsgPayloadHandler(protocolID protocol.ID, handler handler.MsgPayloadHandler) error {
_, loaded := s.protocolHandlers.LoadOrStore(protocolID, handler)
s.mu.Lock()
defer s.mu.Unlock()
_, loaded := s.protocolHandlers[protocolID]
if loaded {
return ErrProtocolIDRegistered
}
s.protocolHandlers[protocolID] = handler
s.protocolBook.AddProtocol(s.lPID, protocolID)
return nil
}
// UnregisterMsgPayloadHandler unregister a protocol supported by us.
func (s *simpleProtocolMgr) UnregisterMsgPayloadHandler(protocolID protocol.ID) error {
_, loaded := s.protocolHandlers.LoadAndDelete(protocolID)
s.mu.Lock()
defer s.mu.Unlock()
_, loaded := s.protocolHandlers[protocolID]
if !loaded {
return ErrProtocolIDNotRegistered
}
delete(s.protocolHandlers, protocolID)
s.protocolBook.DeleteProtocol(s.lPID, protocolID)
return nil
}
// IsRegistered return whether a protocol given is supported by us.
func (s *simpleProtocolMgr) IsRegistered(protocolID protocol.ID) bool {
_, ok := s.protocolHandlers.Load(protocolID)
return ok
s.mu.RLock()
defer s.mu.RUnlock()
_, loaded := s.protocolHandlers[protocolID]
return loaded
}
// GetHandler return the handler.MsgPayloadHandler mapped to the protocol supported by us and id is the given.
// If the protocol not supported by us, return nil.
func (s *simpleProtocolMgr) GetHandler(protocolID protocol.ID) handler.MsgPayloadHandler {
h, _ := s.protocolHandlers.Load(protocolID)
return h.(handler.MsgPayloadHandler)
s.mu.RLock()
defer s.mu.RUnlock()
return s.protocolHandlers[protocolID]
}
// GetSelfSupportedProtocols return a list of protocol.ID that supported by ourself.
func (s *simpleProtocolMgr) GetSelfSupportedProtocols() []protocol.ID {
s.mu.RLock()
defer s.mu.RUnlock()
res := make([]protocol.ID, 0)
s.protocolHandlers.Range(func(key, _ interface{}) bool {
protocolID, _ := key.(protocol.ID)
for protocolID := range s.protocolHandlers {
res = append(res, protocolID)
return true
})
}
return res
}
// IsPeerSupported return whether the protocol is supported by peer which id is the given pid.
// If peer not connected to us, return false.
func (s *simpleProtocolMgr) IsPeerSupported(pid peer.ID, protocolID protocol.ID) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.protocolBook.ContainsProtocol(pid, protocolID)
}
// GetPeerSupportedProtocols return a list of protocol.ID that supported by the peer which id is the given pid.
func (s *simpleProtocolMgr) GetPeerSupportedProtocols(pid peer.ID) []protocol.ID {
s.mu.RLock()
defer s.mu.RUnlock()
return s.protocolBook.GetProtocols(pid)
}
......@@ -416,7 +429,7 @@ func (s *simpleProtocolMgr) callNotifyIfChanged(pid peer.ID, news []protocol.ID)
if s.supportedN != nil {
for i := range news {
if !s.protocolBook.ContainsProtocol(pid, news[i]) {
go s.supportedN(news[i], pid)
s.supportedN(news[i], pid)
}
}
}
......@@ -431,7 +444,7 @@ func (s *simpleProtocolMgr) callNotifyIfChanged(pid peer.ID, news []protocol.ID)
for idx := range oldSupported {
_, notDrop := tempMap[oldSupported[idx]]
if !notDrop {
go s.unsupportedN(oldSupported[idx], pid)
s.unsupportedN(oldSupported[idx], pid)
}
}
}
......@@ -439,21 +452,29 @@ func (s *simpleProtocolMgr) callNotifyIfChanged(pid peer.ID, news []protocol.ID)
// SetPeerSupportedProtocols stores the protocols supported by the peer which id is the given pid.
func (s *simpleProtocolMgr) SetPeerSupportedProtocols(pid peer.ID, protocolIDs []protocol.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.callNotifyIfChanged(pid, protocolIDs)
s.protocolBook.SetProtocols(pid, protocolIDs)
}
// CleanPeerSupportedProtocols remove all records of protocols supported by the peer which id is the given pid.
func (s *simpleProtocolMgr) CleanPeerSupportedProtocols(pid peer.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.protocolBook.ClearProtocol(pid)
}
// SetProtocolSupportedNotifyFunc set a function for notifying peer protocol supporting.
func (s *simpleProtocolMgr) SetProtocolSupportedNotifyFunc(notifyFunc mgr.ProtocolSupportNotifyFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.supportedN = notifyFunc
}
// SetProtocolUnsupportedNotifyFunc set a function for notifying peer protocol supporting canceled.
func (s *simpleProtocolMgr) SetProtocolUnsupportedNotifyFunc(notifyFunc mgr.ProtocolSupportNotifyFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.unsupportedN = notifyFunc
}
package simple
import (
"testing"
"chainmaker.org/chainmaker/net-liquid/core/peer"
"chainmaker.org/chainmaker/net-liquid/core/protocol"
"github.com/stretchr/testify/require"
)
func TestSimpleProtocolMgr_RegisterMsgPayloadHandler(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
err := mgr.RegisterMsgPayloadHandler("test", func(senderPID peer.ID, msgPayload []byte) {})
require.NoError(t, err)
require.True(t, pb.ContainsProtocol(pid, "test"))
err = mgr.RegisterMsgPayloadHandler("test", func(senderPID peer.ID, msgPayload []byte) {})
require.Error(t, err)
}
func TestSimpleProtocolMgr_UnregisterMsgPayloadHandler(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
err := mgr.UnregisterMsgPayloadHandler("test")
require.Error(t, err)
err = mgr.RegisterMsgPayloadHandler("test", func(senderPID peer.ID, msgPayload []byte) {})
require.NoError(t, err)
require.True(t, pb.ContainsProtocol(pid, "test"))
err = mgr.UnregisterMsgPayloadHandler("test")
require.NoError(t, err)
}
func TestSimpleProtocolMgr_IsRegistered(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
ok := mgr.IsRegistered("test")
require.False(t, ok)
err := mgr.RegisterMsgPayloadHandler("test", func(senderPID peer.ID, msgPayload []byte) {})
require.NoError(t, err)
require.True(t, pb.ContainsProtocol(pid, "test"))
ok = mgr.IsRegistered("test")
require.True(t, ok)
}
func TestSimpleProtocolMgr_GetHandler(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
h := mgr.GetHandler("test")
require.Nil(t, h)
err := mgr.RegisterMsgPayloadHandler("test", func(senderPID peer.ID, msgPayload []byte) {})
require.NoError(t, err)
require.True(t, pb.ContainsProtocol(pid, "test"))
h = mgr.GetHandler("test")
require.NotNil(t, h)
}
func TestSimpleProtocolMgr_GetSelfSupportedProtocols(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
hs := mgr.GetSelfSupportedProtocols()
require.Equal(t, 0, len(hs))
err := mgr.RegisterMsgPayloadHandler("test", func(senderPID peer.ID, msgPayload []byte) {})
require.NoError(t, err)
require.True(t, pb.ContainsProtocol(pid, "test"))
hs = mgr.GetSelfSupportedProtocols()
require.Equal(t, 1, len(hs))
require.Equal(t, hs[0], protocol.ID("test"))
err = mgr.RegisterMsgPayloadHandler("test2", func(senderPID peer.ID, msgPayload []byte) {})
require.NoError(t, err)
require.True(t, pb.ContainsProtocol(pid, "test2"))
hs = mgr.GetSelfSupportedProtocols()
require.Equal(t, 2, len(hs))
require.Equal(t, hs[0], protocol.ID("test"))
require.Equal(t, hs[1], protocol.ID("test2"))
}
func TestSimpleProtocolMgr_SetPeerSupportedProtocols(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
mgr.SetPeerSupportedProtocols(pid, []protocol.ID{"test", "test2"})
require.False(t, pb.ContainsProtocol(pid, "test3"))
require.True(t, pb.ContainsProtocol(pid, "test"))
require.True(t, pb.ContainsProtocol(pid, "test2"))
}
func TestSimpleProtocolMgr_CleanPeerSupportedProtocols(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
mgr.SetPeerSupportedProtocols(pid, []protocol.ID{"test", "test2"})
require.False(t, pb.ContainsProtocol(pid, "test3"))
require.True(t, pb.ContainsProtocol(pid, "test"))
require.True(t, pb.ContainsProtocol(pid, "test2"))
mgr.CleanPeerSupportedProtocols(pid)
require.False(t, pb.ContainsProtocol(pid, "test"))
require.False(t, pb.ContainsProtocol(pid, "test2"))
}
func TestSimpleProtocolMgr_NotifyFuncs(t *testing.T) {
pid := peer.ID("abc")
pb := newProtocolBook(pid)
supportedNotified := make(map[protocol.ID]peer.ID)
unsupportedNotified := make(map[protocol.ID]peer.ID)
mgr := NewSimpleProtocolMgr(pid, pb).(*simpleProtocolMgr)
mgr.SetProtocolSupportedNotifyFunc(func(protocolID protocol.ID, pid peer.ID) {
supportedNotified[protocolID] = pid
})
mgr.SetProtocolUnsupportedNotifyFunc(func(protocolID protocol.ID, pid peer.ID) {
unsupportedNotified[protocolID] = pid
})
mgr.SetPeerSupportedProtocols(pid, []protocol.ID{"test", "test2"})
_, ok := supportedNotified["test"]
require.True(t, ok)
_, ok = supportedNotified["test2"]
require.True(t, ok)
_, ok = supportedNotified["test3"]
require.False(t, ok)
require.Equal(t, 0, len(unsupportedNotified))
mgr.SetPeerSupportedProtocols(pid, []protocol.ID{"test3", "test2"})
_, ok = supportedNotified["test2"]
require.True(t, ok)
_, ok = supportedNotified["test3"]
require.True(t, ok)
require.Equal(t, 1, len(unsupportedNotified))
_, ok = unsupportedNotified["test"]
require.True(t, ok)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment