Raven Core  3.0.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Copyright (c) 2017-2019 The Raven Core developers
3 // Distributed under the MIT software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
7 #include "zmqpublishnotifier.h"
8 
9 #include "version.h"
10 #include "validation.h"
11 #include "streams.h"
12 #include "util.h"
13 
14 void zmqError(const char *str)
15 {
16  LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
17 }
18 
20 {
21 }
22 
24 {
25  Shutdown();
26 
27  for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
28  {
29  delete *i;
30  }
31 }
32 
34 {
35  CZMQNotificationInterface* notificationInterface = nullptr;
36  std::map<std::string, CZMQNotifierFactory> factories;
37  std::list<CZMQAbstractNotifier*> notifiers;
38 
39  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
40  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
41  factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
42  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
43  factories["pubrawmessage"] = CZMQAbstractNotifier::Create<CZMQPublishNewAssetMessageNotifier>;
44 
45  for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
46  {
47  std::string arg("-zmq" + i->first);
48  if (gArgs.IsArgSet(arg))
49  {
50  CZMQNotifierFactory factory = i->second;
51  std::string address = gArgs.GetArg(arg, "");
52  CZMQAbstractNotifier *notifier = factory();
53  notifier->SetType(i->first);
54  notifier->SetAddress(address);
55  notifiers.push_back(notifier);
56  }
57  }
58 
59  if (!notifiers.empty())
60  {
61  notificationInterface = new CZMQNotificationInterface();
62  notificationInterface->notifiers = notifiers;
63 
64  if (!notificationInterface->Initialize())
65  {
66  delete notificationInterface;
67  notificationInterface = nullptr;
68  }
69  }
70 
71  return notificationInterface;
72 }
73 
74 // Called at startup to conditionally set up ZMQ socket(s)
76 {
77  LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
78  assert(!pcontext);
79 
80  pcontext = zmq_init(1);
81 
82  if (!pcontext)
83  {
84  zmqError("Unable to initialize context");
85  return false;
86  }
87 
88  std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
89  for (; i!=notifiers.end(); ++i)
90  {
91  CZMQAbstractNotifier *notifier = *i;
92  if (notifier->Initialize(pcontext))
93  {
94  LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
95  }
96  else
97  {
98  LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
99  break;
100  }
101  }
102 
103  if (i!=notifiers.end())
104  {
105  return false;
106  }
107 
108  return true;
109 }
110 
111 // Called during shutdown sequence
113 {
114  LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
115  if (pcontext)
116  {
117  for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
118  {
119  CZMQAbstractNotifier *notifier = *i;
120  LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
121  notifier->Shutdown();
122  }
123  zmq_ctx_destroy(pcontext);
124 
125  pcontext = nullptr;
126  }
127 }
128 
129 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
130 {
131  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
132  return;
133 
134  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
135  {
136  CZMQAbstractNotifier *notifier = *i;
137  if (notifier->NotifyBlock(pindexNew))
138  {
139  i++;
140  }
141  else
142  {
143  notifier->Shutdown();
144  i = notifiers.erase(i);
145  }
146  }
147 }
148 
150 {
151  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
152  {
153  CZMQAbstractNotifier *notifier = *i;
154  if (notifier->NotifyMessage(message))
155  {
156  i++;
157  }
158  else
159  {
160  notifier->Shutdown();
161  i = notifiers.erase(i);
162  }
163  }
164 }
165 
167 {
168  // Used by BlockConnected and BlockDisconnected as well, because they're
169  // all the same external callback.
170  const CTransaction& tx = *ptx;
171 
172  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
173  {
174  CZMQAbstractNotifier *notifier = *i;
175  if (notifier->NotifyTransaction(tx))
176  {
177  i++;
178  }
179  else
180  {
181  notifier->Shutdown();
182  i = notifiers.erase(i);
183  }
184  }
185 }
186 
187 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted)
188 {
189  for (const CTransactionRef& ptx : pblock->vtx) {
190  // Do a normal notify for each transaction added in the block
192  }
193 }
194 
195 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock)
196 {
197  for (const CTransactionRef& ptx : pblock->vtx) {
198  // Do a normal notify for each transaction removed in block disconnection
200  }
201 }
void NewAssetMessage(const CMessage &message) override
bool IsArgSet(const std::string &strArg) const
Return true if the given argument has been manually set.
Definition: util.cpp:448
void TransactionAddedToMempool(const CTransactionRef &tx) override
Notifies listeners of a transaction having been added to mempool.
virtual bool NotifyBlock(const CBlockIndex *pindex)
std::string GetAddress() const
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected, const std::vector< CTransactionRef > &vtxConflicted) override
Notifies listeners of a block being connected.
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:436
static CZMQNotificationInterface * Create()
virtual bool NotifyTransaction(const CTransaction &transaction)
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock) override
Notifies listeners of a block being disconnected.
virtual bool NotifyMessage(const CMessage &message)
void SetAddress(const std::string &a)
virtual void Shutdown()=0
#define LogPrint(category,...)
Definition: util.h:160
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners of updated block chain tip.
std::list< CZMQAbstractNotifier * > notifiers
ArgsManager gArgs
Definition: util.cpp:94
void zmqError(const char *str)
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:172
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: util.cpp:454
CZMQAbstractNotifier *(* CZMQNotifierFactory)()
virtual bool Initialize(void *pcontext)=0
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:270
void SetType(const std::string &t)
std::string GetType() const