Глава 7 Добавляем боту асинхронность

7.1 Что такое асинхронное программирование

По умолчанию созданные вами боты работают в параллельном, однопоточном режиме. Т.е. они выполняют заданные команды последовательно. Это не доставит никаких дополнительных трудностей если:

  • ваш бот выполняет простейшие команды длительность работы которых не превышает 1 секунды;
  • вашего бота использует всего несколько пользователей, и редко используют его одновременно.

Асинхронность в программировании — выполнение процесса в неблокирующем режиме системного вызова, что позволяет потоку программы продолжить обработку.

tproger.ru

7.2 Пример последовательного бота с поддержкой длительных команд

В этом разделе мы разберёмся с тем, как сделать нашего бота асинхронным, т.е. способным обрабатывать одновременно несколько команд, таким образом, что бы одна длительная команда, не блокировала работу боту на время её выполнения. Для демонстрации примера мы создадим бота с двумя простейшими командами:

  • fast - быстрая команда, время выполнение которой менее 1 секунды.
  • slow - команда, на выполнение которой боту требуется некоторое время, в нашем случае более 10 секунд.

Для создания бота выполните приведённый ниже код:

library(telegram.bot)
library(stringr)

updater <- Updater("Токен вашего бота")

# Функция с длительным временем вычислений
slow_fun <- function(bot, update) {
  
  # Сообщение о том, что начата работа длительного вычисления
  bot$sendMessage(
      update$message$chat_id,
      text = str_glue("Медленная функция, начало работы!\nID процесса: {Sys.getpid()}"),
      parse_mode = "Markdown"
  )
  
  # Добавляем паузу, для того, что бы исскусственно сделать функцию длительной
  Sys.sleep(10)
  
  # Сообщаем о том, что все вычисления выполнены
  bot$sendMessage(update$message$chat_id,
      text = str_glue("Медленная функция выполнена!\nID процесса: {Sys.getpid()}"),
      parse_mode = "Markdown")

}

# Функция с коротким временем вычислений
fast_fun <- function(bot, update) {
  
  # Просто отправляем сообщение
  bot$sendMessage(update$message$chat_id,
    text = str_glue("Быстрая функция, выполняется последовательный режим!\nID процесса: {Sys.getpid()}"),
    parse_mode = "Markdown")
  
}

# создаём обработчики
slow_hendler <- CommandHandler('slow', slow_fun)
fast_hendler <- CommandHandler('fast', fast_fun)

# добавляем обработчик в диспетчер
updater <- updater + slow_hendler + fast_hendler

# запускаем бота
updater$start_polling()

В многофункциональных ботах также можно разделить все команды на быстрые и медленные. Команды, которые бот выполняет мгновенно не требуют асинхронности, а вот команды реализующие длительные, дорогие вычисления, например запросы к API, лучше выполнять в параллельном, фоновом процессе не блокируя на период вычислений работу бота.

Для демонстрации проблемы давайте попробуем запустить бота, по приведённому выше примеру кода.

Изначально мы запустили медленную команду /slow, и не дожидаясь её выполнения отправили быструю команду /fast. Но, к выполнению команды /fast бот приступил только после того, как выполнил длительную команду /slow. Это видно из диалога, т.к. после того, как боту была отправлена команда /fast, он завершил работу команды /slow, сообщил нам “Медленная функция выполнена! ID процесса: 868”. Только после этого приступил к выполнению быстрой функции, сообщив “Быстрая функция, выполняется последовательный режим!ID процесса: 868”.

Представьте ситуацию, если у вас одновременно 5 пользователей отправят вперемешку быстрые и длительные команды. В качестве эксперимента давайте отправим боту очередь команд:

  1. /slow
  2. /slow
  3. /fast
  4. /slow
  5. /fast

В данном случае не важно, эти команды запустил один пользователь или 5, выполняться они будут последовательно. Несмотря на то, что 5ая команда является быстрой, пользователю, который её отправил придётся ждать выполнения всех 4ёх, предыдущих команд. Если изобразить этот процесс схематически, и допустить, что быстрая команда выполняется за 1 секунду, а медленная за 10, то получится следующее:

В последовательном режиме выполнения, несмотря на то, что 5ая по счёту команда требует всего 1 секунду на вычисления, она 31 секунду находится в ожидании, пока будут выполнены 4 предыдущие операции.

7.3 Многопоточность в R

В языке R есть множество реализаций многопоточности:

  • foreach
  • parallel
  • future

