Download
Содержание:
Technology
The ZeroMQ API provides sockets (a kind of generalization over the traditional IP and Unix domain sockets), each of which can represent a many-to-many connection between endpoints. Operating with a message-wise granularity, they require that a messaging pattern be used, and are particularly optimized for that kind of pattern.
The basic ZeroMQ patterns are:
- Request–reply
- Connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
- Publish–subscribe
- Connects a set of publishers to a set of subscribers. This is a data distribution pattern.
- Push–pull (pipeline)
- Connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.
- Exclusive pair
- Connects two sockets in an exclusive pair. (This is an advanced low-level pattern for specific use cases.)
Each pattern defines a particular network topology. Request-reply defines a so-called «service bus», publish-subscribe defines a «data distribution tree», and push-pull defines «parallelised pipeline». All the patterns are deliberately designed in such a way as to be infinitely scalable and thus usable on Internet scale.
Any message through the socket is treated as an opaque blob of data. Delivery to a subscriber can be automatically filtered by the blob leading string. Available message transports include TCP, PGM (reliable multicast), inter-process communication (IPC) and inter-thread communication (ITC).
The ZeroMQ core library performs very well due to its internal threading model, and can outperform conventional TCP applications in terms of throughput by utilizing an automatic message batching technique.
ZeroMQ implements ZMTP, the ZeroMQ Message Transfer Protocol. ZMTP defines rules for backward interoperability, extensible security mechanisms, command and message framing, connection metadata, and other transport-level functionality. A growing number of projects implement ZMTP directly as an alternative to using the full ZeroMQ implementations.
Краткий обзор ZeroMQ
Если у вас есть опыт работы с другими системами обмена сообщениями (такими как RabbitMQ), понять принципы работы ZeroMQ, вероятно, будет немного сложнее. В сети Интернет существует немало статей, посвященных сравнению RabbitMQ и ZeroMQ, однако нужно понимать: эти два инструмента совершенно разные, поскольку направлены на решение разных задач.
ZeroMQ, как уже говорилось, является библиотекой (или инструментарием). Сначала ZeroMQ может показаться более примитивным решением по сравнению с другими, тем не менее, эта библиотека имеет все необходимое для быстрого внедрения простого пользовательского обмена сообщениями с большим диапазоном привязок для различных языков программирования.
Это означает, что для того, чтобы начать сборку приложения ZeroMQ, необходимо скачать и установить библиотеку, а затем загрузить дополнительные файлы в соответствии с языком программирования. В данном руководстве библиотека ZeroMQ будет установлена из исходного кода, поскольку это позволяет получить последнюю стабильную версию программы.
Компилирование исходного кода
Сначала процесс компиляции приложений из исходного кода в системах Unix может показаться очень сложным; на самом же деле, все гораздо проще. Существует немалое количество инструментов, позволяющих скомпилировать код, но в данном руководстве будет использоваться утилита GNU make – одна из самых распространенных утилит, встроенная в системы Unix почти сразу после своего появления в конце 70х.
Зачем компилировать программы из исходного кода?
Многие системные администраторы предпочитают компилировать программы из исходного кода, потому что таким образом можно устранить некоторые проблемы предварительно собранного ПО. Кроме того, такой подход позволяет настроить процесс установки, выбрав несколько версий одного приложения.
Functions¶
- ()
-
return the version of libzmq as a string
- ()
-
return the version of pyzmq as a string
- ()
-
Return the version of ZeroMQ itself as a 3-tuple of ints.
- ()
-
return the pyzmq version as a tuple of at least three numbers
If pyzmq is a development version, inf will be appended after the third integer.
- ()
-
Check for zmq capability by name (e.g. ‘ipc’, ‘curve’)
New in version libzmq-4.1.
New in version 14.1.
- (device_type, frontend, backend)
-
Start a zeromq device.
Deprecated since version libzmq-3.2: Use zmq.proxy
Parameters: - device_type ((QUEUE, FORWARDER, STREAMER)) – The type of device to start.
- frontend () – The Socket instance for the incoming traffic.
- backend () – The Socket instance for the outbound traffic.
- (frontend, backend, capture)
-
Start a zeromq proxy (replacement for device).
New in version libzmq-3.2.
New in version 13.0.
Parameters: - frontend () – The Socket instance for the incoming traffic.
- backend () – The Socket instance for the outbound traffic.
- capture ( (optional)) – The Socket instance for capturing traffic.
- (frontend, backend, capture, control)
-
Start a zeromq proxy with control flow.
New in version libzmq-4.1.
New in version 18.0.
Parameters: - frontend () – The Socket instance for the incoming traffic.
- backend () – The Socket instance for the outbound traffic.
- capture ( (optional)) – The Socket instance for capturing traffic.
- control ( (optional)) – The Socket instance for control flow.
- ()
-
Compute the public key corresponding to a secret key for use
with zmq.CURVE securityRequires libzmq (≥ 4.2) to have been built with CURVE support.
Parameters: private – The private key as a 40 byte z85-encoded bytestring Returns: The public key as a 40 byte z85-encoded bytestring. Return type: bytestring
- ()
-
generate a Z85 keypair for use with zmq.CURVE security
Requires libzmq (≥ 4.0) to have been built with CURVE support.
New in version libzmq-4.0.
New in version 14.0.
Returns: (public, secret) – The public and private keypair as 40 byte z85-encoded bytestrings. Return type: two bytestrings
- ()
-
Return a list of directories to include for linking against pyzmq with cython.
- ()
-
Return a list of directories used to link against pyzmq’s bundled libzmq.
DealerSocket
The NetMQ doesn’t do anything particularly special, but what it does offer is the ability to work in a fully asynchronous manner.
Which if you recall was not something that other socket types could do, where the / methods are blocking, and would also throw exceptions should you try to call
things in the wrong order, or more than expected.
The main selling point of a is its asynchronous abilities. Typically a would be used in conjunction with a , which is why we have decided to bundle the description of both these socket types into this documentation page.
If you want to know more details about socket combinations involving s, then as ALWAYS the guide is your friend. In particular the page of the guide may be of interest.
Exceptions¶
- class (errno=None, msg=None)
-
Wrap an errno style error.
Parameters: -
errno () – The ZMQ errno or None. If None, then is called and
used. - msg (string) – Description of the error or None.
- ()
-
Exception.with_traceback(tb) –
set self.__traceback__ to tb and return self.
-
errno () – The ZMQ errno or None. If None, then is called and
- class (min_version, msg=’Feature’)
-
Raised when a feature is not provided by the linked version of libzmq.
New in version 14.2.
- ()
-
Exception.with_traceback(tb) –
set self.__traceback__ to tb and return self.
- class (errno=’ignored’, msg=’ignored’)
-
Wrapper for zmq.EAGAIN
New in version 13.0.
- class (errno=’ignored’, msg=’ignored’)
-
Wrapper for zmq.ETERM
New in version 13.0.
- class
-
Raised when timeout is reached while waiting for 0MQ to finish with a Message
See also
- object for tracking when ZeroMQ is done
An example
Time for an example. The best way to think of this example is summarized in the bullet points below:
- There is one server. It binds a which stores the inbound request’s connection identity to work out how to route back the response message to the correct client socket.
- There are multiple clients, each in its own thread. These clients are s. The client socket will provide a fixed identity, such that the server () will be able to use the identity supplied to correctly route back messages for this client.
Ok so that is the overview. Let’s see the code:
public static void Main(string[] args) { // NOTES // 1. Use ThreadLocal<DealerSocket> where each thread has // its own client DealerSocket to talk to server // 2. Each thread can send using it own socket // 3. Each thread socket is added to poller const int delay = 3000; // millis var clientSocketPerThread = new ThreadLocal<DealerSocket>(); using (var server = new RouterSocket("@tcp://127.0.0.1:5556")) using (var poller = new NetMQPoller()) { // Start some threads, each with its own DealerSocket // to talk to the server socket. Creates lots of sockets, // but no nasty race conditions no shared state, each // thread has its own socket, happy days. for (int i = ; i < 3; i++) { Task.Factory.StartNew(state => { DealerSocket client = null; if (!clientSocketPerThread.IsValueCreated) { client = new DealerSocket(); client.Options.Identity = Encoding.Unicode.GetBytes(state.ToString()); client.Connect("tcp://127.0.0.1:5556"); client.ReceiveReady += Client_ReceiveReady; clientSocketPerThread.Value = client; poller.Add(client); } else { client = clientSocketPerThread.Value; } while (true) { var messageToServer = new NetMQMessage(); messageToServer.AppendEmptyFrame(); messageToServer.Append(state.ToString()); Console.WriteLine("======================================"); Console.WriteLine(" OUTGOING MESSAGE TO SERVER "); Console.WriteLine("======================================"); PrintFrames("Client Sending", messageToServer); client.SendMultipartMessage(messageToServer); Thread.Sleep(delay); } }, string.Format("client {0}", i), TaskCreationOptions.LongRunning); } // start the poller poller.RunAsync(); // server loop while (true) { var clientMessage = server.ReceiveMessage(); Console.WriteLine("======================================"); Console.WriteLine(" INCOMING CLIENT MESSAGE FROM CLIENT "); Console.WriteLine("======================================"); PrintFrames("Server receiving", clientMessage); if (clientMessage.FrameCount == 3) { var clientAddress = clientMessage]; var clientOriginalMessage = clientMessage2ConvertToString(); string response = string.Format("{0} back from server {1}", clientOriginalMessage, DateTime.Now.ToLongTimeString()); var messageToClient = new NetMQMessage(); messageToClient.Append(clientAddress); messageToClient.AppendEmptyFrame(); messageToClient.Append(response); server.SendMultipartMessage(messageToClient); } } } } void PrintFrames(string operationType, NetMQMessage message) { for (int i = ; i < message.FrameCount; i++) { Console.WriteLine("{0} Socket : Frame = {2}", operationType, i, messageiConvertToString()); } } void Client_ReceiveReady(object sender, NetMQSocketEventArgs e) { bool hasmore = false; e.Socket.Receive(out hasmore); if (hasmore) { string result = e.Socket.ReceiveFrameString(out hasmore); Console.WriteLine("REPLY {0}", result); } }
When run, this program should output something like this:
====================================== OUTGOING MESSAGE TO SERVER ====================================== ====================================== OUTGOING MESSAGE TO SERVER ====================================== Client Sending Socket : Frame = Client Sending Socket : Frame = client 1 Client Sending Socket : Frame = Client Sending Socket : Frame = client 0 ====================================== INCOMING CLIENT MESSAGE FROM CLIENT ====================================== Server receiving Socket : Frame = c l i e n t 1 Server receiving Socket : Frame = Server receiving Socket : Frame = client 1 ====================================== INCOMING CLIENT MESSAGE FROM CLIENT ====================================== Server receiving Socket : Frame = c l i e n t 0 Server receiving Socket : Frame = Server receiving Socket : Frame = client 0 REPLY client 1 back from server 08:05:56 REPLY client 0 back from server 08:05:56
Remember this is asynchronous code, so events may not occur in the order you logically expect.
My work on a proof of concept
Given that nature of problems 1 & 2 I set out to rewrite the internals and design a better API that is suitable for honouring high water marks and timeouts for both send & receive operations.
Because I was now effectively rewriting the entire library I also decided to address the additional improvements mentioned above. Especially the move to N-API is non-trivial and influences most of the native code.
To summarize, the goals of my experiment were:
- Design new promise-based API that solves usability issues with high water marks and timeouts.
- Attempt to use Node.js experimental N-API which offers an outlook of binary compatibility across Node.js versions once it stabilises. (This prompted several patches to the C++ node-addon-api wrapper and potential improvements to the N-API itself).
- Allow easy definition of new socket types and options by consumers of the library, without changes to the internals.
- Match performance of the current library & reduce the interdependency between JS and C++ code (in practice this boils down to having all socket & context logic in C++).
What is included and working
- Support for all socket types and socket options up to ZMQ 4.2.
- Support for ZMQ 4.0+.
- Test suite for all code written so far – covering at least all scenarios in the current test suite (except for features still missing, see below).
- Successful builds on macOS & Linux.
What is still missing
The following features have not yet been added. My plan is to implement them over the next weeks years:
A proxy classA curve keypair helper functionMonitoring sockets for eventsSupport for prebuilt binariesDraft sockets and associated (new) APIsThorough documentation of the APIWindows supportElectron supportReal world testing
Языковые привязки ZeroMQ
Привязки для Python: PyZMQ
Загрузить и установить привязки ZeroMQ для Python (под названием PyZMQ) можно при помощи менеджера пакетов pip.
Для этого запустите:
Чтобы подробнее ознакомиться с установкой и настройкой Python 2.7.x и 3.x (а также с основными инструментами и привязками Python) в системе CentOS, читайте данное руководство.
Привязки для Ruby: zmq Gem
Языковые привязки ZeroMQ для Ruby называются отдельным Gem-ом под названием zmq.
Чтобы установить эту привязку на стандартную установку ZeroMQ, используйте:
Чтобы установить zmq на нестандартную установку ZeroMQ, запустите:
Другие языковые привязки ZeroMQ
Чтобы получить информацию о других привязках ZeroMQ (включая, но не ограничиваясь PHP, C #, Erlang, Haskell, Java, Lua и т.д.), посетите сайт сообщества ZeroMQ.
CentOSCentOS 6Cloud ServerEPELPythonPyZMQRHELRubyZeroMQzmq Gem
Usage
Direct CZMQ Sock API
Example
package main import ( "log" "github.com/zeromq/goczmq" ) func main() { // Create a router socket and bind it to port 5555. router, err := goczmq.NewRouter("tcp://*:5555") if err != nil { log.Fatal(err) } defer router.Destroy() log.Println("router created and bound") // Create a dealer socket and connect it to the router. dealer, err := goczmq.NewDealer("tcp://127.0.0.1:5555") if err != nil { log.Fatal(err) } defer dealer.Destroy() log.Println("dealer created and connected") // Send a 'Hello' message from the dealer to the router. // Here we send it as a frame ([]byte), with a FlagNone // flag to indicate there are no more frames following. err = dealer.SendFrame([]byte("Hello"), goczmq.FlagNone) if err != nil { log.Fatal(err) } log.Println("dealer sent 'Hello'") // Receve the message. Here we call RecvMessage, which // will return the message as a slice of frames ([][]byte). // Since this is a router socket that support async // request / reply, the first frame of the message will // be the routing frame. request, err := router.RecvMessage() if err != nil { log.Fatal(err) } log.Printf("router received '%s' from '%v'", request, request[]) // Send a reply. First we send the routing frame, which // lets the dealer know which client to send the message. // The FlagMore flag tells the router there will be more // frames in this message. err = router.SendFrame(request[], goczmq.FlagMore) if err != nil { log.Fatal(err) } log.Printf("router sent 'World'") // Next send the reply. The FlagNone flag tells the router // that this is the last frame of the message. err = router.SendFrame([]byte("World"), goczmq.FlagNone) if err != nil { log.Fatal(err) } // Receive the reply. reply, err := dealer.RecvMessage() if err != nil { log.Fatal(err) } log.Printf("dealer received '%s'", string(reply[])) }
io.ReadWriter support
Example
package main import ( "log" "github.com/zeromq/goczmq" ) func main() { // Create a router socket and bind it to port 5555. router, err := goczmq.NewRouter("tcp://*:5555") if err != nil { log.Fatal(err) } defer router.Destroy() log.Println("router created and bound") // Create a dealer socket and connect it to the router. dealer, err := goczmq.NewDealer("tcp://127.0.0.1:5555") if err != nil { log.Fatal(err) } defer dealer.Destroy() log.Println("dealer created and connected") // Send a 'Hello' message from the dealer to the router, // using the io.Write interface n, err := dealer.Write([]byte("Hello")) if err != nil { log.Fatal(err) } log.Printf("dealer sent %d byte message 'Hello'\n", n) // Make a byte slice and pass it to the router // Read interface. When using the ReadWriter // interface with a router socket, the router // caches the routing frames internally in a // FIFO and uses them transparently when // sending replies. buf := make([]byte, 16386) n, err = router.Read(buf) if err != nil { log.Fatal(err) } log.Printf("router received '%s'\n", buf) // Send a reply. n, err = router.Write([]byte("World")) if err != nil { log.Fatal(err) } log.Printf("router sent %d byte message 'World'\n", n) // Receive the reply, reusing the previous buffer. n, err = dealer.Read(buf) if err != nil { log.Fatal(err) } log.Printf("dealer received '%s'", string(buf)) }
Example
package main import ( "log" "github.com/zeromq/goczmq" ) func main() { // Create a router channeler and bind it to port 5555. // A channeler provides a thread safe channel interface // to a *Sock router := goczmq.NewRouterChanneler("tcp://*:5555") defer router.Destroy() log.Println("router created and bound") // Create a dealer channeler and connect it to the router. dealer := goczmq.NewDealerChanneler("tcp://127.0.0.1:5555") defer dealer.Destroy() log.Println("dealer created and connected") // Send a 'Hello' message from the dealer to the router. dealer.SendChan <- [][]byte{[]byte("Hello")} log.Println("dealer sent 'Hello'") // Receve the message as a [][]byte. Since this is // a router, the first frame of the message wil // be the routing frame. request := <-router.RecvChan log.Printf("router received '%s' from '%v'", request, request[]) // Send a reply. First we send the routing frame, which // lets the dealer know which client to send the message. router.SendChan <- [][]byte{request[], []byte("World")} log.Printf("router sent 'World'") // Receive the reply. reply := <-dealer.RecvChan log.Printf("dealer received '%s'", string(reply[])) }
History
iMatix CEO Pieter Hintjens registered the zeromq.org domain in May 2007 and started the ZeroMQ project together with Martin Sustrik, who was its architect and lead developer until December 2011.
On March 30, 2010, Hintjens announced that iMatix (the original designer of Advanced Message Queuing Protocol) would leave the AMQP workgroup and did not plan to support AMQP/1.0 in favor of the significantly simpler and faster ZeroMQ.
In 2011, CERN was investigating ways to unify middleware solutions used to operate CERN accelerators. The CERN study compared two open source implementations of CORBA, Ice, Thrift, ZeroMQ, YAMI4,RTI, and Qpid (AMQP) and scored ZeroMQ highest, in part for its versatility, including its easy adaptability to the LynxOS.
At the start of 2012, two of the original developers forked ZeroMQ as Crossroads I/O. Martin Sustrik has started nanomsg, a rewrite of the ZeroMQ core library.
In August 2012, Dongmin Yu announced his pure Java conversion of ZeroMQ, JeroMQ. This has inspired further full-native ports of ZeroMQ, such as NetMQ for C#.
In March 2013, Pieter Hintjens announced a new draft of the ZMTP wire-level protocol bringing extensible security mechanisms to ZeroMQ. Martin Hurton implemented the CurveZMQ authentication and encryption mechanism in the core library shortly afterwards.
Background
We have recently taken a system into production that uses zeromq.js. This works well, but I encountered a number of issues with this library that triggered me to think about solutions. Unfortunately some problems did not appear to be fixable without a massive overhaul of the internals. I will briefly describe the main problems.
Problem 1: Sent messages are queued, but not with ZMQ
Zeromq.js uses an internal queuing mechanism for sent messages. Essentially it pushes all messages onto a JavaScript array and sends them out as soon as the socket is ready for writing. This works nicely except when it doesn’t:
- If write is not possible, messages are queued infinitely; eventually exhausting all system memory if messages continue to be sent.
- Because writes only happen when when the socket is writable, write timeouts have no effect – the socket just never enters the writable state. If the application waits for a message to be sent before continuing (by adding a callback) it will be stuck without an obvious way to recover.
Related issues:
From I understand that this queueing was built to improve performance. Which means that matching the performance of the current library should be a goal of any new version.
Problem 2: Messages that are received are read automatically
Readable sockets follow the event emitter pattern. This is common in Node.js APIs, so it makes sense for most people. There is important one drawback, though: messages are read automatically.
- Sequential processing of messages is not possible – all available messages will flow in automatically.
- Read timeouts have no effect; this may break some REQ/REP scenarios, especially.
- An application cannot reliably signal that it cannot process any more messages.
It appears that a call to offers a suitable workaround, however:
- A call to also disables writes, which may not be intentional.
- A call to will always dispatch all messages that can be read from the ZMQ queue without blocking before it actually stops reading from the socket.
- A user has to explicitly call to approximate features that are offered by libzmq by default, which is not a very friendly developer experience.
Related issues:
Other potential improvements
There were some other things we encountered while working on the library that we believed could be improved:
- This library includes many ZMQ options, but some are not (yet) added. Adding an option requires changes to JS & C++ code. It would be nice to be able to add options without having to change the library itself. This allows users to polyfill options available in their version of ZMQ, before they are added to the library.
- It would be nice to support promise instead of callbacks. Node.js has great support for promises in all supported versions.
- It would be nice to support async iterators for incoming messages. This is only natively supported with an experimental flag in Node.js version 8. It can be used with Babel or TypeScript on any supported Node.js version.
Подготовка системы и установка зависимостей ZeroMQ
Прежде чем приступить к установке библиотеки ZeroMQ, нужно подготовить систему.
Добавление репозитория EPEL
Чтобы иметь возможность загружать отдельные инструменты для сборки и использования ZeroMQ и многих других программ, нужно подключить репозиторий EPEL (Extra Packages for Enterprise Linux). В нем можно найти некоторые важные инструменты, которых нет в других репозиториях.
Чтобы добавить EPEL, используйте команды:
Загрузка дополнительных инструментов для компиляции кода
Как уже говорилось, процесс сборки ZeroMQ требует некоторых дополнительных инструментов. Установив EPEL, эти инструменты можно без проблем скачать при помощи стандартного менеджера пакетов YUM.
Итак, запустите:
What next?
I’d like to invite you to have a look at https://github.com/rolftimmermans/zeromq-ng and give your feedback.
Specifically, I’d like to discuss two scenarios:
- I publish this as a new library that exists separately from zeromq.js
- This can be the beginning of a next major (API incompatible) release of zeromq.js
I’d be completely happy with either.
In addition to the above I am looking from feedback from users that have experience with one or more of the problems mentioned above (or even other problems!). It would be nice if you can give your opinion on the API or even test the proof of concept.
I am also in the process of writing and collecting benchmarks and will post the results soon. Based on what I have tested so far I’m expecting similar performance in general.
Вопросы по реализации
-
Где и как хранятся адреса издателей и подписчиков?
В принципе адреса могут храниться у любого участника процесса. Но обычно система выступает в качестве брокера и хранит все адреса у себя. -
Подписка, что она из себя представляет?
Наверняка подписка будет включать адрес подписчика. Кроме этого, будет указан «класс сообщения«, своеобразный фильтр, который определяет, соответствует ли сообщение данной подписке или нет. Адрес издателя здесь не нужен. -
Можем ли мы добавить или убрать подписку в любой момент или должны для этого остановить систему?
Если мы добавляем/убираем подписку, как это отразится на сообщениях, которые уже опубликованы и ждут отправки? -
Важен ли формат сообщения?
Скорее всего сообщение будет сериализовано и размещено в пакете. Вместе с пакетом могут передаваться дополнительные параметры, которые размещаются в headers пакета. -
Как сообщения фильтруются по подпискам?
Обычно сообщение сразу же фильтруется по подпискам на этапе публикации. Иногда сообщение фильтруется только на этапе отправки сообщения подписчику. Иногда сообщение фильтруется уже на стороне подписчика. -
Что из себя представляют фильтры подписок?
Фильтры подписок могут представлять из себя иерархии наподобие namespaces, либо наборы ключей, либо RegEx выражения. -
Что будет, если система или подписчик не может принять сообщения?
Система или подписчик может отказать в приемке, может удалить сообщение без всякого предупреждения, а может и просто зависнуть или выйти из строя. -
Повторить ли отправку сообщения, если она завершилась неудачей?
Повторение отправки (retry) — это стандартное решение в случае, если принимающая сторона временно недоступна. Ключевое слово здесь «временно». Если принимающая сторона или канал передачи перестали работать на постоянной основе, то повторение отправки ничем не поможет, а только перегрузит систему/канал связи. -
Можно ли настраивать алгоритм повторной отправки сообщений?
При ненадежных каналах связи этот вопрос может быть одним из главных. Можем ли мы изменять количество повторных попыток, промежуток между попытками? По какому алгоритму настраивается промежуток между попытками? -
Как долго сообщения хранятся?
Если подписчик долго не забирает сообщение, что с ним надо делать? Хранить ли его неограниченное время или удалять после определенного интервала? -
Кто инициирует отправку сообщения?
Подписчик может время от времени опрашивать систему на предмет новых сообщений (poll mode) или система сама отправляет новое сообщение подписчику (push mode). В первом случае подписчик должен знать адрес системы, во втором случае система должна знать адрес подписчика. Во втором случае система более экономно расходует как свои ресурсы, так и ресурсы канала передачи данных. -
Что будет, если подписчик перегружен сообщениями?
Закрыть ли ему вход на прием новых сообщений и отсылать отправляющей стороне предупреждения или молча игнорировать сообщения? А может подписчик может увеличивать (масштабировать) свои ресурсы?