kma 구현
This commit is contained in:
251
cmd/kma/handler.go
Normal file
251
cmd/kma/handler.go
Normal file
@@ -0,0 +1,251 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/neatflowcv/seven-skies/internal/pkg/broker"
|
||||
"github.com/neatflowcv/seven-skies/internal/pkg/domain"
|
||||
"github.com/neatflowcv/seven-skies/internal/pkg/kma"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
broker broker.Broker
|
||||
key string
|
||||
nx int
|
||||
ny int
|
||||
}
|
||||
|
||||
func NewHandler(b broker.Broker, key string, nx int, ny int) *Handler {
|
||||
return &Handler{
|
||||
broker: b,
|
||||
key: key,
|
||||
nx: nx,
|
||||
ny: ny,
|
||||
}
|
||||
}
|
||||
|
||||
type TemporalData struct {
|
||||
Temperature string
|
||||
Sky string
|
||||
Precipitation string
|
||||
ForecastDate string
|
||||
TargetDate string
|
||||
}
|
||||
|
||||
func buildTemporalData(resp *kma.ForecastResponse) []TemporalData {
|
||||
dates := map[string]*TemporalData{}
|
||||
for _, item := range resp.Response.Body.Items.Item {
|
||||
data, ok := dates[item.FcstDate+item.FcstTime]
|
||||
if !ok {
|
||||
data = &TemporalData{
|
||||
ForecastDate: item.BaseDate + item.BaseTime,
|
||||
TargetDate: item.FcstDate + item.FcstTime,
|
||||
Temperature: "",
|
||||
Sky: "",
|
||||
Precipitation: "",
|
||||
}
|
||||
}
|
||||
|
||||
switch item.Category {
|
||||
case "T1H", "TMP": // 온도
|
||||
data.Temperature = item.FcstValue
|
||||
case "SKY": // 하늘상태
|
||||
data.Sky = item.FcstValue
|
||||
case "PTY": // 강수형태
|
||||
data.Precipitation = item.FcstValue
|
||||
}
|
||||
|
||||
dates[item.FcstDate+item.FcstTime] = data
|
||||
}
|
||||
|
||||
var ret []TemporalData
|
||||
for _, data := range dates {
|
||||
ret = append(ret, *data)
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func parseAndValidateTemporalData(data TemporalData) (*domain.WeatherEvent, bool) {
|
||||
targetTime, err := parseKMAForecastDate(data.TargetDate)
|
||||
if err != nil {
|
||||
log.Println("error in parse forecast datetime", err)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
forecastTime, err := parseKMAForecastDate(data.ForecastDate)
|
||||
if err != nil {
|
||||
log.Println("error in parse forecast datetime", err)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
temperature, err := strconv.ParseInt(data.Temperature, 10, 64)
|
||||
if err != nil {
|
||||
log.Println("error in parse temperature", err)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
condition := decideCondition(&data)
|
||||
|
||||
// 값 검증: 비어있는 값이 있으면 건너뛰기
|
||||
if forecastTime.IsZero() {
|
||||
log.Printf("skip weather event: forecastTime is empty (targetDate: %v)", targetTime)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if targetTime.IsZero() {
|
||||
log.Printf("skip weather event: targetTime is empty (forecastDate: %v)", forecastTime)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if condition == "" {
|
||||
log.Printf("skip weather event: condition is empty (forecastDate: %v, targetDate: %v)", forecastTime, targetTime)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if temperature == 0 {
|
||||
log.Printf("skip weather event: temperature is empty (forecastDate: %v, targetDate: %v)", forecastTime, targetTime)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return &domain.WeatherEvent{
|
||||
Source: domain.WeatherSourceKMA,
|
||||
ForecastDate: forecastTime,
|
||||
TargetDate: targetTime,
|
||||
Condition: condition,
|
||||
Temperature: domain.Temperature{
|
||||
Value: domain.Celsius(temperature),
|
||||
},
|
||||
}, true
|
||||
}
|
||||
|
||||
func convertTemporalDataToWeatherEvents(datas []TemporalData) []*domain.WeatherEvent {
|
||||
var weatherEvents []*domain.WeatherEvent
|
||||
|
||||
for _, data := range datas {
|
||||
weatherEvent, ok := parseAndValidateTemporalData(data)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
weatherEvents = append(weatherEvents, weatherEvent)
|
||||
}
|
||||
|
||||
return weatherEvents
|
||||
}
|
||||
|
||||
// HandleUltraShort는 초단기 예보를 조회해 이벤트로 발행한다.
|
||||
func (h *Handler) HandleUltraShort(ctx context.Context) error {
|
||||
log.Println("get ultra short forecast")
|
||||
|
||||
now := time.Now()
|
||||
|
||||
resp, err := kma.GetUltraShortForecast(ctx, h.key, now, h.nx, h.ny)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in get ultra short forecast: %w", err)
|
||||
}
|
||||
|
||||
datas := buildTemporalData(resp)
|
||||
weatherEvents := convertTemporalDataToWeatherEvents(datas)
|
||||
|
||||
for _, weatherEvent := range weatherEvents {
|
||||
message, err := json.Marshal(weatherEvent)
|
||||
if err != nil {
|
||||
log.Println("error in marshal weather event", err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
err = h.broker.Publish(ctx, "SEVEN_SKIES_SUBJECT.KMA", message)
|
||||
if err != nil {
|
||||
log.Println("error in publish weather event", err)
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func decideCondition(data *TemporalData) domain.WeatherCondition {
|
||||
if data.Precipitation == "0" && data.Sky == "1" {
|
||||
return domain.WeatherConditionClear
|
||||
}
|
||||
|
||||
switch data.Precipitation {
|
||||
case "1", "5":
|
||||
return domain.WeatherConditionRain
|
||||
case "2", "6":
|
||||
return domain.WeatherConditionRainSnow
|
||||
case "3", "7":
|
||||
return domain.WeatherConditionSnow
|
||||
}
|
||||
|
||||
switch data.Sky {
|
||||
case "3", "4":
|
||||
return domain.WeatherConditionCloudy
|
||||
}
|
||||
|
||||
return domain.WeatherConditionUnknown
|
||||
}
|
||||
|
||||
// HandleShortTerm는 단기 예보를 조회해 이벤트로 발행한다.
|
||||
func (h *Handler) HandleShortTerm(ctx context.Context) error {
|
||||
log.Println("get short term forecast")
|
||||
|
||||
now := time.Now()
|
||||
|
||||
resp, err := kma.GetShortTermForecast(ctx, h.key, now, h.nx, h.ny)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in get short term forecast: %w", err)
|
||||
}
|
||||
|
||||
datas := buildTemporalData(resp)
|
||||
weatherEvents := convertTemporalDataToWeatherEvents(datas)
|
||||
|
||||
for _, weatherEvent := range weatherEvents {
|
||||
message, err := json.Marshal(weatherEvent)
|
||||
if err != nil {
|
||||
log.Println("error in marshal weather event", err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
err = h.broker.Publish(ctx, "SEVEN_SKIES_SUBJECT.KMA", message)
|
||||
if err != nil {
|
||||
log.Println("error in publish weather event", err)
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseKMAForecastDate(dateStr string) (time.Time, error) {
|
||||
layout := "200601021504"
|
||||
|
||||
loc, err := time.LoadLocation("Asia/Seoul")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
t, err := time.ParseInLocation(layout, dateStr, loc)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("error in parse datetime: %w", err)
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
191
cmd/kma/main.go
Normal file
191
cmd/kma/main.go
Normal file
@@ -0,0 +1,191 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/joho/godotenv"
|
||||
"github.com/neatflowcv/seven-skies/internal/pkg/broker/nats"
|
||||
)
|
||||
|
||||
func version() string {
|
||||
info, ok := debug.ReadBuildInfo()
|
||||
if !ok {
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
return info.Main.Version
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Key string
|
||||
NX int
|
||||
NY int
|
||||
NATSURL string
|
||||
}
|
||||
|
||||
func NewConfig() *Config {
|
||||
key := os.Getenv("KMA_API_KEY")
|
||||
if key == "" {
|
||||
log.Panic("KMA_API_KEY is not set")
|
||||
}
|
||||
|
||||
nx := os.Getenv("KMA_NX")
|
||||
if nx == "" {
|
||||
log.Panic("KMA_NX is not set")
|
||||
}
|
||||
|
||||
ny := os.Getenv("KMA_NY")
|
||||
if ny == "" {
|
||||
log.Panic("KMA_NY is not set")
|
||||
}
|
||||
|
||||
NATS_URL := os.Getenv("NATS_URL")
|
||||
if NATS_URL == "" {
|
||||
log.Panic("NATS_URL is not set")
|
||||
}
|
||||
|
||||
nxInt, err := strconv.Atoi(nx)
|
||||
if err != nil {
|
||||
log.Panic("error in parse nx", err)
|
||||
}
|
||||
|
||||
nyInt, err := strconv.Atoi(ny)
|
||||
if err != nil {
|
||||
log.Panic("error in parse ny", err)
|
||||
}
|
||||
|
||||
return &Config{
|
||||
Key: key,
|
||||
NX: nxInt,
|
||||
NY: nyInt,
|
||||
NATSURL: NATS_URL,
|
||||
}
|
||||
}
|
||||
|
||||
func setupBroker(ctx context.Context, natsURL string) (*nats.Broker, error) {
|
||||
broker, err := nats.NewBroker(ctx, natsURL, "SEVEN_SKIES_STREAM", []string{"SEVEN_SKIES_SUBJECT.>"})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in create broker: %w", err)
|
||||
}
|
||||
|
||||
return broker, nil
|
||||
}
|
||||
|
||||
//nolint:ireturn // gocron.Scheduler is an interface by design
|
||||
func setupScheduler() (gocron.Scheduler, error) {
|
||||
scheduler, err := gocron.NewScheduler()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in create scheduler: %w", err)
|
||||
}
|
||||
|
||||
return scheduler, nil
|
||||
}
|
||||
|
||||
func runInitialTasks(ctx context.Context, handler *Handler) error {
|
||||
err := handler.HandleUltraShort(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = handler.HandleShortTerm(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint:ireturn // gocron.Job is an interface by design
|
||||
func setupJobs(scheduler gocron.Scheduler, handler *Handler) (gocron.Job, gocron.Job, error) {
|
||||
ultraJob, err := scheduler.NewJob(
|
||||
gocron.CronJob("50 * * * *", false),
|
||||
gocron.NewTask(handler.HandleUltraShort),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error in create ultra short job: %w", err)
|
||||
}
|
||||
|
||||
shortJob, err := scheduler.NewJob(
|
||||
gocron.CronJob("15 2,5,8,11,14,17,20,23 * * *", false),
|
||||
gocron.NewTask(handler.HandleShortTerm),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error in create short term job: %w", err)
|
||||
}
|
||||
|
||||
return ultraJob, shortJob, nil
|
||||
}
|
||||
|
||||
func logNextRuns(ultraJob, shortJob gocron.Job) error {
|
||||
ultraNext, err := ultraJob.NextRun()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in get ultra short next run: %w", err)
|
||||
}
|
||||
|
||||
shortNext, err := shortJob.NextRun()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in get short term next run: %w", err)
|
||||
}
|
||||
|
||||
log.Println("ultra short next run", ultraNext)
|
||||
log.Println("short term next run", shortNext)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.Println("version", version())
|
||||
|
||||
err := godotenv.Load()
|
||||
if err == nil {
|
||||
log.Println("env loaded")
|
||||
}
|
||||
|
||||
config := NewConfig()
|
||||
ctx := context.Background()
|
||||
|
||||
broker, err := setupBroker(ctx, config.NATSURL)
|
||||
if err != nil {
|
||||
log.Panic("error in create broker", err)
|
||||
}
|
||||
defer broker.Close()
|
||||
|
||||
handler := NewHandler(broker, config.Key, config.NX, config.NY)
|
||||
|
||||
scheduler, err := setupScheduler()
|
||||
if err != nil {
|
||||
log.Panic("error in create scheduler", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := scheduler.Shutdown()
|
||||
if err != nil {
|
||||
log.Println("error in shutdown scheduler", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = runInitialTasks(ctx, handler)
|
||||
if err != nil {
|
||||
log.Panic("error in initial tasks", err)
|
||||
}
|
||||
|
||||
ultraJob, shortJob, err := setupJobs(scheduler, handler)
|
||||
if err != nil {
|
||||
log.Panic("error in create jobs", err)
|
||||
}
|
||||
|
||||
scheduler.Start()
|
||||
|
||||
err = logNextRuns(ultraJob, shortJob)
|
||||
if err != nil {
|
||||
log.Panic("error in get next runs", err)
|
||||
}
|
||||
|
||||
select {}
|
||||
}
|
||||
Reference in New Issue
Block a user