Raven Core  3.0.0
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Copyright (c) 2017-2019 The Raven Core developers
3 // Distributed under the MIT software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #include "chain.h"
7 #include "chainparams.h"
8 #include "streams.h"
9 #include "zmqpublishnotifier.h"
10 #include "validation.h"
11 #include "util.h"
12 #include "rpc/server.h"
13 
14 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
15 
16 static const char *MSG_HASHBLOCK = "hashblock";
17 static const char *MSG_HASHTX = "hashtx";
18 static const char *MSG_RAWBLOCK = "rawblock";
19 static const char *MSG_RAWTX = "rawtx";
20 static const char *MSG_RAWASSETMSG = "rawmessage";
21 
22 // Internal function to send multipart message
23 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
24 {
25  va_list args;
26  va_start(args, size);
27 
28  while (1)
29  {
30  zmq_msg_t msg;
31 
32  int rc = zmq_msg_init_size(&msg, size);
33  if (rc != 0)
34  {
35  zmqError("Unable to initialize ZMQ msg");
36  va_end(args);
37  return -1;
38  }
39 
40  void *buf = zmq_msg_data(&msg);
41  memcpy(buf, data, size);
42 
43  data = va_arg(args, const void*);
44 
45  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
46  if (rc == -1)
47  {
48  zmqError("Unable to send ZMQ msg");
49  zmq_msg_close(&msg);
50  va_end(args);
51  return -1;
52  }
53 
54  zmq_msg_close(&msg);
55 
56  if (!data)
57  break;
58 
59  size = va_arg(args, size_t);
60  }
61  va_end(args);
62  return 0;
63 }
64 
66 {
67  assert(!psocket);
68 
69  // check if address is being used by other publish notifier
70  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
71 
72  if (i==mapPublishNotifiers.end())
73  {
74  psocket = zmq_socket(pcontext, ZMQ_PUB);
75  if (!psocket)
76  {
77  zmqError("Failed to create socket");
78  return false;
79  }
80 
81  int rc = zmq_bind(psocket, address.c_str());
82  if (rc!=0)
83  {
84  zmqError("Failed to bind address");
85  zmq_close(psocket);
86  return false;
87  }
88 
89  // register this notifier for the address, so it can be reused for other publish notifier
90  mapPublishNotifiers.insert(std::make_pair(address, this));
91  return true;
92  }
93  else
94  {
95  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
96 
97  psocket = i->second->psocket;
98  mapPublishNotifiers.insert(std::make_pair(address, this));
99 
100  return true;
101  }
102 }
103 
105 {
106  assert(psocket);
107 
108  int count = mapPublishNotifiers.count(address);
109 
110  // remove this notifier from the list of publishers using this address
111  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
112  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
113 
114  for (iterator it = iterpair.first; it != iterpair.second; ++it)
115  {
116  if (it->second==this)
117  {
118  mapPublishNotifiers.erase(it);
119  break;
120  }
121  }
122 
123  if (count == 1)
124  {
125  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
126  int linger = 0;
127  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
128  zmq_close(psocket);
129  }
130 
131  psocket = nullptr;
132 }
133 
134 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
135 {
136  assert(psocket);
137 
138  /* send three parts, command & data & a LE 4byte sequence number */
139  unsigned char msgseq[sizeof(uint32_t)];
140  WriteLE32(&msgseq[0], nSequence);
141  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
142  if (rc == -1)
143  return false;
144 
145  /* increment memory only sequence number after sending */
146  nSequence++;
147 
148  return true;
149 }
150 
152 {
153  uint256 hash = pindex->GetBlockHash();
154  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
155  char data[32];
156  for (unsigned int i = 0; i < 32; i++)
157  data[31 - i] = hash.begin()[i];
158  return SendMessage(MSG_HASHBLOCK, data, 32);
159 }
160 
162 {
163  uint256 hash = transaction.GetHash();
164  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
165  char data[32];
166  for (unsigned int i = 0; i < 32; i++)
167  data[31 - i] = hash.begin()[i];
168  return SendMessage(MSG_HASHTX, data, 32);
169 }
170 
172 {
173  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
174 
175  const Consensus::Params& consensusParams = Params().GetConsensus();
176  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
177  {
178  LOCK(cs_main);
179  CBlock block;
180  if(!ReadBlockFromDisk(block, pindex, consensusParams))
181  {
182  zmqError("Can't read block from disk");
183  return false;
184  }
185 
186  ss << block;
187  }
188 
189  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
190 }
191 
193 {
194  uint256 hash = transaction.GetHash();
195  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
196  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
197  ss << transaction;
198  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
199 }
200 
202 {
203  LogPrint(BCLog::ZMQ, "zmq: Publish message %s\n", message.ToString());
204 
205  CZMQMessage zmqmessage(message);
206  std::string str = zmqmessage.createJsonString();
207  return SendMessage(MSG_RAWASSETMSG, &(*str.begin()), str.size());
208 }
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyMessage(const CMessage &message) override
std::string ToString() const
Definition: messages.h:91
uint32_t nSequence
upcounting per message sequence number
Definition: block.h:73
CCriticalSection cs_main
Global state.
Definition: validation.cpp:72
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:147
unsigned char * begin()
Definition: uint256.h:57
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
uint256 GetBlockHash() const
Definition: chain.h:294
size_type size() const
Definition: streams.h:238
#define LOCK(cs)
Definition: sync.h:176
bool SendMessage(const char *command, const void *data, size_t size)
const uint256 & GetHash() const
Definition: transaction.h:320
bool NotifyBlock(const CBlockIndex *pindex) override
Parameters that influence chain consensus.
Definition: params.h:47
#define LogPrint(category,...)
Definition: util.h:160
256-bit opaque blob.
Definition: uint256.h:123
const_iterator begin() const
Definition: streams.h:234
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:172
const CChainParams & Params()
Return the currently selected parameters.
int RPCSerializationFlags()
Definition: server.cpp:559
void * memcpy(void *a, const void *b, size_t c)
std::string GetHex() const
Definition: uint256.cpp:22
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:270
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:61
bool ReadBlockFromDisk(CBlock &block, const CDiskBlockPos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
void zmqError(const char *str)
bool Initialize(void *pcontext) override