feat kafka 改版 2
This commit is contained in:
@@ -2,7 +2,6 @@ package timer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.hlsq.asia/mmorpg/service-common/db/kafka"
|
||||
"git.hlsq.asia/mmorpg/service-common/log"
|
||||
@@ -12,6 +11,14 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// !!!消费者必须做幂等!!!
|
||||
func startConsumer() {
|
||||
go kafka.NewConsumer().Consume([]kafka.Topic{
|
||||
&TopicQuestionAnswer{},
|
||||
&TopicAddPoint{},
|
||||
})
|
||||
}
|
||||
|
||||
type TopicQuestionAnswer struct {
|
||||
MessageSn string `json:"messageSn"`
|
||||
USN string `json:"usn"`
|
||||
@@ -24,6 +31,34 @@ func (t *TopicQuestionAnswer) Name() string {
|
||||
return "qgdzs.question.answer"
|
||||
}
|
||||
|
||||
func (t *TopicQuestionAnswer) OnMessage(ctx context.Context) error {
|
||||
log.Infof("Kafka consume topic: %v: %#+v", t.Name(), t)
|
||||
if t.USN == "" || t.QuestionSn == "" {
|
||||
return utils.ErrorsWrap(errors.New("invalid data"))
|
||||
}
|
||||
|
||||
// 答题记录
|
||||
isCorrect := int32(0)
|
||||
if t.QuestionAnswer == t.Answer {
|
||||
isCorrect = 1
|
||||
}
|
||||
_, err := repository.NewRecordDao(ctx, repository.Query()).Create(&model.Record{
|
||||
Sn: t.MessageSn,
|
||||
UserSn: t.USN,
|
||||
QuestionSn: t.QuestionSn,
|
||||
Answer: t.Answer,
|
||||
IsCorrect: isCorrect,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
return nil
|
||||
}
|
||||
return utils.ErrorsWrap(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type TopicAddPoint struct {
|
||||
MessageSn string `json:"messageSn"`
|
||||
Source AddPointSource `json:"source"`
|
||||
@@ -44,65 +79,25 @@ func (t *TopicAddPoint) Name() string {
|
||||
return "qgdzs.user.point"
|
||||
}
|
||||
|
||||
// !!!消费者必须做幂等!!!
|
||||
func startConsumer() {
|
||||
go kafka.NewConsumer().Consume(map[kafka.Topic]kafka.Handler{
|
||||
&TopicQuestionAnswer{}: func(ctx context.Context, msg []byte) error {
|
||||
data := &TopicQuestionAnswer{}
|
||||
if err := json.Unmarshal(msg, data); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Kafka consume topic: %v: %#+v", data.Name(), data)
|
||||
if data.USN == "" || data.QuestionSn == "" {
|
||||
return utils.ErrorsWrap(errors.New("invalid data"))
|
||||
}
|
||||
func (t *TopicAddPoint) OnMessage(ctx context.Context) error {
|
||||
log.Infof("Kafka consume topic: %v: %#+v", t.Name(), t)
|
||||
if t.USN == "" || t.Point == 0 {
|
||||
return utils.ErrorsWrap(errors.New("invalid data"))
|
||||
}
|
||||
|
||||
// 答题记录
|
||||
isCorrect := int32(0)
|
||||
if data.QuestionAnswer == data.Answer {
|
||||
isCorrect = 1
|
||||
}
|
||||
_, err := repository.NewRecordDao(ctx, repository.Query()).Create(&model.Record{
|
||||
Sn: data.MessageSn,
|
||||
UserSn: data.USN,
|
||||
QuestionSn: data.QuestionSn,
|
||||
Answer: data.Answer,
|
||||
IsCorrect: isCorrect,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
return nil
|
||||
}
|
||||
return utils.ErrorsWrap(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
&TopicAddPoint{}: func(ctx context.Context, msg []byte) error {
|
||||
data := &TopicAddPoint{}
|
||||
if err := json.Unmarshal(msg, data); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Kafka consume topic: %v: %#+v", data.Name(), data)
|
||||
if data.USN == "" || data.Point == 0 {
|
||||
return utils.ErrorsWrap(errors.New("invalid data"))
|
||||
}
|
||||
|
||||
// 积分记录 & 加分
|
||||
err := repository.NewPointRecordsDao(ctx, repository.Query()).CreateAndIncrPointCard(&model.PointRecord{
|
||||
Sn: data.MessageSn,
|
||||
UserSn: data.USN,
|
||||
Source: int32(data.Source),
|
||||
Point: data.Point,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
return nil
|
||||
}
|
||||
return utils.ErrorsWrap(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
// 积分记录 & 加分
|
||||
err := repository.NewPointRecordsDao(ctx, repository.Query()).CreateAndIncrPointCard(&model.PointRecord{
|
||||
Sn: t.MessageSn,
|
||||
UserSn: t.USN,
|
||||
Source: int32(t.Source),
|
||||
Point: t.Point,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrDuplicatedKey) {
|
||||
return nil
|
||||
}
|
||||
return utils.ErrorsWrap(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user