Discovery service refactoring

This commit is contained in:
2023-12-06 08:56:42 +03:00
parent 7864272baa
commit cf00cfaab6

View File

@@ -24,10 +24,10 @@ func (d DiscoverySet) NewNodes() NewNodes {
res := make(chan string, 20) res := make(chan string, 20)
for _, discovery := range d { for _, discovery := range d {
go func(v *Discovery) { go func(discovery *Discovery) {
for node := range v.Nodes() { for node := range discovery.Nodes() {
if len(res) == cap(res) { if len(res) == cap(res) {
panic("discovery set nodes not reading!") panic("nodes from discovery set not reading!")
} }
res <- node res <- node
@@ -72,8 +72,8 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
return nil, fmt.Errorf("get interface address: %w", err) return nil, fmt.Errorf("get interface address: %w", err)
} }
broadcast := net.IP(make([]byte, 4)) broadcastIP := net.IP(make([]byte, 4))
var ownAddr net.IP var ownIP net.IP
for _, addr := range addrs { for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok { if ipnet, ok := addr.(*net.IPNet); ok {
@@ -83,35 +83,40 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
} }
for i := range ip4 { for i := range ip4 {
broadcast[i] = ip4[i] | ^ipnet.Mask[i] broadcastIP[i] = ip4[i] | ^ipnet.Mask[i]
} }
ownAddr = ipnet.IP ownIP = ipnet.IP
break break
} }
} }
if broadcast.To4() == nil {
if broadcastIP.To4() == nil {
return nil, fmt.Errorf("no broadcast address") return nil, fmt.Errorf("no broadcast address")
} }
portString := strconv.Itoa(int(discoverPort)) if ownIP.To4() == nil {
return nil, fmt.Errorf("no own address")
}
conn, err := net.ListenPacket("udp4", broadcast.String()+":"+portString) broadcastAddrString := broadcastIP.String() + ":" + strconv.Itoa(int(discoverPort))
conn, err := net.ListenPacket("udp4", broadcastAddrString)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen packet: %w", err) return nil, fmt.Errorf("listen packet: %w", err)
} }
udpAddr, err := net.ResolveUDPAddr("udp4", broadcast.String()+":"+portString) udpAddr, err := net.ResolveUDPAddr("udp4", broadcastAddrString)
if err != nil { if err != nil {
return nil, fmt.Errorf("resolve udp address: %w", err) return nil, fmt.Errorf("resolve udp address: %w", err)
} }
d := Discovery{ d := Discovery{
log: log.With(zap.Stringer("broadcast_address", broadcast)), log: log.With(zap.Stringer("broadcast_address", broadcastIP)),
nodes: make(Nodes), nodes: make(Nodes),
newNode: make(chan string, 20), newNodesCh: make(chan string, 20),
ownAddr: ownAddr.To4(), ownAddr: ownIP.To4(),
conn: conn, conn: conn,
broadcastAddr: udpAddr, broadcastAddr: udpAddr,
broadcastInterval: defaultBroadcastInterval, broadcastInterval: defaultBroadcastInterval,
@@ -130,7 +135,7 @@ type Discovery struct {
mu sync.Mutex mu sync.Mutex
nodes Nodes nodes Nodes
newNode chan string newNodesCh chan string
ownAddr net.IP ownAddr net.IP
conn net.PacketConn conn net.PacketConn
@@ -164,7 +169,7 @@ func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) {
d.log.Warn("close connection", zap.Error(err)) d.log.Warn("close connection", zap.Error(err))
} }
close(d.newNode) close(d.newNodesCh)
return return
@@ -228,12 +233,12 @@ func (d *Discovery) addNode(addr string) {
if _, ok := d.nodes[addr]; !ok { if _, ok := d.nodes[addr]; !ok {
d.log.Info("new node address", zap.String("address", addr)) d.log.Info("new node address", zap.String("address", addr))
d.nodes[addr] = struct{}{} d.nodes[addr] = struct{}{}
d.newNode <- addr d.newNodesCh <- addr
} }
d.mu.Unlock() d.mu.Unlock()
} }
func (d *Discovery) Nodes() NewNodes { func (d *Discovery) Nodes() NewNodes {
return d.newNode return d.newNodesCh
} }