Files
marketlabtest/application/application.go
2024-05-30 19:49:06 +03:00

94 lines
1.7 KiB
Go

package application
import (
"context"
"fmt"
"sync"
"time"
"marketlab/accumulator"
"marketlab/adapters/randomsource"
"marketlab/adapters/stdoutpub"
"marketlab/worker"
"go.uber.org/zap"
)
type Source interface {
Start(ctx context.Context, wg *sync.WaitGroup)
SourceCh() <-chan [10]int
}
type Publisher interface {
Publish(int64) error
}
func NewApplication(ctx context.Context) (Application, error) {
cfg, err := NewConfig(ctx)
if err != nil {
return Application{}, fmt.Errorf("create config: %w", err)
}
logger, err := zap.NewDevelopment()
if err != nil {
return Application{}, fmt.Errorf("create logger: %w", err)
}
in := make(chan [10]int, cfg.WorkersCount)
pool := worker.NewPool(in, cfg.WorkersCount, logger.Named("pool"))
acc := accumulator.NewAccumulator(pool.Out())
src := randomsource.NewService(in, cfg.PacketInputInterval, cfg.PacketMaxValues)
pub := stdoutpub.NewService()
return Application{
Logger: logger,
Cfg: cfg,
Source: src,
Pool: pool,
Accumulator: acc,
Publisher: pub,
}, nil
}
type Application struct {
Logger *zap.Logger
Cfg Config
Source Source
Pool *worker.Pool
Accumulator *accumulator.Accumulator
Publisher Publisher
}
func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) error {
wg.Add(4)
go a.Source.Start(ctx, wg)
go a.Accumulator.Start(ctx, wg)
go a.Pool.Start(ctx, wg)
go func() {
wg.Done()
ticker := time.NewTicker(a.Cfg.OutputInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := a.Publisher.Publish(a.Accumulator.Res()); err != nil {
a.Logger.Error("failed to publish result", zap.Error(err))
}
}
}
}()
return nil
}