feat kafka 改版 1

This commit is contained in:
2026-01-29 16:20:30 +08:00
parent aa42046fe9
commit defe0b8fff
2 changed files with 83 additions and 26 deletions

View File

@@ -9,7 +9,6 @@ import (
"git.hlsq.asia/mmorpg/service-common/utils" "git.hlsq.asia/mmorpg/service-common/utils"
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model"
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository"
"github.com/IBM/sarama"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -21,31 +20,89 @@ type TopicQuestionAnswer struct {
Answer string `json:"answer"` Answer string `json:"answer"`
} }
func startConsumer() { func (t *TopicQuestionAnswer) Name() string {
kafka.NewConsumer().Consume("qgdzs.question.answer", func(ctx context.Context, msg *sarama.ConsumerMessage) error { return "qgdzs.question.answer"
log.Infof("Kafka topic: qgdzs.question.answer: %s", string(msg.Value))
data := &TopicQuestionAnswer{}
if err := json.Unmarshal(msg.Value, &data); err != nil {
return utils.ErrorsWrap(err)
} }
type TopicAddPoint struct {
MessageSn string `json:"messageSn"`
Source AddPointSource `json:"source"`
USN string `json:"usn"`
Point int64 `json:"point"`
}
type AddPointSource int32
const (
AddPointSourceUnKnown = 0
AddPointSourceRandom = 1
AddPointSourceCategory = 2
AddPointSourceQuickly = 3
)
func (t *TopicAddPoint) Name() string {
return "qgdzs.user.point"
}
// !!!消费者必须做幂等!!!
func startConsumer() {
go kafka.NewConsumer().Consume(map[kafka.Topic]kafka.Handler{
&TopicQuestionAnswer{}: func(ctx context.Context, msg []byte) error {
data := &TopicQuestionAnswer{}
if err := json.Unmarshal(msg, data); err != nil {
return err
}
log.Infof("Kafka consume topic: %v: %#+v", data.Name(), data)
if data.USN == "" || data.QuestionSn == "" { if data.USN == "" || data.QuestionSn == "" {
return utils.ErrorsWrap(errors.New("invalid data")) return utils.ErrorsWrap(errors.New("invalid data"))
} }
// 答题记录
isCorrect := int32(0) isCorrect := int32(0)
if data.QuestionAnswer == data.Answer { if data.QuestionAnswer == data.Answer {
isCorrect = 1 isCorrect = 1
} }
_, err := repository.NewRecordDao(context.Background()).Create(&model.Record{ _, err := repository.NewRecordDao(ctx, repository.Query()).Create(&model.Record{
Sn: data.MessageSn, Sn: data.MessageSn,
UserSn: data.USN, UserSn: data.USN,
QuestionSn: data.QuestionSn, QuestionSn: data.QuestionSn,
Answer: data.Answer, Answer: data.Answer,
IsCorrect: isCorrect, IsCorrect: isCorrect,
}) })
if err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) { if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil return nil
} }
return utils.ErrorsWrap(err) return utils.ErrorsWrap(err)
}
return nil
},
&TopicAddPoint{}: func(ctx context.Context, msg []byte) error {
data := &TopicAddPoint{}
if err := json.Unmarshal(msg, data); err != nil {
return err
}
log.Infof("Kafka consume topic: %v: %#+v", data.Name(), data)
if data.USN == "" || data.Point == 0 {
return utils.ErrorsWrap(errors.New("invalid data"))
}
// 积分记录 & 加分
err := repository.NewPointRecordsDao(ctx, repository.Query()).CreateAndIncrPointCard(&model.PointRecord{
Sn: data.MessageSn,
UserSn: data.USN,
Source: int32(data.Source),
Point: data.Point,
})
if err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil
}
return utils.ErrorsWrap(err)
}
return nil
},
}) })
} }

View File

@@ -11,7 +11,7 @@ func (t *Timer) Start() {
_, _ = t.c.AddFunc("0/5 * * * ?", func() { _, _ = t.c.AddFunc("0/5 * * * ?", func() {
}) })
t.c.Start() t.c.Start()
go startConsumer() startConsumer()
} }
func (t *Timer) Stop() { func (t *Timer) Stop() {