feat 初次提交
This commit is contained in:
54
internal/grpc_server/server/server.go
Normal file
54
internal/grpc_server/server/server.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"common/db/redis"
|
||||
"common/net/http/http_resp"
|
||||
"common/proto/ss/grpc_pb"
|
||||
"common/utils"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gorm.io/gorm"
|
||||
"user/internal/dao/model"
|
||||
"user/internal/dao/repository"
|
||||
)
|
||||
|
||||
func (s *Server) Login(ctx context.Context, req *grpc_pb.LoginReq) (*grpc_pb.LoginResp, error) {
|
||||
userDao := repository.NewUserDao(ctx, redis.GetCacheClient())
|
||||
user, err := userDao.FindByPhone(req.Phone)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
user = &model.User{
|
||||
Phone: req.Phone,
|
||||
}
|
||||
if err := userDao.Create(user); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
user.Name = fmt.Sprintf("user_%d", user.Sn)
|
||||
_ = userDao.Updates(user)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &grpc_pb.LoginResp{
|
||||
USN: user.Sn,
|
||||
Name: user.Name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) GetUserInfo(ctx context.Context, req *grpc_pb.GetUserInfoReq) (*grpc_pb.GetUserInfoResp, error) {
|
||||
if !utils.ShouldBindUsn(ctx, &req.USN) {
|
||||
return nil, http_resp.ParamError
|
||||
}
|
||||
userDao := repository.NewUserDao(ctx, redis.GetCacheClient())
|
||||
user, err := userDao.FindBySn(req.USN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &grpc_pb.GetUserInfoResp{
|
||||
USN: user.Sn,
|
||||
Name: user.Name,
|
||||
}, nil
|
||||
}
|
||||
32
internal/grpc_server/server/server_init.go
Normal file
32
internal/grpc_server/server/server_init.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"common/discover/common"
|
||||
"common/net/grpc/service"
|
||||
"common/proto/ss/grpc_pb"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
grpc_pb.UnimplementedUserServer
|
||||
service.Base
|
||||
}
|
||||
|
||||
func NewServer(ttl int64) *Server {
|
||||
s := &Server{
|
||||
Base: service.Base{
|
||||
Target: common.KeyDiscoverUser,
|
||||
EtcdTTL: ttl,
|
||||
},
|
||||
}
|
||||
s.Base.OnInit = s.OnInit
|
||||
s.Base.OnClose = s.OnClose
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) OnInit(serve *grpc.Server) {
|
||||
grpc_pb.RegisterUserServer(serve, s)
|
||||
}
|
||||
|
||||
func (s *Server) OnClose() {
|
||||
}
|
||||
65
internal/grpc_server/stream_client/gateway.go
Normal file
65
internal/grpc_server/stream_client/gateway.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package stream_client
|
||||
|
||||
import (
|
||||
"common/log"
|
||||
"common/net/grpc/service"
|
||||
"context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
var gatewayServerM map[int64]map[GatewayFun]grpc.ClientStream // map[sid]map[方法名]流连接
|
||||
|
||||
type GatewayFun int
|
||||
|
||||
const (
|
||||
FunToClient GatewayFun = iota
|
||||
)
|
||||
|
||||
func init() {
|
||||
gatewayServerM = make(map[int64]map[GatewayFun]grpc.ClientStream)
|
||||
}
|
||||
|
||||
func FindGatewayBySID(sid int64, fun GatewayFun) (grpc.ClientStream, error) {
|
||||
g := gatewayServerM[sid]
|
||||
if g == nil {
|
||||
g = make(map[GatewayFun]grpc.ClientStream)
|
||||
gatewayServerM[sid] = g
|
||||
}
|
||||
gatewayLink := g[fun]
|
||||
if gatewayLink == nil {
|
||||
gatewayClient, err := service.GatewayNewClient(sid)
|
||||
if err != nil {
|
||||
log.Errorf("cannot find gatewayClient: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
var link grpc.ClientStream
|
||||
switch fun {
|
||||
case FunToClient:
|
||||
link, err = gatewayClient.ToClient(context.Background())
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("FindGatewayBySID %v err: %v, sid: %v", fun, err, sid)
|
||||
return nil, err
|
||||
}
|
||||
g[fun] = link
|
||||
gatewayLink = link
|
||||
}
|
||||
return gatewayLink, nil
|
||||
}
|
||||
|
||||
func SendMessageToGateway(sid int64, fun GatewayFun, msg proto.Message, re ...bool) error {
|
||||
stream, err := FindGatewayBySID(sid, fun)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = stream.SendMsg(msg); err != nil {
|
||||
if re == nil || !re[0] {
|
||||
_ = stream.CloseSend()
|
||||
delete(gatewayServerM[sid], fun)
|
||||
return SendMessageToGateway(sid, fun, msg, true)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user