Dash Core Source Documentation (0.16.0.1)

Find detailed information regarding the Dash Core source code.

zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <chain.h>
6 #include <chainparams.h>
7 #include <streams.h>
9 #include <validation.h>
10 #include <util.h>
11 
12 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
13 
14 static const char *MSG_HASHBLOCK = "hashblock";
15 static const char *MSG_HASHCHAINLOCK = "hashchainlock";
16 static const char *MSG_HASHTX = "hashtx";
17 static const char *MSG_HASHTXLOCK = "hashtxlock";
18 static const char *MSG_HASHGVOTE = "hashgovernancevote";
19 static const char *MSG_HASHGOBJ = "hashgovernanceobject";
20 static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
21 static const char *MSG_RAWBLOCK = "rawblock";
22 static const char *MSG_RAWCHAINLOCK = "rawchainlock";
23 static const char *MSG_RAWCLSIG = "rawchainlocksig";
24 static const char *MSG_RAWTX = "rawtx";
25 static const char *MSG_RAWTXLOCK = "rawtxlock";
26 static const char *MSG_RAWTXLOCKSIG = "rawtxlocksig";
27 static const char *MSG_RAWGVOTE = "rawgovernancevote";
28 static const char *MSG_RAWGOBJ = "rawgovernanceobject";
29 static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
30 
31 // Internal function to send multipart message
32 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
33 {
34  va_list args;
35  va_start(args, size);
36 
37  while (1)
38  {
39  zmq_msg_t msg;
40 
41  int rc = zmq_msg_init_size(&msg, size);
42  if (rc != 0)
43  {
44  zmqError("Unable to initialize ZMQ msg");
45  va_end(args);
46  return -1;
47  }
48 
49  void *buf = zmq_msg_data(&msg);
50  memcpy(buf, data, size);
51 
52  data = va_arg(args, const void*);
53 
54  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
55  if (rc == -1)
56  {
57  zmqError("Unable to send ZMQ msg");
58  zmq_msg_close(&msg);
59  va_end(args);
60  return -1;
61  }
62 
63  zmq_msg_close(&msg);
64 
65  if (!data)
66  break;
67 
68  size = va_arg(args, size_t);
69  }
70  va_end(args);
71  return 0;
72 }
73 
75 {
76  assert(!psocket);
77 
78  // check if address is being used by other publish notifier
79  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
80 
81  if (i==mapPublishNotifiers.end())
82  {
83  psocket = zmq_socket(pcontext, ZMQ_PUB);
84  if (!psocket)
85  {
86  zmqError("Failed to create socket");
87  return false;
88  }
89 
90  int rc = zmq_bind(psocket, address.c_str());
91  if (rc!=0)
92  {
93  zmqError("Failed to bind address");
94  zmq_close(psocket);
95  return false;
96  }
97 
98  // register this notifier for the address, so it can be reused for other publish notifier
99  mapPublishNotifiers.insert(std::make_pair(address, this));
100  return true;
101  }
102  else
103  {
104  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
105 
106  psocket = i->second->psocket;
107  mapPublishNotifiers.insert(std::make_pair(address, this));
108 
109  return true;
110  }
111 }
112 
114 {
115  assert(psocket);
116 
117  int count = mapPublishNotifiers.count(address);
118 
119  // remove this notifier from the list of publishers using this address
120  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
121  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
122 
123  for (iterator it = iterpair.first; it != iterpair.second; ++it)
124  {
125  if (it->second==this)
126  {
127  mapPublishNotifiers.erase(it);
128  break;
129  }
130  }
131 
132  if (count == 1)
133  {
134  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
135  int linger = 0;
136  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
137  zmq_close(psocket);
138  }
139 
140  psocket = nullptr;
141 }
142 
143 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
144 {
145  assert(psocket);
146 
147  /* send three parts, command & data & a LE 4byte sequence number */
148  unsigned char msgseq[sizeof(uint32_t)];
149  WriteLE32(&msgseq[0], nSequence);
150  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
151  if (rc == -1)
152  return false;
153 
154  /* increment memory only sequence number after sending */
155  nSequence++;
156 
157  return true;
158 }
159 
161 {
162  uint256 hash = pindex->GetBlockHash();
163  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
164  char data[32];
165  for (unsigned int i = 0; i < 32; i++)
166  data[31 - i] = hash.begin()[i];
167  return SendMessage(MSG_HASHBLOCK, data, 32);
168 }
169 
171 {
172  uint256 hash = pindex->GetBlockHash();
173  LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s\n", hash.GetHex());
174  char data[32];
175  for (unsigned int i = 0; i < 32; i++)
176  data[31 - i] = hash.begin()[i];
177  return SendMessage(MSG_HASHCHAINLOCK, data, 32);
178 }
179 
181 {
182  uint256 hash = transaction.GetHash();
183  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
184  char data[32];
185  for (unsigned int i = 0; i < 32; i++)
186  data[31 - i] = hash.begin()[i];
187  return SendMessage(MSG_HASHTX, data, 32);
188 }
189 
191 {
192  uint256 hash = transaction.GetHash();
193  LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s\n", hash.GetHex());
194  char data[32];
195  for (unsigned int i = 0; i < 32; i++)
196  data[31 - i] = hash.begin()[i];
197  return SendMessage(MSG_HASHTXLOCK, data, 32);
198 }
199 
201 {
202  uint256 hash = vote.GetHash();
203  LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s\n", hash.GetHex());
204  char data[32];
205  for (unsigned int i = 0; i < 32; i++)
206  data[31 - i] = hash.begin()[i];
207  return SendMessage(MSG_HASHGVOTE, data, 32);
208 }
209 
211 {
212  uint256 hash = object.GetHash();
213  LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s\n", hash.GetHex());
214  char data[32];
215  for (unsigned int i = 0; i < 32; i++)
216  data[31 - i] = hash.begin()[i];
217  return SendMessage(MSG_HASHGOBJ, data, 32);
218 }
219 
221 {
222  uint256 currentHash = currentTx.GetHash(), previousHash = previousTx.GetHash();
223  LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s\n", currentHash.ToString(), previousHash.ToString());
224  char dataCurrentHash[32], dataPreviousHash[32];
225  for (unsigned int i = 0; i < 32; i++) {
226  dataCurrentHash[31 - i] = currentHash.begin()[i];
227  dataPreviousHash[31 - i] = previousHash.begin()[i];
228  }
229  return SendMessage(MSG_HASHISCON, dataCurrentHash, 32)
230  && SendMessage(MSG_HASHISCON, dataPreviousHash, 32);
231 }
232 
233 
235 {
236  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
237 
238  const Consensus::Params& consensusParams = Params().GetConsensus();
240  {
241  LOCK(cs_main);
242  CBlock block;
243  if(!ReadBlockFromDisk(block, pindex, consensusParams))
244  {
245  zmqError("Can't read block from disk");
246  return false;
247  }
248 
249  ss << block;
250  }
251 
252  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
253 }
254 
256 {
257  LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex());
258 
259  const Consensus::Params& consensusParams = Params().GetConsensus();
261  {
262  LOCK(cs_main);
263  CBlock block;
264  if(!ReadBlockFromDisk(block, pindex, consensusParams))
265  {
266  zmqError("Can't read block from disk");
267  return false;
268  }
269 
270  ss << block;
271  }
272 
273  return SendMessage(MSG_RAWCHAINLOCK, &(*ss.begin()), ss.size());
274 }
275 
277 {
278  LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s\n", pindex->GetBlockHash().GetHex());
279 
280  const Consensus::Params& consensusParams = Params().GetConsensus();
282  {
283  LOCK(cs_main);
284  CBlock block;
285  if(!ReadBlockFromDisk(block, pindex, consensusParams))
286  {
287  zmqError("Can't read block from disk");
288  return false;
289  }
290 
291  ss << block;
292  ss << clsig;
293  }
294 
295  return SendMessage(MSG_RAWCLSIG, &(*ss.begin()), ss.size());
296 }
297 
299 {
300  uint256 hash = transaction.GetHash();
301  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
303  ss << transaction;
304  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
305 }
306 
308 {
309  uint256 hash = transaction.GetHash();
310  LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s\n", hash.GetHex());
312  ss << transaction;
313  return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
314 }
315 
317 {
318  uint256 hash = transaction.GetHash();
319  LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s\n", hash.GetHex());
321  ss << transaction;
322  ss << islock;
323  return SendMessage(MSG_RAWTXLOCKSIG, &(*ss.begin()), ss.size());
324 }
325 
327 {
328  uint256 nHash = vote.GetHash();
329  LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, vote = %d\n", nHash.ToString(), vote.ToString());
331  ss << vote;
332  return SendMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size());
333 }
334 
336 {
337  uint256 nHash = govobj.GetHash();
338  LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, type = %d\n", nHash.ToString(), govobj.GetObjectType());
340  ss << govobj;
341  return SendMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size());
342 }
343 
345 {
346  LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s\n", currentTx.GetHash().ToString(), previousTx.GetHash().ToString());
348  ssCurrent << currentTx;
349  ssPrevious << previousTx;
350  return SendMessage(MSG_RAWISCON, &(*ssCurrent.begin()), ssCurrent.size())
351  && SendMessage(MSG_RAWISCON, &(*ssPrevious.begin()), ssPrevious.size());
352 }
static const char * MSG_HASHISCON
static const char * MSG_HASHBLOCK
bool NotifyTransaction(const CTransaction &transaction) override
static const char * MSG_HASHGVOTE
uint256 GetHash() const
GetHash()
uint32_t nSequence
upcounting per message sequence number
Governance Object.
static const char * MSG_RAWCHAINLOCK
Definition: block.h:72
uint256 GetHash() const
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
CCriticalSection cs_main
Definition: validation.cpp:213
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:44
bool NotifyChainLock(const CBlockIndex *pindex, const llmq::CChainLockSig &clsig) override
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:103
static const char * MSG_HASHTX
bool NotifyGovernanceVote(const CGovernanceVote &vote) override
unsigned char * begin()
Definition: uint256.h:57
bool NotifyTransaction(const CTransaction &transaction) override
static const char * MSG_RAWTXLOCKSIG
static const char * MSG_HASHCHAINLOCK
bool NotifyBlock(const CBlockIndex *pindex) override
uint256 GetBlockHash() const
Definition: chain.h:292
static const char * MSG_RAWCLSIG
size_type size() const
Definition: streams.h:194
int GetObjectType() const
bool NotifyGovernanceObject(const CGovernanceObject &object) override
static const char * MSG_HASHGOBJ
bool NotifyChainLock(const CBlockIndex *pindex, const llmq::CChainLockSig &clsig) override
#define LOCK(cs)
Definition: sync.h:178
bool SendMessage(const char *command, const void *data, size_t size)
const uint256 & GetHash() const
Definition: transaction.h:256
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyGovernanceVote(const CGovernanceVote &vote) override
static const char * MSG_HASHTXLOCK
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
std::string ToString() const
Definition: uint256.cpp:62
Parameters that influence chain consensus.
Definition: params.h:130
#define LogPrint(category,...)
Definition: util.h:214
256-bit opaque blob.
Definition: uint256.h:123
static const char * MSG_RAWTX
bool NotifyGovernanceObject(const CGovernanceObject &object) override
const_iterator begin() const
Definition: streams.h:190
static const char * MSG_RAWBLOCK
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:170
const CChainParams & Params()
Return the currently selected parameters.
void * memcpy(void *a, const void *b, size_t c)
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:14
bool NotifyInstantSendDoubleSpendAttempt(const CTransaction &currentTx, const CTransaction &previousTx) override
static const char * MSG_RAWTXLOCK
std::string GetHex() const
Definition: uint256.cpp:21
static int count
Definition: tests.c:45
bool NotifyTransactionLock(const CTransaction &transaction, const llmq::CInstantSendLock &islock) override
bool NotifyTransactionLock(const CTransaction &transaction, const llmq::CInstantSendLock &islock) override
static const char * MSG_RAWGVOTE
bool NotifyChainLock(const CBlockIndex *pindex, const llmq::CChainLockSig &clsig) override
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:198
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:54
bool ReadBlockFromDisk(CBlock &block, const CDiskBlockPos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
static const char * MSG_RAWISCON
bool NotifyTransactionLock(const CTransaction &transaction, const llmq::CInstantSendLock &islock) override
void zmqError(const char *str)
bool NotifyInstantSendDoubleSpendAttempt(const CTransaction &currentTx, const CTransaction &previousTx) override
static const char * MSG_RAWGOBJ
std::string ToString() const
bool Initialize(void *pcontext) override
Released under the MIT license