Хабрахабр

[Перевод] Пишем на Go простой балансировщик

Балансировщики нагрузки играют в веб-архитектуре ключевую роль. Они позволяют распределять нагрузку по нескольким бэкендам, тем самым улучшая масштабируемость. А поскольку у нас сконфигурировано несколько бэкендов, сервис становится высокодоступным, потому что в случае сбоя на одном сервере балансировщик может выбирать другой работающий сервер.

Написал я его на Go, это современный язык, поддерживающий полноценный параллелизм. Поигравшись с профессиональными балансировщиками наподобие NGINX, я попробовал ради веселья создать простенький балансировщик. К тому же для простоты распространения она генерирует единственный статически скомпонованный бинарник.
Стандартная библиотека в Go имеет широкие возможности и позволяет писать высокопроизводительные приложения с меньшим количеством кода.

Как работает наш балансировщик

Для распределения нагрузки по бэкендам используются разные алгоритмы. Например:

  • Round Robin — нагрузка распределяется равномерно, с учётом одинаковой вычислительной мощности серверов.
  • Weighted Round Robin — в зависимости от вычислительной мощности серверам могут присваиваться разные веса.
  • Least Connections — нагрузка распределяется по серверам с наименьшим количеством активных подключений.

В нашем балансировщике мы реализуем простейший алгоритм — Round Robin.

Выбор в Round Robin

Алгоритм Round Robin прост. Он даёт всем исполнителям одинаковые возможности по выполнению задач.

Выбор серверов в Round Robin для обработки входящих запросов.

Но мы не можем выбирать их напрямую, верно? Как показано на иллюстрации, алгоритм выбирает серверы по кругу, циклически.

Вероятно, нам не нужно отправлять на него трафик. А если сервер лежит? Нужно направлять трафик только на те серверы, которые запущены и работают. То есть сервер не может использоваться напрямую, пока мы не приведём его в нужное состояние.

Определим структуры

Нам нужно отслеживать все подробности, связанные с бэкендом. Необходимо знать, живой ли он, а также отслеживать URL. Для этого мы можем определить такую структуру:

type Backend struct { URL *url.URL Alive bool mux sync.RWMutex ReverseProxy *httputil.ReverseProxy
}

Не волнуйтесь, я поясню значения полей в Backend.

Для этого можно воспользоваться Slice и счётчиком переменных. Теперь в балансировщике нужно как-то отслеживать все бэкенды. Определим его в ServerPool:

type ServerPool struct { backends []*Backend current uint64
}

Использование ReverseProxy

Как мы уже определили, суть балансировщика в распределении трафика по разным серверам и возвращении результатов клиенту. Как сказано в документации Go:

ReverseProxy — это обработчик HTTP, который берёт входящие запросы и отправляет на другой сервер, проксируя ответы обратно клиенту.

Не надо изобретать колесо. Именно то, что нам нужно. Можно просто транслировать наши запросы через ReverseProxy.

u, _ := url.Parse("http://localhost:8080")
rp := httputil.NewSingleHostReverseProxy(u) // initialize your server and add this as handler
http.HandlerFunc(rp.ServeHTTP)

C помощью httputil.NewSingleHostReverseProxy(url) можно инициализировать ReverseProxy, который будет транслировать запросы на переданный url. В приведённом выше примере все запросы переданы на localhost:8080, а результаты отосланы клиенту.

Поэтому можно передавать его HandlerFunc в http. Если посмотреть на сигнатуру метода ServeHTTP, то в ней можно найти сигнатуру HTTP-обработчика.

Другие примеры есть в документации.

Для нашего балансировщика можно инициировать ReverseProxy с ассоциированным URL в Backend, чтобы ReverseProxy маршрутизировал запросы в URL.

Процесс выбора серверов

В ходе очередного выбора сервера нам нужно пропускать лежащие серверы. Но необходимо организовать подсчёт.

Для предотвращения этого мы можем блокировать ServerPool с помощью mutex. Многочисленные клиенты будут подключаться к балансировщику, и когда каждый из них попросит следующий узел передать трафик, может возникнуть состояние гонки. Нам лишь нужно увеличить счётчик на единицу. Но это будет избыточно, к тому же мы вообще не хотим блокировать ServerPool.

