20.10.2010Определение, запущен ли процесс
Пролог
Ого! Уже три месяца я ничего не писал в этот блог! Лето выдалось жаркое не только на погоду. Поскольку летом погода лучше, а световой день длиннее, было много работы. Причём работы связанной с поддержкой того, что уже и так нормально функционировало в прошлом сезоне. Ничего серьёзно нового не писалось активно, а значит и захватывающих сюжетов для статей не находилось.
Но теперь у меня появилась возможность писать кое-что новое. Поэтому есть, что рассказать.
Введение
Если вы любите процессы-демоны, как люблю их я, то, возможно, перед вами уже возникала задача определить, запущен ли уже такой демон, перед тем как создавать дочерний процесс. Об этом и будет сегодняшняя статья.
Баш в помощь
Предположим, что у нас есть простейший демон. Хорошо бы имя у него было уникальное, чтобы можно его потом было отыскать. Файл uniq_name_simple_daemon:
#!/usr/bin/env ruby
pid = fork do
begin
running = true
Signal.trap("TERM") do
running = false
end
while running
sleep 0.01
end
rescue Exception => e
puts e.to_s
puts e.backtrace.join "\n"
ensure
exit!
end
end
Мы всегда можем запускать с помощью другого скрипта, например на баше (simple_daemon_runner.sh):
#!/bin/bash
if ps ax | grep uniq_name_simple_daemon | grep -vq grep
then
echo "uniq_name_simple_daemon is already running"
else
echo "starting uniq_name_simple_daemon"
./uniq_name_simple_daemon
fi
На подобной команде будут базироваться все наши последующие методы. Тут, если кто не понял, мы фильтруем вывод ps ax сначала ища там имя нашего скрипта, а затем исключая из списка сам процесс поиска (команду grep). Ключ q позволяет нам получить код выхода, не выводя ничего на экран. То есть если строчка найдена, то запускаем первый блок, если нет, то второй.
Можно сделать такой же скрипт для остановки процесса (simple_daemon_stopper.sh):
#!/bin/bash
pid=$(ps ax | grep uniq_name_simple_daemon | grep -v grep | awk '{ print $1; }')
if [[ -n $pid ]]
then
echo "stopping uniq_name_simple_daemon"
kill -TERM $pid
else
echo "nothing to stop"
fi
Конечно же, при таком раскладе всегда есть возможность запустить нашего демона без помощи скриптов. И тогда проверка делаться не будет. В таком случае полезно проверять, запущен ли процесс уже внутри самого руби, перед тем, как отпочковать дочерний процесс.
Сам себе хозяин
В данном случае задача сводится к проверке наличия в памяти ещё одного процесса с таким же именем кроме текущего. Так же нужно уметь останавливать процесс с помощью того же файла. Вот, какое решение получилось у меня (uniq_name_auto_daemon):
#!/usr/bin/env ruby
ps_ax = `ps ax | grep #{File.basename(__FILE__)} | grep -v grep`.split("\n").map{ |l| l.strip.split(/\s+/) }.reject{ |l| l[0].to_i == Process.pid }
if ps_ax.any?
case ARGV[0]
when /stop/i
ps_ax.each do |l|
system "kill -TERM #{l[0]}"
end
when /kill/i
ps_ax.each do |l|
system "kill -KILL #{l[0]}"
end
else
puts "#{File.basename(__FILE__)} is already running. If you want to stop it, run './#{File.basename(__FILE__)} stop|kill'"
end
else
pid = fork do
begin
running = true
Signal.trap("TERM") do
running = false
end
while running
sleep 0.01
end
rescue Exception => e
puts e.to_s
puts e.backtrace.join "\n"
ensure
exit!
end
end
end
Во-первых, обходимся одним файлом, который никак иначе не запустить. Во-вторых, нигде не нужно хардкодить его имя. По-моему, очень удобно.
Оффтопик
С одной стороны, когда я пишу текст, то мне удобнее писать все термины по-русски и склонять их: «демоны», «руби», «баш», но с другой стороны это не поможет тому, кто будет искать решение похожей задачи.
Внутри примеров кода — наоборот, удобнее писать комментарии и тексты по-английски, чтобы не переключать раскладку, но как-то это не очень соответствует русскоязычном блогу.
Что же делать? :)
Материалы для самостоятельного изучения
28.10.2009Удалённые вызовы через систему распределённых объектов в руби (dRuby)
Введение
Некоторое время назад я писал о создании подпроцессов на руби. В числе прочего один из вопросов был об общении между собой демона и родительского процесса. Об одном из методов пойдёт речь сегодня
Постановка задачи
Не только программисты знают, что важна цель коммуникации. :) Если цель общения между основным процессом и демоном в том, чтобы вызывать методы на объектах друг друга, до давайте на этом и сосредоточимся.
Решение: DRb
Для удалённого обращения с объектами существует стандартная руби-библиотека dRuby, в которой находится модуль DRb, который мы и будем использовать. Ничего устанавливать не нужно. Согласно документации, совершенно прозрачным образом можно вызвать методы на удалённом объекте даже на другой машине. Объекты и ссылки на них передаются в формате Marshal.
Ну, довольно теории! Перейдём к практике. Для эмуляции параллельных процессов (возможно на разных машинах (!)) мы будем использовать два окна терминала. В одном запустим server.rb:
# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"
class RemoteObject
def remote_method_with_param(param)
puts "вызван метод на сервере с параметром #{param.inspect}"
case param.class.to_s
when "String"
puts "параметр типа строка"
param.reverse!
when "Array"
puts "параметр типа массив"
param.shift
else
puts "параметр оставшегося типа"
param.do_smth
end
end
end
$SAFE = 1 # Запретить eval() и eval-оподобные вызовы
DRb.start_service("druby://localhost:45678", RemoteObject.new)
DRb.thread.join
Здесь мы используем банальный Thread#join, чтобы при необходимости просто прервать выполнение. Но те, кто читал предыдущую статью, знают, что в это время можно делать что угодно и следить за потоком dRuby отдельно.
В другом терминале запустим клиентский код client.rb:
# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"
class MyString
def initialize(str)
@string = str
end
def do_smth
@string.reverse!
end
def inspect
"<#{@string}>"
end
end
rem_o = DRbObject.new_with_uri("druby://localhost:45678")
["строка", ["котик", "пёсик", "слоник"], MyString.new("суперстрока")].each do |obj|
puts "Вызов метода вернул: #{rem_o.remote_method_with_param(obj).inspect}"
puts "Параметр после вызова: #{obj.inspect}"
end
Вывод в терминалы будет следующий (я использую вывод для версии руби 1.9.1, потому что он нормально переворачивает кириллическую строку без колдовства) для сервера:
вызван метод на сервере с параметром "строка"
параметр типа строка
вызван метод на сервере с параметром ["котик", "пёсик", "слоник"]
параметр типа массив
вызван метод на сервере с параметром #<DRb::DRbUnknown:0x00000001248910 @name="MyString", @buf="\x04\bo:\rMyString\x06:\f@stringI\"\e\xD1\x81\xD1\x83\xD0\xBF\xD0\xB5\xD1\x80\xD1\x81\xD1\x82\xD1\x80\xD0\xBE\xD0\xBA\xD0\xB0\x06:\rencoding\"\nUTF-8">
параметр оставшегося типа
Клиент же упадёт с ошибкой:
Вызов метода вернул: "акортс"
Параметр после вызова: "строка"
Вызов метода вернул: "котик"
Параметр после вызова: ["котик", "пёсик", "слоник"]
(druby://localhost:45678) server.rb:17:in `remote_method_with_param': undefined method `do_smth' for #<DRb::DRbUnknown:0x00000001248910> (NoMethodError)
.....
Что, безусловно, прекрасно. Прекрасно, что упал не сервер. :) Понятно, что он не знает ничего про этот объект и не знает, как с ним обращаться.
Как видно из вывода, объекты передаются в виде копий. Нашим же третьим, самодельным объектом, мы можем исследовать две возможности: таки передавать копию объекта или передавать лишь ссылку на него, чтобы вызовы выполнялись на клиентской копии. Для первой возможности достаточно вынести определение класса в общедоступное для клиента и сервера место -- common.rb:
# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"
REM_URI = "druby://localhost:45678"
class MyStringCopied
def initialize(str)
@string = str
end
def do_smth
@string.reverse!
self
end
def inspect
"<<#{@string}>>"
end
end
class MyStringSingle
include DRb::DRbUndumped # это ключ :)
def initialize(str)
@string = str
end
def do_smth
@string.reverse!
self
end
def inspect
"<#{@string}>"
end
end
Добавим require "common.rb" в серверный код, а клиентский преобразится до такого:
# coding: utf-8
require "common"
rem_o = DRbObject.new_with_uri(REM_URI)
DRb.start_service # Это нужно для объекта, который не копируется при передаче
["строка",
["котик", "пёсик", "слоник"],
MyStringCopied.new("суперстрока"),
MyStringSingle.new("суперстрока без копий")].each do |obj|
puts "Вызов метода вернул: #{rem_o.remote_method_with_param(obj).inspect}"
puts "Параметр после вызова: #{obj.inspect}"
end
Как видно, мы сразу позаботились и о второй возможности, создав для неё ещё один класс. Секрет заключается во включении модуля DRb::DRbUndumped и старте ещё одного серверного процесса на клиенте (для вызовов методов объектов клиента удалённо) Клиентский вывод теперь выглядит так:
Вызов метода вернул: "акортс"
Параметр после вызова: "строка"
Вызов метода вернул: "котик"
Параметр после вызова: ["котик", "пёсик", "слоник"]
Вызов метода вернул: <<акортсрепус>>
Параметр после вызова: <<суперстрока>>
Вызов метода вернул: #<DRb::DRbObject:0x000000012588c8 @uri="druby://127.0.1.1:43998", @ref=9631244>
Параметр после вызова: <йипок зеб акортсрепус>
Если немножко почитать, и разобраться, какие объекты можно и нужно «маршализировать», а какие нельзя или не нужно, то получается вполне себе прекрасный инструмент. Который, повторюсь, входит в стандартную библиотеку и не требует никаких внешних зависимостей.
Материалы для самостоятельного изучения
22.10.2009Работа с потоками (Thread) в руби
Введение
Сначала я расскажу, почему на сегодняшний день я не очень много работаю с подпроцессами на базе Thread, предпочитая им Kernel.fork. А потом покажу простой способ следить за потоками при работе приложения.
На текущий момент, основная проблема потоков -- это «ненастоящее» распределение ресурсов. Все потоки руби на самом деле находятся в одном системном потоке, который по очереди передаёт им управление. Это влечёт за собой полтора следствия.
Зависание
Когда имеешь дело с внешним оборудованием, сторонними библиотеками и серийными портами, зависание потока может случиться на самом низком уровне. Это можно симулировать небольшой программой на си -- block_thread.c:
#include <ruby.h>
VALUE rb_mBlockThread;
/*
* call-seq:
* BlockThread::cycle(interval=5)
*
* Блокирует текущий поток на <code>interval</code> секунд.
*
*/
VALUE bt_cycle(int argc, VALUE *argv, VALUE self) {
int i, max;
max = 5;
if (argc == 1) {
max = FIX2INT(argv[0]);
} else if (argc > 1) {
rb_raise(rb_eArgError, "Неправильное количество аргументов (%d вместо 0 или 1)");
}
for (i=0; i<max; i++) {
sleep(1);
}
return Qnil;
}
void Init_block_thread() {
/*
* Модуль содержит методы для демонстрации работы потока
*/
rb_mBlockThread = rb_define_module("BlockThread");
rb_define_module_function(rb_mBlockThread, "cycle", bt_cycle, -1);
}
Если вы никогда не расширяли руби с помощью си, поясню, что в этой программе мы создаём модуль BlockTread, в котором создаём метода класса cycle, который указанное число раз (по умолчанию 5) в цикле ждёт одну секунду. Напишем extconf.rb:
require "mkmf"
create_makefile("block_thread")
И программу на руби, в которой будут два потока, один из которых мы заблокируем на низком уровне block_threads.rb:
# coding: utf-8
require "block_thread.so"
t1 = Thread.new do
10.times { |i| puts i; sleep 0.1 }
end
t2 = Thread.new do
puts "Блокируем"
BlockThread.cycle
puts "Разблокируем"
end
t1.join
t2.join
Скомпилируем и запустим:
ruby extconf.rb
make
ruby block_threads.rb
И что же мы видим? Мы видим, как все потоки, включая основной, блокируются на пять секунд (или любое число секунд, которое мы укажем) И даже ctrl + c не в силах нам помочь. Помогает только ctrl + z и потом killall ...
В случае же с Kernel.fork, процессы действительно равномерно делят между собой ресурсы, и один подпроцесс не способен заблокировать всё.
Синхронизация
Я говорил про полторы проблемы. Об одной уже рассказал, а вторая известна давно -- попробуйте выполнить следующий код:
# coding: utf-8
$cnt = 0
t1 = Thread.new do
100000.times { $cnt += 1 }
end
t2 = Thread.new do
100000.times { $cnt += 1 }
end
t1.join
t2.join
puts "Without sync: #{$cnt}"
Если вы не используете руби 1.9, то вы получите неожиданный и каждый раз разный результат. Всё дело в том, что переключение между потоками происходит между элементарными операциями, а += состоит из трёх элементарных операций: достать значение, прибавить к нему число, записать значение. Чтобы этого не произошло, нужно либо использовать синхронизацию с помощью Mutex, либо руби 1.9. Ссылка на полный код для этой статьи в конце, т.к. я спешу перейти к более интересной части. :)
Слежение за потоками с помощью менеджера ThreadsWait
Совершенно недавно открыл для себя интересный способ следить за статусом пакетов в блокирующей и неблокирующей манере:
# coding: utf-8
require "thwait"
t1 = Thread.new do
10.times { |i| puts "поток 1 тик #{i}"; sleep 0.5 }
end
t2 = Thread.new do
10.times { |i| puts "поток 2 тик #{i}"; sleep 0.7 }
end
tw = ThreadsWait.new t1, t2
t3 = Thread.new do
10.times { |i| puts "поток 3 тик #{i}"; sleep 0.3 }
end
run = true
tw.join_nowait t3
while run do
begin
# Неблокирующее ожидание
puts "Закончил работу #{tw.next_wait(true).inspect }"
run = false
rescue ThreadsWait::ErrNoFinishedThread
puts "Ожидаем окончания работы одного из потоков"
sleep 0.5
end
end
# Блокирующее ожидание
tw.all_waits do |t|
puts "Закончил работу #{t.inspect}"
end
По-моему, весьма удобно, если вам нужно не просто ожидать окончания работы потоков, но ещё и делать что-то при этом.
Материалы для самостоятельного изучения
10.06.2009Ruby daemon, или как сделать демона на руби
Задача
Иногда в процессе работы с разными сторонними библиотеками, которые содержат тяжелые блокирующие методы, не обойтись обычным Thread. В таком случае на помощь приходят демоны :) Вот несколько приемов, которые я освоил в работе с нимим.
Базовый механизм
Запустить демона можно с помощью метода Kernel.fork, передав ему блок. Метод возвращает pid процесса, который можно записать в файл и просто использовать в дальнейшем.
pid = fork do
puts "from daemon"
exit!(1)
end
Определение статуса
Я не нашёл удобного способа определения методами руби, запущен ли процесс. Есть возможность фильтровать вывод ps ax, ища в нём pid процесса. Но есть метод Process.waitpid, который можно использовать хитрым образом. Так же для будущих задач, упакуем наш код в класс:
require 'timeout'
class Daemon
class << self
def start
@pid = fork do
puts "from daemon"
sleep 1
exit!(1)
end
end
def running?
if @pid
begin
Timeout::timeout(0.01) do
Process.waitpid(@pid)
if $?.exited?
return false
end
end
rescue Timeout::Error
end
return true
else
return false
end
end
end
end
Daemon.start
puts "running: #{Daemon.running?}"
sleep 1.5
puts "running: #{Daemon.running?}"
Сообщение об ошибках
Предположим, что у нашего демона есть некоторый процесс инициализации, и мы хотим знать, завершился ли он или произошла ошибка до того, как покинем метод Daemon.start.
Для передачи подобных сообщений хорошо подходит IO.pipe. Для двустороннего общения нужно создавать два канала, но нам хватит и одного:
require 'timeout'
class Daemon
def initialize
puts "from daemon: initializing"
end
class << self
def start
@rd, @wr = IO.pipe
@pid = fork do
@rd.close
begin
dmn = new
@wr.write "ok"
@wr.close
sleep 1
rescue Exception => e
@wr.write e.to_s
@wr.close
ensure
exit!(1)
end
end
@wr.close
str = @rd.read
if str == "ok"
puts "daemon started ok"
else
puts "error while initializing daemon: #{str}"
end
@rd.close
end
def running?
if @pid
begin
Timeout::timeout(0.01) do
Process.waitpid(@pid)
if $?.exited?
return false
end
end
rescue Timeout::Error
end
return true
else
return false
end
end
end
end
Daemon.start
puts "running: #{Daemon.running?}"
sleep 1.5
puts "running: #{Daemon.running?}"
Остановка
У каждого демона (для того они обычно и создаются) есть циклическая часть. Нам хотелось бы запускать и останавливать процесс тогда, когда нам нужно. Если с запуском всё понятно, то для остановки потребуется ещё одна вещь. Ведь в тот момент, когда мы создали демона, он создает копии всех переменных и дальнейшее их изменение внутри и снаружи демона становится независимым. Ещё один способ общения с демоном — сигнал:
require 'timeout'
class Daemon
def initialize
puts "from daemon: initializing"
@cnt = 0
end
def main_loop
@cnt += 1
puts "from daemon: running loop ##{@cnt}"
sleep 0.1
end
class << self
def start
@rd, @wr = IO.pipe
@pid = fork do
@rd.close
running = true
Signal.trap("TERM") do
running = false
end
begin
dmn = new
@wr.write "ok"
@wr.close
while running
dmn.main_loop
end
rescue Exception => e
@wr.write e.to_s
@wr.close
ensure
exit!(1)
end
end
@wr.close
str = @rd.read
if str == "ok"
puts "daemon started ok"
else
puts "error while initializing daemon: #{str}"
end
@rd.close
end
def stop
unless @pid.nil?
Process.kill("TERM", @pid)
@pid = nil
end
end
def running?
if @pid
begin
Timeout::timeout(0.01) do
Process.waitpid(@pid)
if $?.exited?
return false
end
end
rescue Timeout::Error
end
return true
else
return false
end
end
end
end
Daemon.start
puts "running: #{Daemon.running?}"
sleep 1
Daemon.stop
puts "running: #{Daemon.running?}"
Теперь, если наследовать от этого класса свой класс и переопределить методы initialize и main_loop, получится вполне себе демон :)
Для самостоятельного изучения
Есть, конечно, ещё недостатки. Например:
- Если ошибка возникает в main_loop, то канал вывода уже закрыт. А если не закрывать канал, то метод IO#read не позволит нам выйти из метода start. Что делать?
- Если нужно периодически общаться с демоном общирными объемами информации, что делать? (я в своей задаче использовал TCPSocket, но это, ведь, не панацея)
- Хорошо бы хранить pid в pid-файле на случай нашествия зомби. И, соответственно, обрабатывать возникающие зомбо-проблемы.
Но это я оставлю на самостоятельное решение пытливым читателям.