package timer import ( "context" "encoding/json" "errors" "git.hlsq.asia/mmorpg/service-common/db/kafka" "git.hlsq.asia/mmorpg/service-common/log" "git.hlsq.asia/mmorpg/service-common/utils" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model" "git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository" "github.com/IBM/sarama" "gorm.io/gorm" ) type TopicQuestionAnswer struct { MessageSn string `json:"messageSn"` USN string `json:"usn"` QuestionSn string `json:"questionSn"` QuestionAnswer string `json:"questionAnswer"` Answer string `json:"answer"` } func startConsumer() { kafka.NewConsumer().Consume("qgdzs.question.answer", func(ctx context.Context, msg *sarama.ConsumerMessage) error { log.Infof("Kafka topic: qgdzs.question.answer: %s", string(msg.Value)) data := &TopicQuestionAnswer{} if err := json.Unmarshal(msg.Value, &data); err != nil { return utils.ErrorsWrap(err) } if data.USN == "" || data.QuestionSn == "" { return utils.ErrorsWrap(errors.New("invalid data")) } isCorrect := int32(0) if data.QuestionAnswer == data.Answer { isCorrect = 1 } _, err := repository.NewRecordDao(context.Background()).Create(&model.Record{ Sn: data.MessageSn, UserSn: data.USN, QuestionSn: data.QuestionSn, Answer: data.Answer, IsCorrect: isCorrect, }) if errors.Is(err, gorm.ErrDuplicatedKey) { return nil } return utils.ErrorsWrap(err) }) }