Initial commit

This commit is contained in:
2024-05-30 19:42:26 +03:00
commit f04111c689
13 changed files with 455 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
*~
.fuse_hidden*
.directory
.Trash-*
.nfs*
.idea

13
README.md Normal file
View File

@@ -0,0 +1,13 @@
# ТЗ
Реализовать модель обработки данных в виде пайплайна, состоящего из следующих этапов
1. Подача на вход пакетов данных. Пакет данных = слайс случайных целых чисел из 10 элементов. Новый пакет подается каждые N мс (N задается в виде env переменной)
2. Обработка пакетов: нахождение 3-х наибольших чисел в пакете. Вход: слайс int из 10 элементов, выход: слайс из 3-х элементов. Обработка пакетов должна производиться M воркерами (M задается в виде env переменной)
3. Аккумулятор: суммирование чисел обработанных пакетов, полученных на предыдущем этапе, и запись в единую переменную int
4. Публикатор: вывод на консоль текущего значения аккумулятора каждые K секунд (K задается в виде env переменной)
Пример:
вход: {1, 9, 6, 4, 4, 5, 7, 8, 0, 1}
обработка: {9, 7, 8}
аккумулятор: 9+7+8=24
публикатор: 24

View File

@@ -0,0 +1,40 @@
package accumulator
import (
"context"
"sync"
"sync/atomic"
)
func NewAccumulator(in <-chan [3]int) *Accumulator {
return &Accumulator{in: in}
}
type Accumulator struct {
in <-chan [3]int
summ atomic.Int64
}
func (a *Accumulator) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case m, ok := <-a.in:
if !ok {
return
}
for _, value := range m {
a.summ.Add(int64(value))
}
}
}
}
func (a *Accumulator) Res() int64 {
return a.summ.Load()
}

View File

@@ -0,0 +1,58 @@
package randomsource
import (
"context"
crand "crypto/rand"
"io"
"math/big"
"sync"
"time"
)
func NewService(ch chan [10]int, interval time.Duration, maxInt int64) *Service {
return &Service{
interval: interval,
ch: ch,
rand: crand.Reader,
maxInt: maxInt,
}
}
type Service struct {
interval time.Duration
rand io.Reader
ch chan [10]int
maxInt int64
}
func (s *Service) SourceCh() <-chan [10]int {
return s.ch
}
func (s *Service) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
var res [10]int
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for i := 0; i < 10; i++ {
v, err := crand.Int(s.rand, big.NewInt(s.maxInt))
if err != nil {
panic(err)
}
res[i] = int(v.Int64())
}
s.ch <- res
}
}
}

View File

@@ -0,0 +1,26 @@
package stdoutpub
import (
"fmt"
"os"
)
const outPrefix = "Result: "
func NewService() Service {
return Service{
fd: os.Stdout,
}
}
type Service struct {
fd *os.File
}
func (s Service) Publish(i int64) error {
if _, err := fmt.Fprintf(s.fd, "%s%d\n", outPrefix, i); err != nil {
return fmt.Errorf("write to file: %w", err)
}
return nil
}

View File

@@ -0,0 +1,93 @@
package application
import (
"context"
"fmt"
"sync"
"time"
"marketlab/accumulator"
"marketlab/adapters/randomsource"
"marketlab/adapters/stdoutpub"
"marketlab/worker"
"go.uber.org/zap"
)
type Source interface {
Start(ctx context.Context, wg *sync.WaitGroup)
SourceCh() <-chan [10]int
}
type Publisher interface {
Publish(int64) error
}
func NewApplication(ctx context.Context) (Application, error) {
cfg, err := NewConfig(ctx)
if err != nil {
return Application{}, fmt.Errorf("create config: %w", err)
}
logger, err := zap.NewDevelopment()
if err != nil {
return Application{}, fmt.Errorf("create logger: %w", err)
}
in := make(chan [10]int, cfg.WorkersCount)
pool := worker.NewPool(in, cfg.WorkersCount, logger.Named("pool"))
acc := accumulator.NewAccumulator(pool.Out())
src := randomsource.NewService(in, cfg.PacketInputInterval, 100)
pub := stdoutpub.NewService()
return Application{
Logger: logger,
Cfg: cfg,
Source: src,
Pool: pool,
Accumulator: acc,
Publisher: pub,
}, nil
}
type Application struct {
Logger *zap.Logger
Cfg Config
Source Source
Pool *worker.Pool
Accumulator *accumulator.Accumulator
Publisher Publisher
}
func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) error {
wg.Add(4)
go a.Source.Start(ctx, wg)
go a.Accumulator.Start(ctx, wg)
go a.Pool.Start(ctx, wg)
go func() {
wg.Done()
ticker := time.NewTicker(a.Cfg.OutputInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := a.Publisher.Publish(a.Accumulator.Res()); err != nil {
a.Logger.Error("failed to publish result", zap.Error(err))
}
}
}
}()
return nil
}