Go поддерживает его с помощью пакета atomic. Наилучшим решением, соблюдающим эти требования, будет атомарное инкрементирование.

func (s *ServerPool) NextIndex() int { return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
}

Мы атомарно увеличиваем текущее значение на единицу и возвращаем индекс, изменяя длину массива. Это означает, что значение всегда должно лежать в диапазоне от 0 до длины массива. В конце нас будет интересовать конкретный индекс, а не весь счётчик.

Выбор живого сервера

Мы уже знаем, что наши запросы циклически ротируются по всем серверам. И нам нужно лишь пропускать неработающие.

В любой момент мы можем получить следующий узел, и если он неактивен, нужно в рамках цикла искать дальше по массиву. GetNext() всегда возвращает значение, лежащее в диапазоне от 0 до длины массива.

Циклически проходим по массиву.

Это можно сделать с помощью next + length. Как показано на иллюстрации, мы хотим пройти от следующего узла до конца списка. Это легко можно сделать с помощью операции модифицирования. Но для выбора индекса нужно ограничить его рамками длины массива.

После того, как мы в ходе поиска нашли работающий сервер, его нужно пометить как текущий:

// GetNextPeer returns next active peer to take a connection
func (s *ServerPool) GetNextPeer() *Backend return s.backends[idx] } } return nil
}

Избегаем состояния гонки в структуре Backend

Здесь нужно помнить о важной проблеме. Структура Backend содержит переменную, которую могут изменять или запрашивать одновременно несколько горутин.

Поэтому для сериализации доступа к Alive мы выбрали RWMutex. Мы знаем, что читать переменную будет больше горутин, чем записывать в неё.

// SetAlive for this backend
func (b *Backend) SetAlive(alive bool) { b.mux.Lock() b.Alive = alive b.mux.Unlock()
} // IsAlive returns true when backend is alive
func (b *Backend) IsAlive() (alive bool) { b.mux.RLock() alive = b.Alive b.mux.RUnlock() return
}

Балансируем запросы

Теперь можно сформулировать простой метод для балансировки наших запросов. Он будет сбоить лишь в том случае, если упадут все серверы.

// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) { peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

Этот метод можно передать HTTP-серверу просто в виде HandlerFunc.

server := http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: http.HandlerFunc(lb),
}

Маршрутизируем трафик только на работающие серверы

У нашего балансировщика серьёзная проблема. Мы не знаем, работает ли сервер. Чтобы узнать это, нужно проверить сервер. Сделать это можно двумя способами:

  • Активный: выполняя текущий запрос, мы обнаруживаем, что выбранный сервер не отвечает, и помечаем его как нерабочий.
  • Пассивный: можно пинговать серверы с каким-то интервалом и проверять статус.

Активно проверяем работающие серверы

При любой ошибке ReverseProxy инициирует функцию обратного вызова ErrorHandler. Это можно применять для обнаружения сбоев:

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { log.Printf("[%s] %s\n", serverUrl.Host, e.Error()) retries := GetRetryFromContext(request) if retries < 3 { select { case <-time.After(10 * time.Millisecond): ctx := context.WithValue(request.Context(), Retry, retries+1) proxy.ServeHTTP(writer, request.WithContext(ctx)) } return } // after 3 retries, mark this backend as down serverPool.MarkBackendStatus(serverUrl, false) // if the same request routing for few attempts with different backends, increase the count attempts := GetAttemptsFromContext(request) log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts) ctx := context.WithValue(request.Context(), Attempts, attempts+1) lb(writer, request.WithContext(ctx))
}

При разработке этого обработчика ошибок мы использовали возможности замыканий. Это позволяет нам захватывать в наш метод такие внешние переменные, как серверный URL. Обработчик проверяет счётчик повторов, и если он меньше 3, то мы снова отправляем тот же запрос тому же серверу. Это делается потому, что из-за временных ошибок сервер может отбрасывать наши запросы, но вскоре он становится доступен (возможно, у сервера не было свободных сокетов для новых клиентов). Так что нужно настроить таймер задержки для новой попытки примерно через 10 мс. С каждым запросом мы увеличиваем счётчик попыток.