Это далеко не полный перечень пакетов, которые позволяют вам производить вычисления в многопоточном режиме используя язык R. Для реализации многопоточности при разработке telegram ботов наиболее удобным является пакет future, о котором я подробно рассказывал в уроке “Пакет future” курса “Циклы и функционалы в языке R”. Крайне рекомендую пройти весь курс “Циклы и функционалы в языке R” для большего погружения в тему многопоточности. Т.к. в данном курсе мы не будет подробно рассматривать параллельное программирование.

Пакет future позволяет вам, выполнять вычисления как в последовательном (обычном) режиме, так и в многопоточном. При этом данный пакет поддерживает несколько различных многопоточных режима:

Изменять план выполнения вычислений можно с помощью future::plan(). Наиболее простым, и удобным для использования при построении telegram ботов многопоточный план вычислений - multisession. Данный план позволяет запускать на вашем локальном ПК параллельные R сеансы в фоновом режиме, после выполнения вычислений их результат импортируется в основной R сеанс.

Далее, после переопределения плана вычислений, запустить вычисление в многопоточном режиме можно с помощью одноимённой функции future().

7.4 Используем future для построения асинхронного бота

Хочу обратить ваше внимание, когда мы в начале этого урока запустили бота в последовательном режиме, он с помощью функции Sys.getpid() получал, и выводил в сообщении идентификатор R сеанса, в ходе которого выполнялись все вычисления бота. Во всех представленных выше сообщение идентификатор процесса был одинаковым - 868. Это связано с тем, что все вычисления производились последовательно в рамках одного R сеанса.

Ниже я приведу пример, доработанного бота, таким образом, что бы функция /slow запускалась в фоновом, параллельном R сеансе, и не блокировала работу бота. При этом функцию /fast мы оставим без изменений, т.к. она выполняется ботом достаточно быстро, и скорее всего накладные расходы на создание фонового сеанса будут больше, чем вычисление самой функции.

library(telegram.bot)
library(stringr)

# Включаем параллельный план вычислений
future::plan('multisession')

updater <- Updater("Токен вашего бота")

# Функция с длительным временем вычислений
slow_fun <- function(bot, update) {
  
  # Запускаем выполнение кода в параллельной сессии
  future::future(
    {
      # Сообщение о том, что начата работа длительного вычисления
      bot$sendMessage(update$message$chat_id,
        text = str_glue("Медленная функция, начало работы!\nID процесса: {Sys.getpid()}"),
        parse_mode = "Markdown")
      
      # Добавляем паузу, для того, что бы исскусственно сделать функцию длительной
      Sys.sleep(10)
      
      # Сообщаем о том, что все вычисления выполнены
      bot$sendMessage(update$message$chat_id,
        text = str_glue("Медленная функция выполнена!\nID процесса: {Sys.getpid()}"),
        parse_mode = "Markdown")
    }
  )
  
}

# Функция с коротким временем вычислений
fast_fun <- function(bot, update) {
  
  
  # Просто отправляем сообщение
  bot$sendMessage(update$message$chat_id,
    text = str_glue("Быстрая функция, выполняется последовательный режим!\nID процесса: {Sys.getpid()}"),
    parse_mode = "Markdown")
  
}

# создаём обработчик
slow_hendler <- CommandHandler('slow', slow_fun)
fast_hendler <- CommandHandler('fast', fast_fun)

# добавляем обработчик в диспетчер
updater <- updater + slow_hendler + fast_hendler

# запускаем бота
updater$start_polling()

Что мы изменили в коде бота: 1. В начале скрипта, командой future::plan('multisession') мы переопределили план вычислений с последовательного на многопоточный. На самом деле весь код будет выполняться последовательно, кроме кода используемого внутри функции future(). 2. Весь код внутри функции бота slow_fun() мы завернули в future::future(), таким образом, при запуске медленной функции будет запускаться параллельный фоновый R процесс, и все вычисления данной функции будут выполняться там, не блокируя основной сеанс.

Теперь давайте попробуем в параллельном режиме запустить такую же очередь команд, как и в предыдущем последовательном примере:

Обратите внимание на то, что вычисление всех долгих команд /slow выполняются в разных процессах, бот выводит в каждом сообщение информацию “ID процесса: XX”. При этом вычисление быстрой команды /fast оба раза выполнялись в корневом процессе с id 868.

