Complete loading page to pdf and base API

This commit is contained in:
2023-03-27 22:09:54 +03:00
parent 92469fa3a2
commit 91d8f676ae
24 changed files with 864 additions and 95 deletions

View File

@@ -57,9 +57,7 @@ func (p *Page) SetProcessing() {
p.Status = StatusProcessing
}
func (p *Page) Process(ctx context.Context, wg *sync.WaitGroup, processor Processor) {
defer wg.Done()
func (p *Page) Process(ctx context.Context, processor Processor) {
innerWG := sync.WaitGroup{}
innerWG.Add(len(p.Formats))
@@ -78,6 +76,8 @@ func (p *Page) Process(ctx context.Context, wg *sync.WaitGroup, processor Proces
}(format)
}
innerWG.Wait()
var hasResultWithOutErrors bool
for _, result := range p.Results.Results() {
if result.Err != nil {
@@ -94,6 +94,4 @@ func (p *Page) Process(ctx context.Context, wg *sync.WaitGroup, processor Proces
if p.Status == StatusProcessing {
p.Status = StatusDone
}
innerWG.Wait()
}

View File

@@ -22,12 +22,14 @@ func (r *Results) MarshalMsgpack() ([]byte, error) {
}
func (r *Results) UnmarshalMsgpack(b []byte) error {
return msgpack.Unmarshal(b, r.results)
return msgpack.Unmarshal(b, &r.results)
}
func (r *Results) Add(result Result) {
r.mu.Lock()
r.results = append(r.results, result)
results := r.results
results = append(results, result)
r.results = results
r.mu.Unlock()
}

66
entity/worker.go Normal file
View File

@@ -0,0 +1,66 @@
package entity
import (
"context"
"sync"
"go.uber.org/zap"
)
type Pages interface {
Save(ctx context.Context, page *Page) error
}
func NewWorker(ch chan *Page, pages Pages, processor Processor, log *zap.Logger) *Worker {
return &Worker{pages: pages, processor: processor, log: log, ch: ch}
}
type Worker struct {
ch chan *Page
pages Pages
processor Processor
log *zap.Logger
}
func (w *Worker) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
w.log.Info("starting")
for {
select {
case <-ctx.Done():
return
case page, open := <-w.ch:
if !open {
w.log.Warn("channel closed")
return
}
log := w.log.With(zap.Stringer("page_id", page.ID), zap.String("page_url", page.URL))
log.Info("got new page")
wg.Add(1)
go w.do(ctx, wg, page, log)
}
}
}
func (w *Worker) do(ctx context.Context, wg *sync.WaitGroup, page *Page, log *zap.Logger) {
defer wg.Done()
page.Process(ctx, w.processor)
log.Debug("page processed")
if err := w.pages.Save(ctx, page); err != nil {
w.log.Error(
"failed to save processed page",
zap.String("page_id", page.ID.String()),
zap.String("page_url", page.URL),
zap.Error(err),
)
}
}