Messaging service, adapters, refactoring
This commit is contained in:
34
pkg/discovery/discoveryset.go
Normal file
34
pkg/discovery/discoveryset.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package discovery
|
||||
|
||||
// DiscoverySet a slice of initialized discovery services.
|
||||
//
|
||||
//goland:noinspection GoNameStartsWithPackageName
|
||||
type DiscoverySet []*Discovery
|
||||
|
||||
// NewNodes returns channel with newly discovered node ips.
|
||||
func (d DiscoverySet) NewNodes() NewNodes {
|
||||
res := make(chan string, 20)
|
||||
|
||||
for _, discovery := range d {
|
||||
go func(discovery *Discovery) {
|
||||
for node := range discovery.NewNodes() {
|
||||
if len(res) == cap(res) {
|
||||
panic("knownNodes from discovery set not reading!")
|
||||
}
|
||||
|
||||
res <- node
|
||||
}
|
||||
}(discovery)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// FailNodes accept a channel with ip of nodes, that was failed (lost or canceled connection).
|
||||
func (d DiscoverySet) FailNodes(ch chan string) {
|
||||
for addr := range ch {
|
||||
for _, discovery := range d {
|
||||
discovery.FailNode(addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
26
pkg/discovery/packet.go
Normal file
26
pkg/discovery/packet.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
)
|
||||
|
||||
var magicBytes = []byte{0x01, 0x02, 0x01}
|
||||
|
||||
func NewPacket() Packet {
|
||||
return make(Packet, 7) // 7 = 3 bytes of magic + 4 bytes for ipv4 address
|
||||
}
|
||||
|
||||
func NewPacketWithIP(ip net.IP) Packet {
|
||||
return append(magicBytes, []byte(ip.To4())...)
|
||||
}
|
||||
|
||||
type Packet []byte
|
||||
|
||||
func (p Packet) IP(n int) net.IP {
|
||||
return net.IP(p[3:n])
|
||||
}
|
||||
|
||||
func (p Packet) MagicOk() bool {
|
||||
return bytes.Equal(p[:3], magicBytes)
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
@@ -14,30 +15,12 @@ import (
|
||||
|
||||
const defaultBroadcastInterval = time.Second
|
||||
|
||||
type Nodes map[string]struct{}
|
||||
type NewNodes <-chan string
|
||||
|
||||
//goland:noinspection GoNameStartsWithPackageName
|
||||
type DiscoverySet []*Discovery
|
||||
|
||||
func (d DiscoverySet) NewNodes() NewNodes {
|
||||
res := make(chan string, 20)
|
||||
|
||||
for _, discovery := range d {
|
||||
go func(discovery *Discovery) {
|
||||
for node := range discovery.Nodes() {
|
||||
if len(res) == cap(res) {
|
||||
panic("nodes from discovery set not reading!")
|
||||
}
|
||||
|
||||
res <- node
|
||||
}
|
||||
}(discovery)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
type (
|
||||
KnownNodes map[string]struct{}
|
||||
NewNodes <-chan string
|
||||
)
|
||||
|
||||
// NewDiscoverySet returns a set of discovery services for all running and non-loopback network interfaces.
|
||||
func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (DiscoverySet, error) {
|
||||
iFaces, err := net.Interfaces()
|
||||
if err != nil {
|
||||
@@ -45,6 +28,7 @@ func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (Disc
|
||||
}
|
||||
|
||||
set := make(DiscoverySet, 0, len(iFaces))
|
||||
var errs []error
|
||||
|
||||
for _, iFace := range iFaces {
|
||||
if iFace.Flags&net.FlagLoopback == net.FlagLoopback {
|
||||
@@ -57,15 +41,22 @@ func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (Disc
|
||||
|
||||
discover, err := NewDiscovery(iFace, log, discoverPort, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new discover for %s: %w", iFace.Name, err)
|
||||
errs = append(errs, err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
set = append(set, discover)
|
||||
}
|
||||
|
||||
if len(set) == 0 {
|
||||
return nil, errors.Join(errs...)
|
||||
}
|
||||
|
||||
return set, nil
|
||||
}
|
||||
|
||||
// NewDiscovery returns new initialized discovery service for specified network interface.
|
||||
func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opts ...Option) (*Discovery, error) {
|
||||
addrs, err := iFace.Addrs()
|
||||
if err != nil {
|
||||
@@ -86,7 +77,7 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
|
||||
broadcastIP[i] = ip4[i] | ^ipnet.Mask[i]
|
||||
}
|
||||
|
||||
ownIP = ipnet.IP
|
||||
ownIP = ip4
|
||||
|
||||
break
|
||||
}
|
||||
@@ -96,7 +87,7 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
|
||||
return nil, fmt.Errorf("no broadcast address")
|
||||
}
|
||||
|
||||
if ownIP.To4() == nil {
|
||||
if ownIP == nil {
|
||||
return nil, fmt.Errorf("no own address")
|
||||
}
|
||||
|
||||
@@ -114,9 +105,11 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
|
||||
|
||||
d := Discovery{
|
||||
log: log.With(zap.Stringer("broadcast_address", broadcastIP)),
|
||||
nodes: make(Nodes),
|
||||
knownNodes: make(KnownNodes),
|
||||
newNodesCh: make(chan string, 20),
|
||||
failNodesCh: make(chan string),
|
||||
ownAddr: ownIP.To4(),
|
||||
ownAddrPacket: NewPacketWithIP(ownIP),
|
||||
conn: conn,
|
||||
broadcastAddr: udpAddr,
|
||||
broadcastInterval: defaultBroadcastInterval,
|
||||
@@ -129,18 +122,21 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
// Discovery a service to notify neighbours about yourself and keep track other neighbours alive.
|
||||
type Discovery struct {
|
||||
log *zap.Logger
|
||||
debug bool
|
||||
|
||||
mu sync.Mutex
|
||||
nodes Nodes
|
||||
newNodesCh chan string
|
||||
knownNodes KnownNodes
|
||||
|
||||
ownAddr net.IP
|
||||
conn net.PacketConn
|
||||
broadcastAddr *net.UDPAddr
|
||||
newNodesCh chan string
|
||||
failNodesCh chan string
|
||||
|
||||
ownAddr net.IP
|
||||
ownAddrPacket Packet
|
||||
conn net.PacketConn
|
||||
broadcastAddr *net.UDPAddr
|
||||
broadcastInterval time.Duration
|
||||
}
|
||||
|
||||
@@ -162,19 +158,24 @@ func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) {
|
||||
close(listenStop)
|
||||
}()
|
||||
|
||||
stop := func() {
|
||||
if err := d.conn.Close(); err != nil {
|
||||
d.log.Warn("close connection", zap.Error(err))
|
||||
}
|
||||
|
||||
close(d.newNodesCh)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if err := d.conn.Close(); err != nil {
|
||||
d.log.Warn("close connection", zap.Error(err))
|
||||
}
|
||||
|
||||
close(d.newNodesCh)
|
||||
stop()
|
||||
|
||||
return
|
||||
|
||||
case <-listenStop:
|
||||
d.log.Error("listener stopped, stop discovery")
|
||||
stop()
|
||||
|
||||
return
|
||||
|
||||
@@ -185,10 +186,10 @@ func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) {
|
||||
}
|
||||
|
||||
func (d *Discovery) listen() error {
|
||||
buf := make([]byte, 4)
|
||||
packet := NewPacket()
|
||||
|
||||
for {
|
||||
n, addr, err := d.conn.ReadFrom(buf)
|
||||
readSize, addr, err := d.conn.ReadFrom(packet)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") {
|
||||
d.log.Warn("listen connection closed")
|
||||
@@ -196,14 +197,20 @@ func (d *Discovery) listen() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
|
||||
if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() {
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("read from: %w", err)
|
||||
}
|
||||
|
||||
nodeAddr := net.IP(buf[:n])
|
||||
if !packet.MagicOk() {
|
||||
d.log.Warn("data without magic bytes received")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
nodeAddr := packet.IP(readSize)
|
||||
d.log.Debug("received node address", zap.Stringer("address", nodeAddr))
|
||||
|
||||
clientIP, _, _ := strings.Cut(addr.String(), ":")
|
||||
@@ -218,7 +225,7 @@ func (d *Discovery) listen() error {
|
||||
func (d *Discovery) broadcast() {
|
||||
d.log.Debug("broadcast")
|
||||
|
||||
if _, err := d.conn.WriteTo(d.ownAddr, d.broadcastAddr); err != nil {
|
||||
if _, err := d.conn.WriteTo(d.ownAddrPacket, d.broadcastAddr); err != nil {
|
||||
d.log.Error("write broadcast message", zap.Error(err))
|
||||
}
|
||||
}
|
||||
@@ -230,15 +237,32 @@ func (d *Discovery) addNode(addr string) {
|
||||
|
||||
d.mu.Lock()
|
||||
|
||||
if _, ok := d.nodes[addr]; !ok {
|
||||
if _, ok := d.knownNodes[addr]; !ok {
|
||||
d.log.Info("new node address", zap.String("address", addr))
|
||||
d.nodes[addr] = struct{}{}
|
||||
d.knownNodes[addr] = struct{}{}
|
||||
d.newNodesCh <- addr
|
||||
}
|
||||
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
func (d *Discovery) Nodes() NewNodes {
|
||||
func (d *Discovery) removeNode(addr string) {
|
||||
d.mu.Lock()
|
||||
|
||||
if _, ok := d.knownNodes[addr]; ok {
|
||||
d.log.Warn("node failed, removed", zap.String("address", addr))
|
||||
delete(d.knownNodes, addr)
|
||||
}
|
||||
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// NewNodes returns channel with new discovered node addresses.
|
||||
func (d *Discovery) NewNodes() NewNodes {
|
||||
return d.newNodesCh
|
||||
}
|
||||
|
||||
// FailNode remove address from list of known nodes. Next broadcast message from this node will be sent to the `NewNodes` channel.
|
||||
func (d *Discovery) FailNode(addr string) {
|
||||
d.removeNode(addr)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,8 @@ func TestIface(t *testing.T) {
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
ctx, _ = context.WithTimeout(ctx, time.Second*5)
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Second*5)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
set, err := NewDiscoverySet(zaptest.NewLogger(t).Named("discovery"), 1234, WithDebug())
|
||||
require.NoError(t, err)
|
||||
@@ -34,5 +35,5 @@ func TestIface(t *testing.T) {
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
assert.Len(t, set[0].Nodes(), 1)
|
||||
assert.Len(t, set[0].NewNodes(), 1)
|
||||
}
|
||||
|
||||
220
pkg/messenger/service.go
Normal file
220
pkg/messenger/service.go
Normal file
@@ -0,0 +1,220 @@
|
||||
package messenger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"git.derfenix.pro/fenix/astontest/api"
|
||||
)
|
||||
|
||||
type DataStorage interface {
|
||||
StoreReceived(ctx context.Context, sender, message string) error
|
||||
StoreSent(ctx context.Context, sender, message string) error
|
||||
}
|
||||
|
||||
type DataSource interface {
|
||||
NewString(ctx context.Context) string
|
||||
}
|
||||
|
||||
func NewMessenger(newNodes <-chan string, port uint16, storage DataStorage, source DataSource, pingInterval time.Duration, log *zap.Logger) *Messenger {
|
||||
return &Messenger{
|
||||
newNodes: newNodes,
|
||||
storage: storage,
|
||||
source: source,
|
||||
port: port,
|
||||
log: log,
|
||||
pingInterval: pingInterval,
|
||||
FailNodesCh: make(chan string, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type Messenger struct {
|
||||
newNodes <-chan string
|
||||
storage DataStorage
|
||||
source DataSource
|
||||
port uint16
|
||||
log *zap.Logger
|
||||
pingInterval time.Duration
|
||||
|
||||
FailNodesCh chan string
|
||||
|
||||
api.UnimplementedAstonTestServer
|
||||
}
|
||||
|
||||
func (m *Messenger) Start(ctx context.Context, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case node := <-m.newNodes:
|
||||
wg.Add(1)
|
||||
go m.newStream(ctx, wg, node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Messenger) StartServer(ctx context.Context, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
srv := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
|
||||
api.RegisterAstonTestServer(srv, m)
|
||||
|
||||
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", m.port))
|
||||
if err != nil {
|
||||
m.log.Error("failed to create listener", zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
srv.Stop()
|
||||
}()
|
||||
|
||||
srv.GracefulStop()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
m.log.Info("server start")
|
||||
|
||||
if err := srv.Serve(lis); err != nil {
|
||||
m.log.Error("serve error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Messenger) Ping(stream api.AstonTest_PingServer) error {
|
||||
ticker := time.NewTicker(m.pingInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
peerData, peerFound := peer.FromContext(stream.Context())
|
||||
if !peerFound {
|
||||
m.log.Warn("not peer found in context")
|
||||
peerData = &peer.Peer{
|
||||
Addr: &net.IPAddr{},
|
||||
}
|
||||
}
|
||||
|
||||
peerIP, _, _ := strings.Cut(peerData.Addr.String(), ":")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
return nil
|
||||
|
||||
case <-ticker.C:
|
||||
data := m.source.NewString(stream.Context())
|
||||
|
||||
if err := stream.Send(&api.PingPong{Value: data}); err != nil {
|
||||
m.log.Error("failed to send message", zap.Error(err))
|
||||
|
||||
if code := status.Convert(err).Code(); code == codes.Canceled || code == codes.Unavailable {
|
||||
return nil
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if err := m.storage.StoreSent(stream.Context(), peerIP, data); err != nil {
|
||||
m.log.Error("failed to save sent data", zap.Error(err), zap.String("data", data), zap.String("remote_addr", peerIP))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Messenger) newStream(ctx context.Context, wg *sync.WaitGroup, address string) {
|
||||
defer wg.Done()
|
||||
|
||||
defer func() {
|
||||
m.FailNodesCh <- address
|
||||
}()
|
||||
|
||||
log := m.log.With(zap.String("address", address))
|
||||
|
||||
cc, err := grpc.Dial(fmt.Sprintf("%s:%d", address, m.port), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
log.Error("failed to dial", zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := cc.Close(); err != nil {
|
||||
log.Error("close connection", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
client := api.NewAstonTestClient(cc)
|
||||
pingClient, err := client.Ping(ctx)
|
||||
if err != nil {
|
||||
log.Error("create ping stream", zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("stream started")
|
||||
|
||||
msgCh := m.newMessagesCh(pingClient, log)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case msg, ok := <-msgCh:
|
||||
if !ok {
|
||||
log.Warn("message channel closed")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if msg.GetValue() == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("received new message", zap.String("message", msg.GetValue()))
|
||||
|
||||
if err := m.storage.StoreReceived(ctx, address, msg.GetValue()); err != nil {
|
||||
log.Error("store message", zap.Error(err), zap.String("message", msg.GetValue()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Messenger) newMessagesCh(pingClient api.AstonTest_PingClient, log *zap.Logger) chan *api.PingPong {
|
||||
msgCh := make(chan *api.PingPong)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
msg, err := pingClient.Recv()
|
||||
if err != nil {
|
||||
if code := status.Convert(err).Code(); !(code == codes.Canceled || code == codes.Unavailable) {
|
||||
log.Error("message receive failed", zap.Error(err))
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
msgCh <- msg
|
||||
}
|
||||
close(msgCh)
|
||||
}()
|
||||
|
||||
return msgCh
|
||||
}
|
||||
Reference in New Issue
Block a user