From cf00cfaab61dea669aed334b7dcc5b170d6ccd57 Mon Sep 17 00:00:00 2001 From: derfenix Date: Wed, 6 Dec 2023 08:56:42 +0300 Subject: [PATCH] Discovery service refactoring --- pkg/discovery/service.go | 45 ++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/discovery/service.go b/pkg/discovery/service.go index f140455..c04f7a4 100644 --- a/pkg/discovery/service.go +++ b/pkg/discovery/service.go @@ -24,10 +24,10 @@ func (d DiscoverySet) NewNodes() NewNodes { res := make(chan string, 20) for _, discovery := range d { - go func(v *Discovery) { - for node := range v.Nodes() { + go func(discovery *Discovery) { + for node := range discovery.Nodes() { if len(res) == cap(res) { - panic("discovery set nodes not reading!") + panic("nodes from discovery set not reading!") } res <- node @@ -72,8 +72,8 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt return nil, fmt.Errorf("get interface address: %w", err) } - broadcast := net.IP(make([]byte, 4)) - var ownAddr net.IP + broadcastIP := net.IP(make([]byte, 4)) + var ownIP net.IP for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok { @@ -83,35 +83,40 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt } for i := range ip4 { - broadcast[i] = ip4[i] | ^ipnet.Mask[i] + broadcastIP[i] = ip4[i] | ^ipnet.Mask[i] } - ownAddr = ipnet.IP + ownIP = ipnet.IP break } } - if broadcast.To4() == nil { + + if broadcastIP.To4() == nil { return nil, fmt.Errorf("no broadcast address") } - portString := strconv.Itoa(int(discoverPort)) + if ownIP.To4() == nil { + return nil, fmt.Errorf("no own address") + } - conn, err := net.ListenPacket("udp4", broadcast.String()+":"+portString) + broadcastAddrString := broadcastIP.String() + ":" + strconv.Itoa(int(discoverPort)) + + conn, err := net.ListenPacket("udp4", broadcastAddrString) if err != nil { return nil, fmt.Errorf("listen packet: %w", err) } - udpAddr, err := net.ResolveUDPAddr("udp4", broadcast.String()+":"+portString) + udpAddr, err := net.ResolveUDPAddr("udp4", broadcastAddrString) if err != nil { return nil, fmt.Errorf("resolve udp address: %w", err) } d := Discovery{ - log: log.With(zap.Stringer("broadcast_address", broadcast)), + log: log.With(zap.Stringer("broadcast_address", broadcastIP)), nodes: make(Nodes), - newNode: make(chan string, 20), - ownAddr: ownAddr.To4(), + newNodesCh: make(chan string, 20), + ownAddr: ownIP.To4(), conn: conn, broadcastAddr: udpAddr, broadcastInterval: defaultBroadcastInterval, @@ -128,9 +133,9 @@ type Discovery struct { log *zap.Logger debug bool - mu sync.Mutex - nodes Nodes - newNode chan string + mu sync.Mutex + nodes Nodes + newNodesCh chan string ownAddr net.IP conn net.PacketConn @@ -164,7 +169,7 @@ func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) { d.log.Warn("close connection", zap.Error(err)) } - close(d.newNode) + close(d.newNodesCh) return @@ -228,12 +233,12 @@ func (d *Discovery) addNode(addr string) { if _, ok := d.nodes[addr]; !ok { d.log.Info("new node address", zap.String("address", addr)) d.nodes[addr] = struct{}{} - d.newNode <- addr + d.newNodesCh <- addr } d.mu.Unlock() } func (d *Discovery) Nodes() NewNodes { - return d.newNode + return d.newNodesCh }