22.10.2009Работа с потоками (Thread) в руби
Введение
Сначала я расскажу, почему на сегодняшний день я не очень много работаю с подпроцессами на базе Thread, предпочитая им Kernel.fork. А потом покажу простой способ следить за потоками при работе приложения.
На текущий момент, основная проблема потоков -- это «ненастоящее» распределение ресурсов. Все потоки руби на самом деле находятся в одном системном потоке, который по очереди передаёт им управление. Это влечёт за собой полтора следствия.
Зависание
Когда имеешь дело с внешним оборудованием, сторонними библиотеками и серийными портами, зависание потока может случиться на самом низком уровне. Это можно симулировать небольшой программой на си -- block_thread.c:
#include <ruby.h>
VALUE rb_mBlockThread;
/*
* call-seq:
* BlockThread::cycle(interval=5)
*
* Блокирует текущий поток на <code>interval</code> секунд.
*
*/
VALUE bt_cycle(int argc, VALUE *argv, VALUE self) {
int i, max;
max = 5;
if (argc == 1) {
max = FIX2INT(argv[0]);
} else if (argc > 1) {
rb_raise(rb_eArgError, "Неправильное количество аргументов (%d вместо 0 или 1)");
}
for (i=0; i<max; i++) {
sleep(1);
}
return Qnil;
}
void Init_block_thread() {
/*
* Модуль содержит методы для демонстрации работы потока
*/
rb_mBlockThread = rb_define_module("BlockThread");
rb_define_module_function(rb_mBlockThread, "cycle", bt_cycle, -1);
}
Если вы никогда не расширяли руби с помощью си, поясню, что в этой программе мы создаём модуль BlockTread, в котором создаём метода класса cycle, который указанное число раз (по умолчанию 5) в цикле ждёт одну секунду. Напишем extconf.rb:
require "mkmf"
create_makefile("block_thread")
И программу на руби, в которой будут два потока, один из которых мы заблокируем на низком уровне block_threads.rb:
# coding: utf-8
require "block_thread.so"
t1 = Thread.new do
10.times { |i| puts i; sleep 0.1 }
end
t2 = Thread.new do
puts "Блокируем"
BlockThread.cycle
puts "Разблокируем"
end
t1.join
t2.join
Скомпилируем и запустим:
ruby extconf.rb
make
ruby block_threads.rb
И что же мы видим? Мы видим, как все потоки, включая основной, блокируются на пять секунд (или любое число секунд, которое мы укажем) И даже ctrl + c не в силах нам помочь. Помогает только ctrl + z и потом killall ...
В случае же с Kernel.fork, процессы действительно равномерно делят между собой ресурсы, и один подпроцесс не способен заблокировать всё.
Синхронизация
Я говорил про полторы проблемы. Об одной уже рассказал, а вторая известна давно -- попробуйте выполнить следующий код:
# coding: utf-8
$cnt = 0
t1 = Thread.new do
100000.times { $cnt += 1 }
end
t2 = Thread.new do
100000.times { $cnt += 1 }
end
t1.join
t2.join
puts "Without sync: #{$cnt}"
Если вы не используете руби 1.9, то вы получите неожиданный и каждый раз разный результат. Всё дело в том, что переключение между потоками происходит между элементарными операциями, а += состоит из трёх элементарных операций: достать значение, прибавить к нему число, записать значение. Чтобы этого не произошло, нужно либо использовать синхронизацию с помощью Mutex, либо руби 1.9. Ссылка на полный код для этой статьи в конце, т.к. я спешу перейти к более интересной части. :)
Слежение за потоками с помощью менеджера ThreadsWait
Совершенно недавно открыл для себя интересный способ следить за статусом пакетов в блокирующей и неблокирующей манере:
# coding: utf-8
require "thwait"
t1 = Thread.new do
10.times { |i| puts "поток 1 тик #{i}"; sleep 0.5 }
end
t2 = Thread.new do
10.times { |i| puts "поток 2 тик #{i}"; sleep 0.7 }
end
tw = ThreadsWait.new t1, t2
t3 = Thread.new do
10.times { |i| puts "поток 3 тик #{i}"; sleep 0.3 }
end
run = true
tw.join_nowait t3
while run do
begin
# Неблокирующее ожидание
puts "Закончил работу #{tw.next_wait(true).inspect }"
run = false
rescue ThreadsWait::ErrNoFinishedThread
puts "Ожидаем окончания работы одного из потоков"
sleep 0.5
end
end
# Блокирующее ожидание
tw.all_waits do |t|
puts "Закончил работу #{t.inspect}"
end
По-моему, весьма удобно, если вам нужно не просто ожидать окончания работы потоков, но ещё и делать что-то при этом.