路由教程
路由功能介绍
源码
代码所在位置: rpc/plugins.go
func (c *CustomSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) (selected string) {
	if sc, ok := ctx.(*share.Context); ok {
		size := c.servers.Len()
		switch size {
		case 0:
			return ""
		default:
			reqMetaData := sc.Value(share.ReqMetaDataKey).(map[string]string)
			selected = reqMetaData[servicePath]
			//用户级别的请求
			uid := reqMetaData[tgf.ContextKeyUserId]
			rpcTip := reqMetaData[tgf.ContextKeyRPCType]
			broadcasts := make([]string, 1)
			if uid != "" {
				broadcasts[0] = uid
			}
			var bindNode bool
			if rpcTip == tgf.RPCBroadcastTip {
				ids := reqMetaData[tgf.ContextKeyBroadcastUserIds]
				broadcasts = strings.Split(ids, ",")
				if !c.checkServerAlive(selected) {
					key := client2.HashString(fmt.Sprintf("%v", time.Now().UnixNano()))
					selected, _ = c.h.Get(key).(string)
				}
				bindNode = true
			}
			if len(broadcasts) > 0 && broadcasts[0] != "" {
				for _, uid := range broadcasts {
					var key uint64
					//先判断携带节点信息是否存活
					if c.checkServerAlive(selected) {
						if bindNode {
							c.processNode(ctx, uid, selected, reqMetaData, servicePath)
						}
						continue
					}
					//从本地缓存中获取用户的节点数据
					selected, _ = c.cacheManager.Get(uid)
					if c.checkServerAlive(selected) {
						continue
					}
					//如果上面的用户节点获取,没有命中,那么取当前请求模式
					//如果是rpc推送请求,
					//if rpcTip == tgf.RPCTip {
					//从数据缓存中获取用户的节点数据
					reqMetaDataKey := fmt.Sprintf(tgf.RedisKeyUserNodeMeta, uid)
					reqMetaCacheData, suc := db.GetMap[string, string](reqMetaDataKey)
					if !suc {
						reqMetaCacheData = make(map[string]string)
					}
					selected = reqMetaCacheData[servicePath]
					if c.checkServerAlive(selected) {
						//将节点数据,放入本地缓存
						if reqMetaData[tgf.ContextKeyCloseLocalCache] == "" {
							c.cacheManager.Set(uid, selected)
						}
						continue
					} else {
						//通过一致性hash的方式,命中一个活跃的业务节点
						key = client2.HashString(uid)
						selected, _ = c.h.Get(key).(string)
						reqMetaData[servicePath] = selected
						c.processNode(ctx, uid, selected, reqMetaData, servicePath)
					}
				}
			} else {
				if c.checkServerAlive(selected) {
					//key := client2.HashString(fmt.Sprintf("%v", time.Now().Unix()))
					//selected, _ = c.h.Get(key).(string)
					return
				}
				key := client2.HashString(fmt.Sprintf("%v", time.Now().UnixNano()))
				selected, _ = c.h.Get(key).(string)
			}
			return
		}
	}
	return ""
}
func (c *CustomSelector) processNode(ctx context.Context, uid string, selected string, reqMetaData map[string]string, servicePath string) {
	reqMetaDataKeyTemp := fmt.Sprintf(tgf.RedisKeyUserNodeMeta, uid)
	db.PutMap(reqMetaDataKeyTemp, servicePath, selected, reqMetaDataTimeout)
	if reqMetaData[tgf.ContextKeyCloseLocalCache] == "" {
		c.cacheManager.Set(uid, selected)
	}
	if UploadUserNodeInfo.ModuleName != servicePath {
		util.Go(func() {
			if _, err := SendRPCMessage(ctx, UploadUserNodeInfo.New(&UploadUserNodeInfoReq{
				UserId:      uid,
				NodeId:      selected,
				ServicePath: servicePath,
			}, &UploadUserNodeInfoRes{ErrorCode: 0})); err != nil {
				log.Warn("[rpc] 节点更新异常 %v", err)
			}
		})
	}
}
 Last modified: 02 March 2024