diff --git a/.golangci.yaml b/.golangci.yaml index 2147eec..040b0c5 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -6,3 +6,5 @@ linters: - depguard - tagliatelle - wsl # deprecated + build-tags: + - integration \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a8f8a8e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "go.buildTags": "integration" +} \ No newline at end of file diff --git a/go.mod b/go.mod index d9c7b2d..0bc9d00 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,19 @@ module github.com/neatflowcv/seven-skies go 1.25.5 + +require ( + github.com/nats-io/nats.go v1.47.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect + github.com/nats-io/nkeys v0.4.12 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/crypto v0.45.0 // indirect + golang.org/x/sys v0.38.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..26ce701 --- /dev/null +++ b/go.sum @@ -0,0 +1,22 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= +github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= +github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/pkg/broker/broker.go b/internal/pkg/broker/broker.go new file mode 100644 index 0000000..8f130b9 --- /dev/null +++ b/internal/pkg/broker/broker.go @@ -0,0 +1,8 @@ +package broker + +import "context" + +type Broker interface { + Publish(ctx context.Context, topic string, message []byte) error + Subscribe(ctx context.Context, durable string, fn func(topic string, message []byte) error) error +} diff --git a/internal/pkg/broker/nats/broker.go b/internal/pkg/broker/nats/broker.go new file mode 100644 index 0000000..9060fc9 --- /dev/null +++ b/internal/pkg/broker/nats/broker.go @@ -0,0 +1,101 @@ +package nats + +import ( + "context" + "fmt" + "log" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/neatflowcv/seven-skies/internal/pkg/broker" +) + +var _ broker.Broker = (*Broker)(nil) + +type Broker struct { + jet jetstream.JetStream + stream jetstream.Stream + consumerContexts []jetstream.ConsumeContext +} + +// NewBroker url example: nats://127.0.0.1:4222 +func NewBroker(ctx context.Context, url string, streamName string, topics []string) (*Broker, error) { + conn, err := nats.Connect(url) + if err != nil { + return nil, fmt.Errorf("error in connect: %w", err) + } + + jet, err := jetstream.New(conn) + if err != nil { + return nil, fmt.Errorf("error in create jetstream: %w", err) + } + + // 같은 이름에 설정이 같으면, 이미 존재하는 스트림을 반환한다. error를 반환하지 않음! + stream, err := jet.CreateStream(ctx, jetstream.StreamConfig{ //nolint:exhaustruct + Name: streamName, + Subjects: topics, + }) + if err != nil { + return nil, fmt.Errorf("error in create stream: %w", err) + } + + return &Broker{ + jet: jet, + stream: stream, + consumerContexts: nil, + }, nil +} + +func (b *Broker) Close() { + for _, consumerContext := range b.consumerContexts { + consumerContext.Stop() + } +} + +func (b *Broker) Publish(ctx context.Context, topic string, message []byte) error { + _, err := b.jet.Publish(ctx, topic, message) + if err != nil { + return fmt.Errorf("error in publish: %w", err) + } + + return nil +} + +func (b *Broker) Subscribe(ctx context.Context, durable string, fn func(topic string, message []byte) error) error { + consumer, err := b.stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ //nolint:exhaustruct + Durable: durable, + AckPolicy: jetstream.AckExplicitPolicy, + }) + if err != nil { + return fmt.Errorf("error in create consumer: %w", err) + } + + consumeContext, err := consumer.Consume(func(msg jetstream.Msg) { + topic := msg.Subject() + content := msg.Data() + + err := fn(topic, content) + if err != nil { + log.Println("error in callback", err) + + inErr := msg.Nak() + if inErr != nil { + log.Println("error in nak", inErr) + } + + return + } + + inErr := msg.Ack() + if inErr != nil { + log.Println("error in ack", inErr) + } + }) + if err != nil { + return fmt.Errorf("error in subscribe: %w", err) + } + + b.consumerContexts = append(b.consumerContexts, consumeContext) + + return nil +} diff --git a/internal/pkg/broker/nats/broker_test.go b/internal/pkg/broker/nats/broker_test.go new file mode 100644 index 0000000..ec84fc3 --- /dev/null +++ b/internal/pkg/broker/nats/broker_test.go @@ -0,0 +1,38 @@ +//go:build integration + +package nats_test + +import ( + "fmt" + "testing" + "time" + + "github.com/neatflowcv/seven-skies/internal/pkg/broker/nats" + "github.com/stretchr/testify/require" +) + +func TestNewBroker(t *testing.T) { + broker, err := nats.NewBroker( + t.Context(), + "nats://127.0.0.1:4222", + "SEVEN_SKIES_TEST_STREAM", + []string{ + "SEVEN_SKIES_TEST_SUBJECT.>", + }, + ) + require.NoError(t, err) + t.Cleanup(func() { + broker.Close() + }) + + err = broker.Subscribe(t.Context(), "SEVEN_SKIES_TEST_DURABLE", func(topic string, message []byte) error { + fmt.Println(topic, string(message)) + return nil + }) + require.NoError(t, err) + + err = broker.Publish(t.Context(), "SEVEN_SKIES_TEST_SUBJECT.data", []byte("hello")) + require.NoError(t, err) + + time.Sleep(2 * time.Second) +}