• ГЛАВА 4 Именованные и неименованные каналы
  • 4.1. Введение
  • 4.2. Приложение типа клиент-сервер
  • 4.3. Программные каналы
  • Пример
  • 4.4. Двусторонние каналы
  • 4.5. Функции popen и pclose
  • Пример
  • 4.6. Именованные каналы (FIFO)
  • Пример
  • Пример: неродственные клиент и сервер
  • 4.7. Некоторые свойства именованных и неименованных каналов
  • 4.8. Один сервер, несколько клиентов
  • Открытие файла и отправка его в FIFO клиента
  • Атомарность записи в FIFO
  • FIFO и NFS
  • 4.9. Последовательные и параллельные серверы
  • Атака типа «отказ в обслуживании»
  • 4.10. Потоки и сообщения
  • 4.11. Ограничения программных каналов и FIFO
  • 4.12. Резюме
  • Упражнения
  • ГЛАВА 5 Очереди сообщений Posix
  • 5.1. Введение
  • 5.2. Функции mq_open, mq_close, mq_unlink
  • Пример: программа mqcreate1
  • Пример: программа mqunlink
  • 5.3. Функции mq_getattr и mq_setattr
  • Пример: программа mqgetattr
  • Пример: программа mqcreate
  • 5.4. Функции mqsend и mqreceive
  • Пример: программа mqsend
  • Пример: программа mqreceive
  • 5.5. Ограничения очередей сообщений
  • Пример: программа mqsysconf
  • 5.6. Функция mq_notify
  • Пример: простая программа с уведомлением
  • Сигналы Posix: функции типа Async-Signal-Safe
  • Пример: уведомление сигналом
  • Пример: уведомление сигналом с отключением блокировки
  • Пример: уведомление с использованием sigwait вместо обработчика
  • Пример: очереди сообщений Posix и функция select
  • Пример: запуск нового потока
  • 5.7. Сигналы реального времени Posix
  • Пример
  • Функция signal_rt
  • 5.8. Реализация с использованием отображения в память
  • Функция mq_open
  • Функция mq_close
  • Функция mq_unlink
  • Функция mq_getattr
  • Функция mq_setattr
  • Функция mq_notify
  • Функция mq_send
  • Функция mq_receive
  • 5.9. Резюме
  • Упражнения
  • ГЛАВА 6 Очереди сообщений System V
  • 6.1. Введениеы
  • 6.2. Функция msgget
  • 6.3. Функция msgsnd
  • 6.4. Функция msgrcv
  • 6.5. Функция msgctl
  • Пример
  • 6.6. Простые примеры
  • Программа msgcreate
  • Программа msgsnd
  • Программа msgrcv
  • Программа msgrmid
  • Примеры
  • Программа msgrcvid
  • 6.7. Пример программы клиент-сервер
  • 6.8. Мультиплексирование сообщений
  • Пример: одна очередь на приложение
  • Пример: одна очередь для каждого клиента
  • 6.9. Использование select и poll с очередями сообщений
  • 6.10. Ограничения, накладываемые на очереди сообщений
  • Пример
  • 6.11.Резюме
  • Упражнения
  • ЧАСТЬ 2

    ОБМЕН СООБЩЕНИЯМИ

    ГЛАВА 4

    Именованные и неименованные каналы

    4.1. Введение

    Неименованные каналы — это самая первая форма IPC в Unix, появившаяся еще в 1973 году в третьей версии (Third Edition [17]). Несмотря на полезность во многих случаях, главным недостатком неименованных каналов является отсутствие имени, вследствие чего они могут использоваться для взаимодействия только родственными процессами. Это было исправлено в Unix System III (1982) добавлением каналов FIFO, которые иногда называются именованными каналами. Доступ и к именованным каналам, и к неименованным организуется с помощью обычных функций read и write.

    ПРИМЕЧАНИЕ

    Программные (неименованные) каналы в принципе могут использоваться неродственными процессами, если предоставить им возможность передавать друг другу дескрипторы (см. раздел 15.8 этой книги или раздел 13.7 [24]). Однако на практике эти каналы обычно используются для осуществления взаимодействия между процессами, у которых есть общий предок.

    В этой главе описываются детали, касающиеся создания и использования программных каналов и каналов FIFO. Мы рассмотрим пример простейшего сервера файлов, а также обратим внимание на некоторые детали модели клиент-сервер, в частности постараемся определить количество требуемых каналов IPC, сравним последовательные серверы с параллельными и неструктурированные потоки байтов с сообщениями.

    4.2. Приложение типа клиент-сервер

    Пример приложения модели клиент-сервер приведен на рис. 4.1. Именно на него мы будем ссылаться в тексте этой главы и главы 6 при необходимости проиллюстрировать использование программных каналов, FIFO и очередей сообщений System V.

    Клиент считывает полное имя (файла) из стандартного потока ввода и записывает его в канал IPC. Сервер считывает это имя из канала IPC и производит попытку открытия файла на чтение. Если попытка оказывается успешной, сервер считывает файл и записывает его в канал IPC. В противном случае сервер возвращает клиенту сообщение об ошибке. Клиент считывает данные из канала IPC и записывает их в стандартный поток вывода. Если сервер не может считать файл, из канала будет считано сообщение об ошибке. В противном случае будет принято содержимое файла. Две штриховые линии между клиентом и сервером на рис. 4.1 представляют собой канал IPC. 

    Рис. 4.1. Пример приложения типа клиент-сервер

    4.3. Программные каналы

    Программные каналы имеются во всех существующих реализациях и версиях Unix. Канал создается вызовом pipe и предоставляет возможность однонаправленной (односторонней) передачи данных:

    #include <unistd.h>

    int pipe(int fd[2]);

    /* возвращает 0 в случае успешного завершения. –1 – в случае ошибки:*/

    Функция возвращает два файловых дескриптора: fd[0] и fd[1], причем первый открыт для чтения, а второй — для записи.

    ПРИМЕЧАНИЕ

    Некоторые версии Unix, в частности SVR4, поддерживают двусторонние каналы (full-duplex pipes). В этом случае канал открыт на запись и чтение с обоих концов. Другой способ создания двустороннего канала IPC заключается в вызове функции socketpair, описанной в разделе 14.3 [24]. Его можно использовать в большинстве современных версий Unix. Однако чаще всего каналы используются при работе с интерпретатором команд, где уместно использование именно односторонних каналов.

    Стандарты Posix.1 и Unix 98 требуют только односторонних каналов, и мы будем исходить из этого.

    Для определения типа дескриптора (файла, программного канала или FIFO) можно использовать макрос S_ISFIFO. Он принимает единственный аргумент: поле st_mode структуры stat и возвращает значение «истина» (ненулевое значение) или «ложь» (ноль). Структуру stat для канала возвращает функция fstat. Для FIFO структура возвращается функциями fstat, lstat и stat.

    На рис. 4.2 изображен канал при использовании его единственным процессом.

    Рис. 4.2. Канал в одиночном процессе


    Хотя канал создается одним процессом, он редко используется только этим процессом (пример канала в одиночном процессе приведен в листинге 5.12). Каналы обычно используются для связи между двумя процессами (родительским и дочерним) следующим образом: процесс создает канал, а затем вызывает fork, создавая свою копию — дочерний процесс (рис. 4.3). Затем родительский процесс закрывает открытый для чтения конец канала, а дочерний, в свою очередь, — открытый на запись конец канала. Это обеспечивает одностороннюю передачу данных между процессами, как показано на рис. 4.4.

    Рис. 4.3. Канал после вызова fork

    Рис. 4.4. Канал между двумя процессами 


    При вводе команды наподобие

    who|sort|lp

    в интерпретаторе команд Unix интерпретатор выполняет вышеописанные действия для создания трех процессов с двумя каналами между ними. Интерпретатор также подключает открытый для чтения конец каждого канала к стандартному потоку ввода, а открытый на запись — к стандартному потоку вывода. Созданный таким образом канал изображен на рис. 4.5.

    Рис. 4.5. Каналы между тремя процессами при конвейерной обработке 


    Все рассмотренные выше каналы были однонаправленными (односторонними), то есть позволяли передавать данные только в одну сторону. При необходимости передачи данных в обе стороны нужно создавать пару каналов и использовать каждый из них для передачи данных в одну сторону. Этапы создания двунаправленного канала IPC следующие:

    1. Создаются каналы 1 (fd1[0] и fd1[1]) и 2 (fd2[0] и fd2[1]).

    2. Вызов fork.

    3. Родительский процесс закрывает доступный для чтения конец канала 1 (fd1[0]).

    4. Родительский процесс закрывает доступный для записи конец канала 2 (fd2[1]).

    5. Дочерний процесс закрывает доступный для записи конец канала 1 (fd1[1]).

    6. Дочерний процесс закрывает доступный для чтения конец канала 2 (fd2[0]).

    Текст программы, выполняющей эти действия, приведен в листинге 4.1. При этом создается структура каналов, изображенная на рис. 4.6.

    Рис. 4.6. Двусторонняя передача данных по двум каналам

    Пример

    Давайте напишем программу, описанную в разделе 4.2, с использованием каналов. Функция main создает два канала и вызывает fork для создания копии процесса. Родительский процесс становится клиентом, а дочерний — сервером. Первый канал используется для передачи полного имени от клиента серверу, а второй — для передачи содержимого файла (или сообщения об ошибке) от сервера клиенту. Таким образом мы получаем структуру, изображенную на рис. 4.7. 

    Рис. 4.7. Реализация рис. 4.1 с использованием двух каналов


    Обратите внимание на то, что мы изображаем на рис. 4.7 два канала, соединяющих сервер с клиентом, но оба канала проходят через ядро, поэтому каждый передаваемый байт пересекает интерфейс ядра дважды: при записи в канал и при считывании из него.

    В листинге 4.1[1] приведена функция main для данного примера.

    Листинг 4.1. Функция main для приложения клиент-сервер, использующего два канала

    //pipe/mainpipe.c

    1  #include "unpipc.h"

    2  void client(int, int), server(int, int);

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int pipe1[2], pipe2[2]:

    7   pid_t childpid;

    8   Pipe(pipe1); /* создание двух каналов */

    9   Pipe(pipe2);

    10  if ((childpid = Fork()) == 0) { /* child */

    11   Close(pipe1[1]);

    12   Close(pipe2[0]);

    13   server(pipe1[0], pipe2[1]);

    14   exit(0);

    15  }

    16  /* родитель */

    17  Close(pipel[0]);

    18  Close(pipe2[1]);

    19  client(pipe2[0], pipel[1]);

    20  Waitpid(childpid, NULL, 0); /* ожидание завершения дочернего процесса */

    21  exit(0);

    22 }

    Создание каналов, вызов fork

    8-19 Создаются два канала и выполняются шесть шагов, уже упоминавшиеся в отношении рис. 4.6. Родительский процесс вызывает функцию client (листинг 4.2), а дочерний — функцию server (листинг 4.3).

    Использование waitpid дочерним процессом

    20 Процесс-сервер (дочерний процесс) завершает свою работу первым, вызывая функцию exit после завершения записи данных в канал. После этого он становится процессом-зомби. Процессом-зомби называется дочерний процесс, завершивший свою работу, родитель которого еще функционирует, но не получил сигнал о завершении работы дочернего процесса. При завершении работы дочернего процесса ядро посылает его родителю сигнал SIGCHLD, но родитель его не принимает и этот сигнал по умолчанию игнорируется. После этого функция client родительского процесса возвращает управление функции main, закончив Считывание данных из канала. Затем родительский процесс вызывает waitpid для получения информации о статусе дочернего процесса (зомби). Если родительский процесс не вызовет waitpid, а просто завершит работу, клиент будет унаследован процессом init, которому будет послан еще один сигнал SIGCHLD.

    Функция client приведена в листинге 4.2.

    Листинг 4.2. Функция client для приложения типа клиент-сервер с двумя каналами

    //pipe/client.с

    1  #include "unpipc.h"

    2  void

    3  client(int readfd, int writefd)

    4  {

    5   size_t len;

    6   ssize_t n;

    7   char buff[MAXLINE];

    8   /* получение полного имени файла */

    9   Fgets(buff, MAXLINE, stdin);

    10  len = strlen(buff); /* fgets() гарантирует завершающий нулевой байт */

    11  if (buff[Len-l] == ' \n' )

    12   len--; /* удаление перевода строки из fgets() */

    13  /* запись полного имени в канал IPC */

    14  Write(writefd, buff, len);

    15  /* считывание из канала, вывод в stdout */

    16  while ((n = Read(readfd, buff, MAXLINE)) > 0)

    17   Write(STDOUT_FILENO, buff, n);

    18 }

    Считывание полного имени из стандартного потока ввода

    8-14 Полное имя файла считывается из стандартного потока ввода и записывается в канал после удаления завершающего символа перевода строки, возвращаемого функцией fgets.

    Копирование из канала в стандартный поток вывода

    15-17 Затем клиент считывает все, что сервер направляет в канал, и записывает эти данные в стандартный поток вывода. Ожидается, что это будет содержимое файла, но в случае его отсутствия будет принято и записано в стандартный поток вывода сообщение об ошибке.

    В листинге 4.3 приведена функция server.

    Листинг 4.3. Функция server для приложения клиент-сервер с двумя каналами

    //pipe/server.c

    1  #include "unpipc.h"

    2  void

    3  server(int readfd, int writefd)

    4  {

    5   int fd;

    6   ssize_t n;

    7   char buff[MAXLINE+1];

    8   /* получение полного имени из канала IPC */

    9   if ((n = Read(readfd, buff, MAXLINE)) == 0)

    10   err_quit("end-of-file while reading pathname"):

    11  buff[n] = '\0'; /* полное имя завершается 0 */

    12  if ((fd = open(buff, O_RDONLY)) < 0) {

    13   /* 4error: must tell client */

    14   snprintf(buff + n, sizeof(buff) – n, ": can't open. %s\n".

    15   strerror(errno)):

    16   n = strlen(buff);

    17   Write(writefd, buff, n);

    18  } else {

    19   /* файл успешно открыт и копируется в канал */

    20   while ( (n = Read(fd, buff, MAXLINE)) > 0)

    21    Write(writefd, buff, n);

    22   Close(fd);

    23  }

    24 }

    Считывание полного имени файла из канала

    8-11 Записанное в канал клиентом имя файла считывается сервером и дополняется завершающим символом с кодом 0 (null-terminated). Обратите внимание, что функция read возвращает данные, как только они помещаются в поток, не ожидая накопления некоторого их количества (MAXLINE в данном примере).

    Открытие файла, обработка возможной ошибки

    12-17 Файл открывается для чтения и при возникновении ошибки сообщение о ней возвращается клиенту с помощью канала. Для получения строки с соответствующим значению переменной errno сообщением об ошибке вызывается функция strerror (в книге [24, с. 690-691] вы найдете более подробный рассказ об этой функции).

    Копирование из файла в канал

    18-23 При успешном завершении работы функции open содержимое файла копируется в канал.

    Ниже приведен результат работы программы в случае наличия файла с указанным полным именем и в случае возникновения ошибок:

    solaris % mainpipe /etc/inet/ntp.conf файл, состоящий из двух строк

    multicastclient 224.0.1.1

    driftfile /etc/inet/ntp.drift

    solaris % mainpipe /etc/shadow          фaйл, на чтение которого нет разрешения

    /etc/shadow: can't open. Permission denied

    solaris % mainpipe /no/such/file        несуществующий файл

    /no/such/file: can't open. No such file or directory

    4.4. Двусторонние каналы

    В предыдущем разделе мы отметили, что во многих системах реализованы двусторонние каналы. В Unix SVR4 это обеспечивается самой функцией pipe, а во многих других ядрах — функцией socketpair. Но что в действительности представляет собой двусторонний канал? Представим себе сначала однонаправленный канал, изображенный на рис. 4.8.

    Рис. 4.8. Односторонний канал


    Двусторонний канал мог бы быть реализован так, как это изображено на рис. 4.9. В этом случае неявно предполагается существование единственного буфера, в который помещается все, что записывается в канал (с любого конца, то есть дескриптора), и при чтении из канала данные просто считываются из буфера.

    Рис. 4.9. Одна из возможных реализаций двустороннего канала (неправильная)


    Такая реализация вызовет проблемы, например, в программе листинга А.14. Здесь требуется двусторонняя передача информации, причем потоки данных должны быть независимы. В противном случае некоторый процесс, записав данные в канал и перейдя затем в режим чтения из этого же канала, рискует считать обратно те же данные, которые были им только что туда записаны.

    На рис. 4.10 изображена правильная реализация двустороннего канала.

    Рис. 4.10. Правильная реализация двустороннего канала


    Здесь двусторонний канал получается из объединения двух односторонних. Все данные, записываемые в fd[1], будут доступны для чтения из fd[0], а данные, записываемые в fd[0], будут доступны для чтения из fd[1].

    Программа в листинге 4.4 иллюстрирует использование одного двустороннего канала для двусторонней передачи информации.

    Листинг 4.4. Двусторонняя связь через двусторонний канал

    //pipe/fduplex.c

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   int fd[2], n;

    6   char c;

    7   pid_t childpid;

    8   Pipe(fd); /* предполагается двусторонний канал (напр., SVR4) */

    9   if ((childpid = Fork()) == 0) { /* child */

    10   sleep(3):

    11   if ((n = Read(fd[0], &c, 1)) != 1)

    12    err_quit("child: read returned %d", n);

    13   printf("child read %c\n", c):

    14   Write(fd[0], "c", 1);

    15   exit(0);

    16  }

    17  /* родитель */

    18  Write(fd[1], "p", 1);

    19  if ((n = Read(fd[1], &c, 1)) != 1)

    20   err_quit("parent: read returned %d", n):

    21  printf("parent read %c\n", c);

    22  exit(0);

    23 }

    В этой программе сначала создается двусторонний канал, затем делается системный вызов fork. Породивший процесс записывает символ р в канал, а затем считывает из канала данные. Дочерний процесс ждет три секунды, считывает символ из канала, а потом записывает туда символ с. Задержка чтения для дочернего процесса позволяет породившему процессу вызвать read первым — таким образом мы можем узнать, не будет ли им считан обратно только что записанный символ.

    При запуске этой программы в Solaris 2.6, в которой организована поддержка двусторонних каналов, мы получим ожидаемый результат:

    solaris % fduplex

    child read p

    parent read с

    Символ р передается по одному из двух односторонних каналов, изображенных на рис. 4.10, а именно по верхнему каналу. Символ с передается по нижнему одностороннему каналу. Родительский процесс не считывает обратно записанный им в канал символ р (что и требуется).

    При запуске этой программы в Digital Unix 4.0B, в которой по умолчанию создаются односторонние каналы (двусторонние каналы — как в SVR4 — будут создаваться в том случае, если при компиляции указать специальные параметры), мы увидим результат, ожидаемый для одностороннего канала:

    alpha % fduplex

    read error: Bad file number

    alpha % child read p

    write error: Bad file number

    Родительский процесс записывает символ р, который успешно считывается дочерним процессом, однако при попытке считывания из канала (дескриптор fd[l]) родительский процесс прерывается с ошибкой, как и дочерний процесс, при попытке записи в канал (дескриптор fd[0]). Вспомните рис. 4.8. Функция read возвращает код ошибки EBADF, означающий, что дескриптор не открыт для чтения. Аналогично write возвращает тот же код ошибки, если дескриптор не был открыт на запись.

    4.5. Функции popen и pclose

    Другим примером использования каналов является имеющаяся в стандартной библиотеке ввода-вывода функция popen, которая создает канал и запускает другой процесс, записывающий данные в этот канал или считывающий их из него:

    #include <stdio.h>

    FILE *popen(const char *соmmаnd, const char *tуре);

    /* Возвращает указатель FILE * в случае успешного выполнения, NULL – в случае ошибки */

    int pclose(FILE *strеаm);

    /* Возвращает код завершения команды интерпретатора или –1 – в случае ошибки */

    Аргумент command представляет собой команду интерпретатора. Он обрабатывается программой sh (обычно это интерпретатор Bourne shell), поэтому для поиска исполняемого файла, вызываемого командой command, используется переменная PATH. Канал создается между вызывающим процессом и указанной командой. Возвращаемое функцией popen значение представляет собой обычный указатель на тип FILE, который может использоваться для ввода или для вывода в зависимости от содержимого строки type:

    ■ если type имеет значение r, вызывающий процесс считывает данные, направляемые командой command в стандартный поток вывода;

    ■ если type имеет значение w, вызывающий процесс записывает данные в стандартный поток ввода команды command.

    Функция pclose закрывает стандартный поток ввода-вывода stream, созданный командой popen, ждет завершения работы программы и возвращает код завершения, принимаемый от интерпретатора.

    ПРИМЕЧАНИЕ

    Информацию о реализациях popen и pclose можно найти в разделе 14.3 [21].

    Пример

    В листинге 4.5 изображено еще одно решение задачи с клиентом и сервером, использующее функцию popen и программу (утилиту Unix) cat.

    Листинг 4.5. Клиент-сервер с использованием popen

    //pipe/mainpopen.c

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   size_t n;

    6   char buff[MAXLINE], command[MAXLINE];

    7   FILE *fp;

    8   /* считывание полного имени */

    9   Fgets(buff, MAXLINE, stdin);

    10  n = strlen(buff); /* fgets() гарантирует завершающий ноль */

    11  if (buff[n-1] == '\n')

    12   n--; /* удаление перевода строки из возврата fgets() */

    13  snprintf(command, sizeof(command), "cat %s", buff);

    14  fp = Popen(command, "r");

    15  /* копирование из канала в стандартный вывод */

    16  while(Fgets(buff, MAXLINE, fp) != NULL)

    17   Fputs(buff, stdout);

    18  Pclose(fp);

    19  exit(0);

    20 }

    8-17 Полное имя файла считывается из стандартного потока ввода, как и в программе в листинге 4.2. Формируется командная строка, которая передается popen. Вывод интерпретатора команд или команды cat копируется в стандартный поток вывода.

    Одним из отличий этой реализации от приведенной в листинге 4.1 является отсутствие возможности формировать собственные сообщения об ошибках. Теперь мы целиком зависим от программы cat, а выводимые ею сообщения не всегда адекватны. Например, в системе Solaris 2.6 при попытке считать данные из файла, доступ на чтение к которому для нас запрещен, будет выведена следующая ошибка:

    solaris % cat/etc/shadow

    cat: cannot open /etc/shadow
     

    А в BSD/OS 3.1 мы получим более информативное сообщение в аналогичной ситуации:

    bsdi % cat /etc/master.passwd

    cat: /etc/master.passwd: cannot open [Permission denied]

    Обратите также внимание на тот факт, что вызов popen в данном случае оказывается успешным, однако при первом же вызове fgets будет возвращен символ конца файла (EOF). Программа cat записывает сообщение об ошибке в стандартный поток сообщений об ошибках (stderr), а popen с этим потоком не связывается — к создаваемому каналу подключается только стандартный поток вывода.

    4.6. Именованные каналы (FIFO)

    Программные каналы не имеют имен, и их главным недостатком является невозможность передачи информации между неродственными процессами. Два неродственных процесса не могут создать канал для связи между собой (если не передавать дескриптор).

    Аббревиатура FIFO расшифровывается как «first in, first out» — «первым вошел, первым вышел», то есть эти каналы работают как очереди. Именованные каналы в Unix функционируют подобно неименованным — они позволяют передавать данные только в одну сторону. Однако в отличие от программных каналов каждому каналу FIFO сопоставляется полное имя в файловой системе, что позволяет двум неродственным процессам обратиться к одному и тому же FIFO.

    FIFO создается функцией mkfifо:

    #include <sys/types.h>

    #include <sys/stat.h>

    int mkfifo(const char *раthnаme, mode_t mоdе);

    /* Возвращает 0 при успешном выполнении, –1 – при возникновении ошибок */

    Здесь pathname — обычное для Unix полное имя файла, которое и будет именем FIFO.

    Аргумент mode указывает битовую маску разрешений доступа к файлу, аналогично второму аргументу команды open. В табл. 2.3 приведены шесть констант, определенных в заголовке <sys/stat.h>. Эти константы могут использоваться для задания разрешений доступа и к FIFO.

    Функция mkfifo действует как open, вызванная с аргументом O_CREAT | O_EXCL. Это означает, что создается новый канал FIFO или возвращается ошибка EEXIST, в случае если канал с заданным полным именем уже существует. Если не требуется создавать новый канал, вызывайте open вместо mkfifo. Для открытия существующего канала или создания нового в том случае, если его еще не существует, вызовите mkfifo, проверьте, не возвращена ли ошибка EEXIST, и если такое случится, вызовите функцию open.

    Команда mkfifо также создает канал FIFO. Ею можно пользоваться в сценариях интерпретатора или из командной строки.

    После создания канал FIFO должен быть открыт на чтение или запись с помощью либо функции open, либо одной из стандартных функций открытия файлов из библиотеки ввода-вывода (например, fopen). FIFO может быть открыт либо только на чтение, либо только на запись. Нельзя открывать канал на чтение и запись, поскольку именованные каналы могут быть только односторонними. 

    При записи в программный канал или канал FIFO вызовом write данные всегда добавляются к уже имеющимся, а вызов read считывает данные, помещенные в программный канал или FIFO первыми. При вызове функции lseek для программного канала или FIFO будет возвращена ошибка ESPIPE.

    Пример

    Переделаем программу, приведенную в листинге 4.1, таким образом, чтобы использовать два канала FIFO вместо двух программных каналов. Функции client и server останутся прежними; отличия появятся только в функции main, новый текст которой приведен в листинге 4.6.

    Листинг 4.6. Функция main приложения клиент-сервер, использующего две очереди

    //pipe/mainfifo.c

    1  #include "unpipc.h"

    2  #define FIFO1 "/tmp/fifo.1"

    3  #define FIFO2 "/tmp/fifo.2"

    4  void client(int, int), server(int. int);

    5  int

    6  main(int argc, char **argv)

    7  {

    8   int readfd, writefd;

    9   pid_t childpid;

    10  /* создание двух FIFO, если существуют – OK */

    11  if ((mkfifo(FIF01, FILE_MODE) < 0) && (errno != EEXIST))

    12   err_sys("can't create %s", FIF01);

    13  if ((mkfifo(FIF02, FILE_MODE) < 0) && (errno != EEXIST)) {

    14   unlink(FIF01);

    15   err_sys("can't create %s", FIF02);

    16  }

    17  if ((childpid = Fork()) == 0) { /* child */

    18   readfd = Open(FIF01, O_RDONLY, 0);

    19   writefd = Open(FIF02, O_WRONLY, 0);

    20   server(readfd, writefd);

    21   exit(0);

    22  }

    23  /* родительский процесс */

    24  writefd = Open(FIF01, O_WRONLY, 0);

    25  readfd = Open(FIF02, O_RDONLY, 0);

    26  client(readfd, writefd);

    27  waitpid(childpid, NULL, 0); /* ожидание завершения дочернего процесса */

    28  Close(readfd):

    29  Close(writefd);

    30  Unlink(FIF01);

    31  Unlink(FIF02);

    32  exit(0);

    33 }

    Создание двух FIFO

    10-16 В файловой системе в каталоге /tmp создается два канала. Если какой-либо из них уже существует — ничего страшного. Константа FILE_MODE определена в нашем заголовке unpiрс.h (листинг В.1) как

    #define FILEMODE(S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)

    /* разрешения по умолчанию для вновь создаваемых файлов */

    При этом владельцу файла разрешается чтение и запись в него, а группе и прочим пользователям — только чтение. Эти биты разрешений накладываются на маску режима доступа создаваемых файлов (file mode creation mask) процесса.

    17-27 Далее происходит вызов fork, дочерний процесс вызывает функцию server (листинг 4.3), а родительский процесс вызывает функцию client (листинг 4.2). Перед вызовом этих функций родительский процесс открывает первый канал на запись, а второй на чтение, в то время как дочерний процесс открывает первый канал на чтение, а второй — на запись. Картина аналогична примеру с каналами и иллюстрируется рис. 4.11. 

    Рис. 4.11. Приложение клиент-сервер, использующее две очереди


    Изменения по сравнению с примером, в которым использовались программные каналы, следующие:

    ■ Для создания и открытия программного канала требуется только один вызов — pipe. Для создания и открытия FIFO требуется вызов mkfifo и последующий вызов open.

    ■ Программный канал автоматически исчезает после того, как будет закрыт последним использующим его процессом. Канал FIFO удаляется из файловой системы только после вызова unlink. Польза от лишнего вызова, необходимого для создания FIFO, следующая: канал FIFO получает имя в файловой системе, что позволяет одному процессу создать такой канал, а другому открыть его, даже если последний не является родственным первому. С программными каналами это неосуществимо.

    В программах, некорректно использующих каналы FIFO, могут возникать неочевидные проблемы. Рассмотрим, например, листинг 4.6: если поменять порядок двух вызовов функции open в породившем процессе, программа перестанет работать. Причина в том, что чтение из FIFO блокирует процесс, если канал еще не открыт на запись каким-либо другим процессом. Действительно, если мы меняем порядок вызовов open в породившем процессе, и породивший, и порожденный процессы открывают канал на чтение, притом что на запись он еще не открыт, так что оба процесса блокируются. Такая ситуация называется блокированием, или зависанием (deadlock). Она будет рассмотрена подробно в следующем разделе.

    Пример: неродственные клиент и сервер

    В листинге 4.6 клиент и сервер все еще являлись родственными процессами. Переделаем этот пример так, чтобы родство между ними отсутствовало. В листинге 4.7 приведен текст программы-сервера. Текст практически идентичен той части программы из листинга 4.6, которая относилась к серверу.

    Содержимое заголовка fifо.h приведено в листинге 4.8. Этот файл определяет имена двух FIFO, которые должны быть известны как клиенту, так и серверу.

    В листинге 4.9 приведен текст программы-клиента, которая не слишком отличается от части программы из листинга 4.6, относящейся к клиенту. Обратите внимание, что именно клиент, а не сервер удаляет канал FIFO по завершении работы, потому что последние операции с этим каналом выполняются им.

    Листинг 4.7. Функция main независимого сервера

    //pipe/server_main.c

    1  #include "fifo.h"

    2  void server(int, int);

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int readfd, writefd;

    7   /* создание двух FIFO. OK, если они существуют */

    8   if ((mkfifo(FIF01, FILE_MODE) < 0) && (errno != EEXIST))

    9    err_sys("can't create %s", FIF01);

    10  if ((mkfifo(FIF02, FILE MODE) < 0) && (errno != EEXIST)) {

    11   unlink(FIF01);

    12   err_sys("can't create %s", FIF02);

    13  }

    14  readfd = Open(FIF01, O_RDONLY, 0);

    15  writefd = Open(FIFO2, O_WRONLY, 0);

    16  server(readfd, writefd);

    17  exit(0);

    18 }
     

    Листинг 4.8. Заголовочный файл fifo.h, используемый и клиентом, и сервером

    //pipe/fifo.h

    1 #include "unpipc.h"

    2 #define FIFO1 "/tmp/fifo.1"

    3 #define FIFO2 "/tmp/fifo.2"

    Листинг 4.9. Функция main независимого клиента

    //pipe/client_main.c

    1  #include "fifo.h"

    2  void client(int, int);

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int readfd, writefd;

    7   writefd = Open(FIFO1, O_WRONLY, 0);

    8   readfd = Open(FIFO2, O_RDONLY, 0);

    9   client(readfd, writefd);

    10  Close(readfd);

    11  Close(writefd);

    12  Unlink(FIFO1);

    13  UnLink(FIFO2);

    14  exit(0);

    15 }

    ПРИМЕЧАНИЕ

    Для программных каналов и каналов FIFO ядро ведет подсчет числа открытых дескрипторов, относящихся к ним, поэтому безразлично, кто именно вызовет unlink — клиент или сервер. Хотя эта функция и удаляет файл из файловой системы, она не влияет на открытые в момент ее выполнения дескрипторы. Однако для других форм IPC, таких как очереди сообщений стандарта System V, счетчик отсутствует, и если сервер удалит очередь после записи в нее последнего сообщения, она может быть удалена еще до того, как клиент это сообщение считает.

    Для запуска клиента и сервера запустите сервер в фоновом режиме:

    % server_fifo &

    а затем запустите клиент. Можно было сделать и по-другому: запускать только программу-клиент, которая запускала бы сервер с помощью fork и exec. Клиент мог бы передавать серверу имена FIFO в качестве аргументов командной строки в команде exec, вместо того чтобы обе программы считывали их из заголовка. Но в этом случае сервер являлся бы дочерним процессом и проще было бы обойтись программным каналом.

    4.7. Некоторые свойства именованных и неименованных каналов

    Некоторые свойства именованных и неименованных каналов, относящиеся к их открытию, а также чтению и записи данных, заслуживают более пристального внимания. Прежде всего можно сделать дескриптор неблокируемым двумя способами. 

    1. При вызове open указать флаг O_NONBLOCK. Например, первый вызов open в листинге 4.9 мог бы выглядеть так:

    writefd = Open(FIFO1, O_WRONLY | O_NONBLOCK, 0);

    2. Если дескриптор уже открыт, можно использовать fcntl для включения флага O_NONBLOCK. Этот прием нужно применять для программных каналов, поскольку для них не вызывается функция open и нет возможности указать флаг O_NONBLOCK при ее вызове. Используя fcntl, мы сначала получаем текущий статус файла с помощью F_GETFL, затем добавляем к нему с помощью побитового логического сложения (OR) флаг O_NONBLOCK и записываем новый статус с помощью команды F_SETFL:

    int flags;

    if ((flags = fcntl(fd, F_GETFL, 0)) < 0) err_sys("F_GETFL error");

    flags |= O_NONBLOCK;

    if (fcntl(fd, F_SETFL, flags) < 0) err_sys("F_SETFL error");

    Будьте аккуратны с программами, которые просто устанавливают требуемый флаг, поскольку при этом сбрасываются все прочие флаги состояния:

    /* Неправильное отключение блокировки */

    if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) err_sys("F_SETFL error");

    Таблица 4.1 иллюстрирует действие флага, отключающего блокировку, при открытии очереди и при чтении данных из пустого программного канала или канала FIFO.


    Таблица 4.1. Действие флага O_NONBLOCK на именованные и неименованные каналы 

    Операция Наличие открытых каналов Блокировка включена (по умолчанию) Флаг O_NONBLOCK установлен
    Открытие (open) FIFO только для чтения FIFO открыт на запись Возвращается код успешного завершения операции Возвращается код успешного завершения операции
    Открытие (open) FIFO только для чтения FIFO не открыт на запись Процесс блокируется, пока FIFO не будет открыт на запись Возвращается код успешного завершения операции
    Открытие (open) FIFO только для записи FIFO открыт на чтение Возвращает код успешного завершения операции Возвращает код успешного завершения операции
    Открытие (open) FIFO только для записи FIFO не открыт на чтение Блокируется до тех пор, пока FIFO не будет открыт на чтение Возвращает ошибку с кодом ENXIO
    Чтение (read) из пустого программного канала или FIFO Программный канал или FIFO открыт на запись Блокируется до тех пор, пока в программный канал или FIFO не будут помещены данные или они не будут закрыты всеми процессами, которыми они были открыты на запись Возвращает ошибку с кодом EAGAIN
    Чтение (read) из пустого программного канала или FIFO Программный канал или FIFO не открыт на запись read возвращает 0 (конец файла) read возвращает 0 (конец файла)
    Запись (write) в программный канал или FIFO Программный канал или FIFO открыт на чтение (См. в тексте) (См. в тексте)
    Запись (write) в программный канал или FIFO Программный канал или FIFO не открыт на чтение Программному потоку посылается сигнал SIGPIPE Программному потоку посылается сигнал SIGPIPE 

    Запомните несколько дополнительных правил, действующих при чтении и записи данных в программные каналы и FIFO.

    ■ При попытке считать больше данных, чем в данный момент содержится в программном канале или FIFO, возвращается только имеющийся объем данных. Нужно предусмотреть обработку ситуации, в которой функция read возвращает меньше данных, чем было запрошено.

    ■ Если количество байтов, направленных на запись функции write, не превышает значения PIPE_BUF (ограничение, устанавливаемое стандартом Posix, о котором более подробно рассказывается в разделе 4.11), то ядро гарантирует атомарность операции записи. Это означает, что если два процесса запишут данные в программный канал или FIFO приблизительно одновременно, то в буфер будут помещены сначала все данные от первого процесса, а затем от второго, либо наоборот. Данные от двух процессов при этом не будут смешиваться. Однако если количество байтов превышает значение PIPEBUF, атомарность операции записи не гарантируется.

    ПРИМЕЧАНИЕ

    Posix.1 требует, чтобы значение PIPE_BUF равнялось по меньшей мере 512. Характерные значения, встречающиеся на практике, лежат в диапазоне от 1024 (BSD/OS 3.1) до 5120 байт (Solaris 2.6). В разделе 4.11 приведен текст программы, выводящей значение этой константы.

    ■ Установка флага O_NONBLOCK не влияет на атомарность операции записи в про-грaммный канал или FIFO — она определяется исключительно объемом посылаемых данных в сравнении с величиной PIPE_BUF. Однако если для прогрaммнoгo канала или FIFO отключена блокировка, возвращаемое функцией write значение зависит от количества байтов, отправленных на запись, и наличия свободного места в пpoгрaммнoм канале или FIFO. Если количество байтов не превышает величины PIPE_BUF, то:

     □ Если в канале достаточно места для записи требуемого количества данных, они будут переданы все сразу.

     □ Если места в пpoгрaммнoм канале или FIFO недостаточно для записи требуемого объема данных, происходит немедленное завершение работы функции с возвратом ошибки EAGAIN. Поскольку установлен флаг O_NONBLOCK, процесс не может быть заблокирован, но в то же время ядро не может принять лишь часть данных, так как при этом невозможно гарантировать атомарность операции записи. Поэтому ядро возвращает ошибку, сообщающую процессу о необходимости попытаться произвести запись еще раз.

    ■ Если количество байтов превышает значение PIPE_BUF, то:

     □ Если в программном канале или FIFO есть место хотя бы для одного байта, ядро передает в буфер ровно столько данных, сколько туда может поместиться, и это переданное количество возвращается функцией write.

     □ Если в программном канале или FIFO свободное место отсутствует, происходит немедленное завершение работы с возвратом ошибки EAGAIN.

    ■ При записи в программный канал или FIFO, не открытый для чтения, ядро посылает сигнал SIGPIPE:

     □ Если процесс не принимает (catch) и не игнорирует SIGPIPE, выполняется действие по умолчанию — завершение работы процесса.

     □ Если процесс игнорирует сигнал SIGPIPE или перехватывает его и возвращается из подпрограммы его обработки, write возвращает ошибку с кодом EPIPE.

    ПРИМЕЧАНИЕ

    SIGPIPE считается синхронным сигналом, что означает, что он привязан к конкретному программному потоку, а именно тому, который вызвал функцию write. Простейшим способом обработки сигнала является его игнорирование (установка SIG_IGN) и предоставление функции write возможности вернуть ошибку с кодом EPIPE. В приложении всегда должна быть предусмотрена обработка ошибок, возвращаемых функцией write, а вот определить, что процесс был завершен сигналом SIGPIPE, сложнее. Если сигнал не перехватывается, придется посмотреть на статус завершения работы процесса (termination status) из интерпретатора команд, чтобы узнать, что процесс был принудительно завершен сигналом и каким именно сигналом. В разделе 5.13 [24] о сигнале SIGPIPE рассказывается более подробно.

    4.8. Один сервер, несколько клиентов

    Преимущества канала FIFO проявляются более явно в том случае, когда сервер представляет собой некоторый длительно функционирующий процесс (например, демон, наподобие описанного в главе 12 [24]), не являющийся родственным клиенту. Демон создает именованный канал с вполне определенным известным именем, открывает его на чтение, а запускаемые впоследствии клиенты открывают его на запись и отправляют демону команды и необходимые данные. Односторонняя связь в этом направлении (от клиента к серверу) легко реализуется с помощью FIFO, однако необходимость отправки данных в обратную сторону (от сервера к клиенту) усложняет задачу. Рисунок 4.12 иллюстрирует прием, применяемый в этом случае. 

    Рис. 4.12. Один сервер, несколько клиентов


    Сервер создает канал с известным полным именем, в данном случае /tmp/fifо.serv. Из этого канала он считывает запросы клиентов. Каждый клиент при запуске создает свой собственный канал, полное имя которого определяется его идентификатором процесса. Клиент отправляет свой запрос в канал сервера с известным именем, причем запрос этот содержит идентификатор процесса клиента и имя файла, отправку которого клиент запрашивает у сервера. В листинге 4.10 приведен текст программы сервера.

    Листинг 4.10. Сервер, обслуживающий несколько клиентов с помощью канала FIFO

    //fifocliserv/mainserver.с

    1  #include "fifo.h"

    2  void server(int, int);

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int readfifo, writefifo, dummyfd, fd;

    7   char *ptr, buff[MAXLINE], fifoname[MAXLINE];

    8   pid_t pid;

    9   ssize_t n;

    10  /* создание FIFO сервера с известным именем. ОК, если уже существует */

    11  if ((mkfifo(SERV_FIFO, FILE_MODE) < 0) && (errno != EEXIST))

    12   err_sys("can't create %s", SERV_FIFO);

    13  /* открытие FIFO-cepвepa на чтение */

    14  readfifo = Open(SERV_FIFO, O_RDONLY, 0);

    15  dummyfd = Open(SERV_FIFO, O_WRONLY, 0); /* не используется */

    16  while ((n = Readline(readfifo, buff, MAXLINE)) > 0) {

    17   if (buff[n-1] == '\n')

    18    n--; /* delete newline from readline() */

    19   buff[n] = '\0'; /* полное имя, завершаемое 0 */

    20   if ((ptr = strchr(buff, ' ')) == NULL) {

    21    err_msg("bogus request: ls", buff);

    22    continue;

    23   }

    24   *ptr++ = 0; /* идентификатор процесса, указатель на имя файла */

    25   pid = atol(buff);

    26   snprintf(fifoname, sizeof(fifoname), "/tmp/fifo.%ld", (long) pid);

    27   if ( (writefifo = open(fifoname, O_WRONLY, 0)) < 0) {

    28    err_msg("cannot open: ls", fifoname);

    29    continue;

    30   }

    31   if ((fd = open(ptr, O_RDONLY)) < 0) {

    32    /* ошибка, нужно сообщить клиенту */

    33    snprintf(buff + n, sizeof(buff) – n, ": can't open, %s\n",

    34     strerror(errno));

    35    n = strlen(ptr);

    36    Write(writefifo, ptr, n);

    37    Close(writefifo);

    38

    39   } else {

    40    /* успешное открытие, копируем файл */

    41    while ((n = Read(fd, buff, MAXLINE)) > 0)

    42     Write(writefifo, buff, n);

    43    Close(fd);

    44    Close(writefifo);

    45   }

    46  }

    47 }

    Создание канала и открытие его только для записи и только для чтения

    10-15 Сервер создает канал FIFO с известным именем, обрабатывая ситуацию, когда такой канал уже существует. Затем этот канал открывается дважды: один раз только для чтения, а второй — только для записи. Дескриптор readfifo используется для приема запросов от клиентов, а дескриптор dummyfd не используется вовсе. Причина, по которой нужно открыть канал для записи, видна из табл. 4.1. Если канал не открыть на запись, то при завершении работы очередного клиента этот канал будет опустошаться и сервер будет считывать 0, означающий конец файла. Пришлось бы каждый раз закрывать канал вызовом close, а затем заново открывать его с флагом O_RDONLY, что приводило бы к блокированию демона до подключения следующего клиента. Мы же всегда будем иметь дескриптор, открытый на запись, поэтому функция read не будет возвращать 0, означающий конец файла, при отсутствии клиентов. Вместо этого сервер просто будет блокироваться при вызове read, ожидая подключения следующего клиента. Этот трюк упрощает код программы-сервера и уменьшает количество вызовов open для канала сервера.

    При запуске сервера первый вызов open (с флагом O_RDONLY) приводит к блокированию процесса до появления первого клиента, открывающего канал сервера на запись (см. табл. 4.1). Второй вызов open (с флагом O_WRONLY) не приводит к блокированию, поскольку канал уже открыт на запись. 

    Считывание запроса от клиента

    16 Каждый запрос, принимаемый от клиента, представляет собой одну строку, состоящую из идентификатора процесса, пробела и полного имени требуемого файла. Эта строка считывается функцией readline (приведенной в [24, с.79]).

    Анализ запроса клиента

    17-26 Символ перевода строки, возвращаемый функцией readline, удаляется. Этот символ может отсутствовать только в том случае, если буфер был заполнен, прежде чем был обнаружен символ перевода строки, либо если последняя введенная строка не была завершена этим символом. Функция strchr возвращает указатель на первый пробел в этой строке, который затем увеличивается на единицу, чтобы он указывал на первый символ полного имени файла, следующего за пробелом. Полное имя канала клиента формируется из его идентификатора процесса, и этот канал открывается сервером на запись.

    Открытие файла и отправка его в FIFO клиента

    27-44 Оставшаяся часть кода пpoгрaммы-cepвepa аналогична функции server из листинга 4.3. Программа открывает файл; если при этом возникает ошибка — клиенту отсылается сообщение о ней. Если открытие файла завершается успешно, его содержимое копируется в канал клиента. После завершения копирования открытый сервером «конец» (дескриптор) канала клиента должен быть закрыт с помощью функции close, чтобы функция read вернула пpoгрaммe-клиeнтy значение 0 (конец файла). Сервер не удаляет канал клиента; клиент должен самостоятельно позаботиться об этом после приема от сервера символа конца файла. Текст пpoгрaммы-клиeнтa приведен в листинге 4.11.

    Листинг 4.11. Клиент, связывающийся с сервером (листинг 4.10) с помощью канала FIFO

    //fifocliserv/mainclient.с

    1  #include "fifo.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   int readfifo, writefifo;

    6   size_t len;

    7   ssize_t n;

    8   char *ptr, fifoname[MAXLINE], buff[MAXLINE];

    9   pid_t pid;

    10  /* создание FIFO с включением в его имя PID */

    11  pid = getpid();

    12  snprintf(fifoname, sizeof(fifoname), "/tmp/fifo,%ld", (long) pid):

    13  if ((mkfifo(fifoname, FILE_MODE) < 0) && (errno != EEXIST))

    14   err_sys("can't create %s", fifoname);

    15  /* инициализация буфера PID и пробелом */

    16  snprintf(buff, sizeof(buff), "%ld ", (long) pid);

    17  len = strlen(buff);

    18  ptr = buff + len;

    19  /* считывание полного имени */

    20  Fgets(ptr, MAXLINE – len, stdin);

    21  len = strlen(buff); /* fgets() гарантирует завершающий 0 */

    22  /* открытие FIFO сервера и запись в него полного имени и PID */

    23  writefifo = Open(SERV_FIFO, O_WRONLY, 0);

    24  Write(writefifo, buff, len);

    25  /* открытие созданного FIFO; блокирование до открытия его сервером */

    26  readfifo = Open(fifoname, O_RDONLY; 0);

    27  /* считывание из канала IPC, запись в stdout */

    28  while ((n = Read(readfifo, buff, MAXLINE)) > 0)

    29   Write(STDOUT_FILENO, buff, n);

    30  Close(readfifo);

    31  Unlink(fifoname);

    32  exit(0);

    33 }

    Создание канала

    10-14 Идентификатор процесса клиента содержится в имени создаваемого им канала.

    Формирование строки запроса

    15-21 Запрос клиента состоит из его идентификатора процесса, одного пробела, полного имени запрашиваемого им файла и символа перевода строки. Строка запроса формируется в массиве buff, причем имя файла считывается из стандартного потока ввода.

    Открытие канала сервера и отправка запроса

    22-24 Клиент открывает канал сервера и записывает в него строку запроса. Если клиент окажется первым с момента запуска сервера, вызов open разблокирует сервер, заблокированный после сделанного им вызова open (с флагом O_RDONLY).

    Прием содержимого файла или сообщения об ошибке от сервера

    25-31 Ответ сервера считывается из канала и записывается в стандартный поток вывода, после чего канал клиента закрывается и* удаляется.

    Сервер может быть запущен в одном из окон, а клиент — в другом, и программа будет работать так, как мы и рассчитывали. Ниже мы приводим только текст, выводимый клиентом:

    solaris % mainclient /etc/shadow          файл, который нам нельзя читать

    /etc/shadow: can't open. Permission denied

    solaris % mainclient /etc/inet/ntp.conf  файл из двух строк

    multicastclient 224.0.1.1

    driftfile /etc/inet/ntp.drift
     

    Мы можем также связаться с сервером из интерпретатора команд, поскольку каналы FIFO обладают именами в файловой системе.

    solaris % Pid=$$

    solaris % mkfifo /tmp/fifo.$Pid

    solaris % echo "$Pid /etc/inet/ntp.conf" > /tmp/fifo.serv

    solaris % cat < /tmp/fifo.$Pid

    multicastclient 224.0.1.1

    driftfile /etc/inet/ntp.drift

    solaris % rm /tmp/fifo.$Pid

    Мы отсылаем серверу идентификатор процесса текущей копии интерпретатора и полное имя файла одной командой интерпретатора (echo) и считываем из канала сервера результат с помощью другой команды (cat). Между выполнением этих двух команд может пройти произвольный промежуток времени. Таким образом, сервер помещает содержимое файла в канал, а клиент затем запускает команду cat, чтобы считать оттуда данные. Может показаться, что данные каким-то образом хранятся в канале, хотя он не открыт ни одним процессом. На самом деле все не так. После закрытия пpoгрaммнoгo канала или FIFO последним процессом с помощью команды close все данные, в нем находящиеся, теряются. В нашем примере сервер, считав строку запроса от клиента, блокируется при попытке открыть канал клиента, потому что клиент (наша копия интерпретатора) еще не открыл его на чтение (вспомним табл. 4.1). Только после вызова cat некоторое время спустя канал будет открыт на чтение, и тогда сервер разблокируется. Кстати, таким образом осуществляется атака типа «отказ в обслуживании» (denial-of-service attack), которую мы обсудим в следующем разделе.

    Использование интерпретатора позволяет провести простейшую проверку способности сервера обрабатывать ошибки. Мы можем отправить серверу строку без идeнтификaтopa процесса или отослать ему такой идентификатор, которому не соответствует никакой канал FIFO в каталоге /tmp. Например, если мы запустим сервер и введем нижеследующие строки:

    solaris % cat > /tmp/fifo.serv /no/process/id

    999999 /invalid/process/id

    то сервер выдаст текст:

    solaris % server

    bogus request: /no/process/id

    cannot open: /tmp/fifo.999999

    Атомарность записи в FIFO

    Наша простейшая пара клиент-сервер позволяет наглядно показать важность наличия свойства атомарности записи в пpoгрaммныe каналы и FIFO. Предположим, что два клиента посылают серверу запрос приблизительно в один и тот же момент. Первый клиент отправляет следующую строку:

    1234 /etc/inet/ntp.conf

    второй:

    9876 /etc/passwd

    Предполагая, что каждый клиент помещает данные в FIFO за один вызов write и кaждая строка имеет размер, не превышающий величины PIPE_BUF (что чаще всего заведомо выполняется, поскольку PIPE_BUF обычно лежит в диапазоне 1024-5120, а длина полного имени обычно oгрaничeнa 1024 байт), мы можем гарантировать, что в FIFO данные будут иметь следующий вид:

    1234 /etc/inet/ntp.conf

    9876 /etc/passwd

    либо

    9876 /etc/passwd

    1234 /etc/inet/ntp.conf

    Данные в канале не могут смешаться в «кашу», наподобие:

    1234 /etc/inet9876 /etc/passwd

    /ntp.conf

    FIFO и NFS

    Каналы FIFO представляют собой вид IPC, который может использоваться только в пределах одного узла. Хотя FIFO и обладают именами в файловой системе, они могут применяться только в локальных файловых системах, но не в присоединенных сетевых (NFS).

    solaris % mkfifo /nsf/bsdi/usr/rstevens/fifo.temp

    mkfifo: I/O error

    В этом примере файловая система /nfs/bsdi/usr — это файловая система /usr нa yзлe bsdi.

    Некоторые системы (например, BSD/OS) позволяют создавать FIFO в присоединенных файловых системах, но по ним нельзя передавать данные между узлами. В этом случае такой канал может использоваться лишь как «точка рандеву» в файловой системе между клиентами и серверами на одном и том же узле. Процесс, выполняемый на одном узле, нe мoжem послать данные через FIFO процессу, выполняемому на другом узле, даже если оба процесса смогут открыть этот канал, доступный обоим узлам через сетевую файловую систему.

    4.9. Последовательные и параллельные серверы

    Сервер в нашем простом примере из предыдущего раздела являлся последовательным сервером (iterative server). Он последовательно обрабатывал запросы клиентов, переходя к следующему только после полного завершения работы с предыдущим. Например, если два клиента пошлют запрос такому серверу приблизительно одновременно, причем один из них запросит 10-мегабайтный файл, отправка которого займет, например, 10 секунд, а второй — 10-байтный файл, то второму придется ждать по меньшей мере 10 секунд, пока не будет обслужен первый клиент.

    Альтернативой является параллельный сервер (concurrent server). Наиболее часто встречаемый в Unix вид такого сервера называется one-child-per-client (каждому клиенту — один дочерний процесс). Сервер вызывает fork для создания нового процесса каждый раз, когда появляется новый клиент. Дочерний процесс полностью обрабатывает запрос клиента, а поддержка многозадачности в Unix обеспечивает параллельность выполнения всех этих процессов. Однако существуют и другие методы решения задачи, подробно описанные в главе 27 [24]:

    ■ создание пула дочерних процессов и передача нового клиента свободному дочернему процессу;

    ■ создание одного пpoгрaммнoгo потока для каждого клиента;

    ■ создание пула потоков и передача нового клиента свободному потоку.

    Хотя в [24] обсуждаются проблемы создания сетевых серверов, те же методы применимы и к серверам межпроцессного взаимодействия (IPC server), клиенты которых находятся на одном узле.

    Атака типа «отказ в обслуживании»

    Один из недостатков последовательных серверов был уже отмечен выше — некоторым клиентам приходится ждать дольше чем нужно, потому что их запросы приходят после запросов других клиентов, запрашивающих большие файлы. Существует и другая проблема. Вспомним наш пример с интерпретатором команд, приведенный после листинга 4.11, и относящееся к нему обсуждение того, что сервер блокируется при вызове open для FIFO клиента, если клиент еще не открыл этот канал (чего не происходит до выполнения cat). Это дает возможность злоумышленнику «подвесить» сервер, послав ему запрос, не открывая канала. Этот тип атаки называется «отказ в обслуживании» (Denial of Service — DoS). Чтобы исключить возможность такой атаки, нужно быть аккуратным при написании последовательной части любого сервера, учитывая возможность и потенциальную продолжительность его блокирования. Одним из методов решения проблемы является установка максимального времени ожидания для некоторых операций, однако обычно проще сделать сервер параллельным, а не последовательным, поскольку в данном случае атака будет действовать лишь на один из дочерних процессов, а не на весь сервер. Однако даже параллельный сервер не защищен от атаки полностью: злоумышленник все еще может послать множество запросов, что приведет к превышению предела количества порожденных сервером процессов и невозможности выполнения последующих вызовов fork.

    4.10. Потоки и сообщения

    Приведенные примеры пpoгрaммныx каналов и каналов FIFO использовали потоковую модель ввода-вывода, что естественно для Unix. При этом отсутствуют грaницы записей — данные при операциях чтения и записи не проверяются вовсе. Процесс, считывающий 100 байт из FIFO, не может определить, записал ли другой процесс в FIFO все 100 байт за 1 раз, или за 5 раз по 20 байт, или в любой другой комбинации общим объемом 100 байт. Возможно, один процесс записал в FIFO 55 байт, а потом другой — 45. Данные представляют собой просто поток байтов, никак не интерпретируемых системой. Если же требуется какая-либо интерпретация данных, пишущий и читающий процессы должны заранее «договориться» о ее правилах и выполнять всю работу самостоятельно.

    Иногда приложению может потребоваться передавать данные, обладающие некоторой внутренней структурой. Это могут быть, например, сообщения переменной длины: в этом случае читающий процесс должен знать, где заканчивается одно сообщение и начинается следующее. Для разграничения сообщений широко используются три метода:

    1. Специальная внутриполосная завершающая последовательность: множество приложений под Unix используют в качестве разделителя сообщений символ перевода строки. Пишущий процесс добавляет к каждому сообщению этот символ, а считывающий процесс производит построчное считывание. Так работают клиент и сервер из листингов 4.10 и 4.11, чтобы разделить запросы клиентов. Этот метод требует исключения символа-разделителя из самих передаваемых данных (в случае необходимости его передать он должен предваряться другим специальным символом).

    2. Явное указание длины: каждой записи предшествует информация об ее длине. Мы вскоре воспользуемся этим методом. Он также применяется в Sun RPC при использовании совместно с TCP. Одним из преимуществ этого метода является отсутствие необходимости исключать разделитель из передаваемых данных, поскольку получатель не проверяет все данные, а переходит сразу к концу очередной записи, чтобы узнать длину следующей.

    3. Одна запись за подключение: приложение закрывает подключение к партнеру (подключение TCP для сетевых приложений либо просто подключение IPC), обозначая конец записи. Это требует повторного подключения для передачи следующей записи, однако используется в стандарте HTTP 1.0.

    Стандартная библиотека ввода-вывода также может использоваться для считывания и записи данных в пpoгрaммный канал или FIFO. Поскольку канал может быть открыт только функцией piре, возвращающей открытый дескриптор, для создания нового стандартного потока, связанного с этим дескриптором, можно использовать стандартную функцию fdopen. Канал FIFO обладает именем, поэтому он может быть открыт с помощью функции fopen.

    Можно создавать и более структурированные сообщения — эта возможность предоставляется очередями сообщений и в Posix, и в System V. Мы вскоре узнаем, что каждое сообщение обладает длиной и приоритетом (типом в System V). Длина и приоритет указываются отправителем и возвращаются получателю после считывания сообщения. Каждое сообщение представляет собой запись, аналогично дeйтaгрaммaм UDP ([24]).

    Мы можем структурировать данные, передаваемые по программному каналу или FIFO, самостоятельно. Определим сообщение в нашем заголовочном файле mesg.h, как показано в листинге 4.12.

    Листинг 4.12. Структура mymesg и сопутствующие определения

    //pipemesg/mesg.h

    1  #include "unpipc.h"

    2  /* Наши собственные "сообщения", которые могут использоваться с каналами, FIFO и очередями сообщений */

    3  /* Мы хотим, чтобы sizeof(struct mymesg) <= PIPE_BUF */

    4  #define MAXMESGDATA (PIPE_BUF – 2*sizeof(long))

    5  /* Длина mesg_len и mesg_type */

    6  #define MESGHDRSIZE (sizeof(struct mymesg) – MAXMESGDATA)

    7  struct mymesg {

    8   long mesg_len; //количество байтов в mesg_data, может быть О

    9   long mesg_type;//тип сообщения, должен быть > 0

    10  char mesg_data[MAXMESGDATA];

    11 };

    12 ssize_t mesg_send(int, struct mymesg *);

    13 void Mesg_send(int, struct mymesg *);

    14 ssize_t mesg_recv(int, struct mymesg *);

    15 ssize_t Mesg_recv(int, struct mymesg *);

    Каждое сообщение содержит в себе информацию о своем типе (mesg_type), причем значение этой переменной должно быть больше нуля. Пока мы будем игнорировать это поле в записи, но вернемся к нему в главе 6, где описываются очереди сообщений System V. Каждое сообщение также обладает длиной, кoтopая может быть и нулевой. Структура mymesg позволяет предварить каждое сообщение информацией о его типе и длине вместо использования символа перевода строки для сигнализации конца сообщения. Ранее мы отметили два преимущества этого подхода: получатель не должен сканировать все принятые байты в поисках конца сообщения и отсутствует необходимость исключать появление разделителя в самих данных.

    На рис. 4.13 изображен вид структуры mymesg и ее использование с каналами, FIFO и очередями сообщений System V. 

    Рис. 4.13. Структура mymesg


    Мы определяем две функции для отправки и приема сообщений. В листинге 4.13 приведен текст функции mesg_send, а в листинге 4.14 — функции mesg_recv.

    Листинг 4.13. Функция mesg_send

    //pipemesg/mesg_send.c

    1 #include "mesg.h"

    2 ssize_t

    3 mesg_send(int fd, struct mymesg *mptr)

    4 {

    5  return(write(fd, mptr, MESGHDRSIZE + mptr->mesg_len));

    6 }

    Листинг 4.14. Функция mesg_recv

    //pipemesg/mesg_recv.c

    1  #include "mesg.h"

    2  ssize_t

    3  mesg_recv(int fd, struct mymesg *mptr)

    4  {

    5   size_t len;

    6   ssize_t n;

    8   /* считывание заголовка сообщения для определения его длины */

    9   if ((n = Read(fd, mptr, MESGHDRSIZE)) == 0)

    10   return(0); /* end of file */

    11  else if (n != MESGHDRSIZE)

    12   err_quit("message header: expected %d, got %d". MESGHDRSIZE, n);

    13  if ((len = mptr->mesg_len) > 0)

    14   if ((n = Read(fd, mptr->mesg_data, len)) != len)

    15    err_quit("message data: expected %d, got %d", len, n);

    16  return(len);

    17 }

    Теперь для каждого сообщения функция read вызывается дважды: один раз для считывания длины, а другой — для считывания самого сообщения (если его длина больше 0).

    ПРИМЕЧАНИЕ

    Внимательные читатели могли заметить, что функция mesg_recv проверяет наличие всех возможных ошибок и прекращает работу при их обнаружении. Однако мы все же определили функцию-обертку Mesg_recv и вызываем из наших программ именно ее — для единообразия.

    Изменим теперь функции client и server, чтобы воспользоваться новыми функциями mesg_send и mesg_recv. В листинге 4.15 приведен текст функции-клиента.

    Листинг 4.15. Функция client с использованием сообщений

    //pipemesg/client.c

    1  #include "mesg.h"

    2  void

    3  client(int readfd, int writefd)

    4  {

    5   size_t len;

    6   ssize_t n;

    7   struct mymesg mesg;

    8   /* считывание полного имени */

    9   Fgets(mesg.mesg_data, MAXMESGDATA, stdin);

    10  len = strlen(mesg.mesg_data);

    11  if (mesg.mesg_data[len-1] == '\n')

    12   len--; /* удаление перевода строки из fgets() */

    13  mesg.mesg_len = len;

    14  mesg.mesg_type = 1;

    15  /* запись полного имени в канал IPC */

    16  Mesg_send(writefd, &mesg);

    17  /* считывание из канала IPC. запись в stdout */

    18  while ( (n = Mesg_recv(readfd, &mesg)) > 0)

    19   Write(STDOUT_FILENO, mesg.mesg_data, n);

    20 }

    Считывание имени файла и отправка его серверу

    8-16 Полное имя считывается из стандартного потока ввода и затем отправляется на сервер с помощью функции mesg_send.

    Считывание содержимого файла или сообщения об ошибке от сервера

    17-19 Клиент вызывает функцию mesg_recv в цикле, считывая все приходящие от сервера сообщения. По соглашению, когда mesg_recv возвращает нулевую длину сообщения, это означает конец передаваемых сервером данных. Мы увидим, что сервер добавляет символ перевода строки к каждому сообщению, отправляемому клиенту, поэтому пустая строка будет иметь длину сообщения 1. В листинге 4.16 приведен текст функции-сервера.

    Листинг 4.16. Функция server, использующая сообщения

    //pipemesg/server.c

    1  #include "mesg.h"

    2  void

    3  server(int readfd, int writefd)

    4  {

    5   FILE *fp;

    6   ssize_t n;

    7   struct mymesg mesg;

    8   /* считывание полного имени из канала */

    9   mesg.mesg_type = 1;

    10  if ((n = Mesg_recv(readfd, &mesg)) == 0)

    11   err_quit("pathname missing");

    12  mesg.mesg_data[n] = '\0'; /* полное имя, завершающееся 0 */

    13  if ((fp = fopen(mesg.mesg_data, "r")) == NULL) {

    14   /* ошибка, нужно сообщить клиенту */

    15   snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) – n,

    16    ": can't open, %s\n", strerror(errno));

    17   mesg.mesg_len = strlen(mesg.mesg_data);

    18   Mesg_send(writefd, &mesg);

    19  } else {

    20   /* файл успешно открыт, передача данных */

    21   while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {

    22    mesg.mesg_len = strlen(mesg.mesg_data);

    23    Mesg_send(writefd, &mesg);

    24   }

    25   Fclose(fp);

    26  }

    27  /* отправка сообщения нулевой длины для обозначения конца связи */

    28  mesg.mesg_len = 0;

    29  Mesg_send(writefd, &mesg);

    30 }

    Считывание имени файла из канала IPC, открытие файла

    8-18 Сервер принимает от клиента имя файла. Хотя значение mesg_type, равное 1, нигде не используется (оно затирается функцией mesg_recv из листинга 4.14), мы будем использовать ту же функцию при работе с очередями сообщений System V (листинг 6.8), а в данном случае в этом значении уже возникает потребность (см., например, листинг 6.11). Стандартная функция ввода-вывода fopen открывает файл, что отличается от листинга 4.3, где вызывалась функция open для получения дескриптора файла. Причина, по которой мы воспользовались fopen, заключается в том, что в этой пpoгрaммe мы пользуемся библиотечной функцией fgets для считывания содержимого файла построчно и затем отправляем клиенту строку за строкой.

    Отправка файла клиенту

    19-26 Если вызов fopen оказывается успешным, содержимое файла считывается с помощью функции fgets и затем отправляется клиенту построчно. Сообщение с нулевой длиной означает конец файла.

    При использовании пpoгрaммныx каналов или FIFO мы могли бы также закрыть канал IPC, чтобы дать клиенту знать о том, что передача файла завершена. Однако мы используем передачу сообщения нулевой длины, потому что другие типы IPC не поддерживают концепцию конца файла.

    Функции main, вызывающие новые функции client и server, вообще не претерпели никаких изменений. Мы можем использовать либо версию для работы с каналами (листинг 4.1), либо версию для работы с FIFO (листинг 4.6).

    4.11. Ограничения программных каналов и FIFO

    На программные каналы и каналы FIFO системой накладываются всего два ограничения:

    ■ OPEN_MAX — максимальное количество дескрипторов, которые могут быть одновременно открыты некоторым процессом (Posix устанавливает для этой величины ограничение снизу — 16);

    ■ PIPE_BUF — максимальное количество данных, для которого гарантируется атомарность операции записи (описано в разделе 4.7; Posix требует по меньшей мере 512 байт).

    Значение OPEN_MAX можно узнать, вызвав функцию sysconf, как мы вскоре покажем. Обычно его можно изменить из интерпретатора команд с помощью команды ulimit (в Bourne shell и KornShell, как мы вскоре покажем) или с помощью команды limit (в С shell). Оно может быть изменено и самим процессом с помощью вызова функции setrlimit (подробно описана в разделе 7.11 [21]).

    Значение PIPE_BUF обычно определено в заголовочном файле <limits.h>, но с точки зрения стандарта Posix оно представляет собой переменную, зависимую от полного имени файла. Это означает, что ее значение может меняться в зависимости от указываемого имени файла (для FIFO, поскольку каналы имен не имеют), поскольку разные имена могут относиться к разным файловым системам и эти файловые системы могут иметь различные характеристики. Это значение можно получить в момент выполнения пpoгрaммы, вызвав либо pathconf, либо fpathconf. В листинге 4.17 приведен пример, выводящий текущее значение этих двух oгрaничeний.

    Листинг 4.17. Определение значений PIPE_BUF и OPEN_MAX во время выполнения

    //pipe/pipeconf.c

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   if (argc != 2)

    6   err_quit("usage: pipeconf <pathname>");

    7   printf("PIPE_BUF = %ld. OPEN_MAX = %ld\n",

    8   Pathconf(argv[1], _PC_PIPE_BUF), Sysconf(_SC_OPEN_MAX));

    9   exit(0);

    10 }

    Вот несколько примеров, в которых указываются имена файлов, относящиеся к различным файловым системам:

    solaris % pipeconf / значения по умолчанию в Solaris 2.6

    PIPE_BUF = 5120, OPEN_MAX = 64

    solaris % pipeconf /home

    PIPE_BUF = 5120, OPEN_MAX = 64

    solaris % pipeconf /tmp

    PIPE_BUF = 5120, OPEN_MAX = 64

    alpha % pipeconf /   значения по умолчанию в Digital Unix 4.0B

    PIPE_BUF = 4096, OPEN_MAX = 4096

    alpha % pipeconf /usr

    PIPE_BUF = 4096, OPEN_MAX = 4096

    Покажем теперь, как изменить значение OPEN_MAX в Solaris, используя интерпретатор KornShell:

    solaris % ulimit –nS     отображение максимального количества дескрипторов, мягкоео граничение

    64

    solaris % ulimit –Nh     отображение максимального количества дескрипторов, жесткое ограничение

    1024

    solaris % ulimit –nS 512 установка мягкого ограничения в 512

    solaris % pipeconf /     проверка внесенных изменений

    PIPE_BUF = 5120, OPEN_MAX = 512
     

    ПРИМЕЧАНИЕ

    Хотя значение PIPE_BUF для FIFO, в принципе, может меняться в зависимости от файловой системы, к которой относится файл, на самом деле это очень редкий случай.

    В главе 2 [21] описаны функции fpathconf, pathconf и sysconf, которые предоставляют информацию о некоторых ограничениях ядра во время выполнения программы. Стандарт Posix.1 определяет 12 констант, начинающихся с _РС_, и 52, начинающихся с _SC_. Системы Digital Unix 4.0B и Solaris 2.6 расширяют последнее ограничение, определяя около 100 констант, значения которых могут быть получены в момент выполнения программы с помощью sysconf. 

    Команда getconf определяется стандартом Posix.2 и выводит значения большинства этих ограничений. Например:

    alpha % getconf OPEN_MAX

    4096

    alpha % getconf PIPE_BUF /

    4096

    4.12. Резюме

    Именованные и неименованные каналы представляют собой базовые строительные блоки для множества приложений. Программные каналы (неименованные) обычно используются в интерпретаторе команд, а также внутри программ — часто для передачи информации от дочернего процесса к родительскому. Можно исключить часть кода, относящегося к использованию каналов (piре, fork, close, exec и waitpid), используя функции popen и pclose, которые берут на себя все тонкости и запускают интерпретатор команд.

    Каналы FIFO похожи на программные каналы, но создаются вызовом mkfifo и затем могут быть открыты с помощью функции open. При открытии FIFO следует быть аккуратным, поскольку процесс может быть заблокирован, а зависит это от множества условий (см. табл. 4.1).

    Используя программные каналы и FIFO, мы создали несколько вариантов приложении типа клиент-сервер: один сервер с несколькими клиентами, последовательный и параллельный серверы. Последовательный сервер единовременно обрабатывает запрос только от одного клиента; такие серверы обычно уязвимы для атак типа «отказ в обслуживании». Параллельный сервер запускает отдельный процесс или поток для обработки запроса нового клиента.

    Одним из свойств программных каналов и FIFO является то, что данные по ним передаются в виде потоков байтов, аналогично соединению TCP. Деление этого потока на самостоятельные записи целиком предоставляется приложению. Мы увидим в следующих двух главах, что очереди сообщений автоматически расставляют границы между записями, аналогично тому, как это делается в дейтаграммах UDP.

    Упражнения

    1. При переходе от рис. 4.3 к рис. 4.4: что могло бы произойти, если бы дочерний процесс не закрывал дескриптор (close(fd[1]))? 

    2. Описывая mkfifo в разделе 4.6, мы сказали, что для открытия существующего FIFO или создания нового, если его не существует, следует вызвать mkfifо, проверить, не возвращается ли ошибка EEXIST, и вызвать open, если это происходит. Что если изменить логику и вызвать сначала open, а затем mkfifо, если FIFO не существует?

    3. Что происходит при вызове popen в листинге 4.5, если в интерпретаторе возникает ошибка?

    4. Удалите вызов open для FIFO сервера в листинге 4.10 и проверьте, приведет ли это к завершению работы сервера после отключения последнего клиента.

    5. К листингу 4.10: мы отметили, что при запуске сервера его работа блокируется при вызове первой функции open, пока FIFO не будет открыт на запись первым клиентом. Как можно обойти это таким образом, чтобы обе функции open завершали работу немедленно, а блокирование происходило при первом вызове readline?

    6. Что произойдет с клиентом в листинге 4.11, если поменять порядок вызовов open?

    7. Почему сигнал отправляется процессу, в котором канал FIFO открыт на запись, после отключения последнего читающего клиента, а не читающему клиенту после отключения последнего пишущего?

    8. Напишите небольшую тестирующую программу для определения того, возвращает ли fstat количество байтов в FIFO в качестве поля st_size структуры stat.

    9. Напишите небольшую тестирующую программу для определения того, что возвращает функция select при проверке возможности записи в дескриптор канала, у которого закрыт второй конец. 

    ГЛАВА 5

    Очереди сообщений Posix

    5.1. Введение

    Очередь сообщений можно рассматривать как связный список сообщений. Программные потоки с соответствующими разрешениями могут помещать сообщения в очередь, а потоки с другими соответствующими разрешениями могут извлекать их оттуда. Каждое сообщение представляет собой запись (вспомните сравнение потоков и сообщений в разделе 4.10), и каждому сообщению его отправителем присваивается приоритет. Для записи сообщения в очередь не требуется наличия ожидающего его процесса. Это отличает очереди сообщений от программных каналов и FIFO, в которые нельзя произвести запись, пока не появится считывающий данные процесс.

    Процесс может записать в очередь какие-то сообщения, после чего они могут быть получены другим процессом в любое время, даже если первый завершит свою работу. Мы говорим, что очереди сообщений обладают живучестью ядра (kernel persistence, раздел 1.3). Это также отличает их от программных каналов и FIFO. В главе 4 говорится о том, что данные, остающиеся в именованном или неименованном канале, сбрасываются, после того как все процессы закроют его.

    В этой главе рассматриваются очереди сообщений стандарта Posix, а в главе 6 — стандарта System V. Функции для работы с ними во многом схожи, а главные отличия заключаются в следующем:

    ■ операция считывания из очереди сообщений Posix всегда возвращает самое старое сообщение с наивысшим приоритетом, тогда как из очереди System V можно считать сообщение с произвольно указанным приоритетом;

    ■ очереди сообщений Posix позволяют отправить сигнал или запустить программный поток при помещении сообщения в пустую очередь, тогда как для очередей System V ничего подобного не предусматривается.

    Каждое сообщение в очереди состоит из следующих частей:

    ■ приоритет (беззнаковое целое, Posix) либо тип сообщения (целое типа long, System V);

    ■ длина полезной части сообщения, которая может быть нулевой;

    ■ собственно данные (если длина сообщения отлична от 0).

    Этим очереди сообщений отличаются от программных каналов и FIFO. Последние две части сообщения представляют собой байтовые потоки, в которых отсутствуют границы между сообщениями и никак не указывается их тип. Мы обсуждали этот вопрос в разделе 4.10 и добавили свой собственный интерфейс для пересылки сообщений по программным каналам и FIFO. На рис. 5.1 показан возможный вид очереди сообщений.

    Рис. 5.1. Очередь сообщений Posix, содержащая три сообщения


    Мы предполагаем реализацию через связный список, причем его заголовок содержит два атрибута очереди: максимально допустимое количество сообщений в ней и максимальный размер сообщения. Об этих атрибутах мы расскажем более подробно в разделе 5.3.

    В этой главе мы используем метод, к которому будем прибегать и в дальнейшем, рассматривая очереди сообщений, семафоры и разделяемую память. Поскольку все эти объекты IPC обладают по крайней мере живучестью ядра (вспомните раздел 1.3), мы можем писать небольшие программы, использующие эти методы для экспериментирования с ними и получения большей информации о том, как они работают. Например, мы можем написать программу, создающую очередь сообщений Posix, а потом написать другую программу, которая помещает сообщение в такую очередь, а потом еще одну, которая будет считывать сообщения из очереди. Помещая в очередь сообщения с различным приоритетом, мы увидим, в каком порядке они будут возвращаться функцией mq_receive.

    5.2. Функции mq_open, mq_close, mq_unlink

    Функция mq_open создает новую очередь сообщений либо открывает существующую:

    #include <mqueue.h>

    mqd_t mq_open(const char *name, int oflag, …

    /* mode_t mode, struct mq_attr *attr*/ );

    /* Возвращает дескриптор очереди в случае успешного завершения;

     –1 – в противном случае. */

    Требования к аргументу пате описаны в разделе 2.2.

    Аргумент oflag может принимать одно из следующих значений: O_RDONLY, O_WRONLY, O_RDWR в сочетании (логическое сложение) с O_CREAT, O_EXCL, O_NONBLOCK. Все эти флаги описаны в разделе 2.3.

    При создании новой очереди (указан флаг O_CREAT и очередь сообщений еще не существует) требуется указание аргументов mode и attr. Возможные значения аргумента mode приведены в табл. 2.3. Аргумент attr позволяет задать некоторые атрибуты очереди. Если в качестве этого аргумента задать нулевой указатель, очередь будет создана с атрибутами по умолчанию. Эти атрибуты описаны в разделе 5.3.

    Возвращаемое функцией mq_open значение называется дескриптором очереди сообщений, но оно не обязательно должно быть (и, скорее всего, не является) небольшим целым числом, как дескриптор файла или программного сокета. Это значение используется в качестве первого аргумента оставшихся семи функций для работы с очередями сообщений.

    ПРИМЕЧАНИЕ

    В системе Solaris 2.6 тип mqd_t определен как void*, а в Digital Unix 4.0B — как int. В нашем примере в разделе 5.8 эти дескрипторы трактуются как указатели на структуру. Название «дескриптор» было дано им по ошибке. 

    Открытая очередь сообщений закрывается функцией mq_close:

    #include <mqueue.h>

    int mq_close(mqd_t mqdes);

    /*Возвращает 0 в случае успешного завершения. –1 в случае ошибки */

    По действию эта функция аналогична close для открытого файла: вызвавший функцию процесс больше не может использовать дескриптор, но очередь сообщений из системы не удаляется. При завершении процесса все открытые очереди сообщений закрываются, как если бы для каждой был сделан вызов mq_close.

    Для удаления из системы имени (пате), которое использовалось в качестве аргумента при вызове mq_open, нужно использовать функцию mq_unlink:

    #include <mqueue.h>

    int mq_unlink(const char *name);

    /* Возвращает 0 в случае успешного завершения. –1 в случае ошибки */

    Для очереди сообщений (как и для файла) ведется подсчет числа процессов, в которых она открыта в данный момент, и по действию эта функция аналогична unlink для файла: имя (пате) может быть удалено из системы, даже пока число подключений к очереди отлично от нуля, но удаление очереди (в отличие от удаления имени из системы) не будет осуществлено до того, как очередь будет закрыта последним использовавшим ее процессом.

    Очереди сообщений Posix обладают по меньшей мере живучестью ядра (раздел 1.3), то есть они продолжают существовать, храня все имеющиеся в них сообщения, даже если нет процессов, в которых они были бы открыты. Очередь существует, пока она не будет удалена явно с помощью mq_unlink.

    ПРИМЕЧАНИЕ

    Мы увидим, что если очередь сообщений реализована через отображаемые в память файлы (раздел 12.2), она может обладать живучестью файловой системы, но это не является обязательным и рассчитывать на это нельзя.

    Пример: программа mqcreate1

    Поскольку очереди сообщений Posix обладают по крайней мере живучестью ядра, можно написать набор небольших программ для работы с ними — с этими программами будет проще экспериментировать. Программа из листинга 5.1[1] создает очередь сообщений, имя которой принимается в качестве аргумента командной строки.

    Листинг 5.1. Создание очереди сообщений (указан флаг O_EXCL)

    //pxmsg/mqcreate1.с

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   int с flags:

    6   mqd_t mqd;

    7   flags = O_RDWR | O_CREAT;

    8   while ((c = Getopt(argc, argv, "e")) != –1) {

    9    switch (c) {

    10   case 'e':

    11    flags |= O_EXCL;

    12    break;

    13   }

    14  }

    15  if (optind != argc – 1)

    16   err_quit("usage: mqcreate [ –e ] <name>");

    17  mqd = Mq_open(argv[optind], flags, FILE_MODE, NULL);

    18  Mq_close(mqd);

    19  exit(0);

    20 }

    В командной строке можно указать параметр –е, управляющий исключающим созданием очереди. (О функции getopt и нашей обертке Getopt рассказано более подробно в комментарии к листингу 5.5.) При возвращении функция getopt сохраняет в переменной optind индекс следующего аргумента, подлежащего обработке.

    Мы вызываем функцию mq_open, указывая ей в качестве имени IPC полученный из командной строки параметр, не обращаясь к рассмотренной нами в разделе 2.2 функции px_ipc_name. Это даст нам возможность узнать, как в данной реализации обрабатываются имена Posix IPC (мы используем для этого наши маленькие тестовые программы на протяжении всей книги).

    Ниже приведен результат работы программы в Solaris 2.6:

    solaris % mqcreate1 /temp.1234    очередь успешно создается

    solaris % ls -l /tmp/.*1234

    -rw-rw-rw– 1 rstevens other1 132632 Oct 23 17:08 /tmp/.MQDtemp.1234

    -rw-rw-rw– 1 rstevens other1      0 Oct 23 17:08 /tmp/.MQLtemp.1234

    -rw-r--r-- 1 rstevens other1      0 Oct 23 17:08 /tmp/.MQPDtemp.1234

    solaris % mqcreate1 –e /temp.1234 очередь уже создана

    mq_open error for /temp.1234: File exists

    Мы назвали эту версию программы mqcreate1, поскольку она будет улучшена в листинге 5.4, после того как мы обсудим использование атрибутов очереди. Разрешения на доступ к третьему файлу определяются константой FILE_MODE (чтение и запись для пользователя, только чтение для группы и прочих пользователей), но у двух первых файлов разрешения отличаются. Можно предположить, что в файле с буквой D в имени хранятся данные; файл с буквой L представляет собой какую-то блокировку, а в файле с буквой Р хранятся разрешения.

    В Digital Unix 4.0B мы указываем действительное имя создаваемого файла:

    alpha % mqcreate1 /tmp/myq.1234    очередь успешно создается

    alpha % ls –l /tmp/myq.1234

    -rw-r--r-- 1 rstevens system 11976 Oct 23 17:04 /tmp/myq.1234

    alpha % mqcreate1 –e /tmp/myq.1234 очередь уже создана

    mq_open error for /tmp/myq.1234: File exists

    Пример: программа mqunlink

    В листинге 5.2 приведена программа mqunlink, удаляющая из системы очередь сообщений.

    Листинг 5.2. Удаление очереди из системы: mqunlink

    //pxmsg/mqunlink.c

    1 #include "unpipc.h"

    2 int

    3 main(int argc, char **argv)

    4 {

    5  if (argc != 2)

    6   err_quit("usage: mqunlink <name>");

    7  Mq_unlink(argv[1]);

    8  exit(0);

    9 }

    С помощью этой программы мы можем удалить очередь сообщений, созданную программой mqcreate1:

    solaris % mqunlink /temp.1234

    При этом будут удалены все три файла из каталога /tmp, которые относятся к этой очереди.

    5.3. Функции mq_getattr и mq_setattr

    У каждой очереди сообщений имеются четыре атрибута, которые могут быть получены функцией mq_getattr и установлены (по отдельности) функцией mq_setattr:

    #include <mqueue.h>

    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

    int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);

    /* Обе функции возвращают 0 в случае успешного завершения; –1 – в случае возникновения ошибок */

    Структура mq_attr хранит в себе эти четыре атрибута:

    struct mq_attr {

     long mq_flags;   /* флаг очереди: 0, O_NONBLOCK */

     long mq_maxmsg;  /* максимальное количество сообщений в очереди */

     long mq_msgsize; /* максимальный размер сообщения (в байтах) */

     long mq_curmsgs; // текущее количество сообщений в очереди

    }

    Указатель на такую структуру может быть передан в качестве четвертого аргумента mq_open, что дает возможность установить параметры mq_maxmsg и mq_msgsize в момент создания очереди. Другие два поля структуры функцией mq_open игнорируются.

    Функция mq_getattr присваивает полям структуры, на которую указывает attr, текущие значения атрибутов очереди.

    Функция mq_setattr устанавливает атрибуты очереди, но фактически используется только поле mqflags той структуры, на которую указывает attr, что дает возможность сбрасывать или устанавливать флаг запрета блокировки. Другие три поля структуры игнорируются: максимальное количество сообщений в очереди и максимальный размер сообщения могут быть установлены только в момент создания очереди, а количество сообщений в очереди можно только считать, но не изменить.

    Кроме того, если указатель oattr ненулевой, возвращаются предыдущие значения атрибутов очереди (mq_flags, mq_maxmsg, mq_msgsize) и текущий статус очереди (mq_curmsgs).

    Пример: программа mqgetattr

    Программа из листинга 5.3 открывает указанную очередь сообщений и выводит значения ее атрибутов.

    Листинг 5.3. Получение и вывод значений атрибутов очереди сообщений

    //pxmsg/mqgetattr.c

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   mqd_t mqd;

    6   struct mq_attr attr;

    7   if (argc != 2)

    8    err_quit("usage: mqgetattr <name>");

    9   mqd = Mq_open(argv[1], O_RDONLY);

    10  Mq_getattr(mqd, &attr);

    11  printf ("max #msgs = %ld, max #bytes/msg = %ld, "

    12   "#currently on queue = %ld\n",

    13   attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    14  Mq_close(mqd);

    15  exit(0);

    16 }

    Мы можем создать очередь сообщений и вывести значения ее атрибутов, устанавливаемые по умолчанию:

    solaris % mqcreate1 /hello.world

    solaris % mqgetattr /hello.world

    max #msgs = 128, max #bytes/msg = 1024, #currently on queue = 0

    Вспомним размер одного из файлов очереди, созданной с использованием устанавливаемых по умолчанию значений атрибутов. Он был выведен командой ls в примере после листинга 5.1. Это значение можно получить как 128×1024+1560 = 132632.

    Добавочные 1560 байт представляют собой, скорее всего, дополнительную информацию: 8 байт на сообщение плюс добавочные 536 байт.

    Пример: программа mqcreate

    Мы можем изменить программу из листинга 5.1 таким образом, чтобы при создании очереди иметь возможность указывать максимальное количество сообщений и максимальный размер сообщения. Мы не можем указать только один из этих параметров; нужно обязательно задать оба (см., впрочем, упражнение 5.1). В листинге 5.4 приведен текст новой программы.

    Листинг 5.4. Усовершенствованная программа mqcreate

    //pxmsg/mqcreate.c

    1  #include "unpipc.h"

    2  struct mq_attr attr; /* mq_maxmsg и mq_msgsize инициализируются О */

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int с flags;

    7   mqd_t mqd;

    8   flags = O_RDWR | O_CREAT;

    9   while ((c = Getopt(argc, argv, "em:z:")) != –1) {

    10   switch (c) {

    11   case 'e':

    12    flags |= O_EXCL;

    13    break;

    14   case 'm':

    15    attr.mq_maxmsg = atol(optarg);

    16    break;

    17   case 'z':

    18    attr.mq_msgsize = atol(optarg);

    19    break;

    20   }

    21  }

    22  if (optind != argc – 1)

    23   err_quit("usage: mqcreate [ –е ] [ –m maxmsg –z msgsize ] <name>");

    24  if ((attr.mq_maxmsg != 0 && attr.mq_msgsize ==0) ||

    25   (attr.mq_maxmsg == 0 && attr.mq_msgsize != 0))

    26   err_quit("must specify both –m maxmsg and –z msgsize");

    27  mqd = Mq_open(argv[optind], flags, FILE_MODE,

    28   (attr.mq_maxmsg != 0) ? &attr : NULL);

    29  Mq_close(mqd);

    30  exit(0);

    31 }

    Параметр командной строки, требующий аргумента, указывается с помощью двоеточия (после параметров m и z в вызове getopt). В момент обработки символа параметр optarg указывает на аргумент.

    ПРИМЕЧАНИЕ

    Наша обертка Getopt вызывает стандартную библиотечную функцию getopt и завершает выполнение процесса в случае возникновения ошибок в ее работе: при появлении параметра, не указанного в третьем аргументе при вызове функции, или при наличии параметра без необходимого числового аргумента (потребность в нем указывается с помощью двоеточия после буквы параметра в третьем аргументе функции getopt). В любом случае, getopt помещает сообщение об ошибке в стандартный поток сообщений об ошибках и возвращает ошибку, что приводит к завершению работы оберткой Getopt. В двух приведенных ниже примерах ошибка обнаруживается функцией getopt:

    solaris %mqcreate –z

    mqcreate: option requires an argument – z

    solaris %mqcreate –q

    mqcreate: illegal option – q

    В следующем примере ошибка (не указан необходимый аргумент — имя очереди) обнаруживается самой программой:

    solaris %mqcreate

    usage: mqcreate [ –e ] [ –m maxmsg –z msgsize ] <name>

    Если не указан ни один из двух новых параметров, мы должны передать функции mq_open пустой указатель в качестве последнего аргумента. В противном случае мы передаем указатель на нашу структуру attr.

    Запустим теперь новую версию нашей программы в системе Solaris 2.6, указав максимальное количество сообщений 1024 и максимальный размер сообщения 8192 байт:

    solaris % mqcreate –e –m 1024 -z 8192 /foobar

    solaris % ls –al /tmp/.*foobar

    -rw-rw-rw– 1 rstevens other1 8397336 Oct 25 11:29 /tmp/.MQDfoobar

    –rw-rw-rw– 1 rstevens other1       0 Oct 25 11:29 /tmp/.MQLfoobar

    –rw-r--r-- 1 rstevens other1       0 Oct 25 11:29 /tmp/.MQPfoobar

    Размер файла, содержащего данные этой очереди, соответствует максимальному количеству сообщений в очереди и максимальному размеру сообщения (1024×8192 = 8388608), а оставшиеся 8728 байт предусматривают 8 байт информации на каждое сообщение (8×1024) плюс дополнительные 536 байт. 

    При выполнении той же программы в Digital Unix 4.0B получим:

    alpha % mqcreate –m 256 -z 2048 /tmp/bigq

    alpha % ls-l/tmp/bigq

    -rw-r--r-- 1 rstevens system 537288 Oct 25 15:38 /tmp/bigq

    В этой реализации размер очереди соответствует максимальному количеству сообщений и максимальному размеру сообщения (256×2048 = 524288), а оставшиеся 13000 байт дают возможность хранить 48 байт добавочной информации для каждого сообщения (48×256) и еще 712 байт.

    5.4. Функции mqsend и mqreceive

    Эти две функции предназначены для помещения сообщений в очередь и получения их оттуда. Каждое сообщение имеет свой приоритет, который представляет собой беззнаковое целое, не превышающее MQ_PRIO_MAX. Стандарт Posix требует, чтобы эта величина была не меньше 32.

    ПРИМЕЧАНИЕ

    В Solaris 2.6 значение MQ_PRIO_MAX равняется именно 32, но в Digital Unix 4.0B этот предел равен уже 256. В листинге 5.7 мы покажем, как получить эти значения.

    Функция mq_receive всегда возвращает старейшее в указанной очереди сообщение с максимальным приоритетом, и приоритет может быть получен вместе с содержимым сообщения и его длиной.

    ПРИМЕЧАНИЕ

    Действие mq_receive отличается от действия msgrcv в System V (раздел 6.4). Сообщения System V имеют поле type, аналогичное по смыслу приоритету, но для функции msgrcv можно указать три различных алгоритма возвращения сообщений: старейшее сообщение в очереди, старейшее сообщение с указанным типом или старейшее сообщение с типом, не превышающим указанного значения.

    #include <mqueue.h>

    int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

    /* Возвращает 0 в случае успешного завершения, –1 – в случае возникновения ошибок */

    ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

    /* Возвращает количество байтов в сообщении в случае успешного завершения. –1 – в случае ошибки */

    Первые три аргумента обеих функций аналогичны первым трем аргументам функций write и read соответственно.

    ПРИМЕЧАНИЕ

    Объявление указателя на буфер как char* кажется ошибкой — тип void* больше соответствовал бы по духу прочим функциям Posix.1. 

    Значение аргумента len функции mq_receive должно быть по крайней мере не меньше максимального размера сообщения, которое может быть помещено в очередь, то есть значения поля mq_msgsize структуры mq_attr для этой очереди. Если len оказывается меньше этой величины, немедленно возвращается ошибка EMSGSIZE. 

    ПРИМЕЧАНИЕ

    Это означает, что большинству приложений, использующих очереди сообщений Posix, придется вызывать mq_getattr после открытия очереди для определения максимального размера сообщения, а затем выделять память под один или несколько буферов чтения этого размера. Требование, чтобы буфер был больше по размеру, чем максимально возможное сообщение, позволяет функции mq_receive не возвращать уведомление о том, что размер письма превышает объем буфера. Сравните это, например, с флагом MSG_NOERROR и ошибкой E2BIG для очередей сообщений System V (раздел 6.4) и флагом MSG_TRUNC для функции recvmsg, используемой с дейтаграммами UDP (раздел 13.5 [24]). 

    Аргумент prio устанавливает приоритет сообщения для mq_send, его значение должно быть меньше MQ_PRIO_MAX. Если при вызове mq_receive priop является ненулевым указателем, в нем сохраняется приоритет возвращаемого сообщения. Если приложению не требуется использование различных приоритетов сообщений, можно указывать его равным нулю для mq_send и передавать mq_receive нулевой указатель в качестве последнего аргумента.

    ПРИМЕЧАНИЕ

    Разрешена передача сообщений нулевой длины. Это тот случай, когда важно не то, о чем говорится в стандарте (Posix.1), а то, о чем в нем не говорится: нигде не запрещена передача сообщений нулевой длины. Функция mq_receive возвращает количество байтов в сообщении (в случае успешного завершения работы) или –1 в случае возникновения ошибок, так что 0 обозначает сообщение нулевой длины. 

    Очередям сообщений Posix и System V не хватает полезной функции: получатель не может определить отправителя сообщения. Эта информация могла бы пригодиться многим приложениям. К сожалению, большинство механизмов передачи сообщений IPC не позволяют определить отправителя сообщений. В разделе 15.5 мы расскажем, как эта возможность обеспечивается для дверей. В разделе 14.8 [24] описано, как эта возможность обеспечивается в BSD/OS для доменных сокетов Unix. В разделе 15.3.1 [21] описано, как SVR4 передает информацию об отправителе по каналу при передаче по нему дескриптора. В настоящее время методы BSD/OS широко используются, и хотя реализация SVR4 является частью стандарта Unix 98, она требует передачи дескриптора по каналу, что обычно является более дорогостоящей операцией, чем просто передача данных. Мы не можем предоставить отправителю возможность передать информацию о себе (например, эффективный идентификатор пользователя) в самом сообщении, поскольку мы не можем быть уверены, что эта информация окажется истинной. Хотя разрешения доступа к очереди сообщений определяют, имеет ли право отправитель помещать в нее сообщения, это все равно не дает однозначности. Существует возможность создавать одну очередь для каждого отправителя (о которой рассказывается в связи с очередями System V в разделе 6.8), но это плохо подходит для больших приложений. Наконец, если функции для работы с очередями сообщений реализованы как пользовательские функции (как мы показываем в разделе 5.8), а не как часть ядра, мы не можем доверять никакой информации об отправителе, передаваемой с сообщением, так как ее легко подделать. 

    Пример: программа mqsend

    В листинге 5.5 приведен текст программы, помещающей сообщение в очередь.

    Листинг 5.5. Программа mqsend

    //pxmsg/mqsend.c

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   mqd_t mqd;

    6   void *ptr;

    7   size_t len;

    8   uint_t prio;

    9   if (argc != 4)

    10   err_quit("usage: mqsend <name> <#bytes> <priority>");

    11  len = atoi(argv[2]);

    12  prio = atoi(argv[3]);

    13  mqd = Mq_open(argv[1], O_WRONLY);

    14  ptr = Calloc(len, sizeof (char));

    15  Mq_send(mqd, ptr, len, prio);

    16  exit(0);

    17 }

    И размер сообщения, и его приоритет являются обязательными аргументами командной строки. Буфер под сообщение выделяется функцией callос, которая инициализирует его нулем.

    Пример: программа mqreceive

    Программа в листинге 5.6 считывает сообщение из очереди.

    Листинг 5.6. Программа mqreceive

    //pxmsg/mqreceive.с

    1  #include "unpipc.h"

    2  int

    3  main(int argc, char **argv)

    4  {

    5   int с flags;

    6   mqd_t mqd;

    7   ssize_t n;

    8   uint_t prio;

    9   void *buff;

    10  struct mq_attr attr;

    11  flags = O_RDONLY;

    12  while ((c = Getopt(argc, argv, "n")) != –1) {

    13   switch (c) {

    14   case 'n':

    15    flags |= O_NONBLOCK;

    16    break;

    17   }

    18  }

    19  if (optind != argc – 1)

    20   err_quit("usage: mqreceive [ –n ] <name>");

    21  mqd = Mq_open(argv[optind], flags);

    22  Mq_getattr(mqd, &attr);

    23  buff = Malloc(attr.mqjnsgsize);

    24  n = Mq_receive(raqd, buff, attr.mq_msgsize, &prio);

    25  printf("read %ld bytes, priority = %u\n", (long) n, prio);

    26  exit(0);

    27 }

    Параметр -n запрещает блокировку

    14-17 Параметр командной строки –n отключает блокировку. При этом программа возвращает сообщение об ошибке, если в очереди нет сообщений.

    Открытие очереди и получение атрибутов

    21-25 Мы открываем очередь и получаем ее атрибуты, вызвав mq_getattr. Нам обязательно нужно определить максимальный размер сообщения, потому что мы должны выделить буфер подходящего размера, чтобы вызвать mq_receive. Программа выводит размер считываемого сообщения и его приоритет.

    ПРИМЕЧАНИЕ

    Поскольку n имеет тип size_t и мы не знаем, int это или long, мы преобразуем эту величину к типу long и используем строку формата %ld. В 64-разрядной реализации int будет 32-разрядным целым, a long и size_t будут 64-разрядными целыми.

    Воспользуемся обеими программами, чтобы проиллюстрировать использование поля приоритета.

    solaris % mqcreate /test1

    solaris % mqgetattr /test1        создаем очередь и смотрим на ее атрибуты

    max #msgs = 128, max #bytes/msg = 1024, #currently on queue = 0

    solaris % mqsend /test1 100 99999 отправка с некорректным значением приоритета

    mq_send error: Invalid argument

    solaris % mqsend /test1 100 6     100 байт, приоритет 6

    solaris % mqsend /test1 50 18     50 байт, приоритет 18

    solaris % mqsend /test1 33 18     33 байт, приоритет 18

    solaris % mqreceive /test1

    read 50 bytes, priority = 18         возвращается старейшее сообщение с

    solaris % mqreceive /test1        наивысшим приоритетом

    read 33 bytes, priority = 18

    Solaris % mqreceive /test1

    read 100 bytes, priority = 6

    Solaris % mqreceive –n /test1     отключаем блокировку и убеждаемся, что очередь пуста

    mq_receive error: Resource temporarily unavailable

    Мы видим, что mq_receive действительно возвращает старейшее сообщение с наивысшим приоритетом. 

    5.5. Ограничения очередей сообщений

    Мы уже сталкивались с двумя ограничениями, устанавливаемыми для любой очереди в момент ее создания:

    ■ mq_maxmsg — максимальное количество сообщений в очереди;

    ■ mq_msgsize — максимальный размер сообщения.

    Не существует каких-либо ограничений на эти значения, хотя в рассматриваемых реализациях необходимо наличие в файловой системе места для файла требуемого размера. Кроме того, ограничения на эти величины могут накладываться реализацией виртуальной памяти (см. упражнение 5.5).

    Другие два ограничения определяются реализацией:

    ■ MQ_OPEN_MAX — максимальное количество очередей сообщений, которые могут быть одновременно открыты каким-либо процессом (Posix требует, чтобы эта величина была не меньше 8);

    ■ MQ_PRIO_MAX — максимальное значение приоритета плюс один (Posix требует, чтобы эта величина была не меньше 32).

    Эти две константы часто определяются в заголовочном файле <unistd.h> и могут быть получены во время выполнения программы вызовом функции sysconf, как мы покажем далее.

    Пример: программа mqsysconf

    Программа в листинге 5.7 вызывает функцию sysconf и выводит два ограничения на очереди сообщений, определяемые реализацией.

    Листинг 5.7. Получение ограничений очередей с помощью sysconf

    //pxmsg/mqsysconf.с

    1 #include "unpipc.h"

    2 int

    3 main(int argc, char **argv)

    4 {

    5  printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n",

    6  Sysconf(_SC_MQ_OPEN_MAX), Sysconf(_SC_MQ_PRIO_MAX));

    7  exit(0);

    8 }

    Запустив эту программу в наших двух операционных системах, получим:

    solaris % mqsysconf

    MQ_OPEN_MAX = 32, MQ_PRIO_MAX = 32

    alpha % mqsysconf

    MQ_OPEN_MAX = 64, MQ_PRIO_MAX = 256

    5.6. Функция mq_notify

    Один из недостатков очередей сообщений System V, как мы увидим в главе 6, заключается в невозможности уведомить процесс о том, что в очередь было помещено сообщение. Мы можем заблокировать процесс при вызове msgrcv, но тогда мы не сможем выполнять другие действия во время ожидания сообщения. Если мы укажем флаг отключения блокировки при вызове msgrcv (IPC_NOWAIT), процесс не будет заблокирован, но нам придется регулярно вызывать эту функцию, чтобы получить сообщение, когда оно будет отправлено. Мы уже говорили, что такая процедура называется опросом и на нее тратится лишнее время. Нужно, чтобы система сама уведомляла процесс о том, что в пустую очередь было помещено новое сообщение.

    ПРИМЕЧАНИЕ

    В этом и всех последующих разделах данной главы обсуждаются более сложные вопросы, которые могут быть пропущены при первом чтении. 

    Очереди сообщений Posix допускают асинхронное уведомление о событии, когда сообщение помещается в очередь. Это уведомление может быть реализовано либо отправкой сигнала, либо созданием программного потока для выполнения указанной функции.

    Мы включаем режим уведомления с помощью функции mq_notify:

    #include <mqueue.h>

    int mq_notify(mqd_t mqdes, const struct sigevent *notification);

    /* Возвращает 0 в случае успешного выполнения, –1 – в случае ошибки */

    Эта функция включает и выключает асинхронное уведомление о событии для указанной очереди. Структура sigevent впервые появилась в стандарте Posix.1 для сигналов реального времени, о которых более подробно рассказано в следующем разделе. Эта структура и все новые константы, относящиеся к сигналам, определены в заголовочном файле <signal.h>:

    union sigval {

     int sival_int; /* целое значение */

     void *sival_ptr; /* указатель */

    };


    struct sigevent {

     int sigev_notify; /* SIGEV_{NONE,SIGNAL,THREAD} */

     int sigev_signo; /* номер сигнала, если SIGEV_SIGNAL */

     union sigval sigev_value; /* передается обработчику сигнала или потоку */

    /* Следующие два поля определены для SIGEV_THREAD */

    void (*sigev_notify_function) (union sigval);

    pthread_attr_t *sigev_notify_attributes;

    Мы вскоре приведем несколько примеров различных вариантов использования уведомления, но о правилах, действующих для этой функции всегда, можно упомянуть уже сейчас.

    1. Если аргумент notification ненулевой, процесс ожидает уведомления при поступлении нового сообщения в указанную очередь, пустую на момент его поступления. Мы говорим, что процесс регистрируется на уведомление для данной очереди.

    2. Если аргумент notification представляет собой нулевой указатель и процесс уже зарегистрирован на уведомление для данной очереди, то уведомление для него отключается.

    3. Только один процесс может быть зарегистрирован на уведомление для любой данной очереди в любой момент.

    4.  При помещении сообщения в пустую очередь, для которой имеется зарегистрированный на уведомление процесс, оно будет отправлено только в том случае, если нет заблокированных в вызове mq_receive для этой очереди процессов. Таким образом, блокировка в вызове mq_receive имеет приоритет перед любой регистрацией на уведомление.

    5. При отправке уведомления зарегистрированному процессу регистрация снимается. Процесс должен зарегистрироваться снова (если в этом есть необходимость), вызвав mq_notify еще раз.

    ПРИМЕЧАНИЕ

    С сигналами в Unix всегда была связана одна проблема: действие сигнала сбрасывалось на установленное по умолчанию каждый раз при отправке сигнала (раздел 10.4 [21]). Обычно первой функцией, вызываемой обработчиком сигнала, была signal, переопределявшая обработчик. Это создавало небольшой временной промежуток между отправкой сигнала и переопределением обработчика, в который процесс мог быть завершен при повторном появлении того же сигнала. На первый взгляд может показаться, что та же проблема должна возникать и при использовании mq_notify, поскольку процесс должен перерегистрироваться каждый раз после появления уведомления. Однако очереди сообщений отличаются по своим свойствам от сигналов, поскольку необходимость отправки уведомления не может возникнуть, пока очередь не будет пуста. Следовательно, необходимо аккуратно перерегистрироваться на получение уведомления до считывания пришедшего сообщения из очереди.

    Пример: простая программа с уведомлением

    Прежде чем углубляться в тонкости сигналов реального времени и потоков Posix, мы напишем простейшую программу, включающую отправку сигнала SI6USR1 при помещении сообщения в пустую очередь. Эта программа приведена в листинге 5.8, и мы отметим, что она содержит ошибку, о которой мы вскоре поговорим подробно.

    Листинг 5.8. Отправка sigusr1 при помещении сообщения в пустую очередь (неправильная версия программы)

    //pxmsg/mqnotifysigl.c

    1  #include "unpipc.h"

    2  mqd_t mqd;

    3  void *buff;

    4  struct mq_attr attr;

    5  struct sigevent sigev;

    6  static void sig_usrl(int);


    7  int

    8  main(int argc, char **argv)

    9  {

    10  if (argc != 2)

    11   err_quit("usage: mqnotifysig1 <name>");

    12  /* открываем очередь, получаем атрибуты, выделяем буфер */

    13  mqd = Mq_open(argv[1], O_RDONLY);

    14  Mq_getattr(mqd, &attr);

    15  buff = Malloc(attr.mq_msgsize);

    16  /* устанавливаем обработчик, включаем уведомление */

    17  Signal(SIGUSR1, sig_usr1);

    18  sigev.sigev_notify = SIGEV_SIGNAL;

    19  sigev.sigev_signo = SIGUSR1;

    20  Mq_notify(mqd, &sigev);

    21  for (;;)

    22   pause(); /* все делает обработчик */

    23  exit(0);

    24 }


    25 static void

    26 sig_usr1(int signo)

    27 {

    28  ssize_t n;

    29  Mq_notify(mqd, &sigev); /* сначала перерегистрируемся */

    30  n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);

    31  printf("SIGUSR1 received, read %ld bytes\n", (long) n);

    32  return;

    33 }

    Объявление глобальных переменных

    2-6 Мы объявляем несколько глобальных переменных, используемых совместно функцией main и нашим обработчиком сигнала (sig_usr1).

    Открытие очереди, получение атрибутов, выделение буфера чтения

    12-15 Мы открываем очередь сообщений, получаем ее атрибуты и выделяем буфер считывания соответствующего размера.

    Установка обработчика сигнала, включение уведомления

    16-20 Сначала мы устанавливаем свой обработчик для сигнала SIGUSR1. Мы присваиваем полю sigev_notify структуры sigevent значение SIGEV_SIGNAL, что говорит системе о необходимости отправки сигнала, когда очередь из пустой становится непустой. Полю sigev_signo присваивается значение, соответствующее тому сигналу, который мы хотим получить. Затем вызывается функция mq_notify.

    Бесконечный цикл

    Функция main после этого зацикливается, и процесс приостанавливается при вызове pause, возвращающей –1 при получении сигнала.

    Получение сигнала, считывание сообщения

    Обработчик сигнала вызывает mq_notify для перерегистрации, считывает сообщение и выводит его длину. В этой программе мы игнорируем приоритет полученного сообщения. 

    ПРИМЕЧАНИЕ

    Оператор return в конце sig_usr1 не требуется, поскольку возвращаемое значение отсутствует, а конец текста функции неявно предусматривает возвращение в вызвавшую программу. Тем не менее автор всегда записывает return явно, чтобы указать, что возвращение из этой функции может происходит с особенностями. Например, может произойти преждевременный возврат (с ошибкой EINTR) в потоке, обрабатывающем сигнал. 

    Запустим теперь эту программу в одном из окон

    solaris % mqcreate /test1

    solaris % mqnotifysig1 /test1

    и затем выполним следующую команду в другом окне

    solaris % mqsend /test1 50 16

    Как и ожидалось, программа mqnotifysig1 выведет сообщение: SIGUSR1 received, read 50 bytes.

    Мы можем проверить, что только один процесс может быть зарегистрирован на получение уведомления в любой момент, запустив копию пpoгрaммы в другом окне:

    solaris % mqnotifysig1 /test1

    mq_notify error: Device busy

    Это сообщение соответствует коду ошибки EBUSY.

    Сигналы Posix: функции типа Async-Signal-Safe

    Недостаток пpoгрaммы из листинга 5.8 в том, что она вызывает mq_notify, mq_receive и printf из обработчика сигнала. Ни одну из этих функций вызывать оттуда не следует.

    Функции, которые могут быть вызваны из обработчика сигнала, относятся к группе, называемой, согласно Posix, async-signal-safe functions (функции, обеспечивающие безопасную обработку асинхронных сигналов). В табл. 5.1 приведены эти функции по стандарту Posix вместе с некоторыми дополнительными, появившимися только в Unix 98.

    Функции, которых нет в этом списке, не должны вызываться из обработчика сигнала. Обратите внимание, что в списке отсутствуют стандартные функции библиотеки ввода-вывода и функции pthread_XXX для работы с потоками. Из всех функций IPC, рассматриваемых в этой книге, в список попали только sem_post, read и write (подразумевается, что последние две используются с программными каналами и FIFO).

    ПРИМЕЧАНИЕ

    Стандарт ANSI С указывает четыре функции, которые могут быть вызваны из обработчика сигналов: abort, exit, longjmp, signal. Первые три отсутствуют в списке функций async-signal-safe стандарта Unix 98. 


    Таблица 5.1. Функции, относящиеся к группе async-signal-safe

    access        fpathconf rename      sysconf

    aio_return    fstat     rmdir       tcdrain

    aio_suspend   fsync     sem_post    tcflow 

    alarm         getegid   setgid      tcflush

    cfgetispeed   geteuid   setpgid     tcgetattr

    cfgetospeed   getgid    setsid      tcgetgrp

    cfsetispeed   getgroups setuid      tcsendbreak

    cfsetospeed   getpgrp   sigaction   tcsetattr

    chdir         getpid    sigaddset   tcsetpgrp

    chmod         getppid   sigdelset   time

    chown         getuid    sigemptyset timer_getoverrun

    clock_gettime kill      sigfillset  timer_gettime

    close         link      sigismember timer_settime

    creat         lseek     signal      times

    dup           mkdir     sigpause    umask

    dup2          mkfifo    sigpending  uname

    execle        open      sigprocmask unlink

    execve        pathconf  sigqueue    utime

    _exit         pause     sigset      wait

    fcntl         pipe      sigsuspend  waitpid

    fdatasync     raise     sleep       write

    fork          read      stat

    Пример: уведомление сигналом

    Одним из способов исключения вызова каких-либо функций из обработчика сигнала является установка этим обработчиком глобального флага, который проверяется программным потоком для получения информации о приходе сообщения. В листинге 5.9 иллюстрируется этот метод, хотя новая программа также содержит ошибку, но уже другую, о которой мы вскоре поговорим подробнее.

    Глобальная переменная

    2 Поскольку единственное действие, выполняемое обработчиком сигнала, заключается в присваивании ненулевого значения флагу mqflag, глобальным переменным из листинга 5.8 уже не нужно являться таковыми. Уменьшение количества глобальных переменных — это всегда благо, особенно при использовании программных потоков.

    Открытие очереди сообщений

    15-18 Мы открываем очередь сообщений, получаем ее атрибуты и выделяем буфер считывания.

    Инициализация наборов сигналов

    19-22 Мы инициализируем три набора сигналов и устанавливаем бит для сигнала SIGUSR1 в наборе newmask.

    Установка обработчика сигнала, включение уведомления

    23-27 Мы устанавливаем обработчик сигнала для SIGUSR1, присваиваем значения полям структуры sigevent и вызываем mq_notify. 

    Листинг 5.9. Обработчик сигнала устанавливает флаг для главного потока (неправильная версия)

    //pxmsg/mqnotifysig2.c

    1  #include "unpipc.h"

    2  volatile sig_atomic_t mqflag; /* ненулевое значение устанавливается обработчиком */

    3  static void sig_usrl(int);


    4  int

    5  main(int argc, char **argv)

    6  {

    7   mqd_t mqd;

    8   void *buff;

    9   ssize_t n;

    10  sigset_t zeromask, newmask, oldmask;

    11  struct mq_attr attr;

    12  struct sigevent sigev;

    13  if (argc != 2)

    14   err_quit("usage: mqnotifysig2 <name>");

    15  /* открытие очереди, получение атрибутов, выделение буфера */

    16  mqd = Mq_open(argv[1], O_RDONLY);

    17  Mq_getattr(mqd, &attr);

    18  buff = Malloc(attr.mq_msgsize);

    19  Sigemptyset(&zeromask); /* сигналы не блокируются */

    20  Sigemptyset(&newmask);

    21  Sigemptyset(&oldmask);

    22  Sigaddset(&newmask, SIGUSR1);

    23  /* установка обработчика, включение уведомления */

    24  Signal(SIGUSR1, sig_usr1);

    25  sigev.sigev_notify = SIGEV_SIGNAL;

    26  sigev.sigev_signo = SIGUSR1;

    27  Mq_notify(mqd, &sigev);

    28  for (;;) {

    29   Sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* блокируем SIGUSR1 */

    30   while (mqflag == 0)

    31    sigsuspend(&zeromask);

    32   mqflag = 0; /* сброс флага */

    33   Mq_notify(mqd, &sigev); /* перерегистрируемся */

    34   n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);

    35   printf("read %ld bytes\n", (long) n);

    36   Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */

    37  }

    38  exit(0);

    39 }


    40 static void

    41 sig_usr1(int signo)

    42 {

    43  mqflag = 1;

    44  return;

    45 }
     

    Ожидание установки флага обработчиком

    28-32 Мы вызываем sigprocmask, чтобы заблокировать SIGUSR1, сохраняя текущую маску сигналов в oldmask. Затем мы в цикле проверяем значение глобального флага mqflag, ожидая, когда обработчик сигнала установит его в ненулевое значение. Пока значение этого флага равно нулю, мы вызываем sigsuspend, что автоматически приостанавливает вызывающий поток и устанавливает его маску в zeromask (сигналы не блокируются). Раздел 10.16 [21] рассказывает о функции sigsuspend более подробно. Также там объясняются причины, по которым мы должны проверять значение переменной mqflag только при заблокированном сигнале SIGUSR1. Каждый раз при выходе из sigsuspend сигнал SIGUSR1 блокируется.

    Перерегистрация и считывание сообщения

    33-36 Когда флаг mqflag принимает ненулевое значение, мы регистрируемся на получение уведомления заново и считываем сообщение из очереди. Затем мы разблокируем сигнал SIGUSR1 и возвращаемся к началу цикла.

    Мы уже говорили, что в этой версии программы также присутствует ошибка. Посмотрим, что произойдет, если в очередь попадут два сообщения, прежде чем будет считано первое из них. Мы можем имитировать это, добавив sleep перед вызовом mq_notify. Проблема тут в том, что уведомление отсылается только в том случае, когда сообщение помещается в пустую очередь. Если в очередь поступают два сообщения, прежде чем первое будет считано, то отсылается только одно уведомление. Тогда мы считываем первое сообщение и вызываем sigsuspend, ожидая поступления еще одного. А в это время в очереди уже имеется сообщение, которое мы должны прочитать, но которое мы никогда не прочтем.

    Пример: уведомление сигналом с отключением блокировки

    Исправить описанную выше ошибку можно, отключив блокировку операции считывания сообщений. Листинг 5.10 содержит измененную версию программы из листинга 5.9. Новая программа считывает сообщения в неблокируемом режиме.

    Листинг 5.10. Использование уведомления с помощью сигнала для считывания сообщения из очереди сообщений Posix

    //pxmsg/mqnotifysig3.с

    1  #include "unpipc.h"

    2  volatile sig_atomic_t mqflag; /* ненулевое значение устанавливается обработчиком сигнала */

    3  static void sig_usr1(int);


    4  int

    5  main(int argc, char **argv)

    6  {

    7   mqd_t mqd;

    8   void *buff;

    9   ssize_t n;

    10  sigset_t zeromask, newmask, oldmask;

    11  struct mq_attr attr;

    12  struct sigevent sigev;

    13  if (argc != 2)

    14   err_quit("usage: mqnotifysig3 <name>");

    15  /* открытие очереди, получение атрибутов, выделение буфера */

    16  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

    17  Mq_getattr(mqd, &attr);

    18  buff = Malloc(attr.mq_msgsize);

    19  Sigemptyset(&zeromask); /* сигналы не блокируются */

    20  Sigemptyset(&newmask);

    21  Sigemptyset(&oldmask);

    22  Sigaddset(&newmask, SIGUSR1);

    23  /* установка обработчика, включение уведомления */

    24  Signal(SIGUSR1, sig_usr1);

    25  sigev.sigev_notify = SIGEV_SIGNAL;

    26  sigev.sigev_signo = SIGUSR1;

    27  Mq_notify(mqd, &sigev);

    28  for (;;) {

    29   Sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* блокируем SIGUSR1 */

    30   while (mqflag == 0)

    31    sigsuspend(&zeromask);

    32   mqflag = 0; /* сброс флага */

    33   Mq_notify(mqd, &sigev); /* перерегистрируемся */

    34   while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

    35    printf("read $ld bytes\n", (long) n);

    36   }

    37   if (errno != EAGAIN)

    38    err_sys("mq_receive error");

    39   Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */

    40  }

    41  exit(0);

    42 }


    43 static void

    44 sig_usr1(int signo)

    45 {

    46  mqflag = 1;

    47  return;

    48 }

    Открытие очереди сообщений в режиме отключенной блокировки

    15-18 Первое изменение в программе: при открытии очереди сообщений указывается флаг O_NONBLOCK.

    Считывание всех сообщений из очереди

    34-38 Другое изменение: mq_receive вызывается в цикле, считывая все сообщения в очереди, пока не будет возвращена ошибка с кодом EAGAIN, означающая отсутствие сообщений в очереди. 

    Пример: уведомление с использованием sigwait вместо обработчика

    Хотя программа из предыдущего примера работает правильно, можно повысить ее эффективность. Программа использует sigsuspend для блокировки в ожидании прихода сообщения. При помещении сообщения в пустую очередь вызывается сигнал, основной поток останавливается, запускается обработчик, который устанавливает флаг mqflag, затем снова запускается главный поток, он обнаруживает, что значение mqflag отлично от нуля, и считывает сообщение. Более простой и эффективный подход заключается в блокировании в функции, ожидающей получения сигнала, что не требует вызова обработчика только для установки флага. Эта возможность предоставляется функцией sigwait:

    #include <signal.h>

    int sigwait(const sigset_t *set, int *sig);

    /* Возвращает 0 в случае успешного завершения, –1 – в случае ошибки */

    Перед вызовом sigwait мы блокируем некоторые сигналы. Набор блокируемых сигналов указывается в качестве аргумента set. Функция sigwait блокируется, пока не придет по крайней мере один из этих сигналов. Когда он будет получен, функция возвратит его. Значение этого сигнала сохраняется в указателе sig, а функция возвращает значение 0. Это называется синхронным ожиданием асинхронного события: мы используем сигнал, но не пользуемся асинхронным обработчиком сигнала.

    В листинге 5.11 приведен текст программы, использующей mq_notifу и sigwait.

    Листинг 5.11. Использование mq_notify совместно с sigwait

    //pxmsg/mqnotifysig4.c

    1  #include "unpipc.h"


    2  int

    3  main(int argc, char **argv)

    4  {

    5   int signo;

    6   mqd_t mqd;

    7   void *buff;

    8   ssize_t n;

    9   sigset_t newmask;

    10  struct mq_attr attr;

    11  struct sigevent sigev;

    12  if (argc != 2)

    13   err_quit("usage: mqnotifysig4 <name>");

    14  /* открытие очереди, получение атрибутов, выделение буфера */

    15  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

    16  Mq_getattr(mqd, &attr);

    17  buff = Malloc(attr.mq_msgsize);

    18  Sigemptyset(&newmask);

    19  Sigaddset(&newmask, SIGUSR1);

    20  Sigprocmask(SIG_BLOCK, &newmask, NULL); /* блокируем SIGUSR1 */

    21  /* установка обработчика, включение уведомления */

    22  sigev.sigev_notify = SIGEV_SIGNAL;

    23  sigev.sigev_signo = SIGUSR1;

    24  Mq_notify(mqd, &sigev);

    25  for (;;) {

    26   Sigwait(&newmask, &signo);

    27   if (signo == SIGUSR1) {

    28    Mq_notify(mqd, &sigev); /* перерегистрируемся */

    29    while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

    30     printf("read %ld bytes\n", (long) n);

    31    }

    32    if (errno != EAGAIN)

    33     err_sys("mq_receive error");

    34   }

    35  }

    36  exit(0);

    37 }

    Инициализация набора сигналов и блокировка SIGUSR1

    18-20 Инициализируется один набор сигналов, содержащий только SIGUSR1, а затем этот сигнал блокируется sigprocmask.

    Ожидание сигнала

    26-34 Мы блокируем выполнение программы и ждем прихода сигнала, вызвав sigwait. При получении сигнала SIGUSR1 мы перерегистрируемся на уведомление и считываем все доступные сообщения.

    ПРИМЕЧАНИЕ

    Функция sigwait часто используется в многопоточных процессах. Действительно, глядя на прототип функции, мы можем заметить, что возвращаемое значение будет 0 или одной из ошибок Еххх, что весьма похоже на функции Pthread. Однако в многопоточном процессе нельзя пользоваться sigprocmask — вместо нее следует вызывать pthread_ sigmask, которая изменяет маску сигналов только для вызвавшего ее потока. Аргументы pthread_sigmask совпадают с аргументами sigprocmask.

    Существуют два варианта функции sigwait: sigwaitinfo возвращает структуру siginfo_t (которая будет определена в следующем разделе) и предназначена для использования с надежными сигналами; функция sigtimedwait также возвращает структуру siginfo_t и позволяет вызывающему процессу установить ограничение по времени на ожидание.

    Большая часть книг о многопоточном программировании, таких как [3], рекомендуют пользоваться sigwait для обработки всех сигналов в многопоточном процессе и не использовать асинхронные обработчики. 

    Пример: очереди сообщений Posix и функция select

    Дескриптор очереди сообщений (переменная типа mqd_t) не является «обычным» дескриптором и не может использоваться с функциями select и poll (глава 6 [24]). Тем не менее их можно использовать вместе с каналом и функцией mq_notify. (Аналогичный метод применен в разделе 6.9 для очередей System V, где создается дочерний процесс и канал связи.) Прежде всего обратите внимание, что, согласно табл. 5.1, функция write принадлежит к группе async-signal-safe, поэтому она может вызываться из обработчика сигналов. Программа приведена в листинге 5.12.

    Листинг 5.12. Использование уведомления с помощью сигнала и канала

    //pxmsg/mqnotifysig5.c

    1  #include "unpipc.h"

    2  int pipefd[2];

    3  static void sig_usr1(int);


    4  int

    5  main(int argc, char **argv)

    6  {

    7   int nfds;

    8   char c;

    9   fd_set rset;

    10  mqd_t mqd;

    11  void *buff;

    12  ssize_t n;

    13  struct mq_attr attr;

    14  struct sigevent sigev;

    15  if (argc != 2)

    16   err_quit("usage: mqnotifysig5 <name>");

    17  /* открытие очереди, получение атрибутов, выделение буфера */

    18  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

    19  Mq_getattr(mqd, &attr);

    20  buff = Malloc(attr.mq_msgsize);

    21  Pipe(pipefd);

    22  /* установка обработчика, включение уведомления */

    23  Signal(SIGUSR1, sig_usr1);

    24  sigev.sigev_notify = SIGEV_SIGNAL;

    25  sigev.sigev_signo = SIGUSR1;

    26  Mq_notify(mqd, &sigev);

    27  FD_ZERO(&rset);

    28  for (;;) {

    29   FD_SET(pipefd[0], &rset);

    30   nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);

    31   if (FD_ISSET(pipefd[0], &rset)) {

    32    Read(pipefd[0], &c, 1);

    33    Mq_notify(mqd, &sigev); /* перерегистрируемся */

    34    while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

    35     printf("read %ld bytes\n", (long) n);

    36    }

    37    if (errno != EAGAIN)

    38     err_sys("mq_receive error");

    39   }

    40  }

    41  exit(0);

    42 }


    43 static void

    44 sig_usr1(int signo)

    45 {

    46  Write(pipefd[1], "", 1); /* один байт – 0 */

    47  return;

    48 }

    Создание канала

    21 Мы создаем канал, в который обработчик сигнала произведет запись, когда будет получено уведомление о поступлении сообщения в очередь. Это пример использования канала внутри одного процесса.

    Вызов select

    27-40 Мы инициализируем набор дескрипторов rset и при каждом проходе цикла включаем бит, соответствующий дескриптору pipefd[0] (открытый на считывание конец канала). Затем мы вызываем функцию select, ожидая получения единственного дескриптора, хотя в типичном приложении именно здесь осуществлялось бы размножение дескрипторов одного из концов канала. Когда появляется возможность читать из канала, мы перерегистрируемся на уведомление и считываем все доступные сообщения.

    Обработчик сигнала

    43-48 Единственное, что делает обработчик сигнала, — записывает в канал 1 байт. Как мы уже отмечали, эта операция относится к разрешенным для асинхронных обработчиков.

    Пример: запуск нового потока

    Альтернативой снятию блокировки сигналом является присваивание sigev_notify значения SIGEV_THREAD, что приводит к созданию нового потока. Функция, указанная в sigev_notify_function, вызывается с параметром sigev_value. Атрибуты нового канала указываются переменной sigev_notify_attributes, которая может быть и нулевым указателем, если нас устраивают устанавливаемые по умолчанию атрибуты. Текст программы приведен в листинге 5.13.

    Листинг 5.13. Функция mq_notify, запускающая новый программный поток

    //pxmsg/mqnotifythread1.с

    1  #include "unpipc.h"

    2  mqd_t mqd;

    3  struct mq_attr attr;

    4  struct sigevent sigev;

    5  static void notify_thread(union sigval); /* наш поток */


    6  int

    7  main(int argc, char **argv)

    8  {

    9   if (argc != 2)

    10   err_quit("usage: mqnotifythread1 <name>");

    11  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

    12  Mq_getattr(mqd, &attr);

    13  sigev.sigev_notify = SIGEV_THREAD;

    14  sigev.sigev_value.sival_ptr = NULL;

    15  sigev.sigev_notify_function = notify_thread;

    16  sigev.sigev_notify_attributes = NULL;

    17  Mq_notify(mqd, &sigev);

    18  for (;;)

    19   pause(); /* новый поток делает все */

    20  exit(0);

    21 }


    22 static void

    23 notify_thread(union sigval arg)

    24 {

    25  ssize_t n;

    26  void *buff;

    27  printf("notify_thread started\n");

    28  buff = Malloc(attr.mq_msgsize);

    29  Mq_notify(mqd, &sigev); /* перерегистрируемся */

    30  while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

    31   printf("read %ld bytes\n", (long) n);

    32  }

    33  if (errno != EAGAIN)

    34   err_sys("mq_receive error");

    35  free(buff);

    36  pthread_exit(NULL);

    37 }

    Мы задаем нулевой указатель в качестве аргумента нового потока (sigev_value), поэтому функции start нового потока ничего не передается. Мы могли бы передать указатель на дескриптор, вместо того чтобы декларировать его как глобальный, но новому потоку все равно нужно получать атрибуты очереди сообщений и структуру sigev (для перерегистрации). Мы также указываем нулевой указатель в качестве атрибутов нового потока, поэтому используются установки по умолчанию. Новые потоки создаются как неприсоединенные (detached threads).

    ПРИМЕЧАНИЕ

    К сожалению, ни одна из использовавшихся для проверки примеров систем (Solaris 2.6 и Digital Unix 4.0B) не поддерживает SIGEV_THREAD. Обе они допускают только два значения sigev_notify: SIGEV_NONE и SIGEV_SIGNAL.

    5.7. Сигналы реального времени Posix

    За прошедшие годы сигналы в Unix много раз претерпевали революционные изменения.

    1. Модель сигналов, предлагавшаяся в Unix Version 7 (1978), была ненадежной. Сигналы могли быть потеряны, и процессу было трудно отключить отдельные сигналы при выполнении отдельных участков кода.

    2. В версии 4.3BSD (1986) надежные сигналы были добавлены.

    3. Версия System V Release 3.0 (1986) также добавила надежные сигналы, хотя и иначе, чем BSD.

    4. Стандарт Posix.1 (1990) увековечил модель надежных сигналов BSD, и эта модель подробно описана в главе 10 [21].

    5. Posix.1 (1996) добавил к модели Posix сигналы реального времени. Это произросло из расширений реального времени Posix.1b (которые были названы Posix.4).

    Почти все системы Unix в настоящее время поддерживают надежные сигналы, а новейшие системы предоставляют также и сигналы реального времени стандарта Posix. (Следует различать надежные сигналы и сигналы реального времени.) О сигналах реального времени следует поговорить подробнее, поскольку мы уже столкнулись с некоторыми структурами, определяемыми этим расширением стандарта, в предыдущем разделе (структуры sigval и sigevent).

    Сигналы могут быть отнесены к двум группам:

    1. Сигналы реального времени, которые могут принимать значения между SIGRTMIN и SIGRTMAX включительно. Posix требует, чтобы предоставлялось по крайней мере RTSIG_MAX сигналов, и минимальное значение этой константы равно 8.

    2. Все прочие сигналы: SIGALRM, SIGINT, SIGKILL и пр.

    ПРИМЕЧАНИЕ

    В Solaris 2.6 обычные сигналы Unix нумеруются с 1 по 37, а 8 сигналов реального времени имеют номера с 38 по 45. В Digital Unix 4.0B обычные сигналы нумеруются с 1 по 32, а 16 сигналов реального времени имеют номера с 33 по 48. Обе реализации определяют SIGRTMIN и SIGRTMAX как макросы, вызывающие sysconf, что позволяет изменять их значения.

    Далее все зависит от того, установлен ли процессом, получившим сигнал, флаг SA_SIGINFO при вызове sigaction. В итоге получаются четыре возможных сценария, приведенных в табл. 5.2.


    Таблица 5.2. Поведение сигналов Posix в реальном времени в зависимости от SA_SIGINFO 

    Сигнал Флаг SA_SIGINFO указан Флаг SA_SIGINFO не указан
    От SIGRTMIN до SIGRTMAX Гарантируются характеристики реального времени Характеристики реального времени не обязательны
    Все прочие сигналы Характеристики реального времени не обязательны Характеристики реального времени не обязательны

    Смысл фразы «характеристики реального времени не обязательны» следующий: некоторые реализации могут обрабатывать эти сигналы как сигналы реального времени, но это не обязательно. Если мы хотим, чтобы сигналы обрабатывались как сигналы реального времени, мы должны использовать сигналы с номерами от SIGRTMIN до SIGRTMAX и должны указать флаг SA_SIGINFO при вызове sigaction при установке обработчика сигнала.

    Термин «характеристики реального времени» подразумевает следующее:

    ■ Сигналы помещаются в очередь. Если сигнал будет порожден трижды, он будет трижды получен адресатом. Более того, повторения одного и того же сигнала доставляются в порядке очереди (FIFO). Мы вскоре покажем пример очереди сигналов. Если же сигналы в очередь не помещаются, трижды порожденный сигнал будет получен лишь один раз.

    ■ Когда в очередь помещается множество неблокируемых сигналов в диапазоне SIGRTMIN—SIGRTMAX, сигналы с меньшими номерами доставляются раньше сигналов с большими номерами. То есть сигнал с номером SIGRTMIN имеет «больший приоритет», чем сигнал с номером SIGRTMIN+1, и т.д.

    ■ При отправке сигнала, не обрабатываемого как сигнал реального времени, единственным аргументом обработчика является номер сигнала. Сигналы реального времени несут больше информации, чем прочие сигналы. Обработчик для сигнала реального времени, устанавливаемый с флагом SA_SIGINFO, объявляется как

    void func(int signo, siginfo_t *info, void *context);

    где signo— номер сигнала, a siginfo_t — структура, определяемая как

    typedef struct {

     int si_signo; /* то же, что и signo */

     int si_code; /* SI_{USER,QUEUE,TIMER,ASYNCIO,MESGQ} */

     union sigval si_value; /* целое или указатель от отправителя */

    } siginfo_t;

    На что указывает context — зависит от реализации.

    ПРИМЕЧАНИЕ

    Обработчик сигналов, не являющихся сигналами реального времени, вызывается с единственным аргументом. Во многих системах существует старое соглашение о вызове обработчиков сигналов с тремя аргументами, которое предшествовало стандарту реального времени Posix.

    Тип siginfo_t является единственной структурой Posix, определяемой оператором typedef с именем, оканчивающимся на _t. В листинге 5.14 мы объявляем указатели на эти структуры как siginfo_t * без слова struct. 

    ■ Для работы с сигналами реального времени добавлено несколько новых функций. Например, для отправки сигнала какому-либо процессу используется функция sigqueue вместо kill. Новая функция позволяет отправить вместе с сигналом структуру sigval.

    Сигналы реального времени порождаются нижеследующими функциями Posix.1, определяемыми значением si_code, которое хранится в структуре siginfo_t, передаваемой обработчику сигнала.

    ■ SI_ASYNCIO — сигнал был порожден по завершении асинхронного запроса на ввод или вывод одной из функций Posix aio_XXX, которые мы не рассматриваем;

    ■ SI_MESGQ — сигнал был порожден при помещении сообщения в пустую очередь сообщений (как в разделе 5.6); 

    ■ SI_QUEUE — сигнал был отправлен функцией sigqueue. Пример будет вскоре приведен;

    ■ SI_TIMER — сигнал был порожден по истечении установленного функцией timer_settime времени. Эту функцию мы не описываем;

    ■ SI_USER — сигнал был отправлен функцией kill.

    Если сигнал был порожден каким-либо другим событием, si_code будет иметь значение, отличающееся от приведенных выше. Значение поля si_value структуры siginfo_t актуально только в том случае, если si_code имеет одно из следующих значений: SI_ASYNCIO, SI_MESGQ, SI_QUEUE и SI_TIMER.

    Пример

    В листинге 5.14 приведен пример программы, демонстрирующей использование сигналов реального времени. Программа вызывает fork, дочерний процесс блокирует три сигнала реального времени, родительский процесс отправляет девять сигналов (три раза отсылается каждый из заблокированных сигналов), затем дочерний процесс разблокирует сигналы и мы смотрим, сколько раз будет получен каждый из них и в каком порядке они придут.

    Листинг 5.14. Тестовая программа, иллюстрирующая работу с сигналами реального времени

    //rtsignals/test1.c

    1  #include "unpipc.h"

    2  static void sig_rt(int, siginfo_t *, void *);


    3  int

    4  main(int argc, char **argv)

    5  {

    6   int i, j;

    7   pid_t pid;

    8   sigset_t newset;

    9   union sigval val;

    10  printf("SIGRTMIN = %d, SIGRTMAX = %d\n", (int) SIGRTMIN, (int) SIGRTMAX);

    11  if ((pid = Fork()) == 0) {

    12   /* дочерний процесс блокирует 3 сигнала */

    13   Sigemptyset(&newset);

    14   Sigaddset(&newset, SIGRTMAX);

    15   Sigaddset(&newset, SIGRTMAX – 1);

    16   Sigaddset(&newset, SIGRTMAX – 2);

    17   Sigprocmask(SIG_BLOCK, &newset, NULL);

    18   /* установка обработчика с флагом SA_SIGINFO */

    19   Signal_rt(SIGRTMAX, sig_rt);

    20   Signal_rt(SIGRTMAX – 1, sig_rt);

    21   Signal_rt(SIGRTMAX – 2, sig_rt);

    22   sleep(6); /* родительский процесс посылает все сигналы */

    23   Sigprocmask(SIG UNBLOCK, &newset, NULL); /* разблокируемся */

    24   sleep(3); /* получаем сигналы */

    25   exit(O);

    26  }

    27  /* родительский процесс отправляет сигналы */

    28  sleep(3); /* дочерний процесс блокирует сигналы */

    29  for (i = SIGRTMAX; i >= SIGRTMAX – 2; i--) {

    30   for (j = 0; j <= 2; j++) {

    31    val.sival_int = j;

    32    Sigqueue(pid, i, val);

    33    printf("sent signal %d, val = %d\n", i, j);

    34   }

    35  }

    36  exit(0);

    37 }


    38 static void

    39 sig_rt(int signo, siginfo_t *info, void *context)

    40 {

    41  printf(received signal #%d, code = %d, ival = %d\n",

    42   signo.info->si_code, info->si_value.sival_int);

    43 }

    Вывод номеров сигналов реального времени

    10 Мы печатаем наибольший и наименьший номера сигналов реального времени, чтобы узнать, сколько их предоставляется в данной реализации. Мы преобразуем обе константы к типу integer, поскольку в некоторых реализациях они определяются как макросы, требующие вызова sysconf, например:

    #define SIGRTMAX (sysconf(_SC_RTSIG_MAX))

    и функция sysconf возвращает целое типа long (см. упражнение 5.4).

    Вызов fork и блокирование трех сигналов реального времени

    11-17 Запускается дочерний процесс, который вызывает sigprocmask для блокировки трех используемых сигналов реального времени: SIGRTMAX, SIGRTMAX-1 и SIGRTMAX-2.

    Установка обработчика сигнала

    18-21 Мы вызываем функцию signal_rt (приведенную в листинге 5.15) для установки функции sig_rt в качестве обработчика трех указанных выше сигналов реального времени. Функция устанавливает флаг SA_SIGINFO, и поскольку эти три сигнала являются сигналами реального времени, мы можем ожидать, что они будут обрабатываться соответствующим образом. Эта функция также устанавливает маску сигналов, блокируемых на время выполнения обработчика.

    Ожидание порождения сигналов родительским процессом, разблокирование сигналов

    22-25 Дочерний процесс ждет 6 секунд, пока родительский породит девять сигналов. Затем вызывается sigprocmask для разблокирования трех сигналов реального времени. Это позволяет всем помещенным в очередь сигналам достичь адресата. Затем делается пауза еще на три секунды, чтобы обработчик успел вызвать printf девять раз, после чего дочерний процесс завершает свою работу.

    Родительский процесс отправляет девять сигналов

    27-36 Родительский процесс ждет три секунды, пока дочерний не заблокирует все требуемые сигналы. Затем родительский процесс порождает три экземпляра каждого из трех сигналов реального времени: i принимает 3 значения, a j принимает значения 0, 1 и 2 для каждого из значений i. Мы преднамеренно порождаем сигналы начиная с наибольшего номера, поскольку ожидаем, что они будут получены начиная с наименьшего. Мы также отсылаем с каждым из сигналов новое значение sigval_int, чтобы проверить, что копии одного и того же сигнала доставляются в том же порядке, в каком они были отправлены, то есть очередь действительно является очередью.

    Обработчик сигнала

    38-43 Обработчик сигнала просто выводит информацию о полученном сигнале.

    ПРИМЕЧАНИЕ

    Из табл. 5.1 следует, что функция printf не относится к функциям типа async-signal-safe и не должна вызываться из обработчика сигналов. Здесь мы используем ее исключительно в качестве проверочного средства в маленькой тестовой программе. 

    Запустим эту программу в Solaris 2.6. Результат будет не тем, которого мы ожидали:

    solaris % test1

    SIGRTMIN = 38, SIGRTMAX = 45 8 сигналов реального времени

                                 трехсекундная пауза

    sent signal 45, val = 0

    sent signal 45, val = 1

    sent signal 45, val = 2

    sent signal 44, val = 0

    sent signal 44, val = 1

    sent signal 44, val = 2

    sent signal 43, val = 0

    sent signal 43, val = 1

    sent signal 43, val = 2

    solaris % родительский процесс завершил работу, пауза 3 секунды,

              пока дочерний процесс не разблокирует сигналы

    received signal #45, code = –2, ival = 2 дочерний процесс получает сигналы

    received signal #45, code = –2, ival = 1

    received signal #45, code = –2, ival = 0

    received signal #44, code = –2, ival = 2

    received signal #44, code = –2, ival = 1

    received signal #44, code = –2, ival = 0

    received signal #43, code = –2, ival = 2

    received signal #43, code = –2, ival = 1

    received signal #43, code = –2, ival = 0

    В очередь помещаются девять сигналов, но первыми принимаются сигналы с большими номерами (а мы ожидали получить сигналы с меньшими номерами). 

    Кроме того, сигналы с одинаковым номером приходят в порядке LIFO, а не FIFO. Код si_code = –2 соответствует SI_QUEUE.

    Запустив программу в Digital Unix 4.0B, мы получим именно тот результат, которого ожидали:

    alpha % test1

    SIGRTMIN = 33, SIGRTMAX = 48 16 сигналов реального времени

                                 трех секундная пауза

    sent signal 48, val = 0

    sent signal 48, val = 1

    sent signal 48, val = 2

    sent signal 47, val = 0

    sent signal 47, val = 1

    sent signal 47, val = 2

    sent signal 46, val = 0

    sent signal 46, val = 1

    sent signal 46, val = 2

    alpha % родительский процесс завершил работу, пауза 3 секунды.

            пока дочерний процесс не разблокируетсигналы

    received signal #46, code – –1, ival = 0 дочерний процесс получает сигналы

    received signal #46, code = –1, ival = 1

    received signal #46, code = –1, ival = 2

    received signal #47, code – –1, ival = 0

    received signal #47, code = –1, ival = 1

    received signal #47, code = –1, ival = 2

    received signal #48, code = –1, ival = 0

    received signal #48, code = –1, ival = 1

    received signal #48, code = –1, ival = 2

    Девять сигналов помещаются в очередь и получаются адресатом в ожидаемом порядке: первым приходит сигнал с меньшим номером, а копии сигнала приходят в порядке FIFO.

    ПРИМЕЧАНИЕ

    Похоже, что в реализации Solaris 2.6 есть ошибка.

    Функция signal_rt

    В книге [24, с. 120] мы привели пример собственной функции signal, вызывавшей функцию sigaction стандарта Posix для установки обработчика сигнала, обеспечивающего надежную семантику Posix. Изменим эту функцию, чтобы обеспечить поддержку реального времени. Новую функцию мы назовем signal_rt; ее текст приведен в листинге 5.15.

    Листинг 5.15. Функция signal_rt с поддержкой реального времени

    //lib/signal_rt.c

    1  #include "unpipc.h"

    2  Sigfunc_rt *


    3  signal_rt(int signo, Sigfunc_rt *func)

    4  {

    5   struct sigaction act, oact;

    6   act.sa_sigaction = func; /* сохраняем адрес функции */

    7   sigemptyset(&act.sa_mask);

    8   act.sa_flags = SA_SIGINFO; /* гарантирует режим реального времени */

    9   if (signo == SIGALRM) {

    10 #ifdef SA_INTERRUPT

    11   act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */

    12 #endif

    13  } else {

    14 #ifdef SA_RESTART

    15   act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */

    16 #endif

    17  }

    18  if (sigaction(signo, &act, &oact) < 0)

    19   return((Sigfunc_rt *) SIG_ERR);

    20  return(oact.sa_sigaction);

    21 }

    Упрощение прототипа функции с использованием typedef

    1-3 В нашем заголовочном файле unpiрс.h (листинг В.1) мы определяем Sigfunc_rt как

    typedef void Sigfunc_rt(int, siginfo_t*, void*);

    Ранее в этом разделе мы говорили о том, что это прототип функции для обработчика сигнала, устанавливаемого с флагом SA_SIGINFO.

    Указание функции-обработчика

    Структура sigaction претерпела изменения с добавлением поддержки сигна-5-7 лов реального времени: к ней было добавлено новое поле sa_sigaction:

    struct sigaction {

     void (*sa_handler)(); /* SIG_DFL, SIG_IGN или адрес обработчика сигнала */

     sigset_t sa_mask; /* дополнительные блокируемые сигналы */

     int sa_flags; /* параметры сигналов: SA_XXX */

     void (*sa_sigaction)(int, siginfo_t, void *);

    };

    Правила действуют следующие:

    ■ Если в поле sa_flags установлен флаг SA_SIGINFO, поле sa_sigaction указывает адрес функции-обработчика сигнала.

    ■ Если флаг SA_SIGINFO не установлен, поле sa_handler указывает адрес функции-обработчика сигнала.

    ■ Чтобы сопоставить сигналу действие по умолчанию или игнорировать его, следует установить sa_handler равным либо SIG_DFL, либо SIG_IGN и не устанавливать флаг SA_SIGINFO.

    Установка SA_SIGINFO

    8-17 Мы всегда устанавливаем флаг SA_SIGINFO и указываем флаг SA_RESTART, если перехвачен какой-либо другой сигнал, кроме SIGALRM.

    5.8. Реализация с использованием отображения в память

    Теперь рассмотрим реализацию очередей сообщений Posix с использованием отображения в память, взаимных исключений и условных переменных Posix. 

    ПРИМЕЧАНИЕ

    Взаимные исключения и условные переменные описаны в главе 7, а ввод-вывод с отображением в память — в главах 12 и 13. Вы можете отложить данный раздел до ознакомления с этими главами.

    На рис. 5.2 приведена схема структур данных, которыми мы пользуемся для реализации очереди сообщений Posix. Изображенная очередь может содержать до четырех сообщений по 7 байт каждое.

    В листинге 5.16 приведен заголовочный файл mqueue.h, определяющий основные структуры, используемые в этой реализации.

    Тип mqd_t

    Дескриптор нашей очереди сообщений является просто указателем на структуру mq_infо. Каждый вызов mq_open выделяет память под одну такую структуру, а вызвавшему возвращается указатель на нее. Повторим, что дескриптор очереди сообщений не обязательно является небольшим целым числом, как дескриптор файла — единственное ограничение, накладываемое Posix, заключается в том, что этот тип данных не может быть массивом.

    Листинг 5.16. Заголовочный файл mqueue.h

    //my_pxmsg_mmap/mqueue.h

    1  typedef struct mymq_info *mymqd_t;


    2  struct mymq_attr {

    3   long mq_flags; /* флаг очереди : O_NONBLOCK */

    4   long mq_maxmsg; /* максимальное количество сообщений в очереди */

    5   long mq_msgsize; /* максимальный размер сообщения в байтах */

    6   long mq_curmsgs; /* количество сообщений в очереди */

    7  };


    8  /* одна структура mymq_hdr{} на очередь, в начале отображаемого файла */

    9  struct mymq_hdr {

    10  struct mymq_attr mqh_attr; /* атрибуты очереди */

    11  long mqh_head; /* индекс первого сообщения*/

    12  long mqh_free; /* индекс первого пустого сообщения */

    13  long mqh_nwait; /* количество заблокированных mq_receive() потоков */

    14  pid_t mqh_pid; /* ненулевой PID. если включено уведомление */

    15  struct sigevent mqh_event; /* для mq_notify() */

    16  pthread_mutex_t mqh_lock; /* блокировка: mutex*/

    17  pthread_cond_t mqh_wait; /* и условная переменная */

    18 };


    19 /* один mymsg_hdr{} в начале каждого сообщения */

    20 struct mymsg_hdr {

    21  long msg_next; /* индекс следующего сообщения в списке */

    22                 /* msg_next должно быть первым полем в структуре */

    23  ssize_t msg_len; /* реальная длина */

    24  unsigned int msg_prio; /* приоритет */

    25 };


    26 /* одна mymq_info{} выделяется при каждом mq_open() */

    27 struct mymq_info {

    28  struct mymq_hdr *mqi_hdr; /* начало отображаемого файла */

    29  long mqi_magic; /* магическое значение после инициализации */

    30  int mqi_flags; /* флаги для данного процесса */

    31 };

    32 #define MQI_MAGIC 0x98765432

    33 /* размер сообщения округляется для подгонки */

    34 #define MSGSIZE(i) ((((i) + sizeof(long)-1) / sizeof(long)) * sizeof(long))
     

    Рис. 5.2. Схема структур данных, используемых при реализации очередей сообщений posix через отображаемый в память файл 


    Структура mq_hdr

    8-18 Эта структура хранится в самом начале отображаемого файла и содержит всю информацию об очереди. Поле mq_flags структуры mqh_attr не используется, поскольку флаги (единственный определенный флаг используется для отключения блокировки) должны обрабатываться для каждого открывающего очередь процесса в отдельности, а не для очереди в целом. Поэтому флаги хранятся в структуре mq_info. О прочих полях этой структуры мы будем говорить в связи с их использованием различными функциями.

    Обратите внимание, что все переменные, называемые нами индексными (поля этой структуры mqh_head и mqh_free, а также поле msg_next следующей структуры), содержат индексы байтов относительно начала отображаемого в память файла. Например, размер структуры mq_hdr в системе Solaris 2.6 — 96 байт, поэтому индекс первого сообщения, располагающегося сразу за заголовком, имеет значение 96. Каждое сообщение на рис. 5.2 занимает 20 байт (12 байт на структуру msg_hdr и 8 байт на данные), поэтому индексы следующих трех сообщений имеют значения 116, 136 и 156, а размер отображаемого в память файла — 176 байт. Индексы используются для обработки двух связных списков, хранящихся в этом файле: в одном из списков (mqh_head) хранятся все сообщения, имеющиеся в данный момент в очереди, а в другом (mqh_free) — все незаполненные сообщения. Мы не можем использовать настоящие указатели на области памяти (адреса) при работе со списком, поскольку отображаемый файл может находиться в произвольной области памяти для каждого из процессов, работающих с ним (как показано в листинге 13.5).

    Структура msg_hdr

    19-25 Эта структура располагается в начале каждого сообщения в отображаемом файле. Любое сообщение может принадлежать либо к списку заполненных, либо к списку свободных сообщений, и поле msg_next содержит индекс следующего сообщения в этом списке (или 0, если сообщение является в этом списке последним). Переменная msg_len хранит реальную длину данных в сообщении, которая в нашем примере с рис. 5.2 может иметь значение от 0 до 7 байт включительно. В переменную msg_prio отправителем помещается значение приоритета сообщения.

    Структура mq_info

    26-32 Экземпляр такой структуры динамически создается функцией mq_open при открытии очереди и удаляется mq_close. Поле mqi_hdr указывает на отображаемый файл (адрес начала файла возвращается mmap). Указатель на эту структуру имеет основной в нашей реализации тип mqd_t, он принимает значение, возвращаемое mq_open.

    Поле mqi_magiс принимает значение MQI_MAGIC в момент инициализации структуры. Это значение проверяется всеми функциями, которым передается указатель типа mqd_t, что дает им возможность удостовериться, что указатель действительно указывает на структуру типа mq_infо. mqi_flags содержит флаг отключения блокировки для открывшего очередь процесса.

    Макрос MSGSIZE

    33-34 В целях выравнивания содержимого файла (alignment) мы располагаем начало каждого сообщения так, чтобы его индекс был кратен размеру длинного целого. Следовательно, если максимальный размер сообщения не допускает такого выравнивания, мы добавляем к нему от 1 до 3 байт, как показано на рис. 5.2. При этом предполагается, что размер длинного целого — 4 байт (что верно для Solaris 2.6). Если размер длинного целого 8 байт (в Digital Unix 4.0B), нам придется добавлять к каждому сообщению от 1 до 7 байт.

    Функция mq_open

    В листинге 5.17 приведен текст первой части функции mq_open, создающей новую очередь сообщений или открывающей существующую.

    Листинг 5.17. Функция mq_open: первая часть

    //my_pxmsg._mmap/mq_open. с

    1  #include "unpipc.h"

    2  #include "mqueue.h"

    3  #include <stdarg.h>

    4  #define MAX_TRIES 10

    5  struct mymq_attr defattr =

    6   { 0, 128, 1024, 0 };


    7  mymqd_t

    8  mymq_open(const char *pathname, int oflag, …)

    9  {

    10  int i, fd, nonblock, created, save_errno;

    11  long msgsize, filesize, index;

    12  va_list ap;

    13  mode_t mode;

    14  int8_t *mptr;

    15  struct stat statbuff;

    16  struct mymq_hdr *mqhdr;

    17  struct mymsg_hdr *msghdr;

    18  struct mymq_attr *attr;

    19  struct mymq_info *mqinfo;

    20  pthread_mutexattr_t mattr;

    21  pthread_condattr_t cattr;

    22  created = 0;

    23  nonblock = oflag & O_NONBLOCK;

    24  oflag &= ~O_NONBLOCK;

    25  mptr = (int8_t *) MAP_FAILED;

    26  mqinfo = NULL;

    27 again:

    28  if (oflag & O_CREAT) {

    29   va_start(ap, oflag); /* ар инициализируется последним аргументом */

    30   mode = va_arg(ap, va_mode_t) & ~S_IXUSR;

    31   attr = va_arg(ap, struct mymq_attr *);

    32   va_end(ap);

    33   /* открытие с установкой бита user-execute */

    34   fd = open (pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);

    35   if (fd < 0) {

    36    if (errno == EEXIST && (oflag & O_EXCL) == 0)

    37     goto exists; /* уже существует, OK */

    38    else

    39     return((mymqd_t) –1);

    40   }

    41   created = 1;

    42   /* при создании файла он инициализируется */

    43   if (attr == NULL)

    44    attr = &defattr;

    45   else {

    46    if (attr->mq_maxmsg <– 0 || attr->mq_msgsize <= 0) {

    47     errno = EINVAL;

    48     goto err;

    49    }

    50   }

    Обработка списка аргументов переменного размера

    29-32 Функция может быть вызвана либо с двумя, либо с четырьмя аргументами в зависимости от того, указан ли флаг O_CREAT. Если флаг указан, третий аргумент имеет тип mode_t, а это простой системный тип, являющийся одним из целых типов. При этом мы столкнемся с проблемой в BSD/OS, где этот тип данных определен как unsigned short (16 бит). Поскольку целое в этой реализации занимает 32 бита, компилятор С увеличивает аргумент этого типа с 16 до 32 бит, потому что все короткие целые в списке аргументов увеличиваются до обычных целых. Но если мы укажем mode_t при вызове va_arg, он пропустит 16 бит аргумента в стеке, если этот аргумент был увеличен до 32 бит. Следовательно, мы должны определить свой собственный тип данных, va_mode_t, который будет целым в BSD/OS и типом mode_t в других системах. Эту проблему с переносимостью решают приведенные ниже строки нашего заголовка unpipc.h (листинг В.1):

    #ifdef __bsdi__

    #define va_mode_t int

    #else

    #define va_mode_t mode_t

    #endif

    30 Мы сбрасываем бит user-execute (S_IXUSR) в переменной mode по причинам, которые будут вскоре раскрыты.

    Создание новой очереди сообщений

    33-34 Создается обычный файл с именем, указанным при вызове функции, и устанавливается бит user-execute. 

    Обработка потенциальной ситуации гонок

    35-40 Если бы при указании флага O_CREAT мы просто открыли файл, отобразили его содержимое в память и проинициализировали отображенный файл (как будет описано ниже), у нас возникла бы ситуация гонок. Очередь сообщений инициализируется mq_open только в том случае, если вызывающий процесс указывает флаг O_CREAT и очередь сообщений еще не существует. Это означает, что нам нужно каким-то образом определять, существует она или нет. Для этого при открытии файла для последующего отображения в память мы всегда указываем флаг O_EXCL. Возвращение ошибки EEXIST функцией open является ошибкой для mq_open только в том случае, если при вызове был указан флаг O_EXCL. В противном случае при возвращении функцией open ошибки EEXIST мы делаем вывод, что файл уже существует, и переходим к листингу 5.19, как если бы флаг O_CREAT вовсе не был указан.

    Ситуация гонок может возникнуть потому, что использование отображаемого в память файла для реализации очереди сообщений требует двух шагов при инициализации очереди: сначала файл должен быть создан функцией open, а затем его содержимое должно быть проинициализировано. Проблема возникает, если два потока (одного или различных процессов) вызывают mq_open приблизительно одновременно. Один из потоков может создать файл, после чего управление будет передано системой второму потоку, прежде чем первый завершит инициализацию файла. Второй поток обнаружит, что файл уже существует (вызвав open с флагом O_EXCL), и приступит к использованию очереди сообщений.

    Мы используем бит user-execute для указания того, был ли проинициализирован файл с очередью сообщений. Этот бит устанавливается только тем потоком, который создает этот файл (флаг O_EXCL позволяет определить этот поток); этот поток инициализирует файл с очередью сообщений, а затем сбрасывает бит user-execute.

    Аналогичная ситуация может возникнуть в листингах 10.28 и 10.37.

    Проверка атрибутов

    42-50 Если при вызове в качестве последнего аргумента передан нулевой указатель, очередь сообщений инициализируется со значениями атрибутов по умолчанию: 128 сообщений в очереди и 1024 байта на сообщение. Если атрибуты указаны явно, мы проверяем, что mq_maxmsg и mq_msgsize имеют положительные значения.

    Вторая часть функции mq_open приведена в листинге 5.18. Она завершает инициализацию новой очереди сообщений.

    Листинг 5.18. Вторая часть функции mq_open: инициализация новой очереди

    //my_pxmsg_mmap/mq_open.с

    51    /* вычисление и установка размера файла */

    52    msgsize = MSGSIZE(attr->mq_msgsize);

    53    filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *

    54     (sizeof(struct mymsg_hdr) + msgsize));

    55    if (lseek(fd, filesize – 1, SEEK_SET) == –1)

    56     goto err;

    57    if (write(fd, "", 1) == –1)

    58     goto err;

    59    /* отображение файла в память */

    60    mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,

    61     MAP_SHARED, fd, 0);

    62    if (mptr == MAP_FAILED)

    63     goto err;

    64    /* выделение структуры mymq_info{} для очереди */

    65    if ((mqinfo = mallос (sizeof (struct mymq_info))) == NULL)

    66     goto err;

    67    mqinfo->mqi_hdr = mqhdr = (struct mymq_hdr *) mptr;

    68    mqinfo->mqi_magic = MQI_MAGIC;

    69    mqinfo->mqi_flags = nonblock;

    70    /* инициализация заголовка в начале файла */

    71    /* создание списка пустых сообщений */

    72    mqhdr->mqh_attr.mq_flags = 0;

    73    mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;

    74    mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;

    75    mqhdr->mqh_attr.mq_curmsgs = 0;

    76    mqhdr->mqh_nwait = 0;

    77    mqhdr->mqh_pid = 0;

    78    mqhdr->mqh_head = 0;

    79    index = sizeof(struct mymq_hdr);

    80    mqhdr->mqh_free = index;

    81    for (i = 0; i < attr->mq_maxmsg – 1; i++) {

    82     msghdr = (struct mymsg_hdr *) &mptr[index];

    83     index += sizeof(struct mymsg_hdr) + msgsize;

    84     msghdr->msg_next = index;

    85    }

    86    msghdr = (struct mymsg_hdr *) &mptr[index];

    87    msghdr->msg_next = 0; /* конец списка пустых сообщений */

    88    /* инициализация взаимного исключения и условной переменной */

    89    if ((i = pthread_mutexattr_init(&mattr)) != 0)

    90     goto pthreaderr;

    91    pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);

    92    i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);

    93    pthread_mutexattr_destroy(&mattr); /* обязательно нужно удалить */

    94    if (i != 0)

    95     goto pthreaderr:

    96    if ((i = pthread_condattr_init(&cattr)) != 0)

    97     goto pthreaderr;

    98    pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);

    99    i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);

    100   pthread_condattr_destroy(&cattr); /* обязательно нужно удалить */

    101   if (i != 0)

    102    goto pthreaderr;

    103   /* инициализация завершена, снимаем бит user-execute */

    104   if (fchmod(fd, mode) == –1)

    105    goto err;

    106   close(fd);

    107   return((mymqd_t) mqinfo);

    108  }

    Установка размера файла

    51-58 Вычисляется размер сообщения, который затем округляется до кратного размеру длинного целого. Также в файле отводится место для структуры mq_hdr в начале файла и msghdr в начале каждого сообщения (рис. 5.2). Размер вновь созданного файла устанавливается функцией lseek и записью одного байта со значением 0. Проще было бы вызвать ftruncate (раздел 13.3), но у нас нет гарантий, что это сработало бы для увеличения размера файла.

    Отображение файла в память

    59-63 Файл отображается в память функцией mmap.

    Выделение памяти под структуру mq_info

    64-66 При каждом вызове mq_open создается отдельный экземпляр mq_infо. Эта структура после создания инициализируется.

    Инициализация структуры mq_hdr

    67-87 Инициализируется структура mq_hdr. Заголовок связного списка сообщений (mqh_head) инициализируется нулем, а все сообщения в очереди добавляются к списку свободных (mqh_frее).

    Инициализация взаимного исключения и условной переменной

    88-102 Поскольку очереди сообщений Posix могут использоваться совместно произвольным количеством процессов, которые знают имя очереди и имеют соответствующие разрешения, нам нужно инициализировать взаимное исключение и условную переменную с атрибутом PTHREAD_PROCESS_SHARED. Для этого мы сначала инициализируем атрибуты вызовом pthread_mutexattr_init, а затем устанавливаем значение атрибута совместного использования процессами, вызвав pthread_mutexattr_setpshared. После этого взаимное исключение инициализируется вызовом pthread_mutex_init. Аналогичные действия выполняются для условной переменной. Мы должны аккуратно удалить взаимное исключение и условную переменную даже при возникновении ошибки, поскольку вызовы pthread_ mutexattr_init и pthread_condattr_init выделяют под них память (упражнение 7.3).

    Сброс бита user-execute

    103-107 После инициализации очереди сообщений мы сбрасываем бит user-execute. Это говорит другим процессам о том, что очередь была проинициализирована. Мы также закрываем файл вызовом close, поскольку он был успешно отображен в память и держать его открытым больше нет необходимости.

    В листинге 5.19 приведен конец функции mq_open, в котором осуществляется открытие существующей очереди сообщений.

    Листинг 5.19. Третья часть функции mq_open: открытие существующей очереди сообщений

    //my_pxmsg_mmap/mq_open.с

    109 exists:

    110  /* открытие файла и отображение его в память */

    111  if ((fd = open(pathname, O_RDWR)) < 0) {

    112   if (errno == ENOENT && (oflag & O_CREAT))

    113    goto again;

    114  goto err;

    115  }

    116  /* проверяем, что инициализация завершена */

    117  for (i = 0; i < MAX TRIES; i++) {

    118   if (stat(pathname, &statbuff) == –1) {

    119    if (errno == ENOENT && (oflag & O_CREAT)) {

    120     close(fd);

    121     goto again;

    122    }

    123    goto err;

    124   }

    125   if ((statbuff.st_mode & S_IXUSR) == 0)

    126   break;

    127   sleep(1);

    128  }

    129  if (i == MAX_TRIES) {

    130   errno = ETIMEDOUT;

    131   goto err;

    132  }

    133  filesize = statbuff.st_size;

    134  mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

    135  if (mptr == MAP_FAILED)

    136   goto err;

    137  close(fd);

    138  /* выделяем одну mymq_info{} для каждого вызова open */

    139  if ((mqinfo = malloc(sizeof(struct mymq_info))) == NULL)

    140   goto err;

    141  rnqinfo->mqi_hdr = (struct mymq_hdr *) mptr;

    142  mqinfo->mqi_magic = MQI_MAGIC;

    143  mqinfo->mqi_flags = nonblock;

    144  return((mymqd_t) mqinfo);

    145 pthreaderr:

    146  errno = i;

    147 err:

    148  /* не даем следующим вызовам изменить errno */

    149  save_errno = errno;

    150  if (created)

    151   unlink(pathname);

    152  if (mptr != MAP_FAILED)

    153   munmap(mptr, filesize);

    154  if (mqinfo != NULL)

    155   free(mqinfo);

    156  close(fd);

    157  errno = save_errno;

    158  return((mymqd_t) –1);

    159 }

    Открытие существующей очереди сообщений

    109-115 Здесь мы завершаем работу, если флаг O_CREAT не был указан или если он был указан, но очередь уже существовала. В любом случае, мы открываем существующую очередь сообщений. Для этого мы открываем для чтения и записи файл, в котором она содержится, функцией open и отображаем его содержимое в адресное пpocтрaнcтвo процесса (mmap).

    ПРИМЕЧАНИЕ

    Наша реализация сильно упрощена в том, что касается режима открытия файла. Даже если вызвавший процесс указывает флаг O_RDONLY, мы должны дать возможность доступа для чтения и записи при открытии файла командой open и при отображении его в память командой mmap, поскольку невозможно считать сообщение из очереди, не изменив содержимое файла. Аналогично невозможно записать сообщение в очередь, не имея доступа на чтение. Обойти эту проблему можно, сохранив режим открытия (O_RDONLY, O_WRONLY, O_RDWR) в структуре mq_info и проверяя этот режим в каждой из функций. Например, mq_receive должна возвращать ошибку, если в mq_info хранится значение O_WRONLY.

    Проверка готовности очереди

    116-132 Нам необходимо дождаться, когда очередь будет проинициализирована (в случае, если несколько потоков сделают попытку открыть ее приблизительно одновременно). Для этого мы вызываем stat и проверяем разрешения доступа к файлу (поле st_mode структуры stat). Если бит user-execute сброшен, очередь уже проинициализирована.

    Этот участок кода обрабатывает другую возможную ситуацию гонок. Предположим, что два потока разных процессов попытаются открыть очередь приблизительно одновременно. Первый поток создает файл и блокируется при вызове lseek в листинге 5.18. Второй поток обнаруживает, что файл уже существует, и переходит к метке exists, где он вновь открывает файл функцией open и при этом блокируется. Затем продолжается выполнение первого потока, но его вызов mmap в листинге 5.18 не срабатывает (возможно, он превысил лимит использования памяти), поэтому он переходит на метку err и удаляет созданный файл вызовом unlink. Продолжается выполнение второго потока, но если бы мы вызывали fstat вместо stat, он бы вышел по тайм-ауту в цикле for, ожидая инициализации файла. Вместо этого мы вызываем stat, которая возвращает ошибку, если файл не существует, и, если флаг O_CREAT был указан при вызове mq_open, мы переходим на метку again (листинг 5.17) для повторного создания файла. Эта ситуация гонок заставляет нас также проверять, не возвращается ли при вызове open ошибка ENOENT.

    Отображение файла в память; создание и инициализация структуры mq_info

    133-144 Файл отображается в память, после чего его дескриптор может быть закрыт. Затем мы выделяем место под структуру mq_infо и инициализируем ее. Возвращаемое значение представляет собой указатель на эту структуру.

    Обработка ошибок

    145-148 При возникновении ошибок происходит переход к метке err, а переменной errno присваивается значение, которое должно быть возвращено функцией mq_open. Мы аккуратно вызываем функции для очистки памяти от выделенных объектов, чтобы переменная errno не изменила свое значение в случае возникновения ошибки в этих функциях.

    Функция mq_close

    В листинге 5.20 приведен текст нашей функции mq_close.

    Листинг 5.20. Функция mq_close

    //my_pxmsg_mmap/mq_close.с

    1  #include "unpipc.h"

    2  #include "mqueue.h"


    3  int

    4  mymq_close(mymqd_t mqd)

    5  {

    6   long msgsize, filesize:

    7   struct mymq_hdr *mqhdr;

    8   struct mymq_attr *attr;

    9   struct mymq_info *mqinfo;

    10  mqinfo = mqd;

    11  if (mqinfo->mqi_magic != MQI_MAGIC) {

    12   errno = EBADF;

    13   return(-1);

    14  }

    15  mqhdr = mqinfo->mqi_hdr;

    16  attr = &mqhdr->mqh_attr;

    17  if (mymq_notify(mqd, NULL) != 0) /* снятие вызвавшего процесса с регистрации */

    18   return(-1);

    19  msgsize = MSGSIZE(attr->mq_msgsize);

    20  filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *

    21   (sizeof(struct mymsg_hdr) + msgsize));

    22  if (munmap(mqinfo->mqi_hdr, filesize) == –1)

    23   return(-1);

    24  mqinfo->mqi_magic = 0; /* на всякий случай */

    25  free(mqinfo);

    26  return(0);

    27 }

    Получение указателей на структуры

    10-16 Проверяется правильность переданных аргументов, после чего получаются указатели на область, занятую отображенным в память файлом (mqhdr), и атрибуты (в структуре mq_hdr).

    Сброс регистрации вызвавшего процесса

    17-18 Для сброса регистрации на уведомление вызвавшего процесса мы вызываем mq_notify. Если процесс был зарегистрирован, он будет снят с уведомления, но если нет — ошибка не возвращается.

    Отключение отображения файла и освобождение памяти

    19-25 Мы вычисляем размер файла для вызова munmap и освобождаем память, используемую структурой mqinfo. На случай, если вызвавший процесс будет продолжать использовать дескриптор очереди сообщений, до того как область памяти будет вновь задействована вызовом malloc, мы устанавливаем значение mq_magiс в ноль, чтобы наши функции для работы с очередью сообщений обнаруживали ошибку.

    Обратите внимание, что если процесс завершает работу без вызова mq_close, эти же операции выполняются автоматически: отключается отображение в память, а память освобождается.

    Функция mq_unlink

    Текст функции mqunlink приведен в листинге 5.21. Она удаляет файл, связанный с очередью сообщений, вызывая функцию unlink.

    Листинг 5.21. Функция mq_unlink

    //my_pxmsg_mmap/mq_unlink.с

    1 #include "unpipc.h"

    2 #include "mqueue.h"


    3 int

    4 mymq_unlink(const char *pathname)

    5 {

    6  if (unlink(pathname) == –1)

    7   return(-1);

    8  return(0);

    9 }

    Функция mq_getattr

    В листинге 5.22 приведен текст функции mq_getattr, которая возвращает текущее значение атрибутов очереди.

    Листинг 5.22. Функция mq_getattr

    //my_pxmsg_mmap/mq_getattr.с

    1  #include "unpipc.h"

    2  #include "mqueue.h"


    3  int

    4  mymq_getattr(mymqd_t mqd, struct mymq_attr *mqstat)

    5  {

    6   int n;

    7   struct mymq_hdr *mqhdr;

    8   struct mymq_attr *attr;

    9   struct mymq_info *mqinfo;

    10  mqinfo = mqd;

    11  if (mqinfo->mqi_magic != MQI_MAGIC) {

    12   errno = EBADF;

    13   return(-1);

    14  }

    15  mqhdr = mqinfo->mqi_hdr;

    16  attr = &mqhdr->mqh_attr;

    17  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

    18   errno = n;

    19   return (-1);

    20  }

    21  mqstat->mq_flags = mqinfo->mqi_flags; /* для каждого open */

    22  mqstat->mq_maxmsg = attr->mq_maxmsg; /* оставшиеся три – для очереди */

    23  mqstat->mq_msgsize = attr->mq_msgsize;

    24  mqstat->mq_curmsgs = attr->mq_curmsgs;

    25  pthread_mutex_unlock(&mqhdr->mqh_lock);

    26  return(0);

    27 }

    Блокирование взаимного исключения

    17-20 Мы должны заблокировать соответствующее взаимное исключение для работы с очередью, в частности для получения атрибутов, поскольку какой-либо другой поток может в это время их изменить.

    Функция mq_setattr

    В листинге 5.23 приведен текст функции mq_setattr, которая устанавливает значение атрибутов очереди.

    Считывание текущих атрибутов

    22-27 Если третий аргумент представляет собой ненулевой указатель, мы возвращаем предыдущее значение атрибутов перед внесением каких-либо изменений.

    Изменение mq_flags

    28-31 Единственный атрибут, который можно менять с помощью нашей функции, — mq_flags, хранящийся в структуре mq_infо.

    Листинг 5.23. Функция mq_setattr

    //my_pxmsg_mniap/mq_setattr.с

    1  #include "unpipc.h"

    2  #include "mqueue.h"


    3  int

    4  mymq_setattr(mymqd_t mqd. const struct mymq_attr *mqstat,

    5   struct mymq attr *omqstat)

    6  {

    7   int n;

    8   struct mymq_hdr *mqhdr;

    9   struct mymq_attr *attr;

    10  struct mymq_info *mqinfo;

    11  mqinfo = mqd;

    12  if (mqinfo->mqi_magic != MQI_MAGIC) {

    13   errno = EBADF;

    14   return(-1);

    15  }

    16  mqhdr = mqinfo->mqi_hdr;

    17  attr = &mqhdr->mqh_attr;

    18  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) ! = 0) {

    19   errno = n;

    20   return(-1);

    21  }

    22  if (omqstat != NULL) {

    23   omqstat->mq_flags = mqinfo->mqi_flags; /* исходные атрибуты */

    24   omqstat->mq_maxmsg = attr->mq_maxmsg;

    25   omqstat->mq_msgsize = attr->mq_msgsize;

    26   omqstat->mq_curmsgs = attr->mq_curmsgs; /* текущий статус */

    27  }

    28  if (mqstat->mq_flags & O_NONBLOCK)

    29   mqinfo->mqi flags |= O_NONBLOCK;

    30  else

    31   mqinfo->ntqi_flags &= ~O_NONBLOCK;

    32  pthread_mutex_unlock(&mqhdr->mqh_lock);

    33  return(0);

    34 }

    Функция mq_notify

    Функция mq_notify, текст которой приведен в листинге 5.24, позволяет регистрировать процесс на уведомление для текущей очереди и снимать его с регистрации. Информация о зарегистрированных процессах (их идентификаторы) хранится в поле mqh_pid структуры mq_hdr. Только один процесс может быть зарегистрирован на уведомление в любой момент времени. При регистрации процесса мы сохраняем его структуру sigevent в структуре mqh_event.

    Листинг 5.24. Функция mq_notify

    //my_pxmsg_mmap/mq_notify.с

    1  #include "unpipc.h"

    2  #include "mqueue.h"


    3  int

    4  mymq_notify(mymqd_t mqd, const struct sigevent *notification)

    5  {

    6   int n;

    7   pid_t pid;

    8   struct mymq_hdr *mqhdr;

    9   struct mymq_info *mqinfo;

    10  mqinfo = mqd;

    11  if (mqinfo->mqi magic != MQI_MAGIC) {

    12   errno = EBADF;

    13   return(-1);

    14  }

    15  mqhdr = mqinfo->mqi_hdr;

    16  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

    17   errno = n;

    18   return(-1);

    19  }

    20  pid = getpid();

    21  if (notification == NULL) {

    22   if (mqhdr->mqh_pid == pid) {

    23    mqhdr->mqh_pid = 0; /* снятие вызвавшего процесса с регистрации */

    24   } /* если вызвавший процесс не зарегистрирован – 61К */

    25  } else {

    26   if (mqhdr->mqh_pid != 0) {

    27    if (kill(mqhdr->mqh_pid, 0) != –1 || errno != ESRCH) {

    28     errno = EBUSY;

    29     goto err;

    30    }

    31   }

    32   mqhdr->mqh_pid = pid;

    33   mqhdr->mqh_event = *notification;

    34  }

    35  pthread_mutex_unlock(&mqhdr->mqh_lock);

    36  return(0);

    37 err:

    38  pthread_mutex_unlock(&mqhdr->mqh_lock);

    39  return(-1);

    40 }

    Снятие процесса с регистрации

    20-24 Если второй аргумент представляет собой нулевой указатель, вызвавший процесс снимается с регистрации. Если он не зарегистрирован, никакой ошибки не возвращается.

    Регистрация вызвавшего процесса

    25-34 Если какой-либо процесс уже зарегистрирован, мы проверяем, существует ли он, отправкой ему сигнала с кодом 0 (называемого нулевым сигналом — null signal). Это обычная проверка на возможность ошибки, на самом деле при этом никакого сигнала процессу не отправляется, но при его отсутствии возвращается ошибка с кодом ESRCH. Если какой-либо процесс уже зарегистрирован на уведомление, функция возвращает ошибку EBUSY. В противном случае сохраняется идентификатор процесса вместе с его структурой sigevent.

    ПРИМЕЧАНИЕ

    Наш метод проверки существования вызвавшего процесса не идеален. Процесс мог завершить работу, а его идентификатор мог быть использован другим процессом.

    Функция mq_send

    В листинге 5.25 приведен текст первой половины нашей функции mqsend.

    Инициализация

    14-29 Мы получаем указатели на используемые структуры и блокируем взаимное исключение для данной очереди. Проверяем, не превышает ли размер сообщения максимально допустимый для данной очереди.

    Проверка очереди на пустоту и отправка уведомления

    30-38 Если мы помещаем первое сообщение в пустую очередь, нужно проверить, не зарегистрирован ли какой-нибудь процесс на уведомление об этом событии и нет ли потоков, заблокированных в вызове mq_receive. Для проверки второго условия мы воспользуемся сохраняемым функцией mq_receive счетчиком mqh_nwait, содержащим количество потоков, заблокированных в вызове mq_receive. Если этот счетчик имеет ненулевое значение, мы не отправляем уведомление зарегистрированному процессу. Для отправки сигнала SIGEV_SIGNAL используется функция sigqueue. Затем процесс снимается с регистрации.

    ПРИМЕЧАНИЕ

    Вызов sigqueue для отправки сигнала приводит к передаче сигнала SI_QUEUE обработчику сигнала в структуре типа siginfo_t (раздел 5.7), что неправильно. Отправка правильного значения si_code (а именно SI_MESGQ) из пользовательского процесса осуществляется в зависимости от реализации. На с. 433 стандарта IEEE 1996 [8] отмечается, что для отправки этого сигнала из пользовательской библиотеки необходимо воспользоваться скрытым интерфейсом механизма отправки сигналов. 

    Проверка заполненности очереди

    39-48 Если очередь переполнена и установлен флаг O_NONBLOCK, мы возвращаем ошибку с кодом EAGAIN. В противном случае мы ожидаем сигнала по условной переменной mqh_wait, который, как мы увидим, отправляется функцией mq_receive при считывании сообщения из переполненной очереди.

    ПРИМЕЧАНИЕ

    Наша реализация упрощает ситуацию с возвращением ошибки EINTR при прерывании вызова mq_send сигналом, перехватываемым вызвавшим процессом. Проблема в том, что функция pthread_cond_wait не возвращает ошибки при возврате из обработчика сигнала: она может вернуть либо 0 (что рассматривается как ложное пробуждение), либо вообще не завершить работу. Все эти проблемы можно обойти, но это непросто. 

    В листинге 5.26 приведена вторая половина функции mq_send. К моменту ее выполнения мы уже знаем о наличии в очереди свободного места для нашего сообщения.

    Листинг 5.25. Функция mq_send: первая половина

    //my_pxmsg_mmap/mq_send.с

    1  #include "unpipc.h"

    2  #include "mqueue.h"


    3  int

    4  mymq_send(mymqd_t mqd, const char *ptr, size_t len, unsigned int prio)

    5  {

    6   int n;

    7   long index, freeindex;

    8   int8_t *mptr;

    9   struct sigevent *sigev;

    10  struct mymq_hdr *mqhdr;

    11  struct mymq_attr *attr;

    12  struct mymsg_hdr *msghdr, *nmsghdr, *pmsghdr;

    13  struct mymq_info *mqinfo;

    14  mqinfo = mqd;

    15  if (mqinfo->mqi_magic != MQI_MAGIC) {

    16   errno = EBADF;

    17   return(-1);

    18  }

    19  mqhdr = mqinfo->mqi_hdr; /* указатель типа struct */

    20  mptr = (int8_t *) mqhdr; /* указатель на байт */

    21  attr = &mqhdr->mqh_attr;

    22  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

    23   errno = n;

    24   return(-1);

    25  }

    26  if (len > attr->mq_msgsize) {

    27   errno = EMSGSIZE;

    28   goto err;

    29  }

    30  if (attr->mq_curmsgs == 0) {

    31   if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {

    32    sigev = &mqhdr->mqh_event;

    33    if (sigev->sigev_notify == SIGEV_SIGNAL) {

    34     sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,

    35      sigev->sigev_value);

    36    }

    37    mqhdr->mqh_pid = 0; /* снятие с регистрации */

    38   }

    39  } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {

    40   /* 4queue is full */

    41   if (mqinfo->mqi_flags & O_NONBLOCK) {

    32    errno = EAGAIN;

    43    goto err;

    44   }

    45   /* ожидание освобождения места в очереди */

    46   while (attr->mq_curmsgs >= attr->mq_maxmsg)

    47    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);

    48  }

    Листинг 5.25. Функция mq_send: вторая половина

    //my_pxmsg_mmap/mq_send.с

    49  /* nmsghdr будет указывать на новое сообщение*/

    50  if ((freeindex = mqhdr->mqh_free) == 0)

    51   err_dump("mymq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);

    52  nmsghdr = (struct mymsg_hdr *) &mptr[freeindex];

    53  nmsghdr->msg_prio = prio;

    54  nmsghdr->msg_len = len;

    55  memcpy(nmsghdr + 1, ptr, len); /* копирование сообщения в очередь */

    56  mqhdr->mqh_free = nmsghdr->msg_next; /* новое начало списка пустых сообщений */

    57  /* поиск места в списке для нового сообщения */

    58  index = mqhdr->mqh_head;

    59  pmsghdr = (struct mymsg_hdr *) &(mqhdr->mqh_head);

    60  while (index != 0) {

    61   msghdr = (struct mymsg_hdr *) &mptr[index];

    62   if (prio > msghdr->msg_prio) {

    63    nmsghdr->msg_next = index;

    64    pmsghdr->msg_next = freeindex;

    65    break;

    66   }

    67   index = msghdr->msg_next;

    68   pmsghdr = msghdr;

    69  }

    70  if (index == 0) {

    71   /* очередь была пуста или новое письмо добавлено к концу списка */

    72   pmsghdr->msg_next = freeindex;

    73   nmsghdr->msg_next = 0;

    74  }

    75  /* запускаем любой из процессов, заблокированных в mq_receive */

    76  if (attr->mq_curmsgs == 0)

    77   pthread_cond_signal(&mqhdr->mqh_wait);

    78  attr->mq_curmsgs++;

    79  pthread_mutex_unlock(&mqhdr->mqh_lock);

    80  return(0);

    81 err:

    82  pthread_mutex_unlock(&mqhdr->mqh lock);

    83  return(-1);

    84 }

    Получение индекса свободного блока

    50-52 Поскольку количество свободных сообщений при создании очереди равно mq_maxmsg, ситуация, в которой mq_curmsgs будет меньше mq_maxmsg для пустого списка свободных сообщений, возникнуть не может.

    Копирование сообщения

    53-56 Указатель nmsghdr хранит адрес области памяти, в которую помещается сообщение. Приоритет и длина сообщения сохраняются в структуре msg_hdr, а затем в память копируется содержимое сообщения, переданного вызвавшим процессом.

    Помещение нового сообщения в соответствующее место связного списка

    57-74 Порядок сообщений в нашем списке зависит от их приоритета: они расположены в порядке его убывания. При добавлении нового сообщения мы проверяем, существуют ли сообщения с тем же приоритетом; в этом случае сообщение добавляется после последнего из них. Используя такой метод упорядочения, мы гарантируем, что mq_receive всегда будет возвращать старейшее сообщение с наивысшим приоритетом. По мере продвижения по списку мы сохраняем в pmsghdr адрес предыдущего сообщения, поскольку именно это сообщение будет хранить индекс нового сообщения в поле msg_next.

    ПРИМЕЧАНИЕ

    Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета. 

    Пробуждение любого процесса, заблокированного в вызове mq_receive

    75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.

    78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.

    Функция mq_receive

    В листинге 5.27 приведен текст первой половины функции mq_receive, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины.

    Проверка полноты очереди

    30-40 Если очередь пуста и установлен флаг O_NONBLOCK, возвращается ошибка с кодом EAGAIN. В противном случае увеличивается значение счетчика mqh_nwait, который проверяется функцией mq_send (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq_send (листинг 5.26).

    ПРИМЕЧАНИЕ

    Наша реализация mq_receive, как и реализация mq_send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом.

    В листинге 5.28 приведен текст второй половины функции mq_receive. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу.

    Листинг 5.27.Функция mq_receive: первая половина

    //my_pxmsg_mmap/mq_receive.с

    1  #include "unpipc.h"

    2  #include "mqueue.h"


    3  ssize_t

    4  mymq_receive(mymqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)

    5  {

    6   int n;

    7   long index;

    8   int8_t *mptr;

    9   ssize_t len;

    10  struct mymq_hdr *mqhdr;

    11  struct mymq_attr *attr;

    12  struct mymsg_hdr *msghdr;

    13  struct mymq_info *mqinfo;

    14  mqinfo = mqd;

    15  if (mqinfo->mqi_magic != MQI_MAGIC) {

    16   errno = EBADF;

    17   return(-1);

    18  }

    19  mqhdr = mqinfo->mqi_hdr; /* указатель struct */

    20  mptr = (int8_t *) mqhdr; /* указатель на байт */

    21  attr = &mqhdr->mqh_attr;

    22  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

    23   errno = n;

    24   return(-1);

    25  }

    26  if (maxlen < attr->mq_msgsize) {

    27   errno = EMSGSIZE;

    28   goto err;

    29  }

    30  if (attr->mq_curmsgs = 0) { /* очередь пуста */

    31   if (mqinfo->mqi_flags & O_NONBLOCK) {

    32    errno = EAGAIN;

    33    goto err;

    34   }

    35   /* ожидаем помещения сообщения в очередь */

    36   mqhdr->mqh_nwait++;

    37   while (attr->mq_curmsgs == 0)

    38    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);

    39   mqhdr->mqh_nwait--;

    40  }

    Листинг 5.28. Функция mq_receive: вторая половина

    //my_pxmsg_mmap/mq_receive.c

    41 
    if ((index = mqhdr->mqh_head) == 0)

    42   err_dump("mymq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);

    43  msghdr = (struct mymsg_hdr *) &mptr[index];

    44  mqhdr->mqh_head = msghdr->msg_next; /* новое начало списка */

    45  len = msghdr->msg_len;

    46  memcpy(ptr, msghdr + 1, len); /* копирование самого сообщения */

    47  if (priop != NULL)

    48   *priop = msghdr->msg_prio;

    49  /* только что считанное сообщение становится первым в списке пустых */

    50  msghdr->msg_next = mqhdr->mqr_free;

    51  mqhdr->mqh_free = index;

    52  /* запуск любого процесса, заблокированного в вызове mq_send */

    53  if (attr->mq_curmsgs == attr->mq_maxmsg)

    54   pthread_cond_signal(&mqhdr->mqh_wait);

    55  attr->mq_curmsgs--;

    56  pthread_mutex_unlock(&mqhdr->mqh_lock);

    57  return(len);

    58 err:

    59  pthread_mutex_unlock(&mqhdr->mqh_lock);

    60  return(-1);

    61 }

    Возвращение сообщения вызвавшему процессу

    43-51 msghdr указывает на msg_hdr первого сообщения в очереди, которое мы и возвратим. Освободившееся сообщение становится первым в списке свободных. 

    Разблокирование процесса, заблокированного в вызове mq_send

    52-54 Если очередь была полной в момент считывания сообщения, мы вызываем pthread_cond_signal для отправки сообщения любому из процессов, заблокированных в вызове mq_send.

    5.9. Резюме

    Очереди сообщений Posix просты в использовании: новая очередь создается (или существующая открывается) функцией mq_open; закрываются очереди вызовом mq_close, а удаляются mq_unlink. Поместить сообщение в очередь можно функцией mq_send, а считать его оттуда можно с помощью mq_receive. Атрибуты очереди можно считать и установить с помощью функций mq_getattr и mq_setattr, а функция mq_notify позволяет зарегистрировать процесс на уведомление о помещении нового сообщения в пустую очередь. Каждое сообщение в очереди обладает приоритетом (небольшое целое число), и функция mq_receive всегда возвращает старейшее сообщение с наивысшим приоритетом.

    Изучая mq_notify, мы познакомились с сигналами реального времени стандарта Posix, которые обладают номерами от SIGMIN до SIGMAX. При установке обработчика для этих сигналов с флагом SA_SIGINFO они будут помещаться в очередь, доставляться в порядке очереди и сопровождаться двумя дополнительными аргументами (при вызове обработчика).

    Наконец, мы реализовали большую часть возможностей очереди сообщений Posix в приблизительно 500 строках кода на языке С, используя отображаемые в память файлы, взаимные исключения и условные переменные Posix. Эта реализация иллюстрирует обработку ситуации гонок при создании новой очереди; еще раз нам придется столкнуться с такой ситуацией в главе 10 при реализации семафоров Posix.

    Упражнения

    1.  Говоря о листинге 5.4, мы отметили, что атрибут attr функции mq_open при создании новой очереди является ненулевым; следует указать оба поля: mq_maxmsg и mq_msgsize. Как можно было бы указать только одно из них, не указывая второе, для которого использовать значения атрибутов по умолчанию?

    2. Измените листинг 5.8 так, чтобы при получении сигнала не вызывалась функция mq_notify. Затем поместите в очередь два сообщения и убедитесь, что для второго из них сигнал порожден не будет. Почему?

    3. Измените листинг 5.8 так, чтобы сообщение из очереди при получении сигнала не считывалось. Вместо этого просто вызовите mq_notify и напечатайте сообщение о получении сигнала. Затем отправьте два сообщения и убедитесь, что для второго из них сигнал не порождается. Почему?

    4. Что произойдет, если мы уберем преобразование двух констант к целому типу в первом вызове printf в листинге 5.14? 

    5. Измените листинг 5.4 следующим образом: перед вызовом mq_open напечатайте сообщение и подождите 30 секунд (sleep). После возвращения из mq_open выведите еще одно сообщение и подождите еще 30 секунд, а затем вызовите mq_close. Откомпилируйте программу и запустите ее, указав большое количество сообщений (несколько сотен тысяч) и максимальный размер сообщения, скажем, в 10 байт. Задача заключается в том, чтобы создать большую очередь и проверить, используются ли в реализации отображаемые в память файлы. В течение 30-секундной паузы запустите программу типа ps и посмотрите на занимаемый программой объем памяти. Сделайте это еще раз после возвращения из mq_open. Можете ли вы объяснить происходящее?

    6. Что произойдет при вызове memcpy в листинге 5.26, если вызвавший процесс укажет нулевую длину сообщения?

    7. Сравните очередь сообщений с двусторонними каналами, описанными в разделе 4.4. Сколько очередей нужно для двусторонней связи между родительским и дочерним процессами?

    8. Почему мы не удаляем взаимное исключение и условную переменную в листинге 5.20?

    9. Стандарт Posix утверждает, что дескриптор очереди сообщений не может иметь тип массива. Почему? 

    10. В каком состоянии проводит большую часть времени функция main из листинга 5.12? Что происходит каждый раз при получении сигнала? Как мы обрабатываем эту ситуацию?

    11. Не все реализации поддерживают атрибут PTHREAD_PROCESS_SHARED для взаимных исключений и условных переменных. Переделайте реализацию очередей сообщений из раздела 5.8 так, чтобы использовать семафоры Posix (глава 10) вместо взаимных исключений и условных переменных.

    12. Расширьте реализацию очередей сообщений Posix из раздела 5.8 так, чтобы она поддерживала SIGEV_THREAD. 

    ГЛАВА 6

    Очереди сообщений System V

    6.1. Введениеы

    Каждой очереди сообщений System V сопоставляется свой идентификатор очереди сообщений. Любой процесс с соответствующими привилегиями (раздел 3.5) может поместить сообщение в очередь, и любой процесс с другими соответствующими привилегиями может сообщение из очереди считать. Как и для очередей сообщений Posix, для помещения сообщения в очередь System V не требуется наличия подключенного к ней на считывание процесса.

    Ядро хранит информацию о каждой очереди сообщений в виде структуры, определенной в заголовочном файле <sys/msg.h>:

    struct msqid_ds {

     struct ipc_perm msg_perm; /* Разрешения чтения и записи: раздел 3.3 */

     struct msg *msg_first; /* указатель на первое сообщение в очереди */

     struct msg *msg_last; /* указатель на последнее сообщение в очереди */

     msglen_t msg_cbytes; /* размер очереди в байтах */

     msgqnum_t msg_qnum;  /* количество сообщений в очереди */

     msglen_t msg_qbytes; /* максимальный размер очереди в байтах */

     pid_t msg_lspid;  /* идентификатор (pid) последнего процесса, вызвавшего msgsnd(); */

     pid_t msg_lrpid;  /* pid последнего msgrcv(); */

     time_t msg_stime; /* время отправки последнего сообщения */

     time_t msg_rtime; /* время последнего считывания сообщения */

     time_t msg_ctime; /* время последнего вызова msgctl(), изменившего одно из полей структуры */

    };

    ПРИМЕЧАНИЕ

    Unix 98 не требует наличия полей msg_first, msg_last и msg_cbytes. Тем не менее они имеются в большинстве существующих реализаций, производных от System V. Естественно, ничто не заставляет реализовывать очередь сообщений через связный список, который неявно предполагается при наличии полей msg_first и msg_last. Эти два указателя обычно указывают на участки памяти, принадлежащие ядру, и практически бесполезны для приложения.

    Мы можем изобразить конкретную очередь сообщений, хранимую ядром как связный список, — рис. 6.1. В этой очереди три сообщения длиной 1, 2 и 3 байта с типами 100, 200 и 300 соответственно.

    В этой главе мы рассмотрим функции, используемые для работы с очередями сообщений System V, и реализуем наш пример файлового сервера из раздела 4.2 с использованием очередей сообщений. 

    Рис. 6.1. Структура очереди system V в ядре

    6.2. Функция msgget

    Создать новую очередь сообщений или получить доступ к существующей можно с помощью функции msgget:

    #include <sys/msg.h>

    int msgget(key_t key, int oflag);

    /* Возвращает неотрицательный идентификатор в случае успешного завершения, –1 в случае ошибки */

    Возвращаемое значение представляет собой целочисленный идентификатор, используемый тремя другими функциями msg для обращения к данной очереди. Идентификатор вычисляется на основе указанного ключа, который может быть получен с помощью функции ftok или может представлять собой константу IPC_PRIVATE, как показано на рис. 3.1.

    Флаг oflag представляет собой комбинацию разрешений чтения-записи, показанную в табл. 3.3. К разрешениям можно добавить флаги IPC_CREAT или IPC_CREAT | IPC_EXCL с помощью логического сложения, как уже говорилось в связи с рис. 3.2.

    При создании новой очереди сообщений инициализируются следующие поля структуры msqid_ds:

    ■ полям uid и cuid структуры msg_perm присваивается значение действующего идентификатора пользователя вызвавшего процесса, а полям gid и cgid — действующего идентификатора группы;

    ■ разрешения чтения-записи, указанные в oflag, помещаются в msg_perm.mode;

    ■ значения msg_qnum, msg_lspid, msg_lrpid, msg_stime и msg_rtime устанавливаются в 0;

    ■ в msg_ctime записывается текущее время;

    ■ в msg_qbytes помещается системное ограничение на размер очереди.

    6.3. Функция msgsnd

    После открытия очереди сообщений с помощью функции msgget можно помещать сообщения в эту очередь с помощью msgsnd.

    #include <sys/msg.h>

    int msgsnd(int msqid, const void *ptr, size_t length, int flag); 

    /* Возвращает 0 в случае успешного завершения; –1 – в случае ошибки */

    Здесь msqid представляет собой идентификатор очереди, возвращаемый msgget. Указатель ptr указывает на структуру следующего шаблона, определенного в <sys/ msg.h>:

    struct msgbuf {

     long mtype; /* тип сообщения, должен быть > 0 */

     char mtext[1]; /* данные */

    };

    Тип сообщения должен быть больше нуля, поскольку неположительные типы используются в качестве специальной команды функции msgrcv, о чем рассказывается в следующем разделе.

    Название mtext в структуре msgbuf употреблено не вполне правильно; данные в сообщении совсем не обязательно должны быть текстом. Разрешена передача любых типов данных как в двоичном, так и в текстовом формате. Ядро никак не интерпретирует содержимое сообщения.

    Для описания структуры мы используем термин «шаблон», поскольку ptr указывает на целое типа long, представляющее собой тип сообщения, за которым непосредственно следует само сообщение (если его длина больше 0 байт). Большинство приложений не пользуются этим определением структуры msgbuf, поскольку установленного в ней количества данных (1 байт) обычно недостаточно для прикладных задач. На количество данных в сообщении никаких ограничений при компиляции не накладывается (как правило, оно может быть изменено системным администратором), поэтому вместо объявления структуры с большим объемом данных (большим, чем поддерживается текущей реализацией) определяется этот шаблон. Большинство приложений затем определяют собственную структуру сообщений, в которой передаваемые данные зависят от нужд этих приложений.

    Например, если приложению нужно передавать сообщения, состоящие из 16-разрядного целого, за которым следует 8-байтовый массив символов, оно может определить свою собственную структуру так:

    #define MY_DATA 8

    typedef struct my_msgbuf {

     long mtype; /* тип сообщения */

     int16_t mshort; /* начало данных */

     char mchar[MY_DATA];

    } Message;

    Аргумент length функции msgsnd указывает длину сообщения в байтах. Это длина пользовательских данных, следующих за типом сообщения (целое типа long). Длина может быть и 0. В указанном выше примере длина может быть вычислена как sizeof(Message) – sizeof(long).

    Аргумент flag может быть либо 0, либо IPC_NOWAIT. В последнем случае он отключает блокировку для msgsnd: если для нового сообщения недостаточно места в очереди, возврат из функции происходит немедленно. Это может произойти, если:

    ■ в данной очереди уже имеется слишком много данных (значение msg_qbytes в структуре msqid_ds);

    ■ во всей системе имеется слишком много сообщений.

    Если верно одно из этих условий и установлен флаг IPC_NOWAIT, функция msgsnd возвращает ошибку с кодом EAGAIN. Если флаг IPC_NOWAIT не указан, а одно из этих условий выполняется, поток приостанавливается до тех пор, пока не произойдет одно из следующего:

    ■ для сообщения освободится достаточно места;

    ■ очередь с идентификатором msqid будет удалена из системы (в этом случае возвращается ошибка с кодом EIDRM);

    ■ вызвавший функцию поток будет прерван перехватываемым сигналом (в этом случае возвращается ошибка с кодом EINTR).

    6.4. Функция msgrcv

    Сообщение может быть считано из очереди с помощью функции msgrcv.

    #include <sys/msg.h>

    ssize_t msgrcv(int msqid, void *ptr, size_t length, long type, int flag);

    /* Возвращает количество данных в сообщении, –1 – в случае ошибки */

    Аргумент ptr указывает, куда следует помещать принимаемые данные. Как и для msgsnd, он указывает на поле данных типа long (рис. 4.13), которое непосредственно предшествует полезным данным.

    Аргумент length задает размер относящейся к полезным данным части буфера, на который указывает ptr. Это максимальное количество данных, которое может быть возвращено функцией. Поле типа long не входит в эту длину.

    Аргумент type определяет тип сообщения, которое нужно считать из очереди:

    ■ если значение type равно 0, возвращается первое сообщение в очереди (то есть при указании типа 0 возвращается старейшее сообщение);

    ■ если тип больше 0, возвращается первое сообщение, тип которого равен указанному;

    ■ если тип меньше нуля, возвращается первое сообщение с наименьшим типом, значение которого меньше либо равно модулю аргумента type.

    Рассмотрим пример очереди сообщений, изображенный на рис. 6.1. В этой очереди имеются три сообщения:

    ■ первое сообщение имеет тип 100 и длину 1;

    ■ второе сообщение имеет тип 200 и длину 2;

    ■ третье сообщение имеет тип 300 и длину 3.

    Таблица 6.1 показывает, какое сообщение будет возвращено при различных значениях аргумента type.


    Таблица 6.1. Возвращаемое сообщение в зависимости от аргумента type 

    type Тип возвращаемого сообщения
    0 100
    100 100
    200 200
    300 300
    -100 100
    -200 100
    -300 100 

    Аргумент flag указывает, что делать, если в очереди нет сообщения с запрошенным типом. Если установлен бит IPC_NOWAIT, происходит немедленный возврат из функции msgrcv с кодом ошибки ENOMSG. В противном случае вызвавший процесс блокируется до тех пор, пока не произойдет одно из следующего:

    ■ появится сообщение с запрошенным типом;

    ■ очередь с идентификатором msqid будет удалена из системы (в этом случае будет возвращена ошибка с кодом EIDRM);

    ■ вызвавший поток будет прерван перехватываемым сигналом (в этом случае возвращается ошибка EINTR).

    В аргументе flag можно указать дополнительный бит MSG_NOERROR. При установке этого бита данные, превышающие объем буфера (аргумент length), будут просто обрезаться до его размера без возвращения кода ошибки. Если этот флаг не указать, при превышении объемом сообщения аргумента length будет возвращена ошибка E2BIG.

    В случае успешного завершения работы msgrcv возвращает количество байтов в принятом сообщении. Оно не включает байты, нужные для хранения типа сообщения (long), который также возвращается через указатель ptr.

    6.5. Функция msgctl

    Функция msgctl позволяет управлять очередями сообщений:

    #include <sys/msg.h>

    int msgctl(int msqid, int cmd, struct msqid_ds *buff);

    /* Возвращает 0 в случае успешного завершения, –1 в случае ошибки */

    Команд (аргумент cmd) может быть три:

    ■ IPC_RMID — удаление очереди с идентификатором msqidиз системы. Все сообщения, имеющиеся в этой очереди, будут утеряны. Мы уже видели пример действия этой функции в листинге 3.2. Для этой команды третий аргумент функции игнорируется.

    ■ IPC_SET — установка значений четырех полей структуры msqid_ds данной очереди равными значениям соответствующих полей структуры, на которую указывает аргумент buff: msg_perm.uid, msg_perm.gid, msg_perm.mode, msg_qbytes.

    ■ IPC_STAT — возвращает вызвавшему процессу (через аргумент buff) текущее содержимое структуры msqid_ds для указанной очереди.

    Пример

    Программа в листинге 6.1 создает очередь сообщений, помещает в нее сообщение с 1 байтом информации, вызывает функцию msgctl с командой IPC_STAT, выполняет команду ipcs, используя функцию system, а затем удаляет очередь, вызвав функцию msgctl с командой IPC_RMID.

    Листинг 6.1.[1] Пример использования функции msgctl с командой IPC_STAT

    //svmsg/ctl.с

    1  #include "unpipc.h"


    2  int

    3  main(int argc, char **argv)

    4  {

    5   int msqid;

    6   struct msqid_ds info;

    7   struct msgbuf buf;

    8   msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);

    9   buf.mtype = 1;

    10  buf.mtext[0] = 1;

    11  Msgsnd(msqid, &buf, 1, 0);

    12  Msgctl(msqid, IPC_STAT, &info);

    13  printf("read-write: *03o, cbytes = %lu, qnum = %lu, qbytes = %lu\n",

    14   info.msg_perm.mode & 0777, (ulong_t) info.msg_cbytes,

    15   (ulong_t) info.msg_qnum, (ulong_t) info.msg_qbytes);

    16  system("ipcs –q");

    17  Msgctl(msqid, IPC_RMID, NULL);

    18  exit(0);

    19 }

    Мы собираемся отправить сообщение размером 1 байт, поэтому можно просто воспользоваться стандартным определением структуры msgbuf из <sys/msg.h>. Выполнение этой программы приведет к следующему результату:

    solaris %ctl

    read-write: 664, cbytes = 1, qnum = 1, qbytes = 4096

    IPC status from <running system> as of MOn Oct 20 15:36:49 1997

    T ID   Key      MODE       OWNER    GROUP

    Message Queues:

    q 1150 00000000 –rw-rw-r-- rstevens other1

    Выведенные значения соответствуют ожидаемым. Нулевое значение ключа обычно соответствует IPC_PRIVATE, как мы отмечали в разделе 3.2. В этой системе на очередь сообщений накладывается ограничение по объему в 4096 байт. Поскольку мы записали сообщение с 1 байтом данных и msg_cbytes имеет значение 1, это ограничение накладывается на объем полезных данных и не включает тип сообщения (long), указываемый для каждого сообщения.

    6.6. Простые примеры

    Поскольку очереди сообщений System V обладают живучестью ядра, мы можем написать несколько отдельных программ для работы с этими очередями и изучить их действие.

    Программа msgcreate

    В листинге 6.2 приведена программа msgcreate, создающая очередь сообщений.

    9-12 Параметр командной строки –e позволяет указать флаг IPC_EXCL.

    16 Полное имя файла, являющееся обязательным аргументом командной строки, передается функции ftok. Получаемый ключ преобразуется в идентификатор функцией msgget.

    Листинг 6.2. Создание очереди сообщений System V

    //svmsg/msgcreate.c

    1  #include "unpipc.h"


    2  int

    3  main(int argc, char **argv)

    4  {

    5   int c, oflag, mqid;

    6   oflag = SVMSG_MODE | IPC_CREAT;

    7   while ((c = Getopt(argc, argv, "e")) != –1) {

    8    switch (c) {

    9    case 'e':

    10    oflag |= IPC_EXCL;

    11    break;

    12   }

    13  }

    14  if (optind != argc – 1)

    15   err_quit("usage: msgcreate [ –e ] <pathname>");

    16  mqid = Msgget(Ftok(argv[optind], 0), oflag);

    17  exit(0);

    18 }

    Программа msgsnd

    Программа msgsnd приведена в листинге 6.3. Она помещает в очередь одно сообщение заданной длины и типа.

    Мы создаем указатель на структуру msgbuf общего вида, а затем выделяем место под реальную структуру (буфер записи) соответствующего размера, вызвав calloc. Эта функция инициализирует буфер нулем.

    Листинг 6.3. Помещение сообщения в очередь System V

    //svmsg/msgsnd.c

    1  #include "unpipc.h"


    2  int

    3  main(int argc, char **argv)

    4  {

    5   int mqid;

    6   size_t len;

    7   long type;

    8   struct msgbuf *ptr;

    9   if (argc != 4)

    10   err_quit("usage: msgsnd <pathname> <#bytes> <type>");

    11  len = atoi(argv[2]);

    12  type = atoi(argv[3]);

    13  mqid = Msgget(Ftok(argv[1], 0), MSG_W);

    14  ptr = Calloc(sizeof(long) + len, sizeof(char));

    15  ptr->mtype = type;

    16  Msgsnd(mqid, ptr, len, 0);

    17  exit(0);

    18 }

    Программа msgrcv

    В листинге 6.4 приведен текст программы msgrcv, считывающей сообщение из очереди. В командной строке может быть указан параметр –n, отключающий блокировку, а параметр –t может быть использован для указания типа сообщения в функции msgrcv.

    2 Не существует простого способа определить максимальный размер сообщения (об этом и других ограничениях мы поговорим в разделе 6.10), поэтому мы определим свою собственную константу.

    Листинг 6.4. Считывание сообщения из очереди System V

    //svmsg/msgrcv.c

    1  #include "unpipc.h"

    2  #define MAXMSG (8192 + sizeof(long))


    3  int

    4  main(int argc, char **argv)

    5  {

    6   int c, flag, mqid;

    7   long type;

    8   ssize_t n;

    9   struct msgbuf *buff;

    10  type = flag = 0;

    11  while ((c = Getopt(argc, argv, "nt:")) != –1) {

    12   switch (c) {

    13   case 'n':

    14    flag |= IPC_NOWAIT;

    15    break;

    16   case 't':

    17    type = atol(optarg);

    18    break;

    19   }

    20  }

    21  if (optind != argc – 1)

    22   err_quit("usage: msgrcv [ –n ] [ –t type ] <pathname>");

    23  mqid = Msgget(Ftok(argv[optind], 0), MSG_R);

    24  buff = Malloc(MAXMSG);

    25  n = Msgrcv(mqid, buff, MAXMSG, type, flag);

    26  printf("read %d bytes, type = %ld\n", n, buff->mtype);

    27  exit(0);

    28 }

    Программа msgrmid

    Для удаления очереди сообщений мы вызываем функцию msgctl с командой IPC_RMID, как показано в листинге 6.5.

    Листинг 6.5. Удаление очереди сообщений System V

    //svmsg/msgrmid.c

    1  #include "unpipc.h"


    2  int

    3  main(int argc, char **argv)

    4  {

    5   int mqid;

    6   if (argc != 2)

    7    err_quit("usage: msgrmid <pathname>");

    8   mqid = Msgget(Ftok(argv[1], 0), 0);

    9   Msgctl(mqid, IPC_RMID, NULL);

    10  exit(0);

    11 }

    Примеры

    Теперь воспользуемся четырьмя только что написанными программами. Создадим очередь и поместим в нее три сообщения:

    solaris % msgcreate /tmp/no/such/file

    ftok error for pathname "tmp/no/such/file" and id 0: No such file or directory

    solaris % touch /trap/test1

    solaris % msgcreate /tmp/test1

    solaris % msgsnd /tmp/test1 1 100

    solaris % msgsnd /tmp/test1 2 200

    solaris % msgsnd /tmp/test1 3 300

    solaris % ipcs –qo

    IPC status from <running system> as of Sat Jan 10 11:25:45 1998

    T ID  KEY        MODE       OWNER    GROUP  CBYTES QNUM

    Message Queues:

    q 100 0х0000113e –rw-r--r-- rstevens other1 6      3

    Сначала мы пытаемся создать очередь, используя имя несуществующего файла. Пример показывает, что файл, указываемый в качестве аргумента ftok, обязательно должен существовать. Затем мы создаем файл /tmp/test1 и используем его имя при создании очереди сообщений. После этого в очередь помещаются три сообщения длиной 1, 2 и 3 байта со значениями типа 100, 200 и 300 (вспомните рис. 6.1). Программа ipcs показывает, что в очереди находятся 3 сообщения общим объемом 6 байт.

    Теперь продемонстрируем использование аргумента type при вызове msgrcv для считывания сообщений в произвольном порядке:

    solaris % msgrcv –t 200 /tmp/test1

    read 2 bytes, type = 200

    solaris % msgrcv –t -300 /tmp/test1

    read 1 bytes, type = 100

    solaris % msgrcv /tmp/test1

    read 3 bytes, type = 300

    solaris % msgrcv –n /tmp/test1

    msgrcv error: No message of desired type

    В первом примере запрашивается сообщение с типом 200, во втором примере — сообщение с наименьшим значением типа, не превышающим 300, а в третьем — первое сообщение в очереди. Последний запуск msgrcv иллюстрирует действие флага IPC_NOWAIT.

    Что произойдет, если мы укажем положительное значение типа, а сообщений с таким типом в очереди не обнаружится?

    solaris % ipcs –qo

    IPC status from <running system> as of Sat Jan 10 11:37:01 1998

    T ID  KEY        MODE       OWNER    GROUP  CBYTES QNUM

    Message Queues:

    q 100 0x0000113e –rw-r--r-- rstevens other1 0      0

    solaris % msgsnd /tmp/test1 1 100

    solaris % msgrcv –t 999 /temp/test1

    ^? нажали клавишу прерывания выполнения программы

    solaris % msgrcv –n –t999/tmp/test1

    msgrcv error: No message of desired type

    solaris % grep desired /usr/include/sys/errno.h

    #define ENOMSG 35 /* No message of desired type */

    solaris % msgrmid /tmp/test1

    Сначала мы вызываем ipcs, чтобы убедиться, что очередь пуста, а затем помещаем в нее сообщение длиной 1 байт с типом 100. Затем мы запрашиваем сообщение с типом 999, и программа блокируется (при вызове msgrcv), ожидая помещения в очередь сообщения с указанным типом. Мы прерываем ожидание нажатием клавиши. Затем мы запускаем программу с флагом –n, предотвращающим блокировку, и видим, что в этом случае возвращается ошибка с кодом ENOMSG. После этого мы удаляем очередь с помощью программы msgrmid. Мы могли бы удалить очередь и с помощью системной команды

    solaris % ipcrm –q 100

    в которой указывается идентификатор очереди, или с помощью той же команды в другом формате

    solaris % ipcrm –Q 0x113e

    где указывается ключ очереди сообщений.

    Программа msgrcvid

    Покажем теперь, что для получения доступа к очереди сообщений System V не обязательно вызывать msgget: все, что нужно, — это знать идентификатор очереди сообщений, который легко получить с помощью ipcs, и считать разрешения доступа для очереди. В листинге 6.6 приведен упрощенный вариант программы msgrcv из листинга 6.4.

    Здесь мы уже не используем msgget. Вместо этого используется идентификатор очереди сообщений, являющийся обязательным аргументом командной строки.

    Листинг 6.6. Считывание из очереди сообщений System V с известным идентификатором

    //svmsg/msgrcvid.c

    1  #include "unpipc.h"

    2  #define MAXMSG (8192 + sizeof(long))


    3  int

    4  main(int argc, char **argv)

    5  {

    6   int mqid;

    7   ssize_t n;

    8   struct msgbuf *buff;

    9   if (argc != 2)

    10   err_quit("usage: msgrcvid <mqid>");

    11  mqid = atoi(argv[1]);

    12  buff = Maloc(MAXMSG);

    13  n = Msgrcv(mqid, buff, MAXMSG, 0, 0);

    14  printf("read %d bytes, type = %ld\n", n, buff->mtype);

    15  exit(0);

    16 }

    Вот пример использования этой программы:

    solaris % touch /tmp/testid

    solaris % msgcreate /tmp/testid

    solaris % msgsnd /tmp/testid4 400

    solaris % ipcs –qo

    IPC status from <running system> as of Wed Mar 25 09:48:28 1998

    T ID  KEY        MODE       OWNER    GROUP  CBYTES QNUM

    Message Queues:

    q 150 0x0000118a –rw-r--r-- rstevens other1 4      1

    solaris % msgrcvid 150

    read 4 bytes, type = 400

    Идентификатор очереди (150) мы узнали с помощью ipcs, его мы и предоставляем программе msgrcvid в качестве аргумента командной строки.

    Этот же метод можно использовать для семафоров System V (упражнение 11.1) и разделяемой памяти System V (упражнение 14.1).

    6.7. Пример программы клиент-сервер

    Перепишем наш пример программы типа клиент-сервер из раздела 4.2 с использованием двух очередей сообщений. Одна из очередей предназначена для передачи сообщений от клиента серверу, а другая — в обратную сторону.

    Заголовочный файл svmsg.h приведен в листинге 6.7. Мы подключаем наш стандартный заголовочный файл и определяем ключи для каждой из очередей сообщений.

    Листинг 6.7. Заголовочный файл svmsg.h для программы клиент-сервер, использующей очереди сообщений

    //svmsgcliserv/svmsg.h

    1 #include "unpipc.h"

    2 #define MQ_KEY1 1234L

    3 #define MQ_KEY2 2345L

    Функция main для сервера приведена в листинге 6.8. Программа создает обе очереди сообщений, и не беда, если какая-нибудь из них уже существует, потому что мы не указываем флаг IPC_EXCL. Функция server дана в листинге 4.16. Она вызывает наши собственные функции mesgsend и mesgrecv, новые версии которых будут приведены ниже.

    Листинг 6.8. Функция main программы-сервера, использующей очереди сообщений

    //svmsgcliserv/server_main.с

    1  #include "svmsg.h"


    2  void server(int, int);

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int readid, writeid;

    7   readid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);

    8   writeid = Msgget(MQ_KEY2, SVMSG_MODE | IPC_CREAT);

    9   server(readid, writeid);

    10  exit(0);

    11 }

    Листинг 6.9. Функция main программы-клиента, использующей очереди сообщений

    //svmsgcliserv/client_main.с

    1  #include "svmsg.h"

    2  void client(int, int);


    3  int

    4  main(int argc, char **argv)

    5  {

    6   int readid, writeid;

    7   /* assumes server has created the queues */

    8   writeid = Msgget(MQ_KEY1, 0);

    9   readid = Msgget(MQ_KEY2, 0);

    10  client(readid, writeid);

    11  /* now we can delete the queues */

    12  Msgctl(readid, IPC_RMID. NULL);

    13  Msgctl(writeid, IPC_RMID, NULL);

    14  exit(0);

    15 }

    В листинге 6.9 приведен текст функции main программы-клиента. Программа открывает две очереди сообщений и вызывает функцию client из листинга 4.15. Эта функция использует две другие: mesg_send и mesg_recv, которые будут приведены ниже.

    И функция client, и функция server используют формат сообщений, изображенный в листинге 4.12. Для передачи и приема сообщений они используют функции mesg_send и mesg_recv. Старые версии этих функций, приведенные в листингах 4.13 и 4.14, вызывали write и read и работали с программными каналами и FIFO, так что нам придется переписать их для использования очередей сообщений. В листингах 6.10 и 6.11 приведены новые версии этих функций. Обратите внимание, что аргументы функций не изменились, поскольку первый целочисленный аргумент может содержать как целочисленный дескриптор программного канала или FIFO, так и целочисленный дескриптор очереди сообщений.

    Листинг 6.10. Функция mesg_send, работающая с очередью сообщений System V

    //svmsgcliserv/mesg_send.с

    1 #include "mesg.h"


    2 ssize_t

    3 mesg_send(int id, struct mymesg *mptr)

    4 {

    5  return(msgsnd(id, &(mptr->mesg_type), mptr->mesg_len, 0));

    6 }

    Листинг 6.11. Функция mesg_recv, работающая с очередью сообщений System V

    //svmsgcliserv/mesg_recv.с

    1 #include "mesg.h"


    2 ssize_t

    3 mesg_recv(int id, struct mymesg *mptr)

    4 {

    5  ssize_t n;

    6  n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);

    7  mptr->mesg_len = n; /* количество возвращаемых данных */

    8  return(n); /* –1 в случае ошибки, 0 – конец файла, иначе – >0 */

    9 }

    6.8. Мультиплексирование сообщений

    Наличие поля type у каждого сообщения в очереди предоставляет две интересные возможности:

    1. Поле type может использоваться для идентификации сообщений, позволяя нескольким процессам мультиплексировать сообщения в одной очереди. Например, все сообщения от клиентов серверу имеют одно и то же значение типа, тогда как сообщения сервера клиентам имеют различные значения типов, уникальные для каждого клиента. Естественно, в качестве значения типа сообщения, гарантированно уникального для каждого клиента, можно использовать идентификатор процесса клиента.

    2. Поле type может использоваться для установки приоритета сообщений. Это позволяет получателю считывать сообщения в порядке, отличном от обычного для очередей (FIFO). В программных каналах и FIFO данные могли приниматься только в том порядке, в котором они были отправлены. Очереди System V позволяют считывать сообщения в произвольном порядке в зависимости от значений типа сообщений. Более того, можно вызывать msgrcv с флагом IPC_NOWAIT для считывания сообщений с конкретным типом и немедленного возвращения управления процессу в случае отсутствия таких сообщений.

    Пример: одна очередь на приложение

    Вспомните наш простой пример с одним процессом-сервером и одним процессом-клиентом. Если применять программные каналы или FIFO, необходимо наличие двух каналов IPC для передачи данных в обоих направлениях, поскольку эти типы IPC являются однонаправленными. Очереди сообщений позволяют передавать данные в обоих направлениях, причем поле type может использоваться для указания адресата (клиента или сервера).

    Рис. 6.2. Мультиплексирование сообщений между несколькими клиентами и одним сервером


    Рассмотрим усложненный вариант: один сервер и несколько клиентов. В этом случае можно использовать значение типа 1, например, для обозначения сообщений от любого клиента серверу. Если клиент передаст серверу свой идентификатор процесса в качестве части сообщения, сервер сможет отсылать клиенту сообщения, используя его идентификатор в качестве значения типа сообщения. Каждый клиент будет использовать свой PID в качестве аргумента type при вызове msgrcv. На рис. 6.2 приведен пример использования очереди для мультиплексирования этих сообщений между несколькими клиентами и одним сервером.

    ПРИМЕЧАНИЕ

    При использовании одного канала IPC одновременно клиентами и сервером всегда существует потенциальная возможность зависания (deadlock). Клиенты могут (в этом примере) заполнить очередь своими сообщениями, не давая серверу возможности отправить ответ. В этому случае клиенты заблокируются при вызове msgsnd, как и сервер. Одно из соглашений, исключающих возможность такой взаимной блокировки, заключается в том, что сервер должен всегда отключать блокировку записи в очередь сообщений.

    Теперь мы можем переделать наш пример с клиентом и сервером, используя одну очередь сообщений с различными типами для разных адресатов. Эти программы используют следующее соглашение: сообщения с типом 1 адресованы серверу, а все остальные сообщения имеют тип, соответствующий идентификатору процесса адресата. При этом запрос клиента должен содержать его PID вместе с полным именем запрашиваемого файла, аналогично программе в разделе 4.8.

    В листинге 6.12 приведен текст функции main сервера. Заголовочный файл svmsg.h был приведен в листинге 6.7. Создается единственная очередь сообщений (если она существует, ошибки не возникнет). Идентификатор этой очереди сообщений используется в качестве обоих аргументов при вызове функции server.

    Листинг 6.12. Функция main сервера

    //svmsgmpx1q/server_main.с

    1  #include "svmsg.h"

    2  void server(int, int);


    3  int

    4  main(int argc, char **argv)

    5  {

    6   int msqid;

    7   msqid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);

    8   server(msqid, msqid); /* одна очередь в обе стороны */

    9   exit(0);

    10 }

    Функция server обеспечивает работу сервера. Ее текст приведен в листинге 6.13. Эта функция представляет собой комбинацию листинга 4.10 — нашего сервера FIFO, считывавшего команды, состоявшие из идентификатора процесса и полного имени файла, — и листинга 4.16, в котором использовались функции mesg_send и mesg_recv. Обратите внимание, что идентификатор процесса, отправляемый клиентом, используется в качестве типа для всех сообщений, отправляемых сервером этому клиенту. Эта функция представляет собой бесконечный цикл, в котором считываются запросы клиентов и отсылаются запрошенные файлы. Этот сервер является последовательным (см. раздел 4.9).

    В листинге 6.14 приведен текст функции main клиента. Клиент открывает очередь сообщений, которая должна была быть создана сервером заранее.

    Функция client, текст которой дан в листинге 6.15, обеспечивает всю обработку со стороны клиента. Эта функция представляет собой комбинацию программ из листингов 4.11 и 4.15. В первой программе клиент отсылал свой идентификатор и полное имя файла, а во второй программе использовались функции mesg_send и mesg_recv. Обратите внимание, что тип сообщений, запрашиваемых функцией mesg_recv, совпадает с идентификатором процесса клиента.

    Функции client и server используют функции mesg_send и mesg_recv из листингов 6.9 и 6.11.

    Листинг 6.13. Функция server

    //svmsgmpx1q/server.c

    1  #include "mesg.h"


    2  void

    3  server(int readfd, int writefd)

    4  {

    5   FILE *fp;

    6   char *ptr;

    7   pid_t pid;

    8   ssize_t n;

    9   struct mymesg mesg;

    10  for (;;) {

    11   /* считывание полного имени из канала IPC */

    12   mesg.mesg_type = 1:

    13   if ((n = Mesg_recv(readfd, &mesg)) == 0) {

    14    err_msg("pathname missing");

    15    continue;

    16   }

    17   mesg.mesg_data[n] = '\0'; /* полное имя */

    18   if ((ptr = strchr(mesg.mesg_data, ' ')) == NULL) {

    19    err_msg("bogus request: %s", mesg.mesg_data);

    20    continue;

    21   }

    22   *ptr++ =0; /* ptr = полное имя */

    23   pid = atol(mesg.mesg_data);

    24   mesg.mesg_type = pid; /* для обратных сообщений */

    25   if ((fp = fopen(ptr, "r")) == NULL) {

    26    /* 4error: must tell client */

    27    snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) – n,

    28     ": can't open. %s\n", strerror(errno));

    29    mesg.mesg_len – strlen(ptr);

    30    memmove(mesg.mesg_data, ptr, mesg.mesg_len);

    31    Mesg_send(writefd, &mesg);

    32   } else {

    33    /* файл открыт, копируем клиенту */

    34    while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {

    35     mesg.mesg_len = strlen(mesg.mesg_data);

    36     Mesg_send(writefd, &mesg);

    37    }

    38    Fclose(fp);

    39   }

    40   /* сообщение нулевой длины заканчивает связь */

    41   mesg.mesg_len = 0;

    42   Mesg_send(writefd, &mesg);

    43  }

    44 }

    Листинг 6.14. Функция main клиента

    //svmsgmpx1q/client_main.c

    1  #include "svmsg.h"

    2  void client(int, int);


    3  int

    4  main(int argc, char **argv)

    5  {

    6   int msqid;

    7   /* сервер должен был создать очередь */

    8   msqid = Msgget(MQ_KEY1, 0);

    9   client(msqid, msqid); /* одна очередь в обе стороны */

    10  exit(0);

    11 }

    Листинг 6.15. Функция client

    //svmsgmpx1q/client.с

    1  #include "mesg.h"


    2  void

    3  client(int readfd, int writefd)

    4  {

    5   size_t len;

    6   ssize_t n;

    7   char *ptr;

    8   struct mymesg mesg;

    9   /* инициализируем буфер идентификатором процесса и пробелом */

    10  snprintf(mesg.mesg_data, MAXMESGDATA. "%ld ", (long) getpid());

    11  len = strlen(mesg.mesg_data);

    12  ptr = mesg.mesg_data + len;

    13  /* считываем полное имя файла */

    14  Fgets(ptr, MAXMESGDATA – len, stdin);

    15  len = strlen(mesg.mesg_data);

    16  if (mesg.mesg_data[len-1] == '\n')

    17   len--; /* удаляем перевод строки fgets() */

    18  mesg.mesg_len = len;

    19  mesg.mesg_type = 1;

    20  /* записываем PID и имя файла в канал IPC */

    21  Mesg_send(writefd, &mesg);

    22  /* считываем из канала IPC, записываем в stdout */

    23  mesg.mesg_type = getpid();

    24  while ((n = Mesg_recv(readfd, &mesg)) > 0)

    25   Write(STDOUT_FILENO, mesg.mesg_data, n);

    26 }
     

    Пример: одна очередь для каждого клиента

    Изменим теперь предыдущий пример таким образом, чтобы все запросы клиентов передавались по одной очереди, но для отправки ответов использовалась бы отдельная очередь для каждого клиента. На рис. 6.3 изображена схема такого приложения. 

    Рис. 6.3. Одна очередь для сервера и по одной для каждого клиента


    Ключ очереди сервера должен быть известен клиентам, а сами клиенты создают свои очереди с ключом IPC_PRIVATE. Вместо передачи серверу идентификатора процесса клиенты сообщают ему идентификатор своей очереди, в которую сервер направляет свой ответ. Этот сервер является параллельным: для каждого нового клиента порождается отдельный процесс.

    ПРИМЕЧАНИЕ

    При такой схеме может возникнуть проблема в случае «гибели» клиента, потому что тогда сообщения останутся в его очереди навсегда (по крайней мере до перезагрузки ядра или явного удаления очереди другим процессом).

    Нижеследующие заголовочные файлы и функции не претерпевают изменений по сравнению с предыдущими версиями:

    ■ mesg.h (листинг 4.12);

    ■ svmsg.h (листинг 6.7);

    ■ функция main сервера (листинг 6.12);

    ■ функция mesg_send (листинг 4.13).

    Функция main клиента приведена в листинге 6.16; она слегка изменилась по сравнению с листингом 6.14. Мы открываем очередь сервера с известным ключом (MQ_KEY1) и создаем нашу собственную очередь с ключом IPC_PRIVATE. Два идентификатора этих очередей становятся аргументами функции client (листинг 6.17). После завершения работы клиента его персональная очередь удаляется.

    Листинг 6.16. Функция main клиента

    //svmsgmpxnq/client_main.с

    1  #include "svmsg.h"


    2  void client(int, int);

    3  int

    4  main(int argc, char **argv)

    5  {

    6   int readid, writeid;

    7   /* сервер должен создать свою очередь */

    8   writeid = Msgget(MQ_KEY1, 0);

    9   /* мы создаем свою собственную очередь */

    10  readid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);

    11  client(readid, writeid);

    12  /* и удаляем нашу собственную очередь */

    13  Msgctl(readid, IPC_RMID, NULL);

    14  exit(0);

    15 }

    Листинг 6.17. Функция client

    //svmsgmpxnq/client.с

    1  #include "mesg.h"


    2  void

    3  client(int readid, int writeid)

    4  {

    5   size_t len;

    6   ssize_t n;

    7   char *ptr;

    8   struct mymesg mesg;

    9   /* инициализируем буфер идентификатором очереди и пробелом */

    10  snprintf(mesg.mesg_data, MAXMESGDATA, "%d ", readid);

    11  len = strlen(mesg.mesg_data);

    12  ptr = mesg.mesg_data + len;

    13  /* считываем имя файла */

    14  Fgets(ptr, MAXMESGDATA – len, stdin);

    15  len = strlen(mesg.mesg_data);

    16  if (mesg.mesg_data[len-1] == '\n')

    17   len--; /* удаляем перевод строки fgets() */

    18  mesg.mesg_len = len;

    19  mesg.mesg_type = 1;

    20  /* отправляем идентификатор очереди и имя файла серверу */

    21  Mesg_send(writeid, &mesg);

    22  /* считываем ответ из нашей очереди и записываем его в stdout */

    23  while ((n = Mesg_recv(readid, &mesg)) > 0)

    24   Write(STDOUT_FILENO, mesg.mesg_data, n);

    25 }

    В листинге 6.17 приведен текст функции client. Эта функция практически идентична функции из листинга 6.15, но вместо передачи идентификатора процесса клиента на сервер направляется идентификатор очереди клиента. Тип сообщения в структуре mesg остается равным 1, поскольку это значение устанавливается для сообщений, передаваемых в обе стороны.

    В листинге 6.19 приведена функция server. Главное отличие от листинга 6.13 в том, что эта функция представляет собой бесконечный цикл, в котором для каждого нового клиента вызывается fork.

    Установка обработчика сигнала для SIGCHLD

    10 Поскольку для каждого клиента порождается отдельный процесс, нужно позаботиться о процессах-зомби. В разделах 5.9 и 5.10 [24] об этом говорится подробно. Здесь мы просто установим обработчик для сигнала SIGCHLD, и наша функция sig_chld (листинг 6.18) будет вызываться при завершении работы дочернего процесса.

    12-18 Породивший процесс сервера блокируется в вызове mesg_recv, ожидая появления сообщения от очередного клиента.

    25-45 Вызовом fork порождается новый процесс, который производит попытку открыть запрошенный файл и отправляет клиенту либо сообщение об ошибке, либо содержимое файла. Мы преднамеренно поместили вызов fopen в дочерний процесс, а не в родительский, поскольку если файл находится в удаленной файловой системе, его открытие может занять довольно много времени в случае наличия проблем с сетью.

    Функция-обработчик для SIGCHLD приведена в листинге 6.18. Она скопирована с листинга 5.11 [24].

    Листинг 6.18. Обработчик сигнала SIGCHLD, вызывающий waitpid

    //svmsgmpxnq/sigchldwaitpid.с

    1 #include "unpipc.h"


    2 void

    3 sig_chld(int signo)

    4 {

    5  pid_t pid;

    6  int stat;

    7  while ((pid = waitpid(-1, &stat, WNOHANG)) > 0);

    8  return;

    9 }

    Каждый раз при вызове обработчика происходит циклический вызов waitpid для получения статуса завершения работы всех дочерних процессов, которые могли завершить работу. Затем происходит возврат из обработчика сигнала. При этом может возникнуть проблема, поскольку родительский процесс проводит большую часть времени в заблокированном состоянии (при вызове mesg_recv, листинг 6.9). При возвращении из обработчика этот вызов msgrcv прерывается. Функция возвращает ошибку с кодом EINTR, как рассказывается в разделе 5.9 [24]. 

    Нам нужно обработать такой возврат из вызванной функции, поэтому мы пишем новую функцию-обертку Mesg_recv, приведенную в листинге 6.20. Эта программа допускает возвращение ошибки с кодом EINTR функцией mesg_recv (которая просто вызывает msgrcv), и, если это происходит, мы просто еще раз вызываем mesg_recv.

    Листинг 6.19. Функция server

    //svmsgmpxnq/server.c

    1  #include "mesg.h"


    2  void

    3  server(int readid, int writeid)

    4  {

    5   FILE *fp;

    6   char *ptr;

    7   ssize_t n;

    8   struct mymesg mesg;

    9   void sig_chld(int);

    10  Signal(SIGCHLD, sig_chld);

    11  for (;;) {

    12   /* считывание имени файла из очереди */

    13   mesg.mesg_type = 1;

    14   if ((n = Mesg_recv(readid, &mesg)) == 0) {

    15    err_msg("pathname missing");

    16    continue;

    17   }

    18   mesg.mesg_data[n] = 40'; /* имя файла */

    19   if ((ptr = strchr(mesg.mesg_data, ' ')) = NULL) {

    20    err_msg("bogus request: %s", mesg.mesg_data);

    21    continue;

    22   }

    23   *ptr++ = 0; /* ptr = имя файла */

    24   writeid = atoi(mesg.mesg_data);

    25   if (Fork() == 0) { /* дочерний процесс */

    26    if ((fp = fopen(ptr, "r")) == NULL) {

    27     /* ошибка: нужно сообщить клиенту */

    28     snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) – n,

    29      ": can't open, %s\n", strerror(errno));

    30     mesg.mesg_len = strlen(ptr);

    31     memmove(mesg.mesg_data, ptr, mesg.mesg_len);

    32     Mesg_send(writeid, &mesg);

    33    } else {

    34     /* файл открыт, копируем клиенту */

    35     while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {

    36      mesg.mesg_len = strlen(mesg.mesg_data);

    37      Mesg_send(writeid, &mesg);

    38     }

    39     Fclose(fp);

    40    }

    41    /* отправка сообщения нулевой длины, указывающего конец файла */

    42    mesg.mesg_len = 0;

    43    Mesg_send(writeid, &mesg);

    44    exit(0); /* завершение дочернего процесса */

    45   }

    46   /* родительский процесс просто зациклен */

    47  }

    48 }

    Листинг 6.20. Функция-обертка Mesg_recv, обрабатывающая прерванный системный вызов

    //svmsgmpxnq/mesg_recv.с

    10 ssize_t

    11 Mesg_recv(int id, struct mymesg *mptr)

    12 {

    13  ssize_t n;

    14  do {

    15   n = mesg_recv(id, mptr);

    16  } while (n == –1 && errno == EINTR);

    17  if (n == –1)

    18   err_sys("mesg_recv error");

    19  return(n);

    20 }

    6.9. Использование select и poll с очередями сообщений

    Одним из недостатков очередей сообщений System V является то, что они идентифицируются не дескрипторами, а идентификаторами. Поэтому с ними нельзя использовать функции select и poll (глава 6 [24]).

    ПРИМЕЧАНИЕ

    На самом деле одна из версий Unix, а именно AIX (созданная IBM), позволяет использовать select с очередями сообщений System V, а не только с дескрипторами. Но эта возможность имеется только в AIX.

    Этот недостаток часто всплывает, когда возникает необходимость написать сервер, работающий одновременно с сетевыми соединениями и с IPC. Сетевые соединения с использованием интерфейса сокетов или XTI ([24]) используют дескрипторы, что позволяет вызывать select или poll. Программные каналы и FIFO также идентифицируются дескрипторами, поэтому для них тоже допустимо использование этих функций.

    Одним из решений этой проблемы является следующее: сервер должен создать канал и породить процесс, который будет заблокирован при вызове msgrcv. При получении сообщения произойдет возврат из msgrcv, дочерний процесс получит это сообщение из очереди и запишет его в канал. Затем родительский процесс может использовать функцию select для канала совместно с сетевыми соединениями. Недостаток этого подхода в том, что сообщения обрабатываются трижды: при считывании дочерним процессом с помощью msgrcv, при отправке в канал и при считывании из канала родительским процессом. Для ускорения обработки порожденный процесс может создать сегмент совместно используемой с породившим процессом памяти, а канал использовать как флаг (упражнение 12.5).

    ПРИМЕЧАНИЕ

    В листинге 5.12 мы привели решение с использованием очередей сообщений Posix, которое не требовало вызова fork. Для очередей сообщений Posix можно было обойтись одним процессом, поскольку они предусматривают уведомление о появлении нового сообщения с помощью сигнала. Для очередей System V такая возможность не предусмотрена, поэтому приходится порождать процесс, который будет блокироваться при вызове msgrcv.

    Другим недостатком очередей сообщений System V по сравнению с сетевым интерфейсом является невозможность считывания сообщений из оперативной памяти (возможность, предоставляемая флагом MSG_PEEK для функций recv, recvfrom, recvmsg [24, с. 356]). Если бы такая возможность имелась, в предложенной только что схеме клиент-сервер (для обхода проблемы с select) можно было бы сделать работу более эффективной, указав флаг peek при вызове msgrcv дочерним процессом и записав 1 байт в канал при приходе сообщения, а родительский процесс тогда просто считывал бы сообщение из очереди.

    6.10. Ограничения, накладываемые на очереди сообщений

    Как отмечалось в разделе 3.8, на очереди сообщений часто накладываются системные oгрaничeния. В табл. 6.2 приведены значения этих oгрaничeний для двух конкретных реализаций. Первая колонка представляет собой традиционное имя System V для переменной ядра, хранящей это ограничение.


    Таблица 6.2. Характерные значения ограничений для очередей сообщений

    Имя Описание DUnix 4.0B Solaris 2.6
    msgmax Максимальное количество байтов в сообщении 8192 2048
    msgmnb Максимальное количество байтов в очереди сообщений 16384 4096
    msgmni Максимальное количество очередей сообщений в системе 64 50
    msgtlq Максимальное количество сообщений в системе 40 40 

    В этом разделе мы хотели показать типичные значения ограничений, чтобы помочь в планировании переносимых программ. При выполнении приложений, активно использующих очереди сообщений, обычно требуется настройка этих (или аналогичных) параметров ядра (что описано в разделе 3.8).

    Пример

    В листинге 6.21 приведен текст программы, которая определяет четыре ограничения, показанные в табл. 6.2.

    Листинг 6.21. Определение системных ограничений для очередей сообщений System V

    //svmsg/limits.c

    1  #include "unpipc.h"

    2  #define MAX_DATA 64*1024

    3  #define MAX_NMESG 4096

    4  #define MAX_NIDS 4096

    5  int max_mesg;


    6  struct mymesg {

    7   long type;

    8   char data[MAX_DATA];

    9  } mesg;


    10 int

    11 main(int argc, char **argv)

    12 {

    13  int i, j, msqid, qid[MAX_NIDS];

    14  /* определение максимального размера сообщения */

    15  msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);

    16  mesg.type = 1;

    17  for (i = MAX_DATA; i > 0; i –= 128) {

    18   if (msgsnd(msqid, &mesg, i, 0) == 0) {

    19    printf("maximum amount of data per message = %d\n", i);

    20    max_mesg = i;

    21    break;

    22   }

    23   if (errno != EINVAL)

    24    err_sys("msgsnd error for length %d", i);

    25  }

    26  if (i == 0)

    27   err_quit("i == 0");

    28  Msgct(lmsqid, IPC_RMID, NULL);

    29  /* количество сообщений в очереди */

    30  mesg.type = 1;

    31  for (i = 8; i <= max_mesg; i *= 2) {

    32   msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);

    33   for (j = 0; j < MAX_NMESG; j++) {

    34    if (msgsnd(msqid, &mesg, i, IPC_NOWAIT) != 0) {

    35     if (errno == EAGAIN)

    36      break;

    37     err_sys("msgsnd error, i = %d, j = %d", i, j);

    38     break;

    39    }

    40   }

    41   printf("%d %d-byte messages were placed onto queue,", j, i);

    42   printf(" %d bytes total\n". i*j);

    43   Msgctl(msqid, IPC_RMID, NULL);

    44  }

    45  /* максимальное количество идентификаторов */

    46  mesg.type = 1;

    47  for (i = 0; i <= MAX_NIDS; i++) {

    48   if ((qid[i] = msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT)) == –1) {

    49    printf("%d identifiers open at once\n", i);

    50    break;

    51   }

    52  }

    53  for (j = 0; j < i; j++)

    54   Msgctl(qid[j], IPC_RMID, NULL);

    55  exit(0);

    56 }

    Определение максимального размера сообщения

    14-28 Для определения максимально возможного размера сообщения мы пытаемся послать сообщение, в котором будет 65 536 байт данных, и если эта попытка оказывается неудачной, уменьшаем этот объем до 65 408, и т.д., пока вызов msgsnd не окажется успешным.

    Сколько сообщений различного размера может быть помещено в очередь?

    29-44 Теперь мы начинаем с 8-байтовых сообщений и смотрим, сколько их поместится в очередь. После определения этого ограничения мы удаляем очередь (сбрасывая все эти сообщения) и повторяем процедуру с 16-байтовыми сообщениями. Мы повторяем это до тех пор, пока не будет достигнут максимальный размер сообщения из первого пункта. Ожидается, что небольшие сообщения будут превышать ограничение по количеству сообщений в очереди, а большие — ограничение по количеству байтов.

    Сколько идентификаторов может быть открыто одновременно?

    45-54 Обычно есть системное ограничение на количество одновременно открытых идентификаторов. Оно определяется непосредственно созданием очередей до тех пор, пока не произойдет ошибка при вызове msgget.

    Запустим эту программу сначала в Solaris 2.6, а затем в Digital Unix 4.0B, и результаты подтвердят приведенные в табл. 6.2 величины:

    solaris % limits

    maximum amount of data per message = 2048

    40 8-byte messages were placed on queue, 320 bytes total

    40 16-byte messages were placed on queue, 640 bytes total

    40 32-byte messages were placed on queue, 1280 bytes total
     

    40 64-byte messages were placed on queue, 2560 bytes total

    32 128-byte messages were placed on queue, 4096 bytes total

    16 256-byte messages were placed on queue, 4096 bytes total

    8 512-byte messages were placed on queue, 4096 bytes total

    4 1024-byte messages were placed on queue, 4096 bytes total

    2 2048-byte messages were placed on queue, 4096 bytes total

    50 identifiers open at once


    alpha % limits

    maximum amount of data per message = 8192

    40 8-byte messages were placed on queue, 320 bytes total

    40 16-byte messages were placed on queue, 640 bytes total

    40 32-byte messages were placed on queue, 1280 bytes total

    40 64-byte messages were placed on queue, 2560 bytes total

    40 128-byte messages were placed on queue, 5120 bytes total

    40 256-byte messages were placed on queue, 10240 bytes total

    32 512-byte messages were placed on queue, 16384 bytes total

    16 1024-byte messages were placed on queue, 16384 bytes total

    8 2048-byte messages were placed on queue, 16384 bytes total

    4 4096-byte messages were placed on queue, 16384 bytes total

    2 8192-byte messages were placed on queue, 16384 bytes total

    63 identifiers at once

    Причина, по которой в Digital Unix 4.0В получился результат 63 идентификатора, а не 64, как в табл. 6.2, заключается в том, что один идентификатор всегда используется системным демоном.

    6.11.Резюме

    Очереди сообщений System V аналогичны очередям сообщений Posix. При создании новых приложений следует рассмотреть возможность использования очередей сообщений Posix, но большое количество существующих программ использует очереди сообщений System V. Тем не менее переписать их для использования очередей Posix вместо очередей System V не должно быть слишком сложно. Главный недостаток очередей Posix заключается в невозможности считывания сообщений с произвольным приоритетом. Ни один из типов очередей не использует обычные дескрипторы, что делает сложным применение функций select и poll для очередей сообщений.

    Упражнения

    1. Почему на рис. 6.2 для сообщений, передаваемых серверу, используется тип 1?

    2. Что произойдет с программой с рис. 6.2, если злоумышленник отправит на сервер множество сообщений, но не будет считывать ответы? Что в такой же ситуации произойдет с программой с рис. 6.3?

    3. Переделайте реализацию очередей сообщений Posix из раздела 5.8 для использования очередей сообщений System V вместо отображения в память. 


    Примечания:



    1

    Все исходные тексты, опубликованные в этой книге, вы можете найти по адресу http://www.piter.com/download.








    Главная | В избранное | Наш E-MAIL | Прислать материал | Нашёл ошибку | Наверх