Хабрахабр

[Из песочницы] Маленькое удобство в жизни студента

Нехитрая история о том, как мне стало стыдно постоянно просить у одногруппников пропущенную информацию и я решил чуть-чуть облегчить нам жизнь.

image

При таких условиях маловероятно, что эту важную информацию увидят все. Полагаю, многим моим ровесникам знакома знакома ситуация, когда в общем чате, где довольно часто мелькает важная информация, расположилось около 30 активных собеседников, постоянно нагружающих базы данных Вконтакте своими сообщениями. Год назад было принято решение исправить это недоразумение. Так случается и со мной.

Тех, кто готов не возмущаться по поводу очередной статьи про бота, прошу под кат.

Благодаря относительно новым фичам Вконтакте (а именно личные сообщения сообществ) решение бросилось в глаза сразу. Так как являюсь студентом первого уровня, примеры будут связаны с этой тематикой.
Итак, есть задача: сделать передачу информации от старосты к студентам удобной как для старосты, так и для студентов. Бот, сидящий в группе должен принимать сообщения от старосты (старост, если на потоке много групп) и рассылать их заинтересованным лицам (студентам).

Задача поставлена, приступаем.

Нам понадобятся:

  1. библиотека vk_api для использования Vk Api
  2. peewee orm для работы с базой данных
  3. и встроенные модули python

Также перед прочтением предлагаю освежить в памяти паттерны "Наблюдатель" (хабр, вики) и "Фасад" (хабр, вики)

"Приятно познакомиться, товарищ бот." Часть 1.

Создадим класс с названием Group. Для начала следует научить нашего бота понимать себя как сообщество. В качестве аргументов пусть принимает объект сессии и объект представителя (Proxy) базы данных.

class Group(BaseCommunicateVK): def __init__(self, vksession, storage): super().__init__(vksession) self.storage = storage

BaseCommunicateVK? Что там?

Решение вынести эту функциональность в отдельный класс объясняется тем, что в будущем, возможно, кто-то из вас решит дополнить бота каким-нибудь другим функционалом Вконтакте.
Ну и чтобы разгрузить абстракцию сообщества, естественно.

class BaseCommunicateVK: longpoll = None def __init__(self, vksession): self.session = vksession self.api = vksession.get_api() if BaseCommunicateVK.longpoll is None: BaseCommunicateVK.longpoll = VkLongPoll(self.session) def get_api(self): return self.api def get_longpoll(self): return self.longpoll def method(self, func, args): return self.api.method(func, args) @staticmethod def create_session(token=None, login=None, password=None, api_v='5.85'): try: if token: session = vk_api.VkApi(token=token, api_version=api_v) elif login and password: session = vk_api.VkApi(login, password, api_version=api_v) else: raise vk_api.AuthError("Define login and password or token.") return session except vk_api.ApiError as error: logging.info(error) def get_last_message(self, user_id): return self.api.messages.getHistory( peer_id=user_id, count=1)["items"][0] @staticmethod def get_attachments(last_message): if not last_message or "attachments" not in last_message: return "" attachments = last_message["attachments"] attach_strings = [] for attach in attachments: attach_type = attach["type"] attach_info = attach[attach_type] attach_id = attach_info["id"] attach_owner_id = attach_info["owner_id"] if "access_key" in attach_info: access_key = attach_info["access_key"] attach_string = "{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key) else: attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id) attach_strings.append(attach_string) return ",".join(attach_strings) @staticmethod def get_forwards(attachments, last_message): if not attachments or "fwd_count" not in attachments: return "" if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]): return last_message["id"] def send(self, user_id, message, attachments=None, **kwargs): send_to = int(user_id) if "last_message" in kwargs: last_message = kwargs["last_message"] else: last_message = None p_attachments = self.get_attachments(last_message) p_forward = self.get_forwards(attachments, last_message) if message or p_attachments or p_forward: self.api.messages.send( user_id=send_to, message=message, attachment=p_attachments, forward_messages=p_forward) if destroy: accept_msg_id = self.api.messages \ .getHistory(peer_id=user_id, count=1) \ .get('items')[0].get('id') self.delete(accept_msg_id, destroy_type=destroy_type) def delete(self, msg_id, destroy_type=1): self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)