После сбоя каждой попытки мы помечаем сервер как нерабочий.

Делать мы это будем с помощью счётчика попыток, использующего пакет context. Теперь нужно назначить для того же запроса новый сервер. После увеличения счётчика попыток мы передаём его в lb для выбора нового сервера для обработки запроса.

Мы не можем делать это бесконечно, так что будем проверять в lb, не достигнуто ли максимальное количество попыток, прежде чем продолжить обработку запроса.

Можно просто получить счётчик попыток из запроса, если он достиг максимума, то мы прерываем запрос.

// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) { attempts := GetAttemptsFromContext(r) if attempts > 3 { log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) http.Error(w, "Service not available", http.StatusServiceUnavailable) return } peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

Это рекурсивная реализация.

Использование пакета context

Пакет context позволяет сохранять полезные данные в HTTP-запросах. Мы будем активно это использовать для отслеживания данных, относящихся к запросам — счётчиков Attempt и Retry.

Рекомендуется использовать не строковые, а уникальные числовые значения. Во-первых, нужно задать ключи для контекста. Это прекрасное решение для определения числовых ключей. В Go есть ключевое слова iota для инкрементальной реализации констант, каждая из которых содержит уникальное значение.

const ( Attempts int = iota Retry
)

Затем можно извлечь значение, как мы обычно делаем с помощью HashMap. Возвращаемое по умолчанию значение может зависеть от текущей ситуации.

// GetAttemptsFromContext returns the attempts for request
func GetRetryFromContext(r *http.Request) int { if retry, ok := r.Context().Value(Retry).(int); ok { return retry } return 0
}

Пассивная проверка серверов

Пассивные проверки позволяют идентифицировать и восстанавливать упавшие серверы. Мы пингуем их с определённым интервалом, чтобы определить их статус.

Если сервер отвечает, мы помечаем его рабочим. Для пингования попробуем установить TCP-соединение. Удостоверьтесь, что закрыли подключение после его создания, чтобы уменьшить дополнительную нагрузку на сервер. Этот метод можно адаптировать для вызова специфических конечных точек наподобие /status. Иначе он будет пытаться поддерживать это подключение и в конце концов исчерпает свои ресурсы.

// isAlive checks whether a backend is Alive by establishing a TCP connection
func isBackendAlive(u *url.URL) bool { timeout := 2 * time.Second conn, err := net.DialTimeout("tcp", u.Host, timeout) if err != nil { log.Println("Site unreachable, error: ", err) return false } _ = conn.Close() // close it, we dont need to maintain this connection return true
}

Теперь можно итерировать серверы и отмечать их статусы:

// HealthCheck pings the backends and update the status
func (s *ServerPool) HealthCheck() { for _, b := range s.backends { status := "up" alive := isBackendAlive(b.URL) b.SetAlive(alive) if !alive { status = "down" } log.Printf("%s [%s]\n", b.URL, status) }
}

Для периодического запуска этого кода можно запустить в Go таймер. Он позволит слушать события в канале.

// healthCheck runs a routine for check status of the backends every 2 mins
func healthCheck() { t := time.NewTicker(time.Second * 20) for { select { case <-t.C: log.Println("Starting health check...") serverPool.HealthCheck() log.Println("Health check completed") } }
}

В этом коде канал <-t.C будет возвращать значение каждые 20 секунд. select позволяет определять это событие. При отсутствии ситуации default он ждёт, пока хотя бы один case может быть выполнен.

Теперь запускаем код в отдельной горутине:

go healthCheck()

Заключение

В этой статье мы рассмотрели много вопросов:

  • Алгоритм Round Robin
  • ReverseProxy из стандартной библиотеки
  • Мьютексы
  • Атомарные операции
  • Замыкания
  • Обратные вызовы
  • Операция выбора

Есть ещё много способов улучшить наш балансировщик. Например:

  • Использовать кучу для сортировки живых серверов, чтобы уменьшить область поиска.
  • Собирать статистику.
  • Реализовать алгоритм weighted round-robin c наименьшим количеством коннектов.
  • Добавить поддержку конфигурационных файлов.

И так далее.

Исходный код лежит здесь.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»