Concurrent programming - Message Queue (1)

ArticleCategory:

SoftwareDevelopment

AuthorImage:

[Leonardo]

TranslationInfo:

original in it Leonardo Giordani

it to enLeonardo Giordani

en to deJ�rgen Pohl

AboutTheAuthor:

Student an der Fakult�t f�r Telecommunications Engineering am Polytechnikum in Milano, arbeitet als Netzwerk - Administrator und besch�ftigt sich mit Programmieren ( meistens in Assembler und CC++). Arbeitet seit 1999 fast ausschliesslich mit Linux / Unix.

Abstract:

Diese Artikelserie m�chte den Leser in das Konzept des Multitasking und dessen Implementation in Linux einf�hren. Beginnend mit den theoretischen Konzepten, die dem Multitasking zugrunde liegen, werden wir zum Abschluss eine vollst�ndige Anwendung in Form eines einfachen, jedoch wirkungsvollen Protokolls schreiben, welches die Kommunikation zwischen Prozessen demonstriert.

Voraussetzungen, um diesem Artikel folgen zu k�nnen:


Es ist auch hilfreich, die ersten beiden Artikel dieser Serie gelesen zu haben, sie erschienen im LinusFocus vom November 2002 (Artikel Nr. 272 ) und Januar 2003 (Artikel Nr. 281).

ArticleIllustration:

[run in paralell]

ArticleBody:

Einf�hrung

In den vorangegangenen zwei Artikeln f�hrten wir in das Konzept des 'concurrent programming' ein und erl�uterten eine erste L�sung zum Problem der Kommunikation zwischen Prozessen: die Semaphoren. Wie wir sahen, erlaubt uns die Anwendung von Semaphoren, den Zugang zu gemeinsam genutzten Ressourcen so zu kontrollieren, dass mindestens zwei oder mehr Prozesse synchronisiert werden.

Prozesse synchronisieren bedeutet, deren Ausf�hrung zeitm�ssig festzulegen, d.h. nicht in einem absoluten Zeitsystem (das w�rde bedeuten, einen genauen Zeitpunkt f�r den Beginn des Prozessablaufs festzulegen), sondern in einer geplanten Reihenfolge zu bestimmen, welcher Prozess zuerst beginnt und welcher als zweiter.

Hierf�r Semaphoren zu benutzen, erweist sich als zu komplex und zu begrenzt: komplex, weil jeder Prozess sein Semaphore mit denen der anderen Prozesse synchronisieren m�sste. Eingeschr�nkt, da es uns nicht den Austausch von Parametern zwischen den Prozessen gestattet. Nehmen wir das Beispiel des Aufbaus eines neuen Prozesses: dieses Ereignis sollte jedem beteiligten Prozess mitgeteilt werden, die Semaphoren erlauben einem Prozess nicht solche Informationen auszutauschen.

Die 'concurrency' - Regelung des Zugriffs zu den gemeinsamen Ressourcen durch die Semaphoren kann zu einer andauernden Blockierung eines Prozesses f�hren, wenn ein weiterer beteiligter Prozess die Ressource freigibt und wieder belegt, bevor ein anderer diese benutzen kann: wie wir erfuhren, ist es in der Welt des "concurrency programming" nicht m�glich, im voraus zu wissen, welcher Prozess wann ausgef�hrt wird.

Hiermit wird klar, dass die Semaphoren ein ungeeignetes Werkzeug zur Behandlung von komplexen Synchronisationsproblemen sind. Eine elegantere L�sung f�r diese Aufgabe erhalten wir durch die Anwendung von 'Message Queues': in diesem Artikel lernen wir etwas Theorie �ber diese Vorrichtung zur Kommunikation zwischen Prozessen, ausserdem werden wir ein kleines Programm mit SysV- Primitiven schreiben.

Die Theorie der Message Queues

Jeder Prozess kann eine oder mehrere Datenstrukturen aufbauen, sie werden 'Queue' genannt: jede Datenstruktur kann eine oder mehrere Messages unterschiedlicher Art von verschiedenen Quellen enthalten. Jeder beteiligte Prozess kann Messages in die 'Queues' schicken, vorausgesetzt er kennt deren Identifikatoren. Der Prozess hat sequentiellen Zugriff (von der �ltesten, zuerst angekommenen bis zur j�ngsten, zuletzt eingetroffenen) auf die Queue, er liest die Messages in chronologischer Reihenfolge selektiv , d.h. nur Messages von einem gewissen Typ werden erwogen: diese letztere Eigenschaft erm�glicht uns eine gewisse Kontrolle �ber die Priorit�t der zu lesenden Messages.

