From defe0b8fff2bbebdadb1db5fbaa05f81499b2bcf Mon Sep 17 00:00:00 2001 From: "DESKTOP-V763RJ7\\Administrator" <835606593@qq.com> Date: Thu, 29 Jan 2026 16:20:30 +0800 Subject: [PATCH] =?UTF-8?q?feat=20kafka=20=E6=94=B9=E7=89=88=201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/timer/consumer.go | 107 ++++++++++++++++++++++++++++--------- internal/timer/timer.go | 2 +- 2 files changed, 83 insertions(+), 26 deletions(-) diff --git a/internal/timer/consumer.go b/internal/timer/consumer.go index 1ecbd23..01d7242 100644 --- a/internal/timer/consumer.go +++ b/internal/timer/consumer.go @@ -9,7 +9,6 @@ import ( "git.hlsq.asia/mmorpg/service-common/utils" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository" - "github.com/IBM/sarama" "gorm.io/gorm" ) @@ -21,31 +20,89 @@ type TopicQuestionAnswer struct { Answer string `json:"answer"` } -func startConsumer() { - kafka.NewConsumer().Consume("qgdzs.question.answer", func(ctx context.Context, msg *sarama.ConsumerMessage) error { - log.Infof("Kafka topic: qgdzs.question.answer: %s", string(msg.Value)) - data := &TopicQuestionAnswer{} - if err := json.Unmarshal(msg.Value, &data); err != nil { - return utils.ErrorsWrap(err) - } - if data.USN == "" || data.QuestionSn == "" { - return utils.ErrorsWrap(errors.New("invalid data")) - } +func (t *TopicQuestionAnswer) Name() string { + return "qgdzs.question.answer" +} + +type TopicAddPoint struct { + MessageSn string `json:"messageSn"` + Source AddPointSource `json:"source"` + USN string `json:"usn"` + Point int64 `json:"point"` +} + +type AddPointSource int32 + +const ( + AddPointSourceUnKnown = 0 + AddPointSourceRandom = 1 + AddPointSourceCategory = 2 + AddPointSourceQuickly = 3 +) + +func (t *TopicAddPoint) Name() string { + return "qgdzs.user.point" +} + +// !!!消费者必须做幂等!!! +func startConsumer() { + go kafka.NewConsumer().Consume(map[kafka.Topic]kafka.Handler{ + &TopicQuestionAnswer{}: func(ctx context.Context, msg []byte) error { + data := &TopicQuestionAnswer{} + if err := json.Unmarshal(msg, data); err != nil { + return err + } + log.Infof("Kafka consume topic: %v: %#+v", data.Name(), data) + if data.USN == "" || data.QuestionSn == "" { + return utils.ErrorsWrap(errors.New("invalid data")) + } + + // 答题记录 + isCorrect := int32(0) + if data.QuestionAnswer == data.Answer { + isCorrect = 1 + } + _, err := repository.NewRecordDao(ctx, repository.Query()).Create(&model.Record{ + Sn: data.MessageSn, + UserSn: data.USN, + QuestionSn: data.QuestionSn, + Answer: data.Answer, + IsCorrect: isCorrect, + }) + if err != nil { + if errors.Is(err, gorm.ErrDuplicatedKey) { + return nil + } + return utils.ErrorsWrap(err) + } - isCorrect := int32(0) - if data.QuestionAnswer == data.Answer { - isCorrect = 1 - } - _, err := repository.NewRecordDao(context.Background()).Create(&model.Record{ - Sn: data.MessageSn, - UserSn: data.USN, - QuestionSn: data.QuestionSn, - Answer: data.Answer, - IsCorrect: isCorrect, - }) - if errors.Is(err, gorm.ErrDuplicatedKey) { return nil - } - return utils.ErrorsWrap(err) + }, + &TopicAddPoint{}: func(ctx context.Context, msg []byte) error { + data := &TopicAddPoint{} + if err := json.Unmarshal(msg, data); err != nil { + return err + } + log.Infof("Kafka consume topic: %v: %#+v", data.Name(), data) + if data.USN == "" || data.Point == 0 { + return utils.ErrorsWrap(errors.New("invalid data")) + } + + // 积分记录 & 加分 + err := repository.NewPointRecordsDao(ctx, repository.Query()).CreateAndIncrPointCard(&model.PointRecord{ + Sn: data.MessageSn, + UserSn: data.USN, + Source: int32(data.Source), + Point: data.Point, + }) + if err != nil { + if errors.Is(err, gorm.ErrDuplicatedKey) { + return nil + } + return utils.ErrorsWrap(err) + } + + return nil + }, }) } diff --git a/internal/timer/timer.go b/internal/timer/timer.go index d4e9432..b56a1aa 100644 --- a/internal/timer/timer.go +++ b/internal/timer/timer.go @@ -11,7 +11,7 @@ func (t *Timer) Start() { _, _ = t.c.AddFunc("0/5 * * * ?", func() { }) t.c.Start() - go startConsumer() + startConsumer() } func (t *Timer) Stop() {