Схематически весь процесс обработки команд, даже при одновременном их запуске, теперь выглядит так:

В последовательном режиме выполнение всех команд заняло 32 секунды (10 + 10 + 1 + 10 + 1), в многопоточном всего 10 секунд. При этом даже в течении этих 10 секунд основной сеанс практически не был заблокирован, только на первые две секунды, когда в нём происходили вычисления быстрых команд /fast в последовательном режиме.

7.5 Управление количеством потоков

По умолчанию функция future::plan() при изменении плана с последовательного на многопоточный автоматически определяет оптимальное количество потоков, т.е. фоновых процессов, которые будут доступны в фоновом режиме. По умолчанию будет создано столько процессов, сколько ядер доступно в процессоре вашего ПК. Программно можно посмотреть количество доступных ядер следующим образом:

future::availableCores()
## system 
##      8

В моём случае одновременно будет доступно 8 фоновых R сеансов. Для большинства задач этого будет достаточно, но в функции future::plan() доступен аргумент workers, который позволяет самостоятельно задать необходимое количество фоновых процессов.

future::plan('multisession', workers = 4)

Приведённый выше код демонстрирует сокращение количества доступных процессов до 4ёх.

7.6 Функция promises::future_promise()

Пакет promises часто используется в связке с future органично дополняя его.

В приведённых выше практических примерах нам было достаточно количества созданных фоновых потоков. Но всегда есть вероятность того, что все потоки будут заняты. Например, мы включили мультисессионый режим вычислений с двумя потоками (workers = 4) и бот получил практически одновременно 3 команды /slow. В таком случае первые две команды уйдут выполняться в фоновые процессы, а третья, и последующие встанут в очередь ожидания свободного процесса, заняв при этом основной процесс. В такой ситуации до тех пор, пока не появится свободный процесс, основной процесс будет заблокирован, и даже при попытке отправить быструю функцию /fast, она будет также поставлена в очередь.

Решить эту проблему можно с помощью функции promises::future_promise(). Преимущество promises::future_promise() перед future::future(), заключается в том, что даже если нет свободных потоков, созданная очередь не будет блокировать основной поток, она будет создана так же в фоновом потоке. Для доработки приведенного ранее примера достаточно просто заменить в коде функции slow() функцию future::future() на promises::future_promise().

library(telegram.bot)
library(stringr)

# Включаем параллельный план вычислений
future::plan('multisession')

updater <- Updater("Токен вашего бота")

# Функция с длительным временем вычислений
slow_fun <- function(bot, update) {
  
  # Запускаем выполнение кода в параллельной сессии
  promises::future_promise(
    {
      # Сообщение о том, что начата работа длительного вычисления
      bot$sendMessage(update$message$chat_id,
        text = str_glue("Медленная функция, начало работы!\nID процесса: {Sys.getpid()}"),
        parse_mode = "Markdown")
      
      # Добавляем паузу, для того, что бы исскусственно сделать функцию длительной
      Sys.sleep(10)
      
      # Сообщаем о том, что все вычисления выполнены
      bot$sendMessage(update$message$chat_id,
        text = str_glue("Медленная функция выполнена!\nID процесса: {Sys.getpid()}"),
        parse_mode = "Markdown")
    }
  )
  
}

# Функция с коротким временем вычислений
fast_fun <- function(bot, update) {
  
  
  # Просто отправляем сообщение
  bot$sendMessage(update$message$chat_id,
    text = str_glue("Быстрая функция, выполняется последовательный режим!\nID процесса: {Sys.getpid()}"),
    parse_mode = "Markdown")
  
}

# создаём обработчик
slow_hendler <- CommandHandler('slow', slow_fun)
fast_hendler <- CommandHandler('fast', fast_fun)

# добаляем добавляем в диспетчер
updater <- updater + slow_hendler + fast_hendler

# запускаем бота
updater$start_polling()

7.7 Заключение

Итак, для того, что бы ваш бот умел одновременно обрабатывать входящие команды необходимо:

  1. Выявить список команд, требующих длительных вычислений.
  2. В начале скрипта добавить команду future::plan('multisession'), для того, что бы у вас была возможность запускать вычисление длительных операций в фоновых, параллельных R сеансах.
  3. Код методов бота, которые требуют длительных вычислений заворачиваем в future::future().
  4. Улучшить многопоточность бота можно заменив функцию future::future() на promises::future_promise(), которая оставляет свободным основной поток R, даже если все фоновые потоки заняты.