Der Gebrauch von Queues ist insofern die einfache Anwendung eines Mailsystems zwischen Prozessen: jeder Prozess hat eine Adresse und er kann mit anderen Prozessen korrespondieren. Ein Prozess kann also Messages, die an ihn gesandt werden, in einer bestimmten Reihenfolge lesen und entsprechend den vorgefundenen Anforderungen handeln.

Die Synchronisation zweier Prozesse kann infolgedessen einfach durch Messages zwischen ihnen erfolgen: Ressourcen besitzen ausserdem Semaphoren, welche die Prozesse �ber ihren Status informieren, der zeitliche Ablauf zwischen den Prozessen wird jedoch direkt durchgef�hrt. Hier wird sofort verst�ndlich, dass der Gebrauch von Message Queues sehr vereinfacht, was anfangs als ein �usserst komplexes Problem erschien.

Bevor wir die Message Queues in der C-Sprache implementieren, m�ssen wir noch ein anderes Problem erw�hnen, das sich auf die Synchronisation bezieht: die Notwendigkeit eines Kommunikationsprotokolls.

Ein Protokoll zusammenstellen

Ein Protokoll ist eine Reihe von Regeln, welche die Zusammenarbeit von Elementen in einem Set behandeln. Im vorhergehenden Artikel haben wir eines der einfachsten Protokolle implementiert, indem wir ein Semaphore schufen und zwei Prozessen anordneten, gem�ss ihrem Status zu handeln. Die Anwendung von Message Queues erlaubt uns komplexere Protokolle anzuwenden: wir k�nnen uns einfach vorstellen, dass jedes Netzwerkprotokoll (TCP/IP, DNS, SMTP,...) auf einer "message exchange architektur" aufgebaut ist, selbst wenn die Kommunikation zwischen Computern und nicht innerhalb ihrer Systeme stattfindet. Der Vergleich ist zwingend: es besteht kein grundlegender Unterschied zwischen der Kommunikation von Prozessen in einer Maschine oder zwischen verschiedenen Maschinen. In einem zuk�nftigen Artikel werden wir sehen, dass die �bertragung des Konzepts - mit dem wir uns hier besch�ftigen - auf eine erweiterte Anwendung (mehrere Computer miteinander verbunden) eine ziemlich einfache Angelegenheit ist.

Hier ist das einfache Beispiel eines Protokolls, welches auf dem Austausch von Messages basiert: die Prozesse A und B werden parallel ausgef�hrt und verarbeiten verschiedene Daten - sobald sie das abgeschlossen haben, m�ssen sie das Ergebnis kombinieren. Ein einfaches Protokoll f�r diesen Austausch k�nnte folgendermassen aussehen:

PROZESS B:


PROZESS A:

Die Bestimmung, welcher Prozess die Daten kombinieren muss, ist in diesem Fall willk�rlich, normalerweise ergibt sich das aus der Funktion der beteiligten Prozesse ( z.B. Client/Server). N�heres dar�ber sollte in einem eigenen Artikel behandelt werden.

Dieses Protokoll kann einfach auf 'n' Prozesse ausgedehnt werden: jeder Prozess, ausser A, bearbeitet seine eigenen Daten und schickt eine Message an A. A antwortet und jeder Prozess sendet seine Ergebnisse: die Struktur der individuellen Prozesse - ausser A - bleibt unver�ndert.

System V Message Queues

Jetzt werden wir auf die Anwendung dieser Konzepte im Linuxsystem eingehen. Wie wir schon bemerkten, haben wir einen Set von Primitiven, der uns erlaubt, mit den Strukturen der Message Queues und mit denen, die zu den Semaphoren geh�ren, zu arbeiten. Ich setze voraus, dass die Leser mit den grunds�tzlichen Konzepten der Erstellung von Prozessen, der Anwendung von Systemaufrufen und IPC-Schl�sseln vertraut sind.