Сразу разделим их на администраторов и участников и сохраним в бд. Создадим метод для обновления участников сообщества.

  • self.api настраивается при создании базового класса Group (BaseCommunicateVK)

def update_members(self): fields = 'domain, sex' admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers') self.save_members(self._configure_users(admins)) members = self.api.groups.getMembers(group_id=self.group_id, fields=fields) self.save_members(self._configure_users(members)) return self def save_members(self, members): self.storage.update(members) @staticmethod
def _configure_users(items, exclude=None): if exclude is None: exclude = [] users = [] for user in items.get('items'): if user.get('id') not in exclude: member = User() member.configure(**user) users.append(member) return users

В параметрах: список адресатов, текст сообщения и приложения. Еще этот класс должен уметь рассылать сообщения адресатам, поэтому следующий метод в студию. Запускается всё это дело в отдельном потоке чтобы бот мог принимать сообщения от других участников.
Принимаются сообщения в синхронном режиме, поэтому с увеличением числа активных клиентов скорость отклика, очевидно, поубавится.

def broadcast(self, uids, message, attachments=None, **kwargs): report = BroadcastReport() def send_all(): users_ids = uids if not isinstance(users_ids, list): users_ids = list(users_ids) report.should_be_sent = len(users_ids) for user_id in users_ids: try: self.send(user_id, message, attachments, **kwargs) if message or attachments: report.sent += 1 except vk_api.VkApiError as error: report.errors.append('vk.com/id{}: {}'.format(user_id, error)) except ValueError: continue for uid in self.get_member_ids(admins=True, moders=True): self.send(uid, str(report)) broadcast_thread = Thread(target=send_all) broadcast_thread.start() broadcast_thread.join()

BroadcastReport - класс отчета

class BroadcastReport: def __init__(self): self.should_be_sent = 0 self.sent = 0 self.errors = [] def __str__(self): res = "# Отчет #" res += "\nПлан: {} сообщений ".format(self.should_be_sent) res += "\nРазослано: {} ".format(self.sent) if self.errors: res += "\nОшибки:" for i in self.errors: res += "\n- {}".format(i) return res

Со всеми участниками сообщества познакомились, теперь надо научиться их понимать. На этом, вроде бы, абстракция группы закончена.

"Пш… прием.." Часть 2.

Заставим бота слушать все сообщения от участников нашего сообщества.
Для этого создадим класс СhatHandler, который и будет этим заниматься
В параметрах:

  • group_manager это экземпляр только что нами написанного класса сообщества
  • command_observer распознает подключенные команды (но об этом в третьей части)

class ChatHandler(Handler): def __init__(self, group_manager, command_observer): super().__init__() self.longpoll = group_manager.get_longpoll() self.group = group_manager self.api = group_manager.get_api() self.command_observer = command_observer

Дальше, собственно, слушаем сообщения от пользователей и распознаем команды.

def listen(self): try: for event in self.longpoll.listen(): if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me: self.group.api.messages.markAsRead(peer_id=event.user_id) self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id) except ConnectionError: logging.error("I HAVE BEEN DOWNED AT {}".format(datetime.datetime.today())) self.longpoll.update_longpoll_server() def handle(self, user_id, message, attachments, **kwargs): member = self.group.get_member(user_id) self.group.update_members() self.command_observer.execute(member, message, attachments, self.group, **kwargs) def run(self): self.listen()

"Что ты там написал про мою ..?" Часть 3.

Распознаванием команд занимается отдельная подсистема реализуемая через паттерн "Наблюдатель".
Внимание, CommandObserver:

class CommandObserver(AbstractObserver): def execute(self, member, message, attachments, group, **kwargs): for command in self.commands: for trigger in command.triggers: body = command.get_body(trigger, message) if body is not None: group.api.messages.setActivity(user_id=member.id, type="typing") if command.system: kwargs.update({"trigger": trigger, "commands": self.commands}) else: kwargs.update({"trigger": trigger}) return command.proceed(member, body, attachments, group, **kwargs)

AbstractObserver

Опять же, вынесение сделано для будущего возможного расширения.

class AbstractObserver(metaclass=ABCMeta): def __init__(self): self.commands = [] def add(self, *args): for arg in args: self.commands.append(arg) @abstractmethod def execute(self, *args, **kwargs): pass

