package application import ( "context" "fmt" "sync" "time" "marketlab/accumulator" "marketlab/adapters/randomsource" "marketlab/adapters/stdoutpub" "marketlab/worker" "go.uber.org/zap" ) type Source interface { Start(ctx context.Context, wg *sync.WaitGroup) SourceCh() <-chan [10]int } type Publisher interface { Publish(int64) error } func NewApplication(ctx context.Context) (Application, error) { cfg, err := NewConfig(ctx) if err != nil { return Application{}, fmt.Errorf("create config: %w", err) } logger, err := zap.NewDevelopment() if err != nil { return Application{}, fmt.Errorf("create logger: %w", err) } in := make(chan [10]int, cfg.WorkersCount) pool := worker.NewPool(in, cfg.WorkersCount, logger.Named("pool")) acc := accumulator.NewAccumulator(pool.Out()) src := randomsource.NewService(in, cfg.PacketInputInterval, cfg.PacketMaxValues) pub := stdoutpub.NewService() return Application{ Logger: logger, Cfg: cfg, Source: src, Pool: pool, Accumulator: acc, Publisher: pub, }, nil } type Application struct { Logger *zap.Logger Cfg Config Source Source Pool *worker.Pool Accumulator *accumulator.Accumulator Publisher Publisher } func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) error { wg.Add(4) go a.Source.Start(ctx, wg) go a.Accumulator.Start(ctx, wg) go a.Pool.Start(ctx, wg) go func() { wg.Done() ticker := time.NewTicker(a.Cfg.OutputInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if err := a.Publisher.Publish(a.Accumulator.Res()); err != nil { a.Logger.Error("failed to publish result", zap.Error(err)) } } } }() return nil }