Хабрахабр

[Из песочницы] Асинхронная работа с PostgreSQL в C

Сегодня захотелось написать небольшую заметку об асинхронной работе с PostgreSQL в C. Мотивы просты: для небольшой утилитки встала необходимость реализовать такой функционал, гугл на тему понятных и рабочих примеров предательски молчал (нашелся только пример в pqxx для C++ — там есть метод асинхронного соединения и pipeline-класс для запросов), а официальная документация по этому вопросу хоть и весьма подробная, но не слишком структурированная, да и сам алгоритм работы с библиотекой libpq в асинхронном режиме имеет много подводных камней. Поэтому разобравшись в вопросе хочется поделиться результатами с общественностью, на случай если кому-то это будет полезным.
Итак, будем считать, что рассказывать что такое PostgreSQL никому не нужно, и чем синхронный (блокирующий) режим работы отличается от асинхронного читатели тоже примерно понимают. Кстати, кроме первого и очевидного достоинства асинхронных вызовов (они не блокируют ввод-вывод и выполнение потока, что освобождает от необходимости создавать дополнительные треды, синхронизировать их, и т.д.), в случае с Postgre есть еще один плюс: обычный метод PQexec позволяет за один раз получить результат выполнения только одного SQL-запроса, а асинхронные функции libpq такого ограничения не имеют.

Как я уже говорил, у libpq в асинхронном режиме есть довольно много подводных камней. Бывают библиотеки, где асинхронный режим реализован красиво и завершенно (разработчик вызывает абсолютно любой асинхронный метод, назначив ему callback, а после этого достаточно просто «вращать» event loop библиотеки (бесконечно или по таймеру вызывать метод), а далее уже сама библиотека позаботится об обработке команд в нужной последовательности, отлове событий и вызове колбэков), то у PostgreSQL модель работы другая.

Имеется большое множество команд для асинхронных соединений и запросов, которые должны быть вызываны в строго определенной последовательности в зависимости от текущего состояния и результата предыдущей операции, плюс необходимо вручную проверять готовность сокетов. Достаточно в каком-то месте ошибиться и вызвать функцию не вовремя или наоборот не вызвать, или попробовать обратиться к занятому сокету, и это может привести к блокировке потока (в худшем случае — бесконечной, то есть зависанию). И еще библиотека в асинхронном режиме почти никак не контроллирует таймаут операций — обо всем нужно будет заботиться самим.

В официальной документации большая часть информации по работе в асинхронном режиме приведена в следующих двух разделах: раз и два.

Ну а мы перейдем сразу к делу.

Чтобы установить соединение с БД в асинхронном режиме, порядок действий должен быть примерно таков:

1. Выделить память под структуру соединения и начать подключение методом PQconnectStart()

2. Запомнить текущее время, чтобы можно было в дальнейшем контроллировать таймаут операции.

3. Проверить успешность подключения, вызвав PQstatus(). Если результат равен CONNECTION_BAD, значит инициализация была не успешной (Например, ошибка в строке подключения или не удалось аллоцировать память), иначе же можно продолжать

4. Проверить методом PQconnectPoll() текущий статус подключения.

Возможные результаты:

PGRES_POLLING_WRITING - ожидание завершения отправки данных из сокета
PGRES_POLLING_READING - ожидание завершения чтения данных из сокета
PGRES_POLLING_FAILED - произошла ошибка во время обмена данными с сервером
PGRES_POLLING_OK - подключение выполнено успешно

5. В случае статуса PGRES_POLLING_WRITING или PGRES_POLLING_READING необходимо получить используемый сокет подключения методом PQsocket() и системными функциями select() или poll() проверять его доступность для записи или чтения данных до тех пор пока он не освободится, после чего повторить пункт 4 до достижения результата OK или FAILED, либо до истечения таймаута (не забываем, таймаут нужно проверять вручную).

Если следующий вызов PQconnectPoll() будет _до_ освобождения сокета, поток заблокируется, и это надо иметь в виду.

После всего этого, если все прошло успешно, мы получаем установленное соединение с БД. Порядок действий для выполнения SQL-запросов будет выглядить же примерно так:

