feat kafka
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.hlsq.asia/mmorpg/service-common/db/kafka"
|
||||
"git.hlsq.asia/mmorpg/service-common/db/redis"
|
||||
"git.hlsq.asia/mmorpg/service-common/log"
|
||||
"git.hlsq.asia/mmorpg/service-common/net/http/http_resp"
|
||||
@@ -13,6 +14,7 @@ import (
|
||||
"git.hlsq.asia/mmorpg/service-qgdzs/internal/ai"
|
||||
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model"
|
||||
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository"
|
||||
"git.hlsq.asia/mmorpg/service-qgdzs/internal/timer"
|
||||
"gorm.io/gorm"
|
||||
"time"
|
||||
)
|
||||
@@ -172,16 +174,15 @@ func (s *Server) AnswerQuestion(ctx context.Context, req *grpc_pb.AnswerQuestion
|
||||
}
|
||||
// 保存答题记录
|
||||
if utils.ShouldBindUsn(ctx, &req.USN) {
|
||||
isCorrect := int32(0)
|
||||
if req.Answer == question.Answer {
|
||||
isCorrect = 1
|
||||
data := &timer.TopicQuestionAnswer{
|
||||
MessageSn: utils.SnowflakeInstance().Generate().String(),
|
||||
USN: req.USN,
|
||||
QuestionSn: question.Sn,
|
||||
QuestionAnswer: question.Answer,
|
||||
Answer: req.Answer,
|
||||
}
|
||||
_, _ = repository.NewRecordDao(ctx).Create(&model.Record{
|
||||
UserSn: req.USN,
|
||||
QuestionSn: question.Sn,
|
||||
Answer: req.Answer,
|
||||
IsCorrect: isCorrect,
|
||||
})
|
||||
marshal, _ := json.Marshal(data)
|
||||
kafka.NewProducer().Produce(ctx, "qgdzs.question.answer", string(marshal))
|
||||
}
|
||||
return &grpc_pb.AnswerQuestionResp{
|
||||
Answer: question.Answer,
|
||||
|
||||
51
internal/timer/consumer.go
Normal file
51
internal/timer/consumer.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package timer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.hlsq.asia/mmorpg/service-common/db/kafka"
|
||||
"git.hlsq.asia/mmorpg/service-common/log"
|
||||
"git.hlsq.asia/mmorpg/service-common/utils"
|
||||
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/model"
|
||||
"git.hlsq.asia/mmorpg/service-qgdzs/internal/dao/repository"
|
||||
"github.com/IBM/sarama"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type TopicQuestionAnswer struct {
|
||||
MessageSn string `json:"messageSn"`
|
||||
USN string `json:"usn"`
|
||||
QuestionSn string `json:"questionSn"`
|
||||
QuestionAnswer string `json:"questionAnswer"`
|
||||
Answer string `json:"answer"`
|
||||
}
|
||||
|
||||
func startConsumer() {
|
||||
kafka.NewConsumer().Consume("qgdzs.question.answer", func(ctx context.Context, msg *sarama.ConsumerMessage) error {
|
||||
log.Infof("qgdzs.question.answer: %s", string(msg.Value))
|
||||
data := &TopicQuestionAnswer{}
|
||||
if err := json.Unmarshal(msg.Value, &data); err != nil {
|
||||
return utils.ErrorsWrap(err)
|
||||
}
|
||||
if data.USN == "" || data.QuestionSn == "" {
|
||||
return utils.ErrorsWrap(errors.New("invalid data"))
|
||||
}
|
||||
|
||||
isCorrect := int32(0)
|
||||
if data.QuestionAnswer == data.Answer {
|
||||
isCorrect = 1
|
||||
}
|
||||
_, err := repository.NewRecordDao(context.Background()).Create(&model.Record{
|
||||
Sn: data.MessageSn,
|
||||
UserSn: data.USN,
|
||||
QuestionSn: data.QuestionSn,
|
||||
Answer: data.Answer,
|
||||
IsCorrect: isCorrect,
|
||||
})
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
return nil
|
||||
}
|
||||
return utils.ErrorsWrap(err)
|
||||
})
|
||||
}
|
||||
19
internal/timer/timer.go
Normal file
19
internal/timer/timer.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package timer
|
||||
|
||||
import "github.com/robfig/cron/v3"
|
||||
|
||||
type Timer struct {
|
||||
c *cron.Cron
|
||||
}
|
||||
|
||||
func (t *Timer) Start() {
|
||||
t.c = cron.New()
|
||||
_, _ = t.c.AddFunc("0/5 * * * ?", func() {
|
||||
})
|
||||
t.c.Start()
|
||||
go startConsumer()
|
||||
}
|
||||
|
||||
func (t *Timer) Stop() {
|
||||
t.c.Stop()
|
||||
}
|
||||
Reference in New Issue
Block a user