222 lines
4.5 KiB
Go
222 lines
4.5 KiB
Go
package messenger
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"git.derfenix.pro/fenix/astontest/api"
|
|
)
|
|
|
|
type DataStorage interface {
|
|
StoreReceived(ctx context.Context, sender, message string) error
|
|
StoreSent(ctx context.Context, sender, message string) error
|
|
}
|
|
|
|
type DataSource interface {
|
|
NewString(ctx context.Context) string
|
|
}
|
|
|
|
func NewMessenger(newNodes <-chan string, port uint16, storage DataStorage, source DataSource, pingInterval time.Duration, log *zap.Logger) *Messenger {
|
|
return &Messenger{
|
|
newNodes: newNodes,
|
|
storage: storage,
|
|
source: source,
|
|
port: port,
|
|
log: log,
|
|
pingInterval: pingInterval,
|
|
FailNodesCh: make(chan string, 1),
|
|
}
|
|
}
|
|
|
|
type Messenger struct {
|
|
newNodes <-chan string
|
|
storage DataStorage
|
|
source DataSource
|
|
port uint16
|
|
log *zap.Logger
|
|
pingInterval time.Duration
|
|
|
|
FailNodesCh chan string
|
|
|
|
api.UnimplementedAstonTestServer
|
|
}
|
|
|
|
func (m *Messenger) Start(ctx context.Context, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
defer close(m.FailNodesCh)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case node := <-m.newNodes:
|
|
wg.Add(1)
|
|
go m.newStream(ctx, wg, node)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) StartServer(ctx context.Context, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
|
|
srv := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
|
|
api.RegisterAstonTestServer(srv, m)
|
|
|
|
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", m.port))
|
|
if err != nil {
|
|
m.log.Error("failed to create listener", zap.Error(err))
|
|
|
|
return
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
<-ctx.Done()
|
|
|
|
go func() {
|
|
time.Sleep(time.Second)
|
|
srv.Stop()
|
|
}()
|
|
|
|
srv.GracefulStop()
|
|
wg.Done()
|
|
}()
|
|
|
|
m.log.Info("server start")
|
|
|
|
if err := srv.Serve(lis); err != nil {
|
|
m.log.Error("serve error", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) Ping(stream api.AstonTest_PingServer) error {
|
|
ticker := time.NewTicker(m.pingInterval)
|
|
defer ticker.Stop()
|
|
|
|
peerData, peerFound := peer.FromContext(stream.Context())
|
|
if !peerFound {
|
|
m.log.Warn("not peer found in context")
|
|
peerData = &peer.Peer{
|
|
Addr: &net.IPAddr{},
|
|
}
|
|
}
|
|
|
|
peerIP, _, _ := strings.Cut(peerData.Addr.String(), ":")
|
|
|
|
for {
|
|
select {
|
|
case <-stream.Context().Done():
|
|
return nil
|
|
|
|
case <-ticker.C:
|
|
data := m.source.NewString(stream.Context())
|
|
|
|
if err := stream.Send(&api.PingPong{Value: data}); err != nil {
|
|
m.log.Error("failed to send message", zap.Error(err))
|
|
|
|
if code := status.Convert(err).Code(); code == codes.Canceled || code == codes.Unavailable {
|
|
return nil
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if err := m.storage.StoreSent(stream.Context(), peerIP, data); err != nil {
|
|
m.log.Error("failed to save sent data", zap.Error(err), zap.String("data", data), zap.String("remote_addr", peerIP))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) newStream(ctx context.Context, wg *sync.WaitGroup, address string) {
|
|
defer wg.Done()
|
|
|
|
defer func() {
|
|
m.FailNodesCh <- address
|
|
}()
|
|
|
|
log := m.log.With(zap.String("address", address))
|
|
|
|
cc, err := grpc.Dial(fmt.Sprintf("%s:%d", address, m.port), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
log.Error("failed to dial", zap.Error(err))
|
|
|
|
return
|
|
}
|
|
|
|
defer func() {
|
|
if err := cc.Close(); err != nil {
|
|
log.Error("close connection", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
client := api.NewAstonTestClient(cc)
|
|
pingClient, err := client.Ping(ctx)
|
|
if err != nil {
|
|
log.Error("create ping stream", zap.Error(err))
|
|
|
|
return
|
|
}
|
|
|
|
log.Info("stream started")
|
|
|
|
msgCh := m.newMessagesCh(pingClient, log)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case msg, ok := <-msgCh:
|
|
if !ok {
|
|
log.Warn("message channel closed")
|
|
|
|
return
|
|
}
|
|
|
|
if msg.GetValue() == "" {
|
|
continue
|
|
}
|
|
|
|
log.Debug("received new message", zap.String("message", msg.GetValue()))
|
|
|
|
if err := m.storage.StoreReceived(ctx, address, msg.GetValue()); err != nil {
|
|
log.Error("store message", zap.Error(err), zap.String("message", msg.GetValue()))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) newMessagesCh(pingClient api.AstonTest_PingClient, log *zap.Logger) chan *api.PingPong {
|
|
msgCh := make(chan *api.PingPong)
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := pingClient.Recv()
|
|
if err != nil {
|
|
if code := status.Convert(err).Code(); !(code == codes.Canceled || code == codes.Unavailable) {
|
|
log.Error("message receive failed", zap.Error(err))
|
|
}
|
|
|
|
break
|
|
}
|
|
msgCh <- msg
|
|
}
|
|
close(msgCh)
|
|
}()
|
|
|
|
return msgCh
|
|
}
|