1. Подготовить запрос к отправке на сервер командой PQsendQuery().

2. Установить неблокирующий режим для отправки запроса методом PQsetnonblocking(), потому что по умолчанию в libPq асинхронно выполняется только чтение, а не запись в сокет.

3. Выполнять PQflush() до тех пор пока она не выдаст 0 (запрос отправлен успешно) или -1 (ошибка).

4. Получить активный сокет и проверить его на готовность к чтению через select() или poll(), до тех пор пока он не будет готов к операции.

5. Выполнить PQconsumeInput(). Если функция вернула 0, то произошла ошибка.

6. Выполнить PQisBusy(). Если функция вернула 1, значит обработка запроса или чтения ответа сервера еще не завершено и нужно заново повторить алгоритм начиная с пункта 4.
Ну и не забываем контроллировать таймауты, само собой.

После выполнения всех вышеперечисленных операций, работать с результатами запроса можно как обычно — PQgetResult(), PQgetvalue(), и т.д.

А теперь перейдем к практике. Код на C, однако если захочется обернуть его в класс для использования в программе на C++, то as you wish, всё очень просто.

// Компилировать будем как-то так: gcc pgtest4.c -I/usr/include/postgresql -lpq
#include <libpq-fe.h> //< Си библиотека для работы PostgreSQL
#include <sys/socket.h> //< setsockopt() и некоторые константы
#include <sys/select.h> //< select()
#include <sys/time.h> //< gettimeoftheday()
#include <unistd.h> //< usleep() тоже может пригодиться #define SOCK_POLL_TIMEOUT 100 // таймаут ожидания освобождения сокета (на сколько можно максимально блокировать основной поток?) в мс typedef enum { DISCONNECTED = 0, CONN_POLLING, CONN_READING, CONN_WRITING, READY, QUERY_SENT, QUERY_FLUSHING, QUERY_BUSY, QUERY_READING, CLOSING, ERROR } pq_state; typedef enum { NO_ERROR = 0, ALLOCATION_FAIL, POLLING_FAIL, READING_FAIL, WRITING_FAIL, TIMEOUT_FAIL
} pq_error; struct pqconn_s{ pq_state state; //< текущее действие PGconn* conn; //< указатель на структуру с данными о соединении unsigned long start; //< время начала текущей операции (для таймаута) long timeout; //< таймаут текущей операции pq_error error; //< если случится что-то не то, сюда прилетит код ошибки
}; /** * @brief получить текущеем время * @return время в мс */
unsigned long time_ms(void)
{ struct timespec tp; // gettimeoftheday() тут использовать нельзя, оно может плавать clock_gettime(CLOCK_MONOTONIC, &tp); return (tp.tv_sec * 1000 + tp.tv_nsec / 1000000);
} /** * @brief проверить готовность (свободность) сокета к записи/чтению * @param socket_fd - дескриптор интересующего сокета * @param rw - 0 если проверяем на чтение, 1 если на запись * @return как и select(): -1 = ошибка, 0 - свободен (готов), 1 - занят */
int try_socket(int socket_fd, int rw)
{ fd_set fset; struct timeval sock_timeout; sock_timeout.tv_sec = 0; sock_timeout.tv_usec = SOCK_POLL_TIMEOUT; FD_ZERO(&fset); FD_SET(socket_fd, &fset); setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&sock_timeout, sizeof(struct timeval)); //здесь кстати возможно не помешает еще выставить SO_SNDTIMEO. экспериментируйте. return select(socket_fd + 1, ((!rw) ? &fset : NULL), ((rw) ? &fset : NULL), NULL, &sock_timeout); } /** * @brief начать процесс подключения к серверу БД * @param conninfo - строка подключения к БД * @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии * @param timeout - таймаут операции в мс * @return 0 - ошибка (можно узнать ее код в s->error), 1 - успех */
int pgsql_connection_start(const char* conninfo, struct pqconn_s* s, long timeout)
{ if (!s) return 0; if (!conninfo) { s->error = ALLOCATION_FAIL; return 0; } s->conn = PQconnectStart(conninfo); s->state = CONN_POLLING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; ConnStatusType status; status = PQstatus(s->conn); if (status == CONNECTION_BAD) { s->state = ERROR; s->error = POLLING_FAIL; return 0; } return 1;
} /** * @brief начать отправку запроса на сервер БД и получение ответа * @param command - SQL-запрос * @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии * @param timeout - таймаут операции в мс * @return 0 - ошибка, 1 - успех */
int pgsql_send_query(struct pqconn_s* s, const char *command, long timeout)
{ if (s->state != READY) { return 0; } if (!PQsendQuery(s->conn, command)) { return 0; } PQsetnonblocking(s->conn, 0); s->state = QUERY_FLUSHING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; return 1;
} /** * @brief основной цикл, метод должен вызываться периодично * @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии */
void pgsql_event_loop(struct pqconn_s* s)
{ if ((s->state == DISCONNECTED) || (s->state == READY)) return; if ((time_ms() - s->start) > s->timeout) { s->state = CLOSING; s->error = TIMEOUT_FAIL; } if (s->state == CONN_POLLING) { PostgresPollingStatusType poll_result; poll_result = PQconnectPoll(s->conn); if (poll_result == PGRES_POLLING_WRITING) s->state = CONN_WRITING; if (poll_result == PGRES_POLLING_READING) s->state = CONN_READING; if (poll_result == PGRES_POLLING_FAILED) { s->state = ERROR; s->error = POLLING_FAIL; } if (poll_result == PGRES_POLLING_OK) s->state = READY; } if (s->state == CONN_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CONN_WRITING) { int sock_state = try_socket(PQsocket(s->conn), 1); if (sock_state == -1) { s->error = WRITING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CLOSING) { PQfinish(s->conn); s->state = ERROR; } if (s->state == QUERY_FLUSHING) { int flush_res = PQflush(s->conn); if (0 == flush_res) s->state = QUERY_READING; if (-1 == flush_res) { s->error = WRITING_FAIL; s->state = CLOSING; } } if (s->state == QUERY_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = QUERY_BUSY; } if (s->state == QUERY_BUSY) { if (!PQconsumeInput(s->conn)) { s->error = READING_FAIL; s->state = CLOSING; } if (PQisBusy(s->conn)) s->state = QUERY_READING; else s->state = READY; }
}

В начале описываем все необходимые нам состояния и возможные ошибки, и объявляем структуру в которой будут храниться данные подключения и выполняемого действия — указатель на структур PGconn необходимую библиотеке для работы с сервером, состояния автомата, код ошибок (если они будут) и время начала текущей операции (для контроля тайм-аута).

Две маленькие функции time_ms() и try_socket() представляют собой обертки над функциями стандартной библиотеки для получения текущего времени в миллисекундах и проверки сокета на занятость соответственно.

Использовать же все это можно как-то примерно так:

int main(void)
{ struct pqconn_s s; pgsql_connection_start("dbname=db1 user=user1 password=password1 hostaddr=10.0.0.1 port=5432", &s, 15000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB connection failed \n"); return 1; } pgsql_send_query(&s, "SELECT * FROM history;", 50000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB query failed \n"); return 1; } PGresult *res; int rec_count; int row; int col; res = PQgetResult(s.conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { perror("We did not get any data!\n"); return 1; } rec_count = PQntuples(res); printf("Received %d records.\n", rec_count); for (row=0; row<rec_count; row++) { for (col=0; col<3; col++) { printf("%s\t", PQgetvalue(res, row, col)); } puts(""); } PQclear(res);
}

Понятно дело, что приведенный пример работает по факту все-таки в блокирующем режиме (т.к. происходит принудительно ожидание установки поля state структуры в состояние ERROR или READY), однако как можно догадаться, дело осталось за малым: нужно вместо этого добавить в pgsql_event_loop() вызов callback'ов в случае успешного соединения, получения данных или возникновения ошибки, а event loop крутить вместе с остальными действиями в основном цикле программы или вызывать его по таймеру, и тогда работа с базой будет идти по-настоящему асинхронно.

Искренне надеюсь, что вышеописанное окажется кому-нибудь полезным.

Теги
Показать больше

Похожие статьи

Кнопка «Наверх»
Закрыть