From 09fef3c11314b6267bef326e7ff3a405267d2402 Mon Sep 17 00:00:00 2001 From: derfenix Date: Thu, 1 Aug 2024 23:42:30 +0300 Subject: [PATCH] initial commit --- .gitignore | 43 +++++ .idea/.gitignore | 8 + .idea/commander.iml | 9 + .idea/git_toolbox_blame.xml | 6 + .idea/git_toolbox_prj.xml | 15 ++ .idea/modules.xml | 8 + .idea/vcs.xml | 6 + cache.go | 117 +++++++++++++ examples/basic/commands.go | 66 ++++++++ examples/basic/go.mod | 25 +++ examples/basic/go.sum | 29 ++++ examples/basic/service.go | 25 +++ examples/basic/service_test.go | 134 +++++++++++++++ executor.go | 289 +++++++++++++++++++++++++++++++++ go.mod | 20 +++ go.sum | 27 +++ inmemorycache/go.mod | 5 + inmemorycache/go.sum | 2 + inmemorycache/inmemorycache.go | 30 ++++ semaphore.go | 31 ++++ semaphore_test.go | 36 ++++ 21 files changed, 931 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/commander.iml create mode 100644 .idea/git_toolbox_blame.xml create mode 100644 .idea/git_toolbox_prj.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 cache.go create mode 100644 examples/basic/commands.go create mode 100644 examples/basic/go.mod create mode 100644 examples/basic/go.sum create mode 100644 examples/basic/service.go create mode 100644 examples/basic/service_test.go create mode 100644 executor.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 inmemorycache/go.mod create mode 100644 inmemorycache/go.sum create mode 100644 inmemorycache/inmemorycache.go create mode 100644 semaphore.go create mode 100644 semaphore_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..04d0201 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf +.idea/**/aws.xml +.idea/**/contentModel.xml +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml +.idea/**/gradle.xml +.idea/**/libraries +cmake-build-*/ +.idea/**/mongoSettings.xml +*.iws +out/ +.idea_modules/ +atlassian-ide-plugin.xml +.idea/replstate.xml +.idea/sonarlint/ +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +.idea/httpRequests +.idea/caches/build_file_checksums.ser +*~ +.fuse_hidden* +.directory +.Trash-* +.nfs* +*.exe +*.exe~ +*.dll +*.so +*.dylib +*.test +*.out +go.work diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/commander.iml b/.idea/commander.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/commander.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/git_toolbox_blame.xml b/.idea/git_toolbox_blame.xml new file mode 100644 index 0000000..7dc1249 --- /dev/null +++ b/.idea/git_toolbox_blame.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/git_toolbox_prj.xml b/.idea/git_toolbox_prj.xml new file mode 100644 index 0000000..02b915b --- /dev/null +++ b/.idea/git_toolbox_prj.xml @@ -0,0 +1,15 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..ea288c8 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..35f7a28 --- /dev/null +++ b/cache.go @@ -0,0 +1,117 @@ +package commander + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/vmihailenco/msgpack/v5" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +const cacheTTL = 5 * time.Second + +type Cache interface { + Set(key string, value []byte, ttl time.Duration) error + Get(key string) ([]byte, error) +} + +type commandCache struct { + tracer trace.Tracer + cache Cache + ttl time.Duration + + locks sync.Map +} + +func newCommandCache(cache Cache, ttl time.Duration) *commandCache { + tracer := otel.Tracer(tracerName) + + if ttl <= 0 { + ttl = cacheTTL + } + + return &commandCache{ + tracer: tracer, + cache: cache, + ttl: ttl, + } +} + +func (c *commandCache) CommandCacheGet(ctx context.Context, key string, commands ...Command) bool { + ctx, span := c.tracer.Start(ctx, "command_cache.get") + defer span.End() + + defer c.locker(ctx, key)() + + res, err := c.cache.Get(key) + if err != nil { + span.RecordError(fmt.Errorf("cache get: %w", err)) + span.SetStatus(codes.Error, err.Error()) + + return false + } + + if len(res) == 0 { + span.AddEvent("cache miss") + + return false + } + + span.AddEvent("cache hit") + + if err := c.unmarshal(res, commands); err != nil { + span.RecordError(fmt.Errorf("unmarshal commands: %w", err)) + span.SetStatus(codes.Error, err.Error()) + + return false + } + + return true +} + +func (c *commandCache) CommandCacheStore(ctx context.Context, key string, commands ...Command) { + ctx, span := c.tracer.Start(ctx, "command_cache.store") + defer span.End() + + defer c.locker(ctx, key)() + + marshaled, err := c.marshal(commands) + if err != nil { + span.RecordError(fmt.Errorf("marshal commands: %w", err)) + span.SetStatus(codes.Error, err.Error()) + + return + } + + if err := c.cache.Set(key, marshaled, c.ttl); err != nil { + span.RecordError(fmt.Errorf("set cache: %w", err)) + span.SetStatus(codes.Error, err.Error()) + } +} + +func (c *commandCache) locker(ctx context.Context, key string) (unlock func()) { + ctx, span := c.tracer.Start(ctx, "locker") + defer span.End() + + for { + if _, loaded := c.locks.LoadOrStore(key, true); !loaded { + break + } + } + + return func() { + c.locks.Delete(key) + } +} + +func (c *commandCache) marshal(commands []Command) ([]byte, error) { + return msgpack.Marshal(commands) +} + +func (c *commandCache) unmarshal(res []byte, commands []Command) error { + return msgpack.Unmarshal(res, &commands) +} diff --git a/examples/basic/commands.go b/examples/basic/commands.go new file mode 100644 index 0000000..6c606e8 --- /dev/null +++ b/examples/basic/commands.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "errors" + "time" +) + +type Entity struct { + Name string + Address string +} + +type GetName struct { + UID string + Sleep time.Duration + + Result Entity +} + +func (b *GetName) CorrelationID() string { + return b.UID +} + +func (b *GetName) Execute(context.Context) error { + b.Result = Entity{Name: "Bob"} + + if b.Sleep > 0 { + time.Sleep(b.Sleep) + } + + return nil +} + +func (b *GetName) Rollback(context.Context) error { + b.Result = Entity{} + + return nil +} + +type GetAddress struct { + Input *Entity + + Result Entity +} + +func (b *GetAddress) CorrelationID() string { + return b.Input.Name +} + +func (b *GetAddress) Execute(context.Context) error { + b.Result = *b.Input + if b.Result.Address != "" { + return errors.New("already set") + } + + b.Result.Address = "London" + + return nil +} + +func (b *GetAddress) Rollback(context.Context) error { + b.Result = Entity{} + + return nil +} diff --git a/examples/basic/go.mod b/examples/basic/go.mod new file mode 100644 index 0000000..c3471d8 --- /dev/null +++ b/examples/basic/go.mod @@ -0,0 +1,25 @@ +module git.derfenix.pro/fenix/commander/examples/basic + +go 1.22 + +replace ( + git.derfenix.pro/fenix/commander => ../../ + git.derfenix.pro/fenix/commander/inmemorycache => ../../inmemorycache +) + +require ( + git.derfenix.pro/fenix/commander v0.0.0-00010101000000-000000000000 + git.derfenix.pro/fenix/commander/inmemorycache v0.0.0-00010101000000-000000000000 + github.com/google/uuid v1.6.0 +) + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect +) diff --git a/examples/basic/go.sum b/examples/basic/go.sum new file mode 100644 index 0000000..ba8cc66 --- /dev/null +++ b/examples/basic/go.sum @@ -0,0 +1,29 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/basic/service.go b/examples/basic/service.go new file mode 100644 index 0000000..0ce78c8 --- /dev/null +++ b/examples/basic/service.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + "fmt" + + "git.derfenix.pro/fenix/commander" + "github.com/google/uuid" +) + +func main() { + ctx := context.Background() + + cmd := commander.New(10) + + c1 := GetName{UID: uuid.NewString()} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + if err := cmd.Execute(ctx, "", commands...); err != nil { + panic(err) + } + + fmt.Println(c2.Result) +} diff --git a/examples/basic/service_test.go b/examples/basic/service_test.go new file mode 100644 index 0000000..e54a682 --- /dev/null +++ b/examples/basic/service_test.go @@ -0,0 +1,134 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + + "git.derfenix.pro/fenix/commander" + "git.derfenix.pro/fenix/commander/inmemorycache" +) + +func BenchmarkServiceNoop(b *testing.B) { + ctx := context.Background() + + cmd := commander.New(10) + + c1 := GetName{UID: uuid.NewString()} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if err := cmd.Execute(ctx, "", commands...); err != nil { + _ = err + } + } +} + +func BenchmarkServiceWithDelay(b *testing.B) { + ctx := context.Background() + + cmd := commander.New(10) + + c1 := GetName{UID: uuid.NewString(), Sleep: time.Microsecond * 100} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if err := cmd.Execute(ctx, "", commands...); err != nil { + _ = err + } + } +} + +func BenchmarkServiceWithRollbackNoop(b *testing.B) { + ctx := context.Background() + + cmd := commander.New(10) + + c1 := GetName{UID: uuid.NewString()} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + commands = append(commands, &GetAddress{Input: &c2.Result}) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if err := cmd.Execute(ctx, "", commands...); err != nil { + _ = err + } + } +} + +func BenchmarkServiceWithRollbackWithDelay(b *testing.B) { + ctx := context.Background() + + cmd := commander.New(10) + + c1 := GetName{UID: uuid.NewString(), Sleep: time.Microsecond * 100} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + commands = append(commands, &GetAddress{Input: &c2.Result}) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if err := cmd.Execute(ctx, "", commands...); err != nil { + _ = err + } + } +} + +func BenchmarkServiceWithCacheNoop(b *testing.B) { + ctx := context.Background() + + cmd := commander.New(10) + + cmd = cmd.WithCache(inmemorycache.NewInMemoryCache()) + + c1 := GetName{UID: uuid.NewString()} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if err := cmd.Execute(ctx, "", commands...); err != nil { + _ = err + } + } +} + +func BenchmarkServiceWithCacheWithDelay(b *testing.B) { + ctx := context.Background() + + cmd := commander.New(10) + + cmd = cmd.WithCache(inmemorycache.NewInMemoryCache()) + + c1 := GetName{UID: uuid.NewString(), Sleep: time.Microsecond * 100} + c2 := GetAddress{Input: &c1.Result} + commands := []commander.Command{&c1, &c2} + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if err := cmd.Execute(ctx, "", commands...); err != nil { + _ = err + } + } +} diff --git a/executor.go b/executor.go new file mode 100644 index 0000000..db7e104 --- /dev/null +++ b/executor.go @@ -0,0 +1,289 @@ +package commander + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +const tracerName = "git.derfenix.pro/fenix/commander" + +var ErrNoErrorChannel = errors.New("no error channel provided") + +type Command interface { + Execute(ctx context.Context) error + Rollback(ctx context.Context) error +} + +type CorrelatedCommand interface { + Command + CorrelationID() string +} + +type Eventer interface { + EmitEvent(ctx context.Context) error +} + +type CommandCache interface { + CommandCacheGet(context.Context, string, ...Command) bool + CommandCacheStore(context.Context, string, ...Command) +} + +func mustNewMetrics() metrics { + m, err := newMetrics() + if err != nil { + panic(err) + } + + return m +} + +func newMetrics() (metrics, error) { + meter := otel.GetMeterProvider().Meter("executor") + + commandsCount, err := meter.Int64Histogram("commands", metric.WithDescription("Count of executed commands (can be rolled back)")) + if err != nil { + return metrics{}, fmt.Errorf("commands count histogram: %w", err) + } + + commandsRollbackCount, err := meter.Int64Histogram("commands.rollback", metric.WithDescription("Count of commands rolled back")) + if err != nil { + return metrics{}, fmt.Errorf("commands count histogram: %w", err) + } + + commandsFailedCount, err := meter.Int64Histogram("commands.failed", metric.WithDescription("Count of failed commands")) + if err != nil { + return metrics{}, fmt.Errorf("commands failed count histogram: %w", err) + } + + commandsRollbackFailedCount, err := meter.Int64Histogram("commands.rollback.failed", metric.WithDescription("Count of commands fail to roll back")) + if err != nil { + return metrics{}, fmt.Errorf("commands count histogram: %w", err) + } + + monitorCommands, err := meter.Int64UpDownCounter("commands.running", metric.WithDescription("Command set in progress")) + if err != nil { + return metrics{}, fmt.Errorf("monitor commands counter: %w", err) + } + + cacheHit, err := meter.Int64Counter("cache.hit", metric.WithDescription("Cache hit")) + if err != nil { + return metrics{}, fmt.Errorf("cache hit: %w", err) + } + + cacheMiss, err := meter.Int64Counter("cache.miss", metric.WithDescription("Cache miss")) + if err != nil { + return metrics{}, fmt.Errorf("cache miss: %w", err) + } + + return metrics{ + commandsCount: commandsCount, + commandsRollbackCount: commandsRollbackCount, + commandsFailedCount: commandsFailedCount, + commandsRollbackFailedCount: commandsRollbackFailedCount, + monitorCommands: monitorCommands, + cacheHit: cacheHit, + cacheMiss: cacheMiss, + }, nil +} + +type metrics struct { + commandsCount metric.Int64Histogram + commandsRollbackCount metric.Int64Histogram + commandsFailedCount metric.Int64Histogram + commandsRollbackFailedCount metric.Int64Histogram + + monitorCommands metric.Int64UpDownCounter + + cacheHit metric.Int64Counter + cacheMiss metric.Int64Counter +} + +func New(commandsLimit int) *Commander { + tracer := otel.Tracer(tracerName) + + return &Commander{ + metrics: mustNewMetrics(), + tracer: tracer, + + correlationIDBuffer: sync.Pool{New: func() any { return bytes.NewBuffer(nil) }}, + + commandsSemaphore: NewSemaphore(commandsLimit), + } +} + +type Commander struct { + metrics + tracer trace.Tracer + cache CommandCache + + correlationIDBuffer sync.Pool + + commandsSemaphore Semaphore +} + +func (c *Commander) WithCache(cache Cache) *Commander { + c.cache = newCommandCache(cache, cacheTTL) + + return c +} + +func (c *Commander) WithCacheTTL(cache Cache, ttl time.Duration) *Commander { + c.cache = newCommandCache(cache, ttl) + + return c +} + +func (c *Commander) ExecuteAsync(ctx context.Context, errCh chan<- error, correlationID string, commands ...Command) { + ctx, span := c.tracer.Start(ctx, "executor.execute_async") + defer span.End() + + if errCh == nil { + span.RecordError(ErrNoErrorChannel) + } + + defer func() { + if errCh != nil { + close(errCh) + } + }() + + if err := c.Execute(ctx, correlationID, commands...); err != nil { + if errCh != nil { + errCh <- err + } + } +} + +func (c *Commander) Execute(ctx context.Context, correlationID string, commands ...Command) error { + ctx, span := c.tracer.Start( + ctx, + "executor.execute", + trace.WithAttributes( + attribute.Int("commands.count", len(commands)), + ), + ) + defer span.End() + + if c.cache != nil && correlationID == "" { + correlationID = c.tryGetCorrelationID(commands) + } + + shouldCache := c.cache != nil && correlationID != "" + + c.commandsSemaphore.Acquire() + c.monitorCommands.Add(ctx, 1) + defer func() { + c.commandsSemaphore.Release() + c.monitorCommands.Add(ctx, -1) + }() + + if shouldCache { + if c.cache.CommandCacheGet(ctx, correlationID, commands...) { + c.cacheHit.Add(ctx, 1) + + return nil + } else { + c.cacheMiss.Add(ctx, 1) + } + } + + var actionsCount int64 + defer func() { + if actionsCount > 0 { + c.commandsCount.Record(ctx, actionsCount) + } + }() + + eventEmitters := make([]Eventer, 0, len(commands)) + + for idx, command := range commands { + if err := command.Execute(ctx); err != nil { + c.commandsFailedCount.Record(ctx, 1) + + span.RecordError( + err, + trace.WithAttributes(attribute.String("step", "execute")), + trace.WithAttributes(attribute.String("command", fmt.Sprintf("%T", command))), + ) + span.SetStatus(codes.Error, "failed to execute command") + + if idx > 0 { + c.Rollback(ctx, commands[:idx]...) + } + + return fmt.Errorf("execute command %v: %w", command, err) + } + + if eventer, ok := command.(Eventer); ok { + eventEmitters = append(eventEmitters, eventer) + } + + actionsCount++ + } + + for _, emitter := range eventEmitters { + if err := emitter.EmitEvent(ctx); err != nil { + span.RecordError( + err, + trace.WithAttributes(attribute.String("step", "send events")), + trace.WithAttributes(attribute.String("command", fmt.Sprintf("%T", emitter))), + ) + } + } + + if shouldCache { + c.cache.CommandCacheStore(ctx, correlationID, commands...) + } + + return nil +} + +func (c *Commander) tryGetCorrelationID(commands []Command) string { + newCorrelationID := c.correlationIDBuffer.Get().(*bytes.Buffer) + defer func() { + newCorrelationID.Reset() + c.correlationIDBuffer.Put(newCorrelationID) + }() + + for _, command := range commands { + correlationIDer, ok := command.(CorrelatedCommand) + if !ok { + return "" + } + + newCorrelationID.WriteString(correlationIDer.CorrelationID()) + } + + return newCorrelationID.String() +} + +func (c *Commander) Rollback(ctx context.Context, commands ...Command) { + ctx, span := c.tracer.Start(ctx, "executor.rollback") + defer span.End() + + for _, command := range commands { + if err := command.Rollback(ctx); err != nil { + c.commandsRollbackFailedCount.Record(ctx, 1) + + span.RecordError( + err, + trace.WithAttributes(attribute.String("step", "rollback")), + trace.WithAttributes(attribute.String("command", fmt.Sprintf("%T", command))), + ) + + continue + } + + c.commandsRollbackCount.Record(ctx, 1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..74cb0ef --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module git.derfenix.pro/fenix/commander + +go 1.22 + +require ( + github.com/stretchr/testify v1.9.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/metric v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..80a1410 --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/inmemorycache/go.mod b/inmemorycache/go.mod new file mode 100644 index 0000000..edb0443 --- /dev/null +++ b/inmemorycache/go.mod @@ -0,0 +1,5 @@ +module git.derfenix.pro/fenix/commander/inmemorycache + +go 1.22 + +require github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/inmemorycache/go.sum b/inmemorycache/go.sum new file mode 100644 index 0000000..33c1ec4 --- /dev/null +++ b/inmemorycache/go.sum @@ -0,0 +1,2 @@ +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= diff --git a/inmemorycache/inmemorycache.go b/inmemorycache/inmemorycache.go new file mode 100644 index 0000000..c07c2cc --- /dev/null +++ b/inmemorycache/inmemorycache.go @@ -0,0 +1,30 @@ +package inmemorycache + +import ( + "time" + + "github.com/patrickmn/go-cache" +) + +type InMemoryCache struct { + cache *cache.Cache +} + +func NewInMemoryCache() *InMemoryCache { + return &InMemoryCache{cache: cache.New(time.Minute, 5*time.Minute)} +} + +func (i *InMemoryCache) Set(key string, value []byte, ttl time.Duration) error { + i.cache.Set(key, value, ttl) + + return nil +} + +func (i *InMemoryCache) Get(key string) ([]byte, error) { + res, ok := i.cache.Get(key) + if !ok { + return nil, nil + } + + return res.([]byte), nil +} diff --git a/semaphore.go b/semaphore.go new file mode 100644 index 0000000..e3b3856 --- /dev/null +++ b/semaphore.go @@ -0,0 +1,31 @@ +package commander + +func NewSemaphore(len int) Semaphore { + if len == 0 { + return nil + } + + return make(Semaphore, len) +} + +type Semaphore chan struct{} + +func (s Semaphore) Acquire() { + if s == nil { + return + } + + s <- struct{}{} +} + +func (s Semaphore) Release() { + if s == nil { + return + } + + <-s +} + +func (s Semaphore) Close() { + close(s) +} diff --git a/semaphore_test.go b/semaphore_test.go new file mode 100644 index 0000000..d0e26c9 --- /dev/null +++ b/semaphore_test.go @@ -0,0 +1,36 @@ +package commander + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNewSemaphore(t *testing.T) { + t.Parallel() + + sem := NewSemaphore(2) + + var locked atomic.Uint32 + + for i := 0; i < 3; i++ { + go func() { + sem.Acquire() + locked.Add(1) + }() + } + + time.Sleep(time.Microsecond * 10) + + require.Equal(t, uint32(2), locked.Load()) + + sem.Release() + time.Sleep(time.Microsecond) + + require.Equal(t, uint32(3), locked.Load()) + + sem.Release() + sem.Release() +}