Урок 6 Многопоточность в R

6.1 Описание

Давайте представим ситуацию, что вам необходимо доствить 8 адресатам посылки. Если вы будете доставлять их одним курьером, то ему придётся по очереди посетить все 8 адресов, собрать подписи в качестве подтверждения о получении посылки, и принести вам подписанные документы. но если у вас в распоряжении будет 4 курьера, то вы сможете распределить каждому курьеру всего по 2 адреса, и процесс доставки займёт в 4 раза меньше времени.

Ок, а при чём тут вообще курьеры спросите вы. Во всех предыдущих уроках мы выполняли итерирование по элементов объектов в последовательном режиме, т.е. использовали одного курьера. Это преемлемый способ итерирования, но не самый эффективный. В этом уроке мы с вами разберёмся с тем, как задействовать сразу 4ёх курьеров, т.е. выполнять итерации в параллеьном, многопоточном режиме.

Так же мы можем сделать этот процесс ещё более эффективным, если будем не рандомно раздавать курьерам адресатов, а например распредим каждому курьеру по одному району, это балансировка нагрузки, её мы тоже затронем в этом уроке.

6.2 Видео

6.3 Тайм коды

00:00 Вступление.
00:51 Что такое многопоточность.
02:20 Какие пакеты мы будем использовать в ходе урока.
03:25 Используем foreach в последовательном режиме.
07:42 Аргументы конструкции foreach.
10:05 Управление объединением результатов итераций цикла foreach.
11:05 Выполнение foreach в многопоточном режиме.
12:41 Схема реализации многопоточности.
13:52 Возвращение к последовательному выполнению и ID процесса.
14:56 Бекенды к foreach.
15:38 Оператор %dorng%.
18:10 Параллельная реализация функций семейства apply.
20:52 Список функций пакетов parallel и pbapply.
21:54 Пакет furrr.
23:10 Соответствие функций пакета purrr и furrr.
23:50 Заключение.

6.4 Код

# многопоточные циклы -----------------------------------------------------
# install.packages("doSNOW")
# library(doSNOW)
# library(doParallel)
library(doFuture)

# функция длительного выполнения
pause <- function(min = 1, max = 3) {
  ptime <- runif(1, min, max)

  Sys.sleep(ptime)

  out <- list(
    pid = Sys.getpid(),
    pause_sec = ptime
  )
}

test <- pause()

# используем foreach 
# итерируемся сразу по двум объектам
system.time (
  {test2 <- foreach(min = 1:3, max = 2:4) %do% pause(min, max)}
)

# сумма длительностей пауз
sum(sapply(test2, '[[', i = 'pause_sec'))

# меняем функцию собирающую результаты каждой итерации
test3 <- foreach(min = 1:3, max = 2:4, .combine = dplyr::bind_rows) %do% pause(min, max)

# параллельный режим выполнения
# создаём кластер из четырёх ядер
#cl <- makeCluster(4)
#registerDoSNOW(cl)

options(future.rng.onMisuse = "ignore")
registerDoFuture()
plan('multisession', workers = 3)

# выполняем тот же код но в параллельном режиме
system.time (
  {
    par_test1 <- 
      foreach(min = 1:3, max = 2:4, .combine = dplyr::bind_rows) %dopar% {
      pause(min, max)
    }
  }
)

# останавливаем кластер
plan('sequential')

par_test1


# многопоточный вариант функций apply -------------------------------------

library(pbapply)
library(parallel)

# создаём кластер из четырёх ядер
cl <- makeCluster(3)

# пример с pbapply
par_test2 <- pblapply(rep(1, 3), FUN = pause, max = 3, cl = cl)
# пример с parallel
par_test3 <- parLapply(rep(1, 3), fun = pause, max = 3, cl = cl)

# останавливаем кластер
stopCluster(cl)

# многопоточный purrr -----------------------------------------------------
library(furrr)

plan('multisession', workers = 3)

par_test4 <- future_map2(1:3, 2:4, pause)

# останавливаем кластер
plan('sequential')

6.6 Тест