25
application/config.go Normal file
View File

@@ -0,0 +1,25 @@
package application
import (
"context"
"fmt"
"time"
"github.com/sethvargo/go-envconfig"
)
type Config struct {
PacketInputInterval time.Duration `env:"PACKET_INPUT_INTERVAL,default=500ms"`
WorkersCount uint `env:"WORKERS_COUNT,default=10"`
OutputInterval time.Duration `env:"OUTPUT_INTERVAL,default=1s"`
}
func NewConfig(ctx context.Context) (Config, error) {
var cfg Config
if err := envconfig.Process(ctx, &cfg); err != nil {
return Config{}, fmt.Errorf("failed to process env vars: %w", err)
}
return cfg, nil
}

30
cmd/service/main.go Normal file
View File

@@ -0,0 +1,30 @@
package main
import (
"context"
"os/signal"
"sync"
"syscall"
"marketlab/application"
"go.uber.org/zap"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
app, err := application.NewApplication(ctx)
if err != nil {
panic(err)
}
wg := &sync.WaitGroup{}
if err := app.Start(ctx, wg); err != nil {
app.Logger.Fatal("startup", zap.Error(err))
}
wg.Wait()
}

10
go.mod Normal file
View File

@@ -0,0 +1,10 @@
module marketlab
go 1.22
require (
github.com/sethvargo/go-envconfig v1.0.3
go.uber.org/zap v1.27.0
)
require go.uber.org/multierr v1.10.0 // indirect

18
go.sum Normal file
View File

@@ -0,0 +1,18 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sethvargo/go-envconfig v1.0.3 h1:ZDxFGT1M7RPX0wgDOCdZMidrEB+NrayYr6fL0/+pk4I=
github.com/sethvargo/go-envconfig v1.0.3/go.mod h1:JLd0KFWQYzyENqnEPWWZ49i4vzZo/6nRidxI8YvGiHw=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

42
worker/pool.go Normal file
View File

@@ -0,0 +1,42 @@
package worker
import (
"context"
"fmt"
"sync"
"go.uber.org/zap"
)
func NewPool(in <-chan [10]int, workersNum uint, log *zap.Logger) *Pool {
out := make(chan [3]int, workersNum)
return &Pool{
in: in,
out: out,
workersNum: int(workersNum),
log: log,
}
}
type Pool struct {
in <-chan [10]int
out chan [3]int
workersNum int
log *zap.Logger
}
func (p *Pool) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
wg.Add(p.workersNum)
for i := 0; i < p.workersNum; i++ {
go NewWorker(p.in, p.out, p.log.Named(fmt.Sprintf("worker_%d", i))).Start(ctx, wg)
}
}
func (p *Pool) Out() <-chan [3]int {
return p.out
}

51
worker/worker.go Normal file
View File

@@ -0,0 +1,51 @@
package worker
import (
"context"
"sort"
"sync"
"go.uber.org/zap"
)
func NewWorker(in <-chan [10]int, out chan<- [3]int, log *zap.Logger) *Worker {
return &Worker{
in: in,
out: out,
log: log,
}
}
type Worker struct {
in <-chan [10]int
out chan<- [3]int
log *zap.Logger
}
func (w *Worker) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case m, ok := <-w.in:
if !ok {
return
}
w.out <- w.findMax(m)
}
}
}
func (w *Worker) findMax(input [10]int) [3]int {
sort.Ints(input[:])
res := [3]int(input[7:])
w.log.Debug("new data", zap.Ints("input", input[:]), zap.Ints("output", res[:]))
return res
}

43
worker/worker_test.go Normal file
View File

@@ -0,0 +1,43 @@
package worker
import (
"reflect"
"strconv"
"testing"
)
func TestWorker_findMax(t *testing.T) {
t.Parallel()
tests := []struct {
in [10]int
out [3]int
}{
{
in: [10]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
out: [3]int{8, 9, 10},
},
{
in: [10]int{10, 9, 8, 7, 6, 5, 4, 3, 2, 1},
out: [3]int{8, 9, 10},
},
{
in: [10]int{9, 7, 6, 10, 5, 4, 3, 8, 1},
out: [3]int{8, 9, 10},
},
}
for idx, tt := range tests {
tt := tt
t.Run(strconv.Itoa(idx), func(t *testing.T) {
t.Parallel()
w := &Worker{}
if got := w.findMax(tt.in); !reflect.DeepEqual(got, tt.out) {
t.Errorf("findMax() = %v, want %v", got, tt.out)
}
})
}
}