我已经编写了一个gRPC Round Robin Load Balancer示例,它将附加到gRPC客户端 . 我想知道以下代码片段是否适用于其中一台服务器发生故障的情况 . 我已经检查了以下可以正常工作的方案

  • 当其中一台服务器从启动时关闭时

  • 其中一台服务器在交易过程中出现故障

  • Server重新上线

我的客户端具有重试选项,因此当其中一个服务器在事务中间发生故障时,客户端将重试与另一个服务器相同的操作 .

我的查询是,以下实现是正确的还是实现gRPC负载均衡的不正确方法 . 有关实现客户端负载 balancer 的更好方法的任何评论都会有所帮助 . 我尝试用HAProxy做同样的事情但在那种情况下我无法创建 Singleton Channel .

Note: 我正在使用Channel的状态来查明服务器是否已启动 .

using Grpc.Core;
    using System;
    using System.Collections.Generic;
    using System.Configuration;

    namespace GRPCLoadBalancer

    {   
    static class CustomGRPCLoadBalancerWithSingleton
    {
        private static Queue<Channel> m_WorkingQueue;
        private static readonly int m_waitLimit;
        private static Channel m_currentChosenChannel=null;
        private static object m_reachableQueueLockObject = new object();

        static string serverList = ConfigurationManager.AppSettings["GrpcArticleServerList"];

        static CustomGRPCLoadBalancerWithSingleton()
        {

            m_WorkingQueue = new Queue<Channel>();

            m_waitLimit = Convert.ToInt32(ConfigurationManager.AppSettings["GrpcArticleServerWaitLimitMilliSecond"]); //100           

            string[] allServers = serverList.Split(';');
            for (int i = 0, j = allServers.Length; i < j; i++)
            {
                //singleton objects for all servers            
                var ch = new Channel(allServers[i], ChannelCredentials.Insecure);
                CheckIfConnectionIsWorking(ch);
                m_WorkingQueue.Enqueue(ch);               
            }

        }

        internal static Channel GetWorkingChannel()
        {

            lock (m_reachableQueueLockObject)
            {
                for (int i = 0; i < m_WorkingQueue.Count; i++)
                {
                    m_currentChosenChannel = m_WorkingQueue.Dequeue();
                    m_WorkingQueue.Enqueue(m_currentChosenChannel);
                    if (m_currentChosenChannel.State == ChannelState.Idle || m_currentChosenChannel.State == ChannelState.Ready)
                    {                        
                        return m_currentChosenChannel;
                    }


                }
            }
            return null;
        }

        static bool CheckIfConnectionIsWorking(Channel serverChannel)
        {

            if (serverChannel != null)
            {
                try
                {

                    //send the heartbit
                    var client = new EditCMSWindowsService.Messages.EditCMSGrpcService.EditCMSGrpcServiceClient(serverChannel);
                    client.GetContentDetails
                                (new EditCMSWindowsService.Messages.GrpcArticleContentURI()
                                {
                                    BasicId = 1
                                },
                                 null, GetForwardTime(2000));
                }
                catch (Exception ex)
                {

                    return false;
                }
            }
            return false;
        }


        static DateTime GetForwardTime(int incrementMillisecond)
        {
            return DateTime.Now.AddMilliseconds(incrementMillisecond).ToUniversalTime();
        }


        public static void DisposeChannels()
        {
            lock (m_reachableQueueLockObject)
            {
                int count = m_WorkingQueue.Count;
                for (int i = 0; i < m_WorkingQueue.Count; i++)
                {
                    m_currentChosenChannel = m_WorkingQueue.Dequeue();
                    m_currentChosenChannel.ShutdownAsync();
                }
            }
        }

    }
}

我更新了上面的代码,不使用锁定并使用数组而不是队列,这给了我3倍的时间更快的结果 . 还添加了连接池,以便在需要时为服务创建多个通道 . 以下是代码:

static class CustomGRPCLoadBalancerWithSingleton
{
    static Channel[] _WorkingQueue;
    static readonly ILog _log = LogManager.GetLogger(typeof(CustomGRPCLoadBalancerWithSingleton));
    static int _poolSize;
    static string serverList = ConfigurationManager.AppSettings["GrpcArticleServerList"];
    static int _WorkingQueueSize;
    static int _currentIndex =0;

    static CustomGRPCLoadBalancerWithSingleton()
    {

        Int32.TryParse(ConfigurationManager.AppSettings["GrpcPoolSize"], out _poolSize);
        if (_poolSize <= 0)
            _poolSize = 1;

        string[] allServers = serverList.Split(new char[] { ';' },StringSplitOptions.RemoveEmptyEntries);
        if (allServers.Length > 0)
        {
            _WorkingQueueSize= allServers.Length * _poolSize;
            _WorkingQueue = new Channel[_WorkingQueueSize];
            int idx = 0;
            for (int j = 0; j < _poolSize; j++)
            {
                for (int i = 0; i < allServers.Length; i++)
                {
                    //singleton objects for all servers            
                    var ch = new Channel(allServers[i], ChannelCredentials.Insecure);
                    _WorkingQueue[idx] = ch;
                    idx++;
                }
            }

        }

    }

    internal static Channel GetWorkingChannel()
    {
        Channel currentChosenChannel = null;
        int curId = _currentIndex % _WorkingQueueSize;
        int idx = 0;
        while(idx<_WorkingQueueSize)
        {
            currentChosenChannel = _WorkingQueue[curId];
            if (currentChosenChannel.State == ChannelState.Idle || currentChosenChannel.State == ChannelState.Ready)
            {
                //ThreadContext.Properties["ChannelChosen"] = _currentIndex;
                //_log.Error(currentChosenChannel.ResolvedTarget+" " + _currentIndex);
                _currentIndex = ++curId;                    
                return currentChosenChannel;
            }
            curId++;
            curId = curId % _WorkingQueueSize;
            idx++;
        }

        return null;
    }


    public static void DisposeChannels()
    {
        for (int i = 0; i < _WorkingQueueSize; i++)
        {
            _WorkingQueue[i].ShutdownAsync();
        }
    }