• No results found

Message Queues. SV POSIX RabbitMQ

N/A
N/A
Protected

Academic year: 2021

Share "Message Queues. SV POSIX RabbitMQ"

Copied!
35
0
0

Loading.... (view fulltext now)

Full text

(1)

Message Queues

SV

POSIX

RabbitMQ

(2)

Message Queues

Link between producer and consumer is indirect

Producer write messages on the queue

Without selecting consumer

Consumer retrieve a message from the Queue

Without selecting producer

Writings are unblocking

As long as resources are available

Reading is blocking

When queue is empty

Producer write

(3)

Message queues

A Message Queue is a linked list of message structures

stored inside the kernel’s memory space and accessible by multiple processes

Synchronization is provided automatically by the kernel

Preallocated message buffers

Messages with priority.

A message with a higher priority is always received first.

Send and receive functions are synchronous by default.

(4)

Programming steps

Define Message structure

Create message Queue

Connect to queue

Write Messages

Close Queue

Connect to queue

Read messages

(5)

Message Structure

Multiple processes should agree on the message structure/size

Application level

Message queues handle different sizes

Programmer should define a structure

With pre-defined size

Able to accommodate multiple sub-types

(6)

SV MQ - Message Structure

Example:

struct mymsg {

long msg_type;

char mytext[512]; /* rest of message */

int somethingelse;

};

msg_type used in reception

(7)

SV MQ – creation

msgget - get a System V message queue identifier

int msgget(key_t key, int msgflg);

Create Private

Key – IPC_PRIVATE

Hinerited by chld processes

Create Public

Key – not in use (ipcs)

Msgflag - IPC_CREAT

(8)

SV MQ – opening

msgget - get a System V message queue identifier

int msgget(key_t key, int msgflg);

Open a Public MQ

Key – already creates MQ

Msgflag - NULL

(9)

SV MQ - write

int msgsnd(int msqid,

const void *msgp, size_t msgsz,

int msgflg);

Writes a message to the queue

Parameters

Msqid – quue id (returned from msgget)

Message + size

Msgflags - IPC_NOWAIT

(10)

SV MQ - Read

ssize_t msgrcv(int msqid,

void *msgp, size_t msgsz,

long msgtyp, int msgflg);

Reads a message from queue

Parameters

Msqid – queue id (returned from msgget)

Pointer to buufer + max size size

Type of message

Msgflags - IPC_NOWAIT

(11)

SV MQ - Read

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

MsgType

0 – first message

> 0 – first message with that type

<0 - first message with

the lowest type less than or equal to the absolute value of msgtyp Msgflag

IPC_NOWAIT

MSG_COPY – does nt remove message

(12)

SV MQ - destruction

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

Msqid

Cmd – IPC_RMID

msqid_ds NULL

(13)

POSIX MQ

(14)

POSIX MQ - Message Structure

Array of bytes

Priority / message selection

API

Each message has an associated priority,

Messages are always delivered to the receiving process highest priority first.

Message priorities range

From 0 (low) to sysconf(_SC_MQ_PRIO_MAX) - 1 (high).

On Linux, sysconf(_SC_MQ_PRIO_MAX) returns 32768,

(15)

POSIX MQ – creation

mq_open - open a message queue

mqd_t mq_open(const char *name,

int oflag, mode_t mode,

struct mq_attr *attr);

Name – identifier

Oflags -

O_CREAT | O_RDONLY | O_WRONLY | O_RDWR

(16)

POSIX MQ – creation

mq_open - open a message queue

mqd_t mq_open(const char *name,

int oflag, mode_t mode,

struct mq_attr *attr);

attr

NULL

struct mq_attr queue_attr;

(17)

POSIX MQ – opening

mq_open - open a message queue

mqd_t mq_open(const char *name, int oflag)

Default settings

Name – identifier

Oflags -

O_RDONLY O_WRONLY O_RDWR

(18)

POSIX MQ - mq_open

Creates

O_CREAT

Message queue is assigned to a file

In /dev/msgque/

File name is used by other processes

mq_unlink

removes a message queue

mq_close

close a message queue descriptor

(19)

POSIX MQ - write

int mq_send(mqd_t mqdes,

const char *msg_ptr, size_t msg_len,

unsigned int msg_prio);

Writes a message to the queue

Parameters

mqdes – queue id (returned from mq_open)

Message + size

msg_priority – udes in mq_receive

(20)

POSIX MQ - read

