package timer import ( "context" "errors" "git.hlsq.asia/mmorpg/service-common/db/kafka" "git.hlsq.asia/mmorpg/service-common/log" "git.hlsq.asia/mmorpg/service-common/utils" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository" "gorm.io/gorm" ) // !!!消费者必须做幂等!!! func startConsumer() { go kafka.NewConsumer().Consume([]kafka.Topic{ &TopicQuestionAnswer{}, &TopicAddPoint{}, }) } type TopicQuestionAnswer struct { MessageSn int64 `json:"messageSn"` USN int64 `json:"usn"` QuestionSn int64 `json:"questionSn"` QuestionAnswer string `json:"questionAnswer"` Answer string `json:"answer"` } func (t *TopicQuestionAnswer) Name() string { return "qgdzs.question.answer" } func (t *TopicQuestionAnswer) OnMessage(ctx context.Context) error { log.Infof("Kafka consume topic: %v: %#+v", t.Name(), t) if t.USN == 0 || t.QuestionSn == 0 { return utils.ErrorsWrap(errors.New("invalid data")) } // 答题记录 isCorrect := int32(0) if t.QuestionAnswer == t.Answer { isCorrect = 1 } _, err := repository.NewRecordDao(ctx, repository.Query()).Create(&model.Record{ Sn: t.MessageSn, UserSn: t.USN, QuestionSn: t.QuestionSn, Answer: t.Answer, IsCorrect: isCorrect, }) if err != nil { if errors.Is(err, gorm.ErrDuplicatedKey) { return nil } return utils.ErrorsWrap(err) } return nil } type TopicAddPoint struct { MessageSn int64 `json:"messageSn"` Source AddPointSource `json:"source"` USN int64 `json:"usn"` Point int64 `json:"point"` } type AddPointSource int32 const ( AddPointSourceUnKnown = 0 AddPointSourceRandom = 1 AddPointSourceCategory = 2 AddPointSourceQuickly = 3 ) func (t *TopicAddPoint) Name() string { return "qgdzs.user.point" } func (t *TopicAddPoint) OnMessage(ctx context.Context) error { log.Infof("Kafka consume topic: %v: %#+v", t.Name(), t) if t.USN == 0 || t.Point == 0 { return utils.ErrorsWrap(errors.New("invalid data")) } // 积分记录 & 加分 err := repository.NewPointRecordsDao(ctx, repository.Query()).CreateAndIncrPointCard(&model.PointRecord{ Sn: t.MessageSn, UserSn: t.USN, Source: int32(t.Source), Point: t.Point, }) if err != nil { if errors.Is(err, gorm.ErrDuplicatedKey) { return nil } return utils.ErrorsWrap(err) } return nil }