Ключевые слова команд определяются в переменной triggers класса команды (строка либо список строк) Но что же этот наблюдатель будет распознавать?
Вот и добрались до самого интересного — команды.
Каждая команда — независимый класс, потомок базового класса Command.
Все что требуется от команды — запустить метод proceed(), если в начале сообщения пользователя найдено его ключевое слово.

class Command(metaclass=ABCMeta): def __init__(self): self.triggers = [] self.description = "Empty description." self.system = False self.privilege = False self.activate_times = [] self.activate_days = set() self.autostart_func = self.proceed def proceed(self, member, message, attachments, group, **kwargs): raise NotImplementedError() @staticmethod def get_body(kw, message): if not isinstance(kw, list): kw = [kw, ] for i in kw: reg = '^ *(\\{}) *'.format(i) if re.search(reg, message): return re.sub(reg, '', message).strip(' ')

То есть, всё взаимодействие с участником группы ложится на плечи команды. Как видно из сигнатуры метода proceed(), каждая команда получает на вход ссылку на экземпляр участника группы, его сообщение (уже без ключевого слова), приложения и ссылку на экземпляр группы. Я считаю это наиболее верным решением, так как таким образом возможно создание шелла (Shell) для большей интерактивности.
(По правде говоря, для этого нужно будет либо вносить асинхронщину, потому что обработка проходит синхронно, либо каждое полученное сообщение обрабатывать в новом потоке, что отнюдь не выгодно)

Примеры реализации команд:

BroadcastCommand

class BroadcastCommand(Command): def __init__(self): super().__init__() self.triggers = ['.mb'] self.privilege = True self.description = "Рассылка сообщения всем участникам сообщества." def proceed(self, member, message, attachments, group, **kwargs): if member.id not in group.get_member_ids(admins=True, editors=True): group.send(member.id, "You cannot do this ^_^") return True last_message = group.get_last_message(member.id) group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs) return True

HelpCommand

class HelpCommand(Command): def __init__(self): super().__init__() self.commands = [] self.triggers = ['.h', '.help'] self.system = True self.description = "Показ этого сообщения." def proceed(self, member, message, attachments, group, **kwargs): commands = kwargs["commands"] help = "Реализованы следующие команды:\n\n" admins = group.get_member_ids(admins=True, moders=True) i = 0 for command in commands: if command.privilege and member.id not in admins: continue help += "{}) {}\n\n".format(i + 1, command.name()) i += 1 group.send(member.id, help) return True

"Мы же одна большая команда." Часть 4.

Теперь все эти модули и обработчики нужно объединить и настроить.
Еще один класс, пожалуйста!
Создаем фасад, который настроит нашего бота.

class VKManage: def __init__(self, token=None, login=None, password=None): self.session = BaseCommunicateVK.create_session(token, login, password, api_version) self.storage = DBProxy(DatabaseORM) self.group = Group(self.session, self.storage).setup().update_members() self.chat = ChatHandler(self.group, CommandObserver.get_observer()) def start(self): self.chat.run() def get_command(self, command_name): return { "рассылка участникам": BroadcastCommand(), "рассылка админам": AdminBroadcastCommand(), "помощь": HelpCommand(), "учет прогулов": SkippedLectionsCommand(), "расписание": TopicTimetableCommand().setup_account(self.bot.api), }.get(command_name) def connect_command(self, command_name): command = self.get_command(str(command_name).lower()) if command: self.chat.command_observer.add(command) return self def connect_commands(self, command_names): for i in command_names.split(','): self.connect_command(i.strip()) return self

Всегда самый противный, потому что может вылезти какая-нибудь неожиданность. Последний этап — запуск. Не в этот раз.

  • ConfigParser импортируется из core.settings.ConfigParser. По сути просто читает конфиг.
  • project_path импортируется из модуля settings в корне проекта.

    if __name__ == '__main__':
    config = ConfigParser(project_path) VKManage(token=config['token'], login=config['login'], password=config['password'])\ .connect_commands("помощь, рассылка участникам, рассылка админам, учет прогулов")\ .start()

На этом, кажется, всё.

На данный момент эта программа принесла пользу, как минимум, трем группам и, надеюсь, вам тоже принесет.

Развернуть можно бесплатно на Heroku, но это уже другая история.

Ссылки:

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть