This repository has been archived on 2024-02-11. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
astontest/pkg/discovery/service.go
derfenix 7864272baa Initial version
Project structure, api, discovery service, docker.
2023-12-06 08:32:01 +03:00

240 lines
4.7 KiB
Go

package discovery
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
)
const defaultBroadcastInterval = time.Second
type Nodes map[string]struct{}
type NewNodes <-chan string
//goland:noinspection GoNameStartsWithPackageName
type DiscoverySet []*Discovery
func (d DiscoverySet) NewNodes() NewNodes {
res := make(chan string, 20)
for _, discovery := range d {
go func(v *Discovery) {
for node := range v.Nodes() {
if len(res) == cap(res) {
panic("discovery set nodes not reading!")
}
res <- node
}
}(discovery)
}
return res
}
func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (DiscoverySet, error) {
iFaces, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("list interfaces: %w", err)
}
set := make(DiscoverySet, 0, len(iFaces))
for _, iFace := range iFaces {
if iFace.Flags&net.FlagLoopback == net.FlagLoopback {
continue
}
if iFace.Flags&net.FlagRunning != net.FlagRunning {
continue
}
discover, err := NewDiscovery(iFace, log, discoverPort, opts...)
if err != nil {
return nil, fmt.Errorf("new discover for %s: %w", iFace.Name, err)
}
set = append(set, discover)
}
return set, nil
}
func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opts ...Option) (*Discovery, error) {
addrs, err := iFace.Addrs()
if err != nil {
return nil, fmt.Errorf("get interface address: %w", err)
}
broadcast := net.IP(make([]byte, 4))
var ownAddr net.IP
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok {
ip4 := ipnet.IP.To4()
if ip4 == nil {
continue
}
for i := range ip4 {
broadcast[i] = ip4[i] | ^ipnet.Mask[i]
}
ownAddr = ipnet.IP
break
}
}
if broadcast.To4() == nil {
return nil, fmt.Errorf("no broadcast address")
}
portString := strconv.Itoa(int(discoverPort))
conn, err := net.ListenPacket("udp4", broadcast.String()+":"+portString)
if err != nil {
return nil, fmt.Errorf("listen packet: %w", err)
}
udpAddr, err := net.ResolveUDPAddr("udp4", broadcast.String()+":"+portString)
if err != nil {
return nil, fmt.Errorf("resolve udp address: %w", err)
}
d := Discovery{
log: log.With(zap.Stringer("broadcast_address", broadcast)),
nodes: make(Nodes),
newNode: make(chan string, 20),
ownAddr: ownAddr.To4(),
conn: conn,
broadcastAddr: udpAddr,
broadcastInterval: defaultBroadcastInterval,
}
for _, opt := range opts {
opt(&d)
}
return &d, nil
}
type Discovery struct {
log *zap.Logger
debug bool
mu sync.Mutex
nodes Nodes
newNode chan string
ownAddr net.IP
conn net.PacketConn
broadcastAddr *net.UDPAddr
broadcastInterval time.Duration
}
func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
d.log.Info("start discovery", zap.Stringer("address", &d.ownAddr))
timer := time.NewTicker(d.broadcastInterval)
defer timer.Stop()
listenStop := make(chan struct{})
go func() {
if err := d.listen(); err != nil {
d.log.Error("listen failed", zap.Error(err))
}
close(listenStop)
}()
for {
select {
case <-ctx.Done():
if err := d.conn.Close(); err != nil {
d.log.Warn("close connection", zap.Error(err))
}
close(d.newNode)
return
case <-listenStop:
d.log.Error("listener stopped, stop discovery")
return
case <-timer.C:
d.broadcast()
}
}
}
func (d *Discovery) listen() error {
buf := make([]byte, 4)
for {
n, addr, err := d.conn.ReadFrom(buf)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
d.log.Warn("listen connection closed")
return nil
}
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
continue
}
return fmt.Errorf("read from: %w", err)
}
nodeAddr := net.IP(buf[:n])
d.log.Debug("received node address", zap.Stringer("address", nodeAddr))
clientIP, _, _ := strings.Cut(addr.String(), ":")
if nodeAddr.String() != clientIP {
d.log.Warn("received addr mismatch", zap.Stringer("received", nodeAddr), zap.String("detected", clientIP))
}
d.addNode(nodeAddr.String())
}
}
func (d *Discovery) broadcast() {
d.log.Debug("broadcast")
if _, err := d.conn.WriteTo(d.ownAddr, d.broadcastAddr); err != nil {
d.log.Error("write broadcast message", zap.Error(err))
}
}
func (d *Discovery) addNode(addr string) {
if !d.debug && addr == d.ownAddr.String() {
return
}
d.mu.Lock()
if _, ok := d.nodes[addr]; !ok {
d.log.Info("new node address", zap.String("address", addr))
d.nodes[addr] = struct{}{}
d.newNode <- addr
}
d.mu.Unlock()
}
func (d *Discovery) Nodes() NewNodes {
return d.newNode
}