From 7864272baa02fa62cac81e28fbbbf8699ed2d8fe Mon Sep 17 00:00:00 2001 From: derfenix Date: Wed, 6 Dec 2023 08:29:05 +0300 Subject: [PATCH] Initial version Project structure, api, discovery service, docker. --- .gitignore | 43 +++++ .idea/.gitignore | 8 + .idea/astontest.iml | 9 ++ .idea/modules.xml | 8 + .idea/vcs.xml | 6 + api/gen.go | 3 + api/server.pb.go | 149 +++++++++++++++++ api/server.proto | 12 ++ api/server_grpc.pb.go | 141 ++++++++++++++++ cmd/server/main.go | 56 +++++++ deploy/Dockerfile | 13 ++ docker-compose.yaml | 16 ++ go.mod | 23 +++ go.sum | 46 ++++++ internal/application/application.go | 42 +++++ internal/application/config.go | 25 +++ pkg/discovery/options.go | 19 +++ pkg/discovery/service.go | 239 ++++++++++++++++++++++++++++ pkg/discovery/service_test.go | 38 +++++ 19 files changed, 896 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/astontest.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 api/gen.go create mode 100644 api/server.pb.go create mode 100644 api/server.proto create mode 100644 api/server_grpc.pb.go create mode 100644 cmd/server/main.go create mode 100644 deploy/Dockerfile create mode 100644 docker-compose.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/application/application.go create mode 100644 internal/application/config.go create mode 100644 pkg/discovery/options.go create mode 100644 pkg/discovery/service.go create mode 100644 pkg/discovery/service_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e434f1c --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +*~ +.fuse_hidden* +.directory +.Trash-* +.nfs* +*.exe +*.exe~ +*.dll +*.so +*.dylib +*.test +*.out +go.work +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf +.idea/**/aws.xml +.idea/**/contentModel.xml +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml +.idea/**/gradle.xml +.idea/**/libraries +cmake-build-*/ +.idea/**/mongoSettings.xml +*.iws +out/ +.idea_modules/ +atlassian-ide-plugin.xml +.idea/replstate.xml +.idea/sonarlint/ +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +.idea/httpRequests +.idea/caches/build_file_checksums.ser diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/astontest.iml b/.idea/astontest.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/astontest.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..a1d09da --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/api/gen.go b/api/gen.go new file mode 100644 index 0000000..615ad2c --- /dev/null +++ b/api/gen.go @@ -0,0 +1,3 @@ +package api + +//go:generate protoc -I . --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative server.proto diff --git a/api/server.pb.go b/api/server.pb.go new file mode 100644 index 0000000..e04d1df --- /dev/null +++ b/api/server.pb.go @@ -0,0 +1,149 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.25.1 +// source: server.proto + +package api + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PingPong struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *PingPong) Reset() { + *x = PingPong{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingPong) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingPong) ProtoMessage() {} + +func (x *PingPong) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingPong.ProtoReflect.Descriptor instead. +func (*PingPong) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{0} +} + +func (x *PingPong) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_server_proto protoreflect.FileDescriptor + +var file_server_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, + 0x61, 0x73, 0x74, 0x6f, 0x6e, 0x74, 0x65, 0x73, 0x74, 0x22, 0x20, 0x0a, 0x08, 0x50, 0x69, 0x6e, + 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x41, 0x0a, 0x09, 0x41, + 0x73, 0x74, 0x6f, 0x6e, 0x54, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, + 0x12, 0x13, 0x2e, 0x61, 0x73, 0x74, 0x6f, 0x6e, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x50, 0x69, 0x6e, + 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x1a, 0x13, 0x2e, 0x61, 0x73, 0x74, 0x6f, 0x6e, 0x74, 0x65, 0x73, + 0x74, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x28, 0x01, 0x30, 0x01, 0x42, 0x26, + 0x5a, 0x24, 0x67, 0x69, 0x74, 0x2e, 0x64, 0x65, 0x72, 0x66, 0x65, 0x6e, 0x69, 0x78, 0x2e, 0x70, + 0x72, 0x6f, 0x2f, 0x66, 0x65, 0x6e, 0x69, 0x78, 0x2f, 0x61, 0x73, 0x74, 0x6f, 0x6e, 0x74, 0x65, + 0x73, 0x74, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_server_proto_rawDescOnce sync.Once + file_server_proto_rawDescData = file_server_proto_rawDesc +) + +func file_server_proto_rawDescGZIP() []byte { + file_server_proto_rawDescOnce.Do(func() { + file_server_proto_rawDescData = protoimpl.X.CompressGZIP(file_server_proto_rawDescData) + }) + return file_server_proto_rawDescData +} + +var file_server_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_server_proto_goTypes = []interface{}{ + (*PingPong)(nil), // 0: astontest.PingPong +} +var file_server_proto_depIdxs = []int32{ + 0, // 0: astontest.AstonTest.Ping:input_type -> astontest.PingPong + 0, // 1: astontest.AstonTest.Ping:output_type -> astontest.PingPong + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_server_proto_init() } +func file_server_proto_init() { + if File_server_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingPong); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_server_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_server_proto_goTypes, + DependencyIndexes: file_server_proto_depIdxs, + MessageInfos: file_server_proto_msgTypes, + }.Build() + File_server_proto = out.File + file_server_proto_rawDesc = nil + file_server_proto_goTypes = nil + file_server_proto_depIdxs = nil +} diff --git a/api/server.proto b/api/server.proto new file mode 100644 index 0000000..7170a1a --- /dev/null +++ b/api/server.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package astontest; +option go_package = "git.derfenix.pro/fenix/astontest/api"; + +service AstonTest { + rpc Ping(stream PingPong) returns (stream PingPong); +} + +message PingPong { + string value = 1; +} diff --git a/api/server_grpc.pb.go b/api/server_grpc.pb.go new file mode 100644 index 0000000..fdcf7cc --- /dev/null +++ b/api/server_grpc.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.25.1 +// source: server.proto + +package api + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + AstonTest_Ping_FullMethodName = "/astontest.AstonTest/Ping" +) + +// AstonTestClient is the client API for AstonTest service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type AstonTestClient interface { + Ping(ctx context.Context, opts ...grpc.CallOption) (AstonTest_PingClient, error) +} + +type astonTestClient struct { + cc grpc.ClientConnInterface +} + +func NewAstonTestClient(cc grpc.ClientConnInterface) AstonTestClient { + return &astonTestClient{cc} +} + +func (c *astonTestClient) Ping(ctx context.Context, opts ...grpc.CallOption) (AstonTest_PingClient, error) { + stream, err := c.cc.NewStream(ctx, &AstonTest_ServiceDesc.Streams[0], AstonTest_Ping_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &astonTestPingClient{stream} + return x, nil +} + +type AstonTest_PingClient interface { + Send(*PingPong) error + Recv() (*PingPong, error) + grpc.ClientStream +} + +type astonTestPingClient struct { + grpc.ClientStream +} + +func (x *astonTestPingClient) Send(m *PingPong) error { + return x.ClientStream.SendMsg(m) +} + +func (x *astonTestPingClient) Recv() (*PingPong, error) { + m := new(PingPong) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// AstonTestServer is the server API for AstonTest service. +// All implementations must embed UnimplementedAstonTestServer +// for forward compatibility +type AstonTestServer interface { + Ping(AstonTest_PingServer) error + mustEmbedUnimplementedAstonTestServer() +} + +// UnimplementedAstonTestServer must be embedded to have forward compatible implementations. +type UnimplementedAstonTestServer struct { +} + +func (UnimplementedAstonTestServer) Ping(AstonTest_PingServer) error { + return status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedAstonTestServer) mustEmbedUnimplementedAstonTestServer() {} + +// UnsafeAstonTestServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AstonTestServer will +// result in compilation errors. +type UnsafeAstonTestServer interface { + mustEmbedUnimplementedAstonTestServer() +} + +func RegisterAstonTestServer(s grpc.ServiceRegistrar, srv AstonTestServer) { + s.RegisterService(&AstonTest_ServiceDesc, srv) +} + +func _AstonTest_Ping_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(AstonTestServer).Ping(&astonTestPingServer{stream}) +} + +type AstonTest_PingServer interface { + Send(*PingPong) error + Recv() (*PingPong, error) + grpc.ServerStream +} + +type astonTestPingServer struct { + grpc.ServerStream +} + +func (x *astonTestPingServer) Send(m *PingPong) error { + return x.ServerStream.SendMsg(m) +} + +func (x *astonTestPingServer) Recv() (*PingPong, error) { + m := new(PingPong) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// AstonTest_ServiceDesc is the grpc.ServiceDesc for AstonTest service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AstonTest_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "astontest.AstonTest", + HandlerType: (*AstonTestServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Ping", + Handler: _AstonTest_Ping_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "server.proto", +} diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..051a725 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "git.derfenix.pro/fenix/astontest/internal/application" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt) + defer cancel() + + cfg, err := application.NewConfig(ctx) + if err != nil { + panic(err) + } + + log, err := getLogger(cfg) + if err != nil { + panic(err) + } + + app, err := application.NewApplication(&cfg, log) + if err != nil { + log.Error("new application", zap.Error(err)) + + return + } + + wg := sync.WaitGroup{} + app.Start(ctx, &wg) + wg.Wait() +} + +func getLogger(cfg application.Config) (*zap.Logger, error) { + logCfg := zap.NewProductionConfig() + logCfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder + if cfg.Debug { + logCfg.DisableStacktrace = false + logCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) + } + + log, err := logCfg.Build() + if err != nil { + return nil, fmt.Errorf("build logger: %w", err) + } + + return log, nil +} diff --git a/deploy/Dockerfile b/deploy/Dockerfile new file mode 100644 index 0000000..989fbf3 --- /dev/null +++ b/deploy/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.21 as builder +WORKDIR /project + +COPY go.* ./ +RUN go mod download + +COPY . . +RUN CGO_ENABLED=0 go build -o /project/service ./cmd/server + + +FROM alpine:latest +COPY --from=builder /project/service / +ENTRYPOINT ["/service"] diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..a20311f --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,16 @@ +--- +version: "3" + +services: + s1: + build: + dockerfile: deploy/Dockerfile + context: . + s2: + build: + dockerfile: deploy/Dockerfile + context: . + s3: + build: + dockerfile: deploy/Dockerfile + context: . diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..33a482a --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module git.derfenix.pro/fenix/astontest + +go 1.21 + +require ( + github.com/sethvargo/go-envconfig v0.9.0 + github.com/stretchr/testify v1.8.1 + go.uber.org/zap v1.26.0 + google.golang.org/grpc v1.59.0 + google.golang.org/protobuf v1.31.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1c65a5b --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= +github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/application/application.go b/internal/application/application.go new file mode 100644 index 0000000..152ac3f --- /dev/null +++ b/internal/application/application.go @@ -0,0 +1,42 @@ +package application + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/zap" + + "git.derfenix.pro/fenix/astontest/pkg/discovery" +) + +type Application struct { + discoverySet discovery.DiscoverySet + log *zap.Logger +} + +func NewApplication(cfg *Config, log *zap.Logger) (*Application, error) { + discoveryOpts := []discovery.Option{discovery.WithBroadcastInterval(cfg.BroadcastInterval)} + + if cfg.Debug { + discoveryOpts = append(discoveryOpts, discovery.WithDebug()) + } + + set, err := discovery.NewDiscoverySet(log.Named("discovery"), cfg.DiscoveryPort, discoveryOpts...) + if err != nil { + return nil, fmt.Errorf("new discovery set: %w", err) + + } + + return &Application{ + discoverySet: set, + log: log, + }, nil +} + +func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) { + wg.Add(len(a.discoverySet)) + for _, discover := range a.discoverySet { + go discover.Start(ctx, wg) + } +} diff --git a/internal/application/config.go b/internal/application/config.go new file mode 100644 index 0000000..46a3e90 --- /dev/null +++ b/internal/application/config.go @@ -0,0 +1,25 @@ +package application + +import ( + "context" + "fmt" + "time" + + "github.com/sethvargo/go-envconfig" +) + +type Config struct { + Debug bool `env:"DEBUG"` + DiscoveryPort uint16 `env:"DISCOVERY_PORT,default=4321"` + BroadcastInterval time.Duration `env:"BROADCAST_INTERVAL,default=5s"` +} + +func NewConfig(ctx context.Context) (Config, error) { + cfg := Config{} + + if err := envconfig.Process(ctx, &cfg); err != nil { + return Config{}, fmt.Errorf("process envs: %w", err) + } + + return cfg, nil +} diff --git a/pkg/discovery/options.go b/pkg/discovery/options.go new file mode 100644 index 0000000..d87bf48 --- /dev/null +++ b/pkg/discovery/options.go @@ -0,0 +1,19 @@ +package discovery + +import ( + "time" +) + +type Option func(*Discovery) + +func WithBroadcastInterval(v time.Duration) Option { + return func(discovery *Discovery) { + discovery.broadcastInterval = v + } +} + +func WithDebug() Option { + return func(discovery *Discovery) { + discovery.debug = true + } +} diff --git a/pkg/discovery/service.go b/pkg/discovery/service.go new file mode 100644 index 0000000..f140455 --- /dev/null +++ b/pkg/discovery/service.go @@ -0,0 +1,239 @@ +package discovery + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + "go.uber.org/zap" +) + +const defaultBroadcastInterval = time.Second + +type Nodes map[string]struct{} +type NewNodes <-chan string + +//goland:noinspection GoNameStartsWithPackageName +type DiscoverySet []*Discovery + +func (d DiscoverySet) NewNodes() NewNodes { + res := make(chan string, 20) + + for _, discovery := range d { + go func(v *Discovery) { + for node := range v.Nodes() { + if len(res) == cap(res) { + panic("discovery set nodes not reading!") + } + + res <- node + } + }(discovery) + } + + return res +} + +func NewDiscoverySet(log *zap.Logger, discoverPort uint16, opts ...Option) (DiscoverySet, error) { + iFaces, err := net.Interfaces() + if err != nil { + return nil, fmt.Errorf("list interfaces: %w", err) + } + + set := make(DiscoverySet, 0, len(iFaces)) + + for _, iFace := range iFaces { + if iFace.Flags&net.FlagLoopback == net.FlagLoopback { + continue + } + + if iFace.Flags&net.FlagRunning != net.FlagRunning { + continue + } + + discover, err := NewDiscovery(iFace, log, discoverPort, opts...) + if err != nil { + return nil, fmt.Errorf("new discover for %s: %w", iFace.Name, err) + } + + set = append(set, discover) + } + + return set, nil +} + +func NewDiscovery(iFace net.Interface, log *zap.Logger, discoverPort uint16, opts ...Option) (*Discovery, error) { + addrs, err := iFace.Addrs() + if err != nil { + return nil, fmt.Errorf("get interface address: %w", err) + } + + broadcast := net.IP(make([]byte, 4)) + var ownAddr net.IP + + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok { + ip4 := ipnet.IP.To4() + if ip4 == nil { + continue + } + + for i := range ip4 { + broadcast[i] = ip4[i] | ^ipnet.Mask[i] + } + + ownAddr = ipnet.IP + + break + } + } + if broadcast.To4() == nil { + return nil, fmt.Errorf("no broadcast address") + } + + portString := strconv.Itoa(int(discoverPort)) + + conn, err := net.ListenPacket("udp4", broadcast.String()+":"+portString) + if err != nil { + return nil, fmt.Errorf("listen packet: %w", err) + } + + udpAddr, err := net.ResolveUDPAddr("udp4", broadcast.String()+":"+portString) + if err != nil { + return nil, fmt.Errorf("resolve udp address: %w", err) + } + + d := Discovery{ + log: log.With(zap.Stringer("broadcast_address", broadcast)), + nodes: make(Nodes), + newNode: make(chan string, 20), + ownAddr: ownAddr.To4(), + conn: conn, + broadcastAddr: udpAddr, + broadcastInterval: defaultBroadcastInterval, + } + + for _, opt := range opts { + opt(&d) + } + + return &d, nil +} + +type Discovery struct { + log *zap.Logger + debug bool + + mu sync.Mutex + nodes Nodes + newNode chan string + + ownAddr net.IP + conn net.PacketConn + broadcastAddr *net.UDPAddr + + broadcastInterval time.Duration +} + +func (d *Discovery) Start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + d.log.Info("start discovery", zap.Stringer("address", &d.ownAddr)) + + timer := time.NewTicker(d.broadcastInterval) + defer timer.Stop() + + listenStop := make(chan struct{}) + + go func() { + if err := d.listen(); err != nil { + d.log.Error("listen failed", zap.Error(err)) + } + + close(listenStop) + }() + + for { + select { + case <-ctx.Done(): + if err := d.conn.Close(); err != nil { + d.log.Warn("close connection", zap.Error(err)) + } + + close(d.newNode) + + return + + case <-listenStop: + d.log.Error("listener stopped, stop discovery") + + return + + case <-timer.C: + d.broadcast() + } + } +} + +func (d *Discovery) listen() error { + buf := make([]byte, 4) + + for { + n, addr, err := d.conn.ReadFrom(buf) + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + d.log.Warn("listen connection closed") + + return nil + } + + if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { + continue + } + + return fmt.Errorf("read from: %w", err) + } + + nodeAddr := net.IP(buf[:n]) + d.log.Debug("received node address", zap.Stringer("address", nodeAddr)) + + clientIP, _, _ := strings.Cut(addr.String(), ":") + if nodeAddr.String() != clientIP { + d.log.Warn("received addr mismatch", zap.Stringer("received", nodeAddr), zap.String("detected", clientIP)) + } + + d.addNode(nodeAddr.String()) + } +} + +func (d *Discovery) broadcast() { + d.log.Debug("broadcast") + + if _, err := d.conn.WriteTo(d.ownAddr, d.broadcastAddr); err != nil { + d.log.Error("write broadcast message", zap.Error(err)) + } +} + +func (d *Discovery) addNode(addr string) { + if !d.debug && addr == d.ownAddr.String() { + return + } + + d.mu.Lock() + + if _, ok := d.nodes[addr]; !ok { + d.log.Info("new node address", zap.String("address", addr)) + d.nodes[addr] = struct{}{} + d.newNode <- addr + } + + d.mu.Unlock() +} + +func (d *Discovery) Nodes() NewNodes { + return d.newNode +} diff --git a/pkg/discovery/service_test.go b/pkg/discovery/service_test.go new file mode 100644 index 0000000..49009ac --- /dev/null +++ b/pkg/discovery/service_test.go @@ -0,0 +1,38 @@ +package discovery + +import ( + "context" + "os" + "os/signal" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestIface(t *testing.T) { + ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt) + t.Cleanup(cancel) + + ctx, _ = context.WithTimeout(ctx, time.Second*5) + + set, err := NewDiscoverySet(zaptest.NewLogger(t).Named("discovery"), 1234, WithDebug()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + wg.Add(1) + go set[0].Start(ctx, &wg) + + go func() { + for { + time.Sleep(time.Second) + } + }() + wg.Wait() + + assert.Len(t, set[0].Nodes(), 1) +}