diff --git a/.gitignore b/.gitignore
index e434f1c..be8d6ac 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,3 +41,4 @@ crashlytics-build.properties
fabric.properties
.idea/httpRequests
.idea/caches/build_file_checksums.ser
+/bin/
diff --git a/.idea/git_toolbox_prj.xml b/.idea/git_toolbox_prj.xml
new file mode 100644
index 0000000..02b915b
--- /dev/null
+++ b/.idea/git_toolbox_prj.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/go.imports.xml b/.idea/go.imports.xml
new file mode 100644
index 0000000..644cdf0
--- /dev/null
+++ b/.idea/go.imports.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/yamllint.xml b/.idea/yamllint.xml
new file mode 100644
index 0000000..e3ff02e
--- /dev/null
+++ b/.idea/yamllint.xml
@@ -0,0 +1,7 @@
+
+
+
+ true
+ /usr/bin/yamllint
+
+
\ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..a13a4b9
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,21 @@
+BIN_PATH:=./bin
+CMD_PATH:=./cmd/server/main.go
+
+LAST_COMMIT:=$(shell git rev-list --abbrev-commit --all --max-count=1)
+TAG:=$(shell git describe --abbrev=0 --tags ${LAST_COMMIT} 2>/dev/null || true)
+VERSION:=$(or $(TAG:v%=%),0.0.0)-$(LAST_COMMIT)
+
+LDFLAGS=-extldflags=-static -w -s -w -s -X main.version=${VERSION}
+
+.PHONY: build test lint
+all: lint test build
+
+build:
+ CGO_ENABLED=0 go build -ldflags='${LDFLAGS}' -o ${BIN_PATH}/service ./cmd/server/main.go
+
+test:
+ go test -v -race ./...
+
+lint:
+ @which golangci-lint || go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
+ golangci-lint run -j4 ./...
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..74dde07
--- /dev/null
+++ b/README.md
@@ -0,0 +1,85 @@
+# Задание
+
+Создайте 5 сервисов на выделенной сети Docker. С заданной частотой сервисы
+опрашивают сеть на наличие “соседей”. Кроме того, каждый сервис устанавливает
+двунаправленный поток grpc с каждым из соседей и раз в секунду отправляет и
+получает случайную строку, регистрируя то, что он послал и что он получил.
+
+Дополнительно каждая из служб регистрирует новые и отпадающие ноды.
+
+## Checklist
+
+- [x] Поиск соседей (service discovery)
+- [x] Двунаправленный поток grpc со всеми соседями
+- [x] Периодическая отправка сообщений в стрим всем соседям
+- [x] Запись полученных и отправленных сообщений
+- [x] Регистрация новых и отпадающих нод
+
+# Комментарии к реализации
+
+## Discovery
+
+Для поиска соседей используется механизм рассылки широковещательных сообщений.
+Каждая нода периодически рассылает широковещательные сообщения со своим ip адресом.
+Каждая нода так же случает широковещательные сообщения на том же порту и сохраняет
+у себя уникальный список полученных адресов. Все новые адреса отправляются в
+отдельный канал. За регистрацию отпадающих нод отвечает сервис рассылки сообщений,
+уведомляя о них discovery сервис через отдельный канал.
+
+К отправляемому IP адресу добавляется фиксированный префикс из 3 байт для детектирования
+только наших пакетов и отсечения возможных сторонних пакетов.
+
+## Рассылка сообщений
+
+Сервис рассылки сообщений получает новые адреса нод от discovery сервиса и
+создаёт стрим для каждого полученного адреса ноды. При отключении стрима отправляет
+сообщение с ip адресом отвалившейся ноды в discovery сервис.
+
+В каждый стрим периодически (период настраиваем) отправляется строка, которую
+возвращает подключённая реализация интерфейса `DataSource` (в данный момент - случайный набор
+символов).
+
+Сообщения, полученные через стрим, сохраняются с использованием реализации интерфейса
+`DataStorage` (в данный момент - просто выводятся в stdout).
+
+# Запуск
+
+```shell
+docker-compose up
+```
+
+Команда запустит 5 контейнеров с сервисом в одной подсети.
+
+Через 5 секунд после запуска (период рассылки широковещательных сообщений по умолчанию),
+все ноды узнают друг о друге и установят двунаправленные потоки между друг другом и начнут
+обмен.
+
+Можно остановить один из контейнеров
+
+```shell
+docker-compose stop s5
+```
+
+и увидеть в логе запущенных контейнеров сообщения о потере ноды. Запустив контейнер
+снова
+
+```shell
+docker-compose up s5
+```
+
+можно будет увидеть (в течение тех же 5 секунд), что новая нода снова включена в сеть,
+с ней установлены все соединения и нода снова участвует в обмене сообщениями.
+
+# Параметры конфигурации
+
+Изменение параметров работы сервиса возможно через переменные окружения:
+
+| Параметр | Описние | Значение по умолчанию |
+|-------------------------|----------------------------------------------------------------------------|-----------------------|
+| **DEBUG** | включение отладочных логов | false |
+| **DISCOVERY_PORT** | порт discovery сервиса | 4321 |
+| **BROADCAST_INTERVAL** | интервал рассылки широковещательных сообщений discovery сервиса | 5s |
+| **MESSAGING_PORT** | порт grpc сервиса | 4322 |
+| **MESSAGING_INTERVAL** | интервал отправки сообщений через стрим grpc сервиса | 1s |
+| **RANDOM_MESSAGE_SIZE** | размер случайного сообщения адаптера randomstring (интерфейс `DataSource`) | 10 |
+
diff --git a/adapters/randomstring/service.go b/adapters/randomstring/service.go
new file mode 100644
index 0000000..30a0af9
--- /dev/null
+++ b/adapters/randomstring/service.go
@@ -0,0 +1,32 @@
+package randomstring
+
+import (
+ "context"
+ "math/rand"
+)
+
+const chars = "abcdefghijklmnopqrstuvwxyz0123456789"
+
+type Service struct {
+ chars []rune
+ length int
+ size uint
+}
+
+func New(size uint) *Service {
+ return &Service{
+ chars: []rune(chars),
+ length: len([]rune(chars)),
+ size: size,
+ }
+}
+
+func (s *Service) NewString(context.Context) string {
+ res := make([]rune, s.size)
+
+ for i := uint(0); i < s.size; i++ {
+ res[i] = s.chars[rand.Intn(s.length)]
+ }
+
+ return string(res)
+}
diff --git a/adapters/stdoutstore/service.go b/adapters/stdoutstore/service.go
new file mode 100644
index 0000000..60b1eb7
--- /dev/null
+++ b/adapters/stdoutstore/service.go
@@ -0,0 +1,25 @@
+package stdoutstore
+
+import (
+ "context"
+ "fmt"
+)
+
+func New() *Service {
+ return &Service{}
+}
+
+type Service struct {
+}
+
+func (s *Service) StoreReceived(_ context.Context, sender, message string) error {
+ fmt.Printf("Received '%s' from <%s>\n", message, sender)
+
+ return nil
+}
+
+func (s *Service) StoreSent(_ context.Context, sender, message string) error {
+ fmt.Printf("Sent '%s' to <%s>\n", message, sender)
+
+ return nil
+}
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 051a725..b5a1672 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -13,6 +13,8 @@ import (
"git.derfenix.pro/fenix/astontest/internal/application"
)
+var version = "dev"
+
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer cancel()
@@ -27,6 +29,8 @@ func main() {
panic(err)
}
+ log.Sugar().Infof("Start service version %s", version)
+
app, err := application.NewApplication(&cfg, log)
if err != nil {
log.Error("new application", zap.Error(err))
@@ -42,6 +46,8 @@ func main() {
func getLogger(cfg application.Config) (*zap.Logger, error) {
logCfg := zap.NewProductionConfig()
logCfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
+ logCfg.DisableCaller = true
+
if cfg.Debug {
logCfg.DisableStacktrace = false
logCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
diff --git a/deploy/Dockerfile b/deploy/Dockerfile
index 989fbf3..59b39fb 100644
--- a/deploy/Dockerfile
+++ b/deploy/Dockerfile
@@ -5,9 +5,9 @@ COPY go.* ./
RUN go mod download
COPY . .
-RUN CGO_ENABLED=0 go build -o /project/service ./cmd/server
+RUN make build
FROM alpine:latest
-COPY --from=builder /project/service /
+COPY --from=builder /project/bin/service /
ENTRYPOINT ["/service"]
diff --git a/docker-compose.yaml b/docker-compose.yaml
index a20311f..1e6a461 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -14,3 +14,11 @@ services:
build:
dockerfile: deploy/Dockerfile
context: .
+ s4:
+ build:
+ dockerfile: deploy/Dockerfile
+ context: .
+ s5:
+ build:
+ dockerfile: deploy/Dockerfile
+ context: .
diff --git a/internal/application/application.go b/internal/application/application.go
index 152ac3f..3ea1d58 100644
--- a/internal/application/application.go
+++ b/internal/application/application.go
@@ -7,29 +7,47 @@ import (
"go.uber.org/zap"
+ "git.derfenix.pro/fenix/astontest/adapters/randomstring"
+ "git.derfenix.pro/fenix/astontest/adapters/stdoutstore"
"git.derfenix.pro/fenix/astontest/pkg/discovery"
+ "git.derfenix.pro/fenix/astontest/pkg/messenger"
)
type Application struct {
discoverySet discovery.DiscoverySet
+ messenger *messenger.Messenger
log *zap.Logger
}
func NewApplication(cfg *Config, log *zap.Logger) (*Application, error) {
- discoveryOpts := []discovery.Option{discovery.WithBroadcastInterval(cfg.BroadcastInterval)}
+ discoveryOpts := []discovery.Option{
+ discovery.WithBroadcastInterval(cfg.BroadcastInterval),
+ }
if cfg.Debug {
discoveryOpts = append(discoveryOpts, discovery.WithDebug())
}
- set, err := discovery.NewDiscoverySet(log.Named("discovery"), cfg.DiscoveryPort, discoveryOpts...)
+ discoverySet, err := discovery.NewDiscoverySet(log.Named("discovery"), cfg.DiscoveryPort, discoveryOpts...)
if err != nil {
return nil, fmt.Errorf("new discovery set: %w", err)
}
+ messengerSrv := messenger.NewMessenger(
+ discoverySet.NewNodes(),
+ cfg.MessagingPort,
+ stdoutstore.New(),
+ randomstring.New(cfg.RandomMessageSize),
+ cfg.MessagingInterval,
+ log.Named("messenger"),
+ )
+
+ go discoverySet.FailNodes(messengerSrv.FailNodesCh)
+
return &Application{
- discoverySet: set,
+ discoverySet: discoverySet,
+ messenger: messengerSrv,
log: log,
}, nil
}
@@ -39,4 +57,35 @@ func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) {
for _, discover := range a.discoverySet {
go discover.Start(ctx, wg)
}
+
+ wg.Add(2)
+ go a.messenger.StartServer(ctx, wg)
+ go a.messenger.Start(ctx, wg)
}
+
+var _ = `
+s1-1 | Received 'apq34yod73' from <172.19.0.5>
+s1-1 | Received 'cbtfl716li' from <172.19.0.3>
+s1-1 | Received 'n84fg3mx9o' from <172.19.0.2>
+s1-1 | Received 'zb98146fqz' from <172.19.0.4>
+
+s2-1 | Received '7h1s9ruilr' from <172.19.0.2>
+s2-1 | Received 'aij179r0ck' from <172.19.0.3>
+s2-1 | Received 'm3k0snj7ma' from <172.19.0.6>
+s2-1 | Received 'tw5726fo7e' from <172.19.0.5>
+
+s3-1 | Received '32d20marhg' from <172.19.0.6>
+s3-1 | Received 'ffs9pi6o9j' from <172.19.0.3>
+s3-1 | Received 'wcso6aashe' from <172.19.0.5>
+s3-1 | Received 'wfhcy9xbdj' from <172.19.0.4>
+
+s4-1 | Received '2le3u1ikyg' from <172.19.0.2>
+s4-1 | Received '5locacst6w' from <172.19.0.4>
+s4-1 | Received 'cf47nyd2ca' from <172.19.0.6>
+s4-1 | Received 'guvfhb7wud' from <172.19.0.5>
+
+s5-1 | Received 'dsr7qt2x5l' from <172.19.0.3>
+s5-1 | Received 'l67gc5xdt3' from <172.19.0.2>
+s5-1 | Received 'mof0yp4vxt' from <172.19.0.6>
+s5-1 | Received 'slc9tw30r4' from <172.19.0.4>
+`
diff --git a/internal/application/config.go b/internal/application/config.go
index 46a3e90..9f8d468 100644
--- a/internal/application/config.go
+++ b/internal/application/config.go
@@ -9,9 +9,15 @@ import (
)
type Config struct {
- Debug bool `env:"DEBUG"`
+ Debug bool `env:"DEBUG"`
+
DiscoveryPort uint16 `env:"DISCOVERY_PORT,default=4321"`
BroadcastInterval time.Duration `env:"BROADCAST_INTERVAL,default=5s"`
+
+ MessagingPort uint16 `env:"MESSAGING_PORT,default=4322"`
+ MessagingInterval time.Duration `env:"MESSAGING_INTERVAL,default=1s"`
+
+ RandomMessageSize uint `env:"RANDOM_MESSAGE_SIZE,default=10"`
}
func NewConfig(ctx context.Context) (Config, error) {
diff --git a/pkg/discovery/discoveryset.go b/pkg/discovery/discoveryset.go
new file mode 100644
index 0000000..c89a3cd
--- /dev/null
+++ b/pkg/discovery/discoveryset.go
@@ -0,0 +1,34 @@
+package discovery
+
+// DiscoverySet a slice of initialized discovery services.
+//
+//goland:noinspection GoNameStartsWithPackageName
+type DiscoverySet []*Discovery
+
+// NewNodes returns channel with newly discovered node ips.
+func (d DiscoverySet) NewNodes() NewNodes {
+ res := make(chan string, 20)
+
+ for _, discovery := range d {
+ go func(discovery *Discovery) {
+ for node := range discovery.NewNodes() {
+ if len(res) == cap(res) {
+ panic("knownNodes from discovery set not reading!")
+ }
+
+ res <- node
+ }
+ }(discovery)
+ }
+
+ return res
+}
+
+// FailNodes accept a channel with ip of nodes, that was failed (lost or canceled connection).
+func (d DiscoverySet) FailNodes(ch chan string) {
+ for addr := range ch {
+ for _, discovery := range d {
+ discovery.FailNode(addr)
+ }
+ }
+}
diff --git a/pkg/discovery/packet.go b/pkg/discovery/packet.go
new file mode 100644
index 0000000..bfedd3c
--- /dev/null
+++ b/pkg/discovery/packet.go
@@ -0,0 +1,26 @@
+package discovery
+
+import (
+ "bytes"
+ "net"
+)
+
+var magicBytes = []byte{0x01, 0x02, 0x01}
+
+func NewPacket() Packet {
+ return make(Packet, 7) // 7 = 3 bytes of magic + 4 bytes for ipv4 address
+}
+
+func NewPacketWithIP(ip net.IP) Packet {
+ return append(magicBytes, []byte(ip.To4())...)
+}
+
+type Packet []byte
+
+func (p Packet) IP(n int) net.IP {
+ return net.IP(p[3:n])
+}
+
+func (p Packet) MagicOk() bool {
+ return bytes.Equal(p[:3], magicBytes)
+}
diff --git a/pkg/discovery/service.go b/pkg/discovery/service.go
index c04f7a4..12c0186 100644
--- a/pkg/discovery/service.go
+++ b/pkg/discovery/service.go
@@ -2,6 +2,7 @@ package discovery
import (
"context"
+ "errors"
"fmt"
"net"
"strconv"
@@ -14,30 +15,12 @@ import (
const defaultBroadcastInterval = time.Second
-type Nodes map[string]struct{}
-type NewNodes <-chan string
-
-//goland:noinspection GoNameStartsWithPackageName
-type DiscoverySet []*Discovery
-
-func (d DiscoverySet) NewNodes() NewNodes {
- res := make(chan string, 20)
-
- for _, discovery := range d {
- go func(discovery *Discovery) {
- for node := range discovery.Nodes() {
- if len(res) == cap(res) {
- panic("nodes from discovery set not reading!")
- }
-
- res <- node
- }
- }(discovery)
- }
-
- return res
-}
+type (
+ KnownNodes map[string]struct{}
+ NewNodes <-chan string
+)
+// NewDiscoverySet returns a set of discovery services for all running and non-loopback network interfaces.
func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (DiscoverySet, error) {
iFaces, err := net.Interfaces()
if err != nil {
@@ -45,6 +28,7 @@ func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (Disc
}
set := make(DiscoverySet, 0, len(iFaces))
+ var errs []error
for _, iFace := range iFaces {
if iFace.Flags&net.FlagLoopback == net.FlagLoopback {
@@ -57,15 +41,22 @@ func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (Disc
discover, err := NewDiscovery(iFace, log, discoverPort, opts...)
if err != nil {
- return nil, fmt.Errorf("new discover for %s: %w", iFace.Name, err)
+ errs = append(errs, err)
+
+ continue
}
set = append(set, discover)
}
+ if len(set) == 0 {
+ return nil, errors.Join(errs...)
+ }
+
return set, nil
}
+// NewDiscovery returns new initialized discovery service for specified network interface.
func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opts ...Option) (*Discovery, error) {
addrs, err := iFace.Addrs()
if err != nil {
@@ -86,7 +77,7 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
broadcastIP[i] = ip4[i] | ^ipnet.Mask[i]
}
- ownIP = ipnet.IP
+ ownIP = ip4
break
}
@@ -96,7 +87,7 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
return nil, fmt.Errorf("no broadcast address")
}
- if ownIP.To4() == nil {
+ if ownIP == nil {
return nil, fmt.Errorf("no own address")
}
@@ -114,9 +105,11 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
d := Discovery{
log: log.With(zap.Stringer("broadcast_address", broadcastIP)),
- nodes: make(Nodes),
+ knownNodes: make(KnownNodes),
newNodesCh: make(chan string, 20),
+ failNodesCh: make(chan string),
ownAddr: ownIP.To4(),
+ ownAddrPacket: NewPacketWithIP(ownIP),
conn: conn,
broadcastAddr: udpAddr,
broadcastInterval: defaultBroadcastInterval,
@@ -129,18 +122,21 @@ func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opt
return &d, nil
}
+// Discovery a service to notify neighbours about yourself and keep track other neighbours alive.
type Discovery struct {
log *zap.Logger
debug bool
mu sync.Mutex
- nodes Nodes
- newNodesCh chan string
+ knownNodes KnownNodes
- ownAddr net.IP
- conn net.PacketConn
- broadcastAddr *net.UDPAddr
+ newNodesCh chan string
+ failNodesCh chan string
+ ownAddr net.IP
+ ownAddrPacket Packet
+ conn net.PacketConn
+ broadcastAddr *net.UDPAddr
broadcastInterval time.Duration
}
@@ -162,19 +158,24 @@ func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) {
close(listenStop)
}()
+ stop := func() {
+ if err := d.conn.Close(); err != nil {
+ d.log.Warn("close connection", zap.Error(err))
+ }
+
+ close(d.newNodesCh)
+ }
+
for {
select {
case <-ctx.Done():
- if err := d.conn.Close(); err != nil {
- d.log.Warn("close connection", zap.Error(err))
- }
-
- close(d.newNodesCh)
+ stop()
return
case <-listenStop:
d.log.Error("listener stopped, stop discovery")
+ stop()
return
@@ -185,10 +186,10 @@ func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) {
}
func (d *Discovery) listen() error {
- buf := make([]byte, 4)
+ packet := NewPacket()
for {
- n, addr, err := d.conn.ReadFrom(buf)
+ readSize, addr, err := d.conn.ReadFrom(packet)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
d.log.Warn("listen connection closed")
@@ -196,14 +197,20 @@ func (d *Discovery) listen() error {
return nil
}
- if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
+ if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() {
continue
}
return fmt.Errorf("read from: %w", err)
}
- nodeAddr := net.IP(buf[:n])
+ if !packet.MagicOk() {
+ d.log.Warn("data without magic bytes received")
+
+ continue
+ }
+
+ nodeAddr := packet.IP(readSize)
d.log.Debug("received node address", zap.Stringer("address", nodeAddr))
clientIP, _, _ := strings.Cut(addr.String(), ":")
@@ -218,7 +225,7 @@ func (d *Discovery) listen() error {
func (d *Discovery) broadcast() {
d.log.Debug("broadcast")
- if _, err := d.conn.WriteTo(d.ownAddr, d.broadcastAddr); err != nil {
+ if _, err := d.conn.WriteTo(d.ownAddrPacket, d.broadcastAddr); err != nil {
d.log.Error("write broadcast message", zap.Error(err))
}
}
@@ -230,15 +237,32 @@ func (d *Discovery) addNode(addr string) {
d.mu.Lock()
- if _, ok := d.nodes[addr]; !ok {
+ if _, ok := d.knownNodes[addr]; !ok {
d.log.Info("new node address", zap.String("address", addr))
- d.nodes[addr] = struct{}{}
+ d.knownNodes[addr] = struct{}{}
d.newNodesCh <- addr
}
d.mu.Unlock()
}
-func (d *Discovery) Nodes() NewNodes {
+func (d *Discovery) removeNode(addr string) {
+ d.mu.Lock()
+
+ if _, ok := d.knownNodes[addr]; ok {
+ d.log.Warn("node failed, removed", zap.String("address", addr))
+ delete(d.knownNodes, addr)
+ }
+
+ d.mu.Unlock()
+}
+
+// NewNodes returns channel with new discovered node addresses.
+func (d *Discovery) NewNodes() NewNodes {
return d.newNodesCh
}
+
+// FailNode remove address from list of known nodes. Next broadcast message from this node will be sent to the `NewNodes` channel.
+func (d *Discovery) FailNode(addr string) {
+ d.removeNode(addr)
+}
diff --git a/pkg/discovery/service_test.go b/pkg/discovery/service_test.go
index 49009ac..ec2d36b 100644
--- a/pkg/discovery/service_test.go
+++ b/pkg/discovery/service_test.go
@@ -17,7 +17,8 @@ func TestIface(t *testing.T) {
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
t.Cleanup(cancel)
- ctx, _ = context.WithTimeout(ctx, time.Second*5)
+ ctx, cancel = context.WithTimeout(ctx, time.Second*5)
+ t.Cleanup(cancel)
set, err := NewDiscoverySet(zaptest.NewLogger(t).Named("discovery"), 1234, WithDebug())
require.NoError(t, err)
@@ -34,5 +35,5 @@ func TestIface(t *testing.T) {
}()
wg.Wait()
- assert.Len(t, set[0].Nodes(), 1)
+ assert.Len(t, set[0].NewNodes(), 1)
}
diff --git a/pkg/messenger/service.go b/pkg/messenger/service.go
new file mode 100644
index 0000000..e2af73a
--- /dev/null
+++ b/pkg/messenger/service.go
@@ -0,0 +1,220 @@
+package messenger
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strings"
+ "sync"
+ "time"
+
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
+
+ "git.derfenix.pro/fenix/astontest/api"
+)
+
+type DataStorage interface {
+ StoreReceived(ctx context.Context, sender, message string) error
+ StoreSent(ctx context.Context, sender, message string) error
+}
+
+type DataSource interface {
+ NewString(ctx context.Context) string
+}
+
+func NewMessenger(newNodes <-chan string, port uint16, storage DataStorage, source DataSource, pingInterval time.Duration, log *zap.Logger) *Messenger {
+ return &Messenger{
+ newNodes: newNodes,
+ storage: storage,
+ source: source,
+ port: port,
+ log: log,
+ pingInterval: pingInterval,
+ FailNodesCh: make(chan string, 1),
+ }
+}
+
+type Messenger struct {
+ newNodes <-chan string
+ storage DataStorage
+ source DataSource
+ port uint16
+ log *zap.Logger
+ pingInterval time.Duration
+
+ FailNodesCh chan string
+
+ api.UnimplementedAstonTestServer
+}
+
+func (m *Messenger) Start(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+
+ case node := <-m.newNodes:
+ wg.Add(1)
+ go m.newStream(ctx, wg, node)
+ }
+ }
+}
+
+func (m *Messenger) StartServer(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ srv := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
+ api.RegisterAstonTestServer(srv, m)
+
+ lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", m.port))
+ if err != nil {
+ m.log.Error("failed to create listener", zap.Error(err))
+
+ return
+ }
+
+ wg.Add(1)
+ go func() {
+ <-ctx.Done()
+
+ go func() {
+ time.Sleep(time.Second)
+ srv.Stop()
+ }()
+
+ srv.GracefulStop()
+ wg.Done()
+ }()
+
+ m.log.Info("server start")
+
+ if err := srv.Serve(lis); err != nil {
+ m.log.Error("serve error", zap.Error(err))
+ }
+}
+
+func (m *Messenger) Ping(stream api.AstonTest_PingServer) error {
+ ticker := time.NewTicker(m.pingInterval)
+ defer ticker.Stop()
+
+ peerData, peerFound := peer.FromContext(stream.Context())
+ if !peerFound {
+ m.log.Warn("not peer found in context")
+ peerData = &peer.Peer{
+ Addr: &net.IPAddr{},
+ }
+ }
+
+ peerIP, _, _ := strings.Cut(peerData.Addr.String(), ":")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return nil
+
+ case <-ticker.C:
+ data := m.source.NewString(stream.Context())
+
+ if err := stream.Send(&api.PingPong{Value: data}); err != nil {
+ m.log.Error("failed to send message", zap.Error(err))
+
+ if code := status.Convert(err).Code(); code == codes.Canceled || code == codes.Unavailable {
+ return nil
+ }
+
+ continue
+ }
+
+ if err := m.storage.StoreSent(stream.Context(), peerIP, data); err != nil {
+ m.log.Error("failed to save sent data", zap.Error(err), zap.String("data", data), zap.String("remote_addr", peerIP))
+ }
+ }
+ }
+}
+
+func (m *Messenger) newStream(ctx context.Context, wg *sync.WaitGroup, address string) {
+ defer wg.Done()
+
+ defer func() {
+ m.FailNodesCh <- address
+ }()
+
+ log := m.log.With(zap.String("address", address))
+
+ cc, err := grpc.Dial(fmt.Sprintf("%s:%d", address, m.port), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ log.Error("failed to dial", zap.Error(err))
+
+ return
+ }
+
+ defer func() {
+ if err := cc.Close(); err != nil {
+ log.Error("close connection", zap.Error(err))
+ }
+ }()
+
+ client := api.NewAstonTestClient(cc)
+ pingClient, err := client.Ping(ctx)
+ if err != nil {
+ log.Error("create ping stream", zap.Error(err))
+
+ return
+ }
+
+ log.Info("stream started")
+
+ msgCh := m.newMessagesCh(pingClient, log)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+
+ case msg, ok := <-msgCh:
+ if !ok {
+ log.Warn("message channel closed")
+
+ return
+ }
+
+ if msg.GetValue() == "" {
+ continue
+ }
+
+ log.Debug("received new message", zap.String("message", msg.GetValue()))
+
+ if err := m.storage.StoreReceived(ctx, address, msg.GetValue()); err != nil {
+ log.Error("store message", zap.Error(err), zap.String("message", msg.GetValue()))
+ }
+ }
+ }
+}
+
+func (m *Messenger) newMessagesCh(pingClient api.AstonTest_PingClient, log *zap.Logger) chan *api.PingPong {
+ msgCh := make(chan *api.PingPong)
+
+ go func() {
+ for {
+ msg, err := pingClient.Recv()
+ if err != nil {
+ if code := status.Convert(err).Code(); !(code == codes.Canceled || code == codes.Unavailable) {
+ log.Error("message receive failed", zap.Error(err))
+ }
+
+ break
+ }
+ msgCh <- msg
+ }
+ close(msgCh)
+ }()
+
+ return msgCh
+}