ssize_t mq_receive(mqd_t mqdes,

char *msg_ptr, size_t msg_len,

unsigned int *msg_prio);

Reads a message from the queue

mqdes – queue id (returned from mq_open)

Message + buffer size

msg_priority – used in mq_receive

(21)

POSIX MQ - read

ssize_t mq_receive(mqd_t mqdes, char

*msg_ptr, size_t msg_len,unsigned int

*msg_prio);

Messages are always delivered to the receiving process highest priority first.

msg_priority –

NULL

Not NULL stores the prioryte of received message

(22)

Read/write

Empty Queue

Receive Call blocks

mq_timedreceive

Block some time

Full queue

Send blocks

mq_timedsend

Blocks some time

… , const struct timespec *abs_timeout);

Errno - ETIMEDOUT

(23)

POSIX MQ - limits

On the user program

queue_attr.mq_maxmsg = 16;

queue_attr.mq_msgsize = 128;

Values limited by the OS

/proc/sys/fs/mqueue/

Change on:

/etc/security/limits.conf

(24)

RabbitMQ

MQ on the internet

(25)

RabbitMQ

Message-queueing software

message broker or queue manager.

queues can be defined

applications may connect to the

queue and transfer a message

onto it.

(26)

It uses the AMQP protocol

Advances Message Queueing Protocol

First developed at JPMorgan Chase

Began as a collaborative effort and eventually involved Bank of America, RedHat, Cisco, Credit Suisse, Goldman Sachs, Microsoft and Novell --

which became known as the AMQP Working Group

Conceived for trading and risk management

systems

(27)

Main objectives

"Business messaging is provided by infrastructure and not by integration experts"

Ubiquity - all the same everywhere

Safety - nobody look at my messages

Fidelity - trust in the delivery!

Applicability - the more you use it, the more payoff

Interoperability - easiest integration

(28)

When to use

Allows fast respose instead of being forced to perform resource-heavy procedures

Is also good when you want to distribute a

message to multiple recipients for consumption

Is also good when you want to balance loads between workers.

Is good to add low coupling between the

(29)

When to use

The consumer can take a message of the queue and start the processing of the PDF

at the same time as the producer is queueing up new messages on the queue.

The consumer can be on a totally different server than the publisher,

(30)

Concepts

Producer:

Application that sends the messages.

Consumer:

Application that receives the messages.

Queue:

Buffer that stores messages.

Message:

Information that is sent from the producer to a consumer through RabbitMQ.

Connection:

A connection is a TCP connection between your application and the RabbitMQ broker.

Channel:

A channel is a virtual connection inside a connection. When you are publishing or consuming messages from a queue - it's all done over a channel.

(31)

Basics

Producers send Messages with Routing Keys and Exchange Names to Brokers

Brokers use Exchange rules to route / filter Messages

Brokers then use Queues to store and forward Messages for Consumers

Consumers receive Messages from the Broker for known Queues

A RoutingKey (Producing) is congruent to a QueueName (Consumption)

(32)
(33)

Sender only cares about

Broker

Exchange

Message Body (&

properties)

Routing Key

Receiver only care about

Broker

Exchange

Bindings

Queue

Message Body

(34)

Binding

Binds a queue to a particular exchange

Unconditional

all messages from the exchange

Conditional fixed

routingKey must match queueName

Conditional pattern

routingKey matches some pattern in the queueName

Conditional multi-* -

routingKey must match a number of names / patterns

(35)

Multiple interactions

Point to Point

Work Queues

Publish/Subscribe

Routing

Topics

RPC

References

Related documents

This problem could be addressed using multiple hurdle technologies (dip or spray with antimicrobial treatments plus packaging plus chilled storage) to retard

Non-South African citizens will have to submit proof of legal status in South Africa, as well as proof of adequate medical aid cover at the International Students Division in

executives. On March 5, 2003, Commissioner Larson denied the petition by CareFirst to convert and to be acquired by Wellpoint. Both the District of Columbia and Delaware

“Service Qualities as antecedents of Satisfaction in the educational scene”, International Conference on Higher Education Marketing (ICHEM) 2008, Marketising, Marketing and

The Development division of IMM Ingenieurbüro GmbH – part of the IMM Group – covers a range of services from conception through hardware and software development and construction

Infrequent access to haemodialysis and the inability to obtain essential and expensive medication (erythropoietin) was shown to be a major cause of patients'

An access pattern predictor is proposed to direct block placement and migration, which can benefit from the high density and low leakage power of STT-RAM lines as well as the low