implement broker

This commit is contained in:
2025-12-04 18:00:43 +09:00
parent cb958834cb
commit 13d37881b9
7 changed files with 190 additions and 0 deletions

View File

@@ -0,0 +1,101 @@
package nats
import (
"context"
"fmt"
"log"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/neatflowcv/seven-skies/internal/pkg/broker"
)
var _ broker.Broker = (*Broker)(nil)
type Broker struct {
jet jetstream.JetStream
stream jetstream.Stream
consumerContexts []jetstream.ConsumeContext
}
// NewBroker url example: nats://127.0.0.1:4222
func NewBroker(ctx context.Context, url string, streamName string, topics []string) (*Broker, error) {
conn, err := nats.Connect(url)
if err != nil {
return nil, fmt.Errorf("error in connect: %w", err)
}
jet, err := jetstream.New(conn)
if err != nil {
return nil, fmt.Errorf("error in create jetstream: %w", err)
}
// 같은 이름에 설정이 같으면, 이미 존재하는 스트림을 반환한다. error를 반환하지 않음!
stream, err := jet.CreateStream(ctx, jetstream.StreamConfig{ //nolint:exhaustruct
Name: streamName,
Subjects: topics,
})
if err != nil {
return nil, fmt.Errorf("error in create stream: %w", err)
}
return &Broker{
jet: jet,
stream: stream,
consumerContexts: nil,
}, nil
}
func (b *Broker) Close() {
for _, consumerContext := range b.consumerContexts {
consumerContext.Stop()
}
}
func (b *Broker) Publish(ctx context.Context, topic string, message []byte) error {
_, err := b.jet.Publish(ctx, topic, message)
if err != nil {
return fmt.Errorf("error in publish: %w", err)
}
return nil
}
func (b *Broker) Subscribe(ctx context.Context, durable string, fn func(topic string, message []byte) error) error {
consumer, err := b.stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ //nolint:exhaustruct
Durable: durable,
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
return fmt.Errorf("error in create consumer: %w", err)
}
consumeContext, err := consumer.Consume(func(msg jetstream.Msg) {
topic := msg.Subject()
content := msg.Data()
err := fn(topic, content)
if err != nil {
log.Println("error in callback", err)
inErr := msg.Nak()
if inErr != nil {
log.Println("error in nak", inErr)
}
return
}
inErr := msg.Ack()
if inErr != nil {
log.Println("error in ack", inErr)
}
})
if err != nil {
return fmt.Errorf("error in subscribe: %w", err)
}
b.consumerContexts = append(b.consumerContexts, consumeContext)
return nil
}

View File

@@ -0,0 +1,38 @@
//go:build integration
package nats_test
import (
"fmt"
"testing"
"time"
"github.com/neatflowcv/seven-skies/internal/pkg/broker/nats"
"github.com/stretchr/testify/require"
)
func TestNewBroker(t *testing.T) {
broker, err := nats.NewBroker(
t.Context(),
"nats://127.0.0.1:4222",
"SEVEN_SKIES_TEST_STREAM",
[]string{
"SEVEN_SKIES_TEST_SUBJECT.>",
},
)
require.NoError(t, err)
t.Cleanup(func() {
broker.Close()
})
err = broker.Subscribe(t.Context(), "SEVEN_SKIES_TEST_DURABLE", func(topic string, message []byte) error {
fmt.Println(topic, string(message))
return nil
})
require.NoError(t, err)
err = broker.Publish(t.Context(), "SEVEN_SKIES_TEST_SUBJECT.data", []byte("hello"))
require.NoError(t, err)
time.Sleep(2 * time.Second)
}