Files
service-qgdzs/internal/timer/consumer.go

104 lines
2.4 KiB
Go

package timer
import (
"context"
"errors"
"git.hlsq.asia/mmorpg/service-common/db/kafka"
"git.hlsq.asia/mmorpg/service-common/log"
"git.hlsq.asia/mmorpg/service-common/utils"
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model"
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository"
"gorm.io/gorm"
)
// !!!消费者必须做幂等!!!
func startConsumer() {
go kafka.NewConsumer().Consume([]kafka.Topic{
&TopicQuestionAnswer{},
&TopicAddPoint{},
})
}
type TopicQuestionAnswer struct {
MessageSn int64 `json:"messageSn"`
USN int64 `json:"usn"`
QuestionSn int64 `json:"questionSn"`
QuestionAnswer string `json:"questionAnswer"`
Answer string `json:"answer"`
}
func (t *TopicQuestionAnswer) Name() string {
return "qgdzs.question.answer"
}
func (t *TopicQuestionAnswer) OnMessage(ctx context.Context) error {
log.Infof("Kafka consume topic: %v: %#+v", t.Name(), t)
if t.USN == 0 || t.QuestionSn == 0 {
return utils.ErrorsWrap(errors.New("invalid data"))
}
// 答题记录
isCorrect := int32(0)
if t.QuestionAnswer == t.Answer {
isCorrect = 1
}
_, err := repository.NewRecordDao(ctx, repository.Query()).Create(&model.Record{
Sn: t.MessageSn,
UserSn: t.USN,
QuestionSn: t.QuestionSn,
Answer: t.Answer,
IsCorrect: isCorrect,
})
if err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil
}
return utils.ErrorsWrap(err)
}
return nil
}
type TopicAddPoint struct {
MessageSn int64 `json:"messageSn"`
Source AddPointSource `json:"source"`
USN int64 `json:"usn"`
Point int64 `json:"point"`
}
type AddPointSource int32
const (
AddPointSourceUnKnown = 0
AddPointSourceRandom = 1
AddPointSourceCategory = 2
AddPointSourceQuickly = 3
)
func (t *TopicAddPoint) Name() string {
return "qgdzs.user.point"
}
func (t *TopicAddPoint) OnMessage(ctx context.Context) error {
log.Infof("Kafka consume topic: %v: %#+v", t.Name(), t)
if t.USN == 0 || t.Point == 0 {
return utils.ErrorsWrap(errors.New("invalid data"))
}
// 积分记录 & 加分
err := repository.NewPointRecordsDao(ctx, repository.Query()).CreateAndIncrPointCard(&model.PointRecord{
Sn: t.MessageSn,
UserSn: t.USN,
Source: int32(t.Source),
Point: t.Point,
})
if err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil
}
return utils.ErrorsWrap(err)
}
return nil
}