mirror of
https://github.com/derfenix/webarchive.git
synced 2026-03-11 12:41:54 +03:00
Update deps, refactoring
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
package badger
|
||||
package repository
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/derfenix/webarchive/adapters/repository"
|
||||
|
||||
"github.com/derfenix/webarchive/entity"
|
||||
)
|
||||
|
||||
@@ -24,7 +26,9 @@ type Page struct {
|
||||
}
|
||||
|
||||
func (p *Page) GetFile(_ context.Context, pageID, fileID uuid.UUID) (*entity.File, error) {
|
||||
page := entity.Page{ID: pageID}
|
||||
page := entity.Page{}
|
||||
page.ID = pageID
|
||||
|
||||
var file *entity.File
|
||||
|
||||
err := p.db.View(func(txn *badger.Txn) error {
|
||||
@@ -66,7 +70,7 @@ func (p *Page) GetFile(_ context.Context, pageID, fileID uuid.UUID) (*entity.Fil
|
||||
|
||||
func (p *Page) Save(_ context.Context, page *entity.Page) error {
|
||||
if p.db.IsClosed() {
|
||||
return ErrDBClosed
|
||||
return repository.ErrDBClosed
|
||||
}
|
||||
|
||||
marshaled, err := marshal(page)
|
||||
@@ -88,16 +92,17 @@ func (p *Page) Save(_ context.Context, page *entity.Page) error {
|
||||
}
|
||||
|
||||
func (p *Page) Get(_ context.Context, id uuid.UUID) (*entity.Page, error) {
|
||||
site := entity.Page{ID: id}
|
||||
page := entity.Page{}
|
||||
page.ID = id
|
||||
|
||||
err := p.db.View(func(txn *badger.Txn) error {
|
||||
data, err := txn.Get(p.key(&site))
|
||||
data, err := txn.Get(p.key(&page))
|
||||
if err != nil {
|
||||
return fmt.Errorf("get data: %w", err)
|
||||
}
|
||||
|
||||
err = data.Value(func(val []byte) error {
|
||||
if err := unmarshal(val, &site); err != nil {
|
||||
if err := unmarshal(val, &page); err != nil {
|
||||
return fmt.Errorf("unmarshal data: %w", err)
|
||||
}
|
||||
|
||||
@@ -113,7 +118,7 @@ func (p *Page) Get(_ context.Context, id uuid.UUID) (*entity.Page, error) {
|
||||
return nil, fmt.Errorf("view: %w", err)
|
||||
}
|
||||
|
||||
return &site, nil
|
||||
return &page, nil
|
||||
}
|
||||
|
||||
func (p *Page) ListAll(ctx context.Context) ([]*entity.Page, error) {
|
||||
@@ -143,16 +148,7 @@ func (p *Page) ListAll(ctx context.Context) ([]*entity.Page, error) {
|
||||
return fmt.Errorf("get item: %w", err)
|
||||
}
|
||||
|
||||
pages = append(pages, &entity.Page{
|
||||
ID: page.ID,
|
||||
URL: page.URL,
|
||||
Description: page.Description,
|
||||
Created: page.Created,
|
||||
Formats: page.Formats,
|
||||
Version: page.Version,
|
||||
Status: page.Status,
|
||||
Meta: page.Meta,
|
||||
})
|
||||
pages = append(pages, &page)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -196,12 +192,11 @@ func (p *Page) ListUnprocessed(ctx context.Context) ([]entity.Page, error) {
|
||||
return fmt.Errorf("get item: %w", err)
|
||||
}
|
||||
|
||||
if page.Status != entity.StatusProcessing {
|
||||
continue
|
||||
if page.Status == entity.StatusNew {
|
||||
//goland:noinspection GoVetCopyLock
|
||||
pages = append(pages, page) //nolint:govet // didn't touch the lock here
|
||||
}
|
||||
|
||||
//goland:noinspection GoVetCopyLock
|
||||
pages = append(pages, page) //nolint:govet // didn't touch the lock here
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/derfenix/webarchive/adapters/repository"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@@ -31,7 +32,7 @@ func TestSite(t *testing.T) {
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
db, err := NewBadger(tempDir, log.Named("db"))
|
||||
db, err := repository.NewBadger(tempDir, log.Named("db"))
|
||||
require.NoError(t, err)
|
||||
|
||||
siteRepo, err := NewPage(db)
|
||||
|
||||
13
adapters/repository/badgers3/marshal.go
Normal file
13
adapters/repository/badgers3/marshal.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package badgers3
|
||||
|
||||
import (
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
)
|
||||
|
||||
func marshal(v interface{}) ([]byte, error) {
|
||||
return msgpack.Marshal(v)
|
||||
}
|
||||
|
||||
func unmarshal(b []byte, v interface{}) error {
|
||||
return msgpack.Unmarshal(b, v)
|
||||
}
|
||||
119
adapters/repository/badgers3/page.go
Normal file
119
adapters/repository/badgers3/page.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package badgers3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/derfenix/webarchive/adapters/repository"
|
||||
"github.com/derfenix/webarchive/entity"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/google/uuid"
|
||||
"github.com/minio/minio-go/v7"
|
||||
)
|
||||
|
||||
func NewPage(db *badger.DB, s3 *minio.Client, bucketName string) (*Page, error) {
|
||||
return &Page{
|
||||
db: db,
|
||||
s3: s3,
|
||||
prefix: []byte("pages3:"),
|
||||
bucketName: bucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type Page struct {
|
||||
db *badger.DB
|
||||
s3 *minio.Client
|
||||
prefix []byte
|
||||
bucketName string
|
||||
}
|
||||
|
||||
func (p *Page) ListAll(ctx context.Context) ([]*entity.PageBase, error) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (p *Page) Get(ctx context.Context, id uuid.UUID) (*entity.Page, error) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (p *Page) GetFile(ctx context.Context, pageID, fileID uuid.UUID) (*entity.File, error) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (p *Page) Save(ctx context.Context, page *entity.Page) error {
|
||||
if p.db.IsClosed() {
|
||||
return repository.ErrDBClosed
|
||||
}
|
||||
|
||||
marshaled, err := marshal(page.PageBase)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal data: %w", err)
|
||||
}
|
||||
|
||||
if err := p.db.Update(func(txn *badger.Txn) error {
|
||||
if err := txn.Set(p.key(page), marshaled); err != nil {
|
||||
return fmt.Errorf("put data: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("update db: %w", err)
|
||||
}
|
||||
|
||||
snowball := make(chan minio.SnowballObject, 1)
|
||||
|
||||
go func() {
|
||||
defer close(snowball)
|
||||
|
||||
for _, result := range page.Results {
|
||||
for _, file := range result.Files {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(snowball) < cap(snowball) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
snowball <- minio.SnowballObject{
|
||||
Key: file.ID.String(),
|
||||
Size: int64(len(file.Data)),
|
||||
ModTime: time.Now(),
|
||||
Content: bytes.NewReader(file.Data),
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err = p.s3.PutObjectsSnowball(ctx, p.bucketName, minio.SnowballOptions{Compress: true}, snowball); err != nil {
|
||||
if dErr := p.db.Update(func(txn *badger.Txn) error {
|
||||
if err := txn.Delete(p.key(page)); err != nil {
|
||||
return fmt.Errorf("put data: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); dErr != nil {
|
||||
err = errors.Join(err, dErr)
|
||||
}
|
||||
|
||||
return fmt.Errorf("store files to s3: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Page) ListUnprocessed(ctx context.Context) ([]entity.Page, error) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (p *Page) key(site *entity.Page) []byte {
|
||||
return append(p.prefix, []byte(site.ID.String())...)
|
||||
}
|
||||
Reference in New Issue
Block a user