From acb9f9780641bdcfc097b221f1ec56f876ea912e Mon Sep 17 00:00:00 2001 From: derfenix Date: Thu, 24 Aug 2023 23:53:52 +0300 Subject: [PATCH] Improve caching, move mutex to the cache implementation --- adapters/inmemorycache/cache.go | 41 +++++++++++++++++++++++++---- application/application.go | 2 +- application/repository/logs.go | 21 +-------------- application/repository/logs_test.go | 6 +++-- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/adapters/inmemorycache/cache.go b/adapters/inmemorycache/cache.go index 0693a9b..841adfd 100644 --- a/adapters/inmemorycache/cache.go +++ b/adapters/inmemorycache/cache.go @@ -1,11 +1,42 @@ package inmemorycache -type Cache map[uint64][]string +import ( + "sync" +) -func (c Cache) Get(id uint64) []string { - return c[id] +func NewCache() *Cache { + return &Cache{cache: map[uint64][]string{}} } -func (c Cache) Append(id uint64, ip string) { - c[id] = append(c[id], ip) +type Cache struct { + cache map[uint64][]string + mu sync.RWMutex +} + +func (c *Cache) Get(id uint64) []string { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.cache[id] +} + +func (c *Cache) Append(id uint64, ip string) { + c.mu.Lock() + defer c.mu.Unlock() + + if ips := c.cache[id]; len(ips) == 0 { + c.cache[id] = append(c.cache[id], ip) + + return + } + + for _, s := range c.cache[id] { + if s == ip { + return + } + } + + c.cache[id] = append(c.cache[id], ip) + + return } diff --git a/application/application.go b/application/application.go index 34270a3..3c75a76 100644 --- a/application/application.go +++ b/application/application.go @@ -20,7 +20,7 @@ func NewApplication(ctx context.Context, cfg Config, logger *zap.Logger) (*Appli return nil, fmt.Errorf("new db: %w", err) } - cache := make(inmemorycache.Cache) + cache := inmemorycache.NewCache() repo, err := repository.NewConnLogs(ctx, db, cache, logger, cfg.UpdateInterval) if err != nil { diff --git a/application/repository/logs.go b/application/repository/logs.go index 7fb3a6b..b56f620 100644 --- a/application/repository/logs.go +++ b/application/repository/logs.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/uptrace/bun" @@ -57,7 +56,6 @@ func NewConnLogs(ctx context.Context, db *bun.DB, cache Cache, logger *zap.Logge type ConnLogs struct { db *bun.DB - mu sync.RWMutex cache Cache lastTS time.Time } @@ -65,9 +63,6 @@ type ConnLogs struct { func (l *ConnLogs) fillCache(ctx context.Context) error { var entity []ConnLog - l.mu.Lock() - defer l.mu.Unlock() - query := l.db.NewSelect().Model(&entity). Order("ts"). Group("user_id", "ip_addr"). @@ -82,22 +77,8 @@ func (l *ConnLogs) fillCache(ctx context.Context) error { return fmt.Errorf("select: %w", err) } -loop: for i := range entity { - item := &entity[i] - - if ips := l.cache.Get(item.UserID); len(ips) == 0 { - l.cache.Append(item.UserID, item.IP) - continue - } - - for _, s := range l.cache.Get(item.UserID) { - if s == item.IP { - continue loop - } - } - - l.cache.Append(item.UserID, item.IP) + l.cache.Append(entity[i].UserID, entity[i].IP) } if len(entity) > 0 { diff --git a/application/repository/logs_test.go b/application/repository/logs_test.go index e0ae95d..367698b 100644 --- a/application/repository/logs_test.go +++ b/application/repository/logs_test.go @@ -70,7 +70,9 @@ func TestLogs_Get(t *testing.T) { }) require.NoError(t, err) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + logger := zaptest.NewLogger(t) require.NoError(t, application.Migrate(ctx, db, logger)) @@ -96,7 +98,7 @@ func TestLogs_Get(t *testing.T) { _, err = db.NewInsert().Model(&testData).Exec(ctx) require.NoError(t, err) - repo, err := NewConnLogs(ctx, db, make(inmemorycache.Cache), logger, time.Millisecond*100) + repo, err := NewConnLogs(ctx, db, inmemorycache.NewCache(), logger, time.Millisecond*100) require.NoError(t, err) t.Run("found dup", func(t *testing.T) {