Die grundlegende Struktur des Systems, das eine Message beschreibt, ist mit msgbuf bezeichnet; ist in linux/msg.h deklariert

/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
        long mtype;         /* type of message */
        char mtext[1];      /* message text */
};

Das Feld mtype repr�sentiert die Art der Message und ist eine streng positive Zahl: der Zusammenhang zwischen den Zahlen und den Arten der Messages muss im voraus eingestellt werden, das ist Teil der Protokolldefinition. Das zweite Feld repr�sentiert den Inhalt der Message, welcher jedoch in der Deklaration nicht aufgef�hrt werden muss. Die Struktur msgbuf kann so umdefiniert werden, dass sie komplexe Daten enth�lt; es ist zum Beispiel m�glich zu schreiben
struct message {
        long mtype;         /* message type */
        long sender;        /* sender id */
        long receiver;      /* receiver id */
        struct info data;   /* message content */
        ...
};

Bevor wir uns mit den Argumenten besch�ftigen, die sich direkt auf die Theorie der 'concurrency ' beziehen, m�ssen wir die Erstellung des Prototyp einer Message mit der fixierten Gr�sse von maximal 4056 Bytes planen. Es ist nat�rlich jederzeit m�glich, den Kernel zu rekompilieren und diese Gr�sse zu erh�hen, das macht die Anwendung jedoch nicht portabel (ausserdem wird die Leistung nicht vorteilhaft beeinflusst, deshalb bleiben wir lieber bei der definierten Gr�sse).

Um eine neue Queue zu erstellen, sollte ein Prozess einen Aufruf an die msgget() Funktion machen

int msgget(key_t key, int msgflg)
welche als Argumente einen IPC-Schl�ssel und einige Flags erh�lt, die nun auf
IPC_CREAT | 0660
eingestellt werden k�nnen (erzeuge die Queue, falls sie noch nicht existiert , erlaube Zugriff f�r den Besitzer und die Gruppenmitglieder), was den Identifikator f�r die Queue zur�ckgibt.

Wie in den vorangegangenen Artikeln nehmen wir an, dass keine Fehler auftreten, so k�nnen wir den Code vereinfachen - in einem zuk�nftigen Artikel werden wir uns mit sicherem IPC-Code besch�ftigen.

Um eine Message an eine Queue mit bekanntem Identifikator zu schicken, benutzen wir das msgsnd() Primitivum

int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg)

wobei msqid der Identifikator der Queue und msgp ein Zeiger auf die Message, die wir schicken wollen, ist (deren Typ hier mit struct msgbuf identifiziert wird - deren Typ wir jedoch umdefiniert haben), msgsz enth�lt die Dimension der Message (ausschliesslich der L�nge des mtype Feldes, die 4 Byte betr�gt) und msgflg -ein Flag, das sich auf die Wait-Richtlinien bezieht. Die L�nge der Message kann einfach festgestellt werden durch
length = sizeof(struct message) - sizeof(long);

Die Wait-Richtlinie bezieht sich auf den Fall einer vollen Queue: falls msgflg auf IPC_NOWAIT eingestellt ist, wartet der Absender nicht auf freiwerdenden Platz, sondern er beendigt mit einem Fehlercode - wir werden weiter darauf eingehen, wenn wir �ber Fehlermanagement sprechen.

Um die Messages in der Queue zu lesen, benutzen wir den msgrcv() -System - Aufruf

int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long mtype, 
int msgflg)

wobei der msgp - Zeiger den Puffer identifiziert, in welchen wir die Messages kopieren, die aus der Queue gelesen werden und mtype identifiziert eine Untermenge der Messages, die wir betrachten wollen.

Eine Queue kann man entfernen mit dem Primitivum msgctl() bei Verwendung des Flag IPC_RMID

msgctl(qid, IPC_RMID, 0)

Wir k�nnen das mit einem einfachen Programm testen, es erzeugt eine Message Queue, sendet und liest diese - auf diese Weise werden wir das korrekte Verhalten des Systems pr�fen.
#include <stdio.h>
#include <stdlib.h>
#include <linux/ipc.h>
#include <linux/msg.h>

/* Redefines the struct msgbuf */
typedef struct mymsgbuf
{
  long mtype;
  int int_num;
  float float_num;
  char ch;
} mess_t;

int main()
{
  int qid;
  key_t msgkey;

  mess_t sent;
  mess_t received;

  int length;

  /* Initializes the seed of the pseudo-random number generator */
  srand (time (0));

  /* Length of the message */
  length = sizeof(mess_t) - sizeof(long);

  msgkey = ftok(".",'m');

  /* Creates the Queue*/
  qid = msgget(msgkey, IPC_CREAT | 0660);

  printf("QID = %d\n", qid);

  /* Builds a message */
  sent.mtype = 1;
  sent.int_num = rand();
  sent.float_num = (float)(rand())/3;
  sent.carattere = 'f';

  /* Sends the message */
  msgsnd(qid, &sent, length, 0);
  printf("MESSAGE SENT...\n");

  /* Receives the message */
  msgrcv(qid, &received, length, sent.mtype, 0);
  printf("MESSAGE RECEIVED...\n");

  /* Controls that received and sent messages are equal */
  printf("Interger number = %d (sent %d) -- ", received.int_num, 
         sent.int_num);
  if(received.int_num == sent.int_num) printf(" OK\n");
  else printf("ERROR\n");

  printf("Float numero = %f (sent %f) -- ", received.float_num, 
         sent.float_num);
  if(received.float_num == sent.float_num) printf(" OK\n");
  else printf("ERROR\n");

  printf("Char = %c (sent %c) -- ", received.ch, sent.ch);
  if(received.ch == sent.ch) printf(" OK\n");
  else printf("ERROR\n");

  /* Destroys the Queue */
  msgctl(qid, IPC_RMID, 0);
}

Jetzt k�nnen wir zwei Prozesse anlegen und diese miteinder durch eine Queue kommunizieren lassen; erinnern wir uns an das Prozess-Forking-Konzept: der Wert aller Variablen, der durch den Vater-Prozess zugewiesen wurde, wird auf den Sohn-Prozess �bertragen (memory copy). Daraus folgt, wir sollten die Queue vor dem Father-fork-Prozess und bevor der Sohn den Identifikator der Queue erf�hrt und diesen anspricht, einrichten.

Mein Code erzeugt eine Queue, die vom Sohn-Prozess benutzt wird, um Daten zum Vater zu schicken: der Sohn erzeugt Zufallszahlen, �bertr�gt diese zum Vater und beide drucken diese als Standard-Ausgabe.

#include <stdio.h>
#include <stdlib.h>
#include <linux/ipc.h>
#include <linux/msg.h>
#include <sys/types.h>

/* Redefines the message structure */
typedef struct mymsgbuf
{
  long mtype;
  int num;
} mess_t;

int main()
{
  int qid;
  key_t msgkey;
  pid_t pid;

  mess_t buf;

  int length;
  int cont;

  length = sizeof(mess_t) - sizeof(long);

  msgkey = ftok(".",'m');

  qid = msgget(msgkey, IPC_CREAT | 0660);

  if(!(pid = fork())){
    printf("FATHER - QID = %d\n", qid);

    srand (time (0));

    for(cont = 0; cont < 10; cont++){
      sleep (rand()%4);
      buf.mtype = 1;
      buf.num = rand()%100;
      msgsnd(qid, &buf, length, 0);
      printf("SON - MESSAGE NUMBER %d: %d\n", cont+1, buf.num);
    }

    return 0;
  }

  printf("FATHER - QID = %d\n", qid);

  for(cont = 0; cont < 10; cont++){
    sleep (rand()%4);
    msgrcv(qid, &buf, length, 1, 0);
    printf("FATHER - MESSAGGE NUMBER %d: %d\n", cont+1, buf.num);
  }

  msgctl(qid, IPC_RMID, 0);

  return 0;
}

Wir erzeugten also zwei Prozesse, die auf elementarer Basis �ber ein Message-Exchange-System zusammenarbeiten k�nnen. Wir ben�tigten kein formales Protokoll, weil die ausgef�hrten Operationen sehr einfach waren. Im n�chsten Artikel werden wir uns wieder mit Message Queues und den verschiedenen Messagetypen besch�ftigen. Wir werden auch am Kommunikationsprotokoll arbeiten, um mit dem Erstellen unseres grossen IPC-Projekts zu beginnen ( ein Telefon-Switch-Simulator).

Empfohlene Literatur