118 lines
2.3 KiB
Go
118 lines
2.3 KiB
Go
package commander
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/vmihailenco/msgpack/v5"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
const cacheTTL = 5 * time.Second
|
|
|
|
type Cache interface {
|
|
Set(key string, value []byte, ttl time.Duration) error
|
|
Get(key string) ([]byte, error)
|
|
}
|
|
|
|
type commandCache struct {
|
|
tracer trace.Tracer
|
|
cache Cache
|
|
ttl time.Duration
|
|
|
|
locks sync.Map
|
|
}
|
|
|
|
func newCommandCache(cache Cache, ttl time.Duration) *commandCache {
|
|
tracer := otel.Tracer(tracerName)
|
|
|
|
if ttl <= 0 {
|
|
ttl = cacheTTL
|
|
}
|
|
|
|
return &commandCache{
|
|
tracer: tracer,
|
|
cache: cache,
|
|
ttl: ttl,
|
|
}
|
|
}
|
|
|
|
func (c *commandCache) CommandCacheGet(ctx context.Context, key string, commands ...Command) bool {
|
|
ctx, span := c.tracer.Start(ctx, "command_cache.get")
|
|
defer span.End()
|
|
|
|
defer c.locker(ctx, key)()
|
|
|
|
res, err := c.cache.Get(key)
|
|
if err != nil {
|
|
span.RecordError(fmt.Errorf("cache get: %w", err))
|
|
span.SetStatus(codes.Error, err.Error())
|
|
|
|
return false
|
|
}
|
|
|
|
if len(res) == 0 {
|
|
span.AddEvent("cache miss")
|
|
|
|
return false
|
|
}
|
|
|
|
span.AddEvent("cache hit")
|
|
|
|
if err := c.unmarshal(res, commands); err != nil {
|
|
span.RecordError(fmt.Errorf("unmarshal commands: %w", err))
|
|
span.SetStatus(codes.Error, err.Error())
|
|
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (c *commandCache) CommandCacheStore(ctx context.Context, key string, commands ...Command) {
|
|
ctx, span := c.tracer.Start(ctx, "command_cache.store")
|
|
defer span.End()
|
|
|
|
defer c.locker(ctx, key)()
|
|
|
|
marshaled, err := c.marshal(commands)
|
|
if err != nil {
|
|
span.RecordError(fmt.Errorf("marshal commands: %w", err))
|
|
span.SetStatus(codes.Error, err.Error())
|
|
|
|
return
|
|
}
|
|
|
|
if err := c.cache.Set(key, marshaled, c.ttl); err != nil {
|
|
span.RecordError(fmt.Errorf("set cache: %w", err))
|
|
span.SetStatus(codes.Error, err.Error())
|
|
}
|
|
}
|
|
|
|
func (c *commandCache) locker(ctx context.Context, key string) (unlock func()) {
|
|
ctx, span := c.tracer.Start(ctx, "locker")
|
|
defer span.End()
|
|
|
|
for {
|
|
if _, loaded := c.locks.LoadOrStore(key, true); !loaded {
|
|
break
|
|
}
|
|
}
|
|
|
|
return func() {
|
|
c.locks.Delete(key)
|
|
}
|
|
}
|
|
|
|
func (c *commandCache) marshal(commands []Command) ([]byte, error) {
|
|
return msgpack.Marshal(commands)
|
|
}
|
|
|
|
func (c *commandCache) unmarshal(res []byte, commands []Command) error {
|
|
return msgpack.Unmarshal(res, &commands)
|
|
}
|