diff --git a/.env.template b/.env.template index 0e393f5..359a7ed 100644 --- a/.env.template +++ b/.env.template @@ -1,4 +1,5 @@ OPENWEATHER_API_KEY= OPENWEATHER_LAT=37 OPENWEATHER_LON=126 -NATS_URL= \ No newline at end of file +NATS_URL= +DATABASE_DSN= \ No newline at end of file diff --git a/cmd/seven-skies/main.go b/cmd/seven-skies/main.go index 70aa556..f17e32f 100644 --- a/cmd/seven-skies/main.go +++ b/cmd/seven-skies/main.go @@ -1,8 +1,18 @@ package main import ( + "context" + "encoding/json" + "fmt" "log" + "os" "runtime/debug" + + "github.com/joho/godotenv" + "github.com/neatflowcv/seven-skies/internal/app/flow" + "github.com/neatflowcv/seven-skies/internal/pkg/broker/nats" + "github.com/neatflowcv/seven-skies/internal/pkg/domain" + "github.com/neatflowcv/seven-skies/internal/pkg/repository/gorm" ) func version() string { @@ -14,6 +24,79 @@ func version() string { return info.Main.Version } +type Config struct { + NATS_URL string + DATABASE_DSN string +} + +func NewConfig() *Config { + NATS_URL := os.Getenv("NATS_URL") + if NATS_URL == "" { + log.Panic("NATS_URL is not set") + } + + DATABASE_DSN := os.Getenv("DATABASE_DSN") + if DATABASE_DSN == "" { + log.Panic("DATABASE_DSN is not set") + } + + return &Config{ + NATS_URL: NATS_URL, + DATABASE_DSN: DATABASE_DSN, + } +} + func main() { log.Println("version", version()) + + err := godotenv.Load() + if err == nil { + log.Println("env loaded") + } + + config := NewConfig() + + ctx := context.Background() + + broker, err := nats.NewBroker(ctx, config.NATS_URL, "SEVEN_SKIES_STREAM", []string{"SEVEN_SKIES_SUBJECT.>"}) + if err != nil { + log.Panic("error in create broker", err) + } + defer broker.Close() + + repo, err := gorm.NewRepository(config.DATABASE_DSN) + if err != nil { + log.Panic("error in create repository", err) + } + + service := flow.NewFlow(repo) + + err = broker.Subscribe(ctx, "SEVEN_SKIES_DURABLE", func(topic string, message []byte) error { + var event domain.WeatherEvent + + err := json.Unmarshal(message, &event) + if err != nil { + log.Println("error in unmarshal", err) + + return fmt.Errorf("error in unmarshal: %w", err) + } + + err = service.CreateWeather(ctx, &flow.Weather{ + Source: string(event.Source), + TargetDate: event.TargetDate, + ForecastDate: event.ForecastDate, + Condition: string(event.Condition), + Temperature: float64(event.Temperature.Value), + }) + if err != nil { + return fmt.Errorf("error in create weather: %w", err) + } + + return nil + }) + if err != nil { + log.Panic("error in subscribe", err) + } + + select {} } diff --git a/go.mod b/go.mod index 0cc0b41..9d3810d 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,22 @@ require ( github.com/go-co-op/gocron/v2 v2.18.2 github.com/joho/godotenv v1.5.1 github.com/nats-io/nats.go v1.47.0 + github.com/oklog/ulid/v2 v2.1.1 github.com/stretchr/testify v1.11.1 + gorm.io/driver/postgres v1.6.0 + gorm.io/gorm v1.31.1 resty.dev/v3 v3.0.0-beta.4 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.7.6 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect github.com/klauspost/compress v1.18.2 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -22,6 +31,8 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/net v0.47.0 // indirect - golang.org/x/sys v0.38.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/text v0.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 5da393a..aaf6f8f 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,23 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-co-op/gocron/v2 v2.18.2 h1:+5VU41FUXPWSPKLXZQ/77SGzUiPCcakU0v7ENc2H20Q= github.com/go-co-op/gocron/v2 v2.18.2/go.mod h1:Zii6he+Zfgy5W9B+JKk/KwejFOW0kZTFvHtwIpR4aBI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I= @@ -21,6 +34,9 @@ github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s= +github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -28,6 +44,9 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -36,12 +55,21 @@ golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4= +gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo= +gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= +gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= resty.dev/v3 v3.0.0-beta.4 h1:2O77oFymtA4NT8AY87wAaSgSGUBk2yvvM1qno9VRXZU= resty.dev/v3 v3.0.0-beta.4/go.mod h1:NTOerrC/4T7/FE6tXIZGIysXXBdgNqwMZuKtxpea9NM= diff --git a/internal/app/flow/errors.go b/internal/app/flow/errors.go new file mode 100644 index 0000000..c3d405a --- /dev/null +++ b/internal/app/flow/errors.go @@ -0,0 +1,7 @@ +package flow + +import "errors" + +var ( + ErrWeatherAlreadyExists = errors.New("weather already exists") +) diff --git a/internal/app/flow/flow.go b/internal/app/flow/flow.go new file mode 100644 index 0000000..014ba22 --- /dev/null +++ b/internal/app/flow/flow.go @@ -0,0 +1,47 @@ +package flow + +import ( + "context" + "errors" + "fmt" + + "github.com/neatflowcv/seven-skies/internal/pkg/domain" + "github.com/neatflowcv/seven-skies/internal/pkg/repository" + "github.com/oklog/ulid/v2" +) + +type Flow struct { + repo repository.Repository +} + +func NewFlow(repo repository.Repository) *Flow { + return &Flow{ + repo: repo, + } +} + +func (f *Flow) CreateWeather(ctx context.Context, weather *Weather) error { + id := ulid.Make().String() + + domainWeather := domain.NewWeather( + id, + domain.WeatherSource(weather.Source), + weather.TargetDate, + weather.ForecastDate, + domain.WeatherCondition(weather.Condition), + domain.Temperature{ + Value: domain.Celsius(weather.Temperature), + }, + ) + + err := f.repo.CreateWeather(ctx, domainWeather) + if err != nil { + if errors.Is(err, repository.ErrWeatherAlreadyExists) { + return ErrWeatherAlreadyExists + } + + return fmt.Errorf("error in create weather: %w", err) + } + + return nil +} diff --git a/internal/app/flow/service.go b/internal/app/flow/service.go deleted file mode 100644 index 0831058..0000000 --- a/internal/app/flow/service.go +++ /dev/null @@ -1,7 +0,0 @@ -package flow - -type Service struct {} - -func NewService() *Service { - return &Service{} -} diff --git a/internal/app/flow/weather.go b/internal/app/flow/weather.go new file mode 100644 index 0000000..324e494 --- /dev/null +++ b/internal/app/flow/weather.go @@ -0,0 +1,11 @@ +package flow + +import "time" + +type Weather struct { + Source string + TargetDate time.Time + ForecastDate time.Time + Condition string + Temperature float64 // Celsius +} diff --git a/internal/pkg/domain/weather.go b/internal/pkg/domain/weather.go new file mode 100644 index 0000000..87768b3 --- /dev/null +++ b/internal/pkg/domain/weather.go @@ -0,0 +1,54 @@ +package domain + +import "time" + +type Weather struct { + id string + source WeatherSource + targetDate time.Time + forecastDate time.Time + condition WeatherCondition + temperature Temperature +} + +func NewWeather( + id string, + source WeatherSource, + targetDate time.Time, + forecastDate time.Time, + condition WeatherCondition, + temperature Temperature, +) *Weather { + return &Weather{ + id: id, + source: source, + targetDate: targetDate, + forecastDate: forecastDate, + condition: condition, + temperature: temperature, + } +} + +func (w *Weather) ID() string { + return w.id +} + +func (w *Weather) Source() WeatherSource { + return w.source +} + +func (w *Weather) TargetDate() time.Time { + return w.targetDate +} + +func (w *Weather) ForecastDate() time.Time { + return w.forecastDate +} + +func (w *Weather) Condition() WeatherCondition { + return w.condition +} + +func (w *Weather) Temperature() Temperature { + return w.temperature +} diff --git a/internal/pkg/repository/errors.go b/internal/pkg/repository/errors.go new file mode 100644 index 0000000..1e4d42f --- /dev/null +++ b/internal/pkg/repository/errors.go @@ -0,0 +1,8 @@ +package repository + +import "errors" + +var ( + ErrWeatherAlreadyExists = errors.New("weather already exists") + ErrWeatherNotFound = errors.New("weather not found") +) diff --git a/internal/pkg/repository/gorm/repository.go b/internal/pkg/repository/gorm/repository.go new file mode 100644 index 0000000..b3f1181 --- /dev/null +++ b/internal/pkg/repository/gorm/repository.go @@ -0,0 +1,44 @@ +package gorm + +import ( + "context" + "fmt" + + "github.com/neatflowcv/seven-skies/internal/pkg/domain" + "github.com/neatflowcv/seven-skies/internal/pkg/repository" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +var _ repository.Repository = (*Repository)(nil) + +type Repository struct { + db *gorm.DB +} + +func NewRepository(dsn string) (*Repository, error) { + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) //nolint:exhaustruct + if err != nil { + return nil, fmt.Errorf("failed to open database: %w", err) + } + + err = db.AutoMigrate(&Weather{}) //nolint:exhaustruct + if err != nil { + return nil, fmt.Errorf("failed to migrate database: %w", err) + } + + return &Repository{ + db: db, + }, nil +} + +func (r *Repository) CreateWeather(ctx context.Context, weather *domain.Weather) error { + modelWeather := newModelWeather(weather) + + err := gorm.G[Weather](r.db).Create(ctx, modelWeather) + if err != nil { + return fmt.Errorf("failed to create weather: %w", err) + } + + return nil +} diff --git a/internal/pkg/repository/gorm/weather.go b/internal/pkg/repository/gorm/weather.go new file mode 100644 index 0000000..a27da21 --- /dev/null +++ b/internal/pkg/repository/gorm/weather.go @@ -0,0 +1,27 @@ +package gorm + +import ( + "time" + + "github.com/neatflowcv/seven-skies/internal/pkg/domain" +) + +type Weather struct { + ID string `gorm:"primaryKey"` + Source string + TargetDate time.Time + ForecastDate time.Time + Condition string + Temperature float64 +} + +func newModelWeather(weather *domain.Weather) *Weather { + return &Weather{ + ID: weather.ID(), + Source: string(weather.Source()), + TargetDate: weather.TargetDate(), + ForecastDate: weather.ForecastDate(), + Condition: string(weather.Condition()), + Temperature: float64(weather.Temperature().Value), + } +} diff --git a/internal/pkg/repository/repository.go b/internal/pkg/repository/repository.go new file mode 100644 index 0000000..266945f --- /dev/null +++ b/internal/pkg/repository/repository.go @@ -0,0 +1,11 @@ +package repository + +import ( + "context" + + "github.com/neatflowcv/seven-skies/internal/pkg/domain" +) + +type Repository interface { + CreateWeather(ctx context.Context, weather *domain.Weather) error +}