commit f04111c68921a3c1c3083ab4a0995f30f1fe529b Author: derfenix Date: Thu May 30 19:42:26 2024 +0300 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d6503d --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*~ +.fuse_hidden* +.directory +.Trash-* +.nfs* +.idea diff --git a/README.md b/README.md new file mode 100644 index 0000000..5d985ec --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# ТЗ + +Реализовать модель обработки данных в виде пайплайна, состоящего из следующих этапов +1. Подача на вход пакетов данных. Пакет данных = слайс случайных целых чисел из 10 элементов. Новый пакет подается каждые N мс (N задается в виде env переменной) +2. Обработка пакетов: нахождение 3-х наибольших чисел в пакете. Вход: слайс int из 10 элементов, выход: слайс из 3-х элементов. Обработка пакетов должна производиться M воркерами (M задается в виде env переменной) +3. Аккумулятор: суммирование чисел обработанных пакетов, полученных на предыдущем этапе, и запись в единую переменную int +4. Публикатор: вывод на консоль текущего значения аккумулятора каждые K секунд (K задается в виде env переменной) + +Пример: +вход: {1, 9, 6, 4, 4, 5, 7, 8, 0, 1} +обработка: {9, 7, 8} +аккумулятор: 9+7+8=24 +публикатор: 24 diff --git a/accumulator/accumulator.go b/accumulator/accumulator.go new file mode 100644 index 0000000..56547c9 --- /dev/null +++ b/accumulator/accumulator.go @@ -0,0 +1,40 @@ +package accumulator + +import ( + "context" + "sync" + "sync/atomic" +) + +func NewAccumulator(in <-chan [3]int) *Accumulator { + return &Accumulator{in: in} +} + +type Accumulator struct { + in <-chan [3]int + summ atomic.Int64 +} + +func (a *Accumulator) Start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + + case m, ok := <-a.in: + if !ok { + return + } + + for _, value := range m { + a.summ.Add(int64(value)) + } + } + } +} + +func (a *Accumulator) Res() int64 { + return a.summ.Load() +} diff --git a/adapters/randomsource/service.go b/adapters/randomsource/service.go new file mode 100644 index 0000000..1123068 --- /dev/null +++ b/adapters/randomsource/service.go @@ -0,0 +1,58 @@ +package randomsource + +import ( + "context" + crand "crypto/rand" + "io" + "math/big" + "sync" + "time" +) + +func NewService(ch chan [10]int, interval time.Duration, maxInt int64) *Service { + return &Service{ + interval: interval, + ch: ch, + rand: crand.Reader, + maxInt: maxInt, + } +} + +type Service struct { + interval time.Duration + rand io.Reader + ch chan [10]int + maxInt int64 +} + +func (s *Service) SourceCh() <-chan [10]int { + return s.ch +} + +func (s *Service) Start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + var res [10]int + + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + for i := 0; i < 10; i++ { + v, err := crand.Int(s.rand, big.NewInt(s.maxInt)) + if err != nil { + panic(err) + } + + res[i] = int(v.Int64()) + } + + s.ch <- res + } + } +} diff --git a/adapters/stdoutpub/service.go b/adapters/stdoutpub/service.go new file mode 100644 index 0000000..bc63aed --- /dev/null +++ b/adapters/stdoutpub/service.go @@ -0,0 +1,26 @@ +package stdoutpub + +import ( + "fmt" + "os" +) + +const outPrefix = "Result: " + +func NewService() Service { + return Service{ + fd: os.Stdout, + } +} + +type Service struct { + fd *os.File +} + +func (s Service) Publish(i int64) error { + if _, err := fmt.Fprintf(s.fd, "%s%d\n", outPrefix, i); err != nil { + return fmt.Errorf("write to file: %w", err) + } + + return nil +} diff --git a/application/application.go b/application/application.go new file mode 100644 index 0000000..cda82dd --- /dev/null +++ b/application/application.go @@ -0,0 +1,93 @@ +package application + +import ( + "context" + "fmt" + "sync" + "time" + + "marketlab/accumulator" + "marketlab/adapters/randomsource" + "marketlab/adapters/stdoutpub" + "marketlab/worker" + + "go.uber.org/zap" +) + +type Source interface { + Start(ctx context.Context, wg *sync.WaitGroup) + SourceCh() <-chan [10]int +} + +type Publisher interface { + Publish(int64) error +} + +func NewApplication(ctx context.Context) (Application, error) { + cfg, err := NewConfig(ctx) + if err != nil { + return Application{}, fmt.Errorf("create config: %w", err) + } + + logger, err := zap.NewDevelopment() + if err != nil { + return Application{}, fmt.Errorf("create logger: %w", err) + } + + in := make(chan [10]int, cfg.WorkersCount) + + pool := worker.NewPool(in, cfg.WorkersCount, logger.Named("pool")) + + acc := accumulator.NewAccumulator(pool.Out()) + + src := randomsource.NewService(in, cfg.PacketInputInterval, 100) + + pub := stdoutpub.NewService() + + return Application{ + Logger: logger, + Cfg: cfg, + Source: src, + Pool: pool, + Accumulator: acc, + Publisher: pub, + }, nil +} + +type Application struct { + Logger *zap.Logger + Cfg Config + Source Source + Pool *worker.Pool + Accumulator *accumulator.Accumulator + Publisher Publisher +} + +func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) error { + wg.Add(4) + + go a.Source.Start(ctx, wg) + go a.Accumulator.Start(ctx, wg) + go a.Pool.Start(ctx, wg) + + go func() { + wg.Done() + + ticker := time.NewTicker(a.Cfg.OutputInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + if err := a.Publisher.Publish(a.Accumulator.Res()); err != nil { + a.Logger.Error("failed to publish result", zap.Error(err)) + } + } + } + }() + + return nil +} diff --git a/application/config.go b/application/config.go new file mode 100644 index 0000000..b5952c4 --- /dev/null +++ b/application/config.go @@ -0,0 +1,25 @@ +package application + +import ( + "context" + "fmt" + "time" + + "github.com/sethvargo/go-envconfig" +) + +type Config struct { + PacketInputInterval time.Duration `env:"PACKET_INPUT_INTERVAL,default=500ms"` + WorkersCount uint `env:"WORKERS_COUNT,default=10"` + OutputInterval time.Duration `env:"OUTPUT_INTERVAL,default=1s"` +} + +func NewConfig(ctx context.Context) (Config, error) { + var cfg Config + + if err := envconfig.Process(ctx, &cfg); err != nil { + return Config{}, fmt.Errorf("failed to process env vars: %w", err) + } + + return cfg, nil +} diff --git a/cmd/service/main.go b/cmd/service/main.go new file mode 100644 index 0000000..d368163 --- /dev/null +++ b/cmd/service/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "context" + "os/signal" + "sync" + "syscall" + + "marketlab/application" + + "go.uber.org/zap" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + app, err := application.NewApplication(ctx) + if err != nil { + panic(err) + } + + wg := &sync.WaitGroup{} + if err := app.Start(ctx, wg); err != nil { + app.Logger.Fatal("startup", zap.Error(err)) + } + + wg.Wait() + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9f87629 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module marketlab + +go 1.22 + +require ( + github.com/sethvargo/go-envconfig v1.0.3 + go.uber.org/zap v1.27.0 +) + +require go.uber.org/multierr v1.10.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2f6493a --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sethvargo/go-envconfig v1.0.3 h1:ZDxFGT1M7RPX0wgDOCdZMidrEB+NrayYr6fL0/+pk4I= +github.com/sethvargo/go-envconfig v1.0.3/go.mod h1:JLd0KFWQYzyENqnEPWWZ49i4vzZo/6nRidxI8YvGiHw= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/worker/pool.go b/worker/pool.go new file mode 100644 index 0000000..32fdd20 --- /dev/null +++ b/worker/pool.go @@ -0,0 +1,42 @@ +package worker + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/zap" +) + +func NewPool(in <-chan [10]int, workersNum uint, log *zap.Logger) *Pool { + out := make(chan [3]int, workersNum) + + return &Pool{ + in: in, + out: out, + workersNum: int(workersNum), + log: log, + } +} + +type Pool struct { + in <-chan [10]int + out chan [3]int + + workersNum int + log *zap.Logger +} + +func (p *Pool) Start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + wg.Add(p.workersNum) + + for i := 0; i < p.workersNum; i++ { + go NewWorker(p.in, p.out, p.log.Named(fmt.Sprintf("worker_%d", i))).Start(ctx, wg) + } +} + +func (p *Pool) Out() <-chan [3]int { + return p.out +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..e64e927 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,51 @@ +package worker + +import ( + "context" + "sort" + "sync" + + "go.uber.org/zap" +) + +func NewWorker(in <-chan [10]int, out chan<- [3]int, log *zap.Logger) *Worker { + return &Worker{ + in: in, + out: out, + log: log, + } +} + +type Worker struct { + in <-chan [10]int + out chan<- [3]int + log *zap.Logger +} + +func (w *Worker) Start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + + case m, ok := <-w.in: + if !ok { + return + } + + w.out <- w.findMax(m) + } + } +} + +func (w *Worker) findMax(input [10]int) [3]int { + sort.Ints(input[:]) + + res := [3]int(input[7:]) + + w.log.Debug("new data", zap.Ints("input", input[:]), zap.Ints("output", res[:])) + + return res +} diff --git a/worker/worker_test.go b/worker/worker_test.go new file mode 100644 index 0000000..9e9eb96 --- /dev/null +++ b/worker/worker_test.go @@ -0,0 +1,43 @@ +package worker + +import ( + "reflect" + "strconv" + "testing" +) + +func TestWorker_findMax(t *testing.T) { + t.Parallel() + + tests := []struct { + in [10]int + out [3]int + }{ + { + in: [10]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + out: [3]int{8, 9, 10}, + }, + { + in: [10]int{10, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + out: [3]int{8, 9, 10}, + }, + { + in: [10]int{9, 7, 6, 10, 5, 4, 3, 8, 1}, + out: [3]int{8, 9, 10}, + }, + } + + for idx, tt := range tests { + tt := tt + + t.Run(strconv.Itoa(idx), func(t *testing.T) { + t.Parallel() + + w := &Worker{} + + if got := w.findMax(tt.in); !reflect.DeepEqual(got, tt.out) { + t.Errorf("findMax() = %v, want %v", got, tt.out) + } + }) + } +}