You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

zmqpublishnotifier.cpp 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright (c) 2015 The Bitcoin Core developers
  2. // Distributed under the MIT software license, see the accompanying
  3. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4. #include "chainparams.h"
  5. #include "zmqpublishnotifier.h"
  6. #include "main.h"
  7. #include "util.h"
  8. static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
  9. // Internal function to send multipart message
  10. static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
  11. {
  12. va_list args;
  13. va_start(args, size);
  14. while (1)
  15. {
  16. zmq_msg_t msg;
  17. int rc = zmq_msg_init_size(&msg, size);
  18. if (rc != 0)
  19. {
  20. zmqError("Unable to initialize ZMQ msg");
  21. return -1;
  22. }
  23. void *buf = zmq_msg_data(&msg);
  24. memcpy(buf, data, size);
  25. data = va_arg(args, const void*);
  26. rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
  27. if (rc == -1)
  28. {
  29. zmqError("Unable to send ZMQ msg");
  30. zmq_msg_close(&msg);
  31. return -1;
  32. }
  33. zmq_msg_close(&msg);
  34. if (!data)
  35. break;
  36. size = va_arg(args, size_t);
  37. }
  38. return 0;
  39. }
  40. bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
  41. {
  42. assert(!psocket);
  43. // check if address is being used by other publish notifier
  44. std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
  45. if (i==mapPublishNotifiers.end())
  46. {
  47. psocket = zmq_socket(pcontext, ZMQ_PUB);
  48. if (!psocket)
  49. {
  50. zmqError("Failed to create socket");
  51. return false;
  52. }
  53. int rc = zmq_bind(psocket, address.c_str());
  54. if (rc!=0)
  55. {
  56. zmqError("Failed to bind address");
  57. zmq_close(psocket);
  58. return false;
  59. }
  60. // register this notifier for the address, so it can be reused for other publish notifier
  61. mapPublishNotifiers.insert(std::make_pair(address, this));
  62. return true;
  63. }
  64. else
  65. {
  66. LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
  67. psocket = i->second->psocket;
  68. mapPublishNotifiers.insert(std::make_pair(address, this));
  69. return true;
  70. }
  71. }
  72. void CZMQAbstractPublishNotifier::Shutdown()
  73. {
  74. assert(psocket);
  75. int count = mapPublishNotifiers.count(address);
  76. // remove this notifier from the list of publishers using this address
  77. typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
  78. std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
  79. for (iterator it = iterpair.first; it != iterpair.second; ++it)
  80. {
  81. if (it->second==this)
  82. {
  83. mapPublishNotifiers.erase(it);
  84. break;
  85. }
  86. }
  87. if (count == 1)
  88. {
  89. LogPrint("zmq", "Close socket at address %s\n", address);
  90. int linger = 0;
  91. zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
  92. zmq_close(psocket);
  93. }
  94. psocket = 0;
  95. }
  96. bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
  97. {
  98. uint256 hash = pindex->GetBlockHash();
  99. LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
  100. char data[32];
  101. for (unsigned int i = 0; i < 32; i++)
  102. data[31 - i] = hash.begin()[i];
  103. int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
  104. return rc == 0;
  105. }
  106. bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
  107. {
  108. uint256 hash = transaction.GetHash();
  109. LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
  110. char data[32];
  111. for (unsigned int i = 0; i < 32; i++)
  112. data[31 - i] = hash.begin()[i];
  113. int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
  114. return rc == 0;
  115. }
  116. bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
  117. {
  118. LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
  119. const Consensus::Params& consensusParams = Params().GetConsensus();
  120. CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
  121. {
  122. LOCK(cs_main);
  123. CBlock block;
  124. if(!ReadBlockFromDisk(block, pindex, consensusParams))
  125. {
  126. zmqError("Can't read block from disk");
  127. return false;
  128. }
  129. ss << block;
  130. }
  131. int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
  132. return rc == 0;
  133. }
  134. bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
  135. {
  136. uint256 hash = transaction.GetHash();
  137. LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
  138. CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
  139. ss << transaction;
  140. int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
  141. return rc == 0;
  142. }