Improve caching, move mutex to the cache implementation
This commit is contained in:
@@ -1,11 +1,42 @@
|
|||||||
package inmemorycache
|
package inmemorycache
|
||||||
|
|
||||||
type Cache map[uint64][]string
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
func (c Cache) Get(id uint64) []string {
|
func NewCache() *Cache {
|
||||||
return c[id]
|
return &Cache{cache: map[uint64][]string{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Cache) Append(id uint64, ip string) {
|
type Cache struct {
|
||||||
c[id] = append(c[id], ip)
|
cache map[uint64][]string
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache) Get(id uint64) []string {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
|
return c.cache[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache) Append(id uint64, ip string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if ips := c.cache[id]; len(ips) == 0 {
|
||||||
|
c.cache[id] = append(c.cache[id], ip)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range c.cache[id] {
|
||||||
|
if s == ip {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.cache[id] = append(c.cache[id], ip)
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ func NewApplication(ctx context.Context, cfg Config, logger *zap.Logger) (*Appli
|
|||||||
return nil, fmt.Errorf("new db: %w", err)
|
return nil, fmt.Errorf("new db: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cache := make(inmemorycache.Cache)
|
cache := inmemorycache.NewCache()
|
||||||
|
|
||||||
repo, err := repository.NewConnLogs(ctx, db, cache, logger, cfg.UpdateInterval)
|
repo, err := repository.NewConnLogs(ctx, db, cache, logger, cfg.UpdateInterval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/uptrace/bun"
|
"github.com/uptrace/bun"
|
||||||
@@ -57,7 +56,6 @@ func NewConnLogs(ctx context.Context, db *bun.DB, cache Cache, logger *zap.Logge
|
|||||||
|
|
||||||
type ConnLogs struct {
|
type ConnLogs struct {
|
||||||
db *bun.DB
|
db *bun.DB
|
||||||
mu sync.RWMutex
|
|
||||||
cache Cache
|
cache Cache
|
||||||
lastTS time.Time
|
lastTS time.Time
|
||||||
}
|
}
|
||||||
@@ -65,9 +63,6 @@ type ConnLogs struct {
|
|||||||
func (l *ConnLogs) fillCache(ctx context.Context) error {
|
func (l *ConnLogs) fillCache(ctx context.Context) error {
|
||||||
var entity []ConnLog
|
var entity []ConnLog
|
||||||
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
|
|
||||||
query := l.db.NewSelect().Model(&entity).
|
query := l.db.NewSelect().Model(&entity).
|
||||||
Order("ts").
|
Order("ts").
|
||||||
Group("user_id", "ip_addr").
|
Group("user_id", "ip_addr").
|
||||||
@@ -82,22 +77,8 @@ func (l *ConnLogs) fillCache(ctx context.Context) error {
|
|||||||
return fmt.Errorf("select: %w", err)
|
return fmt.Errorf("select: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
loop:
|
|
||||||
for i := range entity {
|
for i := range entity {
|
||||||
item := &entity[i]
|
l.cache.Append(entity[i].UserID, entity[i].IP)
|
||||||
|
|
||||||
if ips := l.cache.Get(item.UserID); len(ips) == 0 {
|
|
||||||
l.cache.Append(item.UserID, item.IP)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range l.cache.Get(item.UserID) {
|
|
||||||
if s == item.IP {
|
|
||||||
continue loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
l.cache.Append(item.UserID, item.IP)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(entity) > 0 {
|
if len(entity) > 0 {
|
||||||
|
|||||||
@@ -70,7 +70,9 @@ func TestLogs_Get(t *testing.T) {
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
logger := zaptest.NewLogger(t)
|
logger := zaptest.NewLogger(t)
|
||||||
|
|
||||||
require.NoError(t, application.Migrate(ctx, db, logger))
|
require.NoError(t, application.Migrate(ctx, db, logger))
|
||||||
@@ -96,7 +98,7 @@ func TestLogs_Get(t *testing.T) {
|
|||||||
_, err = db.NewInsert().Model(&testData).Exec(ctx)
|
_, err = db.NewInsert().Model(&testData).Exec(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
repo, err := NewConnLogs(ctx, db, make(inmemorycache.Cache), logger, time.Millisecond*100)
|
repo, err := NewConnLogs(ctx, db, inmemorycache.NewCache(), logger, time.Millisecond*100)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
t.Run("found dup", func(t *testing.T) {
|
t.Run("found dup", func(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user