You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

comptool.py 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2015-2016 The Bitcoin Core developers
  3. # Distributed under the MIT software license, see the accompanying
  4. # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5. """Compare two or more bitcoinds to each other.
  6. To use, create a class that implements get_tests(), and pass it in
  7. as the test generator to TestManager. get_tests() should be a python
  8. generator that returns TestInstance objects. See below for definition.
  9. TestNode behaves as follows:
  10. Configure with a BlockStore and TxStore
  11. on_inv: log the message but don't request
  12. on_headers: log the chain tip
  13. on_pong: update ping response map (for synchronization)
  14. on_getheaders: provide headers via BlockStore
  15. on_getdata: provide blocks via BlockStore
  16. """
  17. from .mininode import *
  18. from .blockstore import BlockStore, TxStore
  19. from .util import p2p_port, wait_until
  20. import logging
  21. logger=logging.getLogger("TestFramework.comptool")
  22. global mininode_lock
  23. class RejectResult(object):
  24. """Outcome that expects rejection of a transaction or block."""
  25. def __init__(self, code, reason=b''):
  26. self.code = code
  27. self.reason = reason
  28. def match(self, other):
  29. if self.code != other.code:
  30. return False
  31. return other.reason.startswith(self.reason)
  32. def __repr__(self):
  33. return '%i:%s' % (self.code,self.reason or '*')
  34. class TestNode(NodeConnCB):
  35. def __init__(self, block_store, tx_store):
  36. super().__init__()
  37. self.conn = None
  38. self.bestblockhash = None
  39. self.block_store = block_store
  40. self.block_request_map = {}
  41. self.tx_store = tx_store
  42. self.tx_request_map = {}
  43. self.block_reject_map = {}
  44. self.tx_reject_map = {}
  45. # When the pingmap is non-empty we're waiting for
  46. # a response
  47. self.pingMap = {}
  48. self.lastInv = []
  49. self.closed = False
  50. def on_close(self, conn):
  51. self.closed = True
  52. def add_connection(self, conn):
  53. self.conn = conn
  54. def on_headers(self, conn, message):
  55. if len(message.headers) > 0:
  56. best_header = message.headers[-1]
  57. best_header.calc_sha256()
  58. self.bestblockhash = best_header.sha256
  59. def on_getheaders(self, conn, message):
  60. response = self.block_store.headers_for(message.locator, message.hashstop)
  61. if response is not None:
  62. conn.send_message(response)
  63. def on_getdata(self, conn, message):
  64. [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
  65. [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
  66. for i in message.inv:
  67. if i.type == 1:
  68. self.tx_request_map[i.hash] = True
  69. elif i.type == 2:
  70. self.block_request_map[i.hash] = True
  71. def on_inv(self, conn, message):
  72. self.lastInv = [x.hash for x in message.inv]
  73. def on_pong(self, conn, message):
  74. try:
  75. del self.pingMap[message.nonce]
  76. except KeyError:
  77. raise AssertionError("Got pong for unknown ping [%s]" % repr(message))
  78. def on_reject(self, conn, message):
  79. if message.message == b'tx':
  80. self.tx_reject_map[message.data] = RejectResult(message.code, message.reason)
  81. if message.message == b'block':
  82. self.block_reject_map[message.data] = RejectResult(message.code, message.reason)
  83. def send_inv(self, obj):
  84. mtype = 2 if isinstance(obj, CBlock) else 1
  85. self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))
  86. def send_getheaders(self):
  87. # We ask for headers from their last tip.
  88. m = msg_getheaders()
  89. m.locator = self.block_store.get_locator(self.bestblockhash)
  90. self.conn.send_message(m)
  91. def send_header(self, header):
  92. m = msg_headers()
  93. m.headers.append(header)
  94. self.conn.send_message(m)
  95. # This assumes BIP31
  96. def send_ping(self, nonce):
  97. self.pingMap[nonce] = True
  98. self.conn.send_message(msg_ping(nonce))
  99. def received_ping_response(self, nonce):
  100. return nonce not in self.pingMap
  101. def send_mempool(self):
  102. self.lastInv = []
  103. self.conn.send_message(msg_mempool())
  104. # TestInstance:
  105. #
  106. # Instances of these are generated by the test generator, and fed into the
  107. # comptool.
  108. #
  109. # "blocks_and_transactions" should be an array of
  110. # [obj, True/False/None, hash/None]:
  111. # - obj is either a CBlock, CBlockHeader, or a CTransaction, and
  112. # - the second value indicates whether the object should be accepted
  113. # into the blockchain or mempool (for tests where we expect a certain
  114. # answer), or "None" if we don't expect a certain answer and are just
  115. # comparing the behavior of the nodes being tested.
  116. # - the third value is the hash to test the tip against (if None or omitted,
  117. # use the hash of the block)
  118. # - NOTE: if a block header, no test is performed; instead the header is
  119. # just added to the block_store. This is to facilitate block delivery
  120. # when communicating with headers-first clients (when withholding an
  121. # intermediate block).
  122. # sync_every_block: if True, then each block will be inv'ed, synced, and
  123. # nodes will be tested based on the outcome for the block. If False,
  124. # then inv's accumulate until all blocks are processed (or max inv size
  125. # is reached) and then sent out in one inv message. Then the final block
  126. # will be synced across all connections, and the outcome of the final
  127. # block will be tested.
  128. # sync_every_tx: analogous to behavior for sync_every_block, except if outcome
  129. # on the final tx is None, then contents of entire mempool are compared
  130. # across all connections. (If outcome of final tx is specified as true
  131. # or false, then only the last tx is tested against outcome.)
  132. class TestInstance(object):
  133. def __init__(self, objects=None, sync_every_block=True, sync_every_tx=False):
  134. self.blocks_and_transactions = objects if objects else []
  135. self.sync_every_block = sync_every_block
  136. self.sync_every_tx = sync_every_tx
  137. class TestManager(object):
  138. def __init__(self, testgen, datadir):
  139. self.test_generator = testgen
  140. self.connections = []
  141. self.test_nodes = []
  142. self.block_store = BlockStore(datadir)
  143. self.tx_store = TxStore(datadir)
  144. self.ping_counter = 1
  145. def add_all_connections(self, nodes):
  146. for i in range(len(nodes)):
  147. # Create a p2p connection to each node
  148. test_node = TestNode(self.block_store, self.tx_store)
  149. self.test_nodes.append(test_node)
  150. self.connections.append(NodeConn('127.0.0.1', p2p_port(i), nodes[i], test_node))
  151. # Make sure the TestNode (callback class) has a reference to its
  152. # associated NodeConn
  153. test_node.add_connection(self.connections[-1])
  154. def clear_all_connections(self):
  155. self.connections = []
  156. self.test_nodes = []
  157. def wait_for_disconnections(self):
  158. def disconnected():
  159. return all(node.closed for node in self.test_nodes)
  160. wait_until(disconnected, timeout=10, lock=mininode_lock)
  161. def wait_for_verack(self):
  162. return all(node.wait_for_verack() for node in self.test_nodes)
  163. def wait_for_pings(self, counter):
  164. def received_pongs():
  165. return all(node.received_ping_response(counter) for node in self.test_nodes)
  166. wait_until(received_pongs, lock=mininode_lock)
  167. # sync_blocks: Wait for all connections to request the blockhash given
  168. # then send get_headers to find out the tip of each node, and synchronize
  169. # the response by using a ping (and waiting for pong with same nonce).
  170. def sync_blocks(self, blockhash, num_blocks):
  171. def blocks_requested():
  172. return all(
  173. blockhash in node.block_request_map and node.block_request_map[blockhash]
  174. for node in self.test_nodes
  175. )
  176. # --> error if not requested
  177. wait_until(blocks_requested, attempts=20*num_blocks, lock=mininode_lock)
  178. # Send getheaders message
  179. [ c.cb.send_getheaders() for c in self.connections ]
  180. # Send ping and wait for response -- synchronization hack
  181. [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
  182. self.wait_for_pings(self.ping_counter)
  183. self.ping_counter += 1
  184. # Analogous to sync_block (see above)
  185. def sync_transaction(self, txhash, num_events):
  186. # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
  187. def transaction_requested():
  188. return all(
  189. txhash in node.tx_request_map and node.tx_request_map[txhash]
  190. for node in self.test_nodes
  191. )
  192. # --> error if not requested
  193. wait_until(transaction_requested, attempts=20*num_events, lock=mininode_lock)
  194. # Get the mempool
  195. [ c.cb.send_mempool() for c in self.connections ]
  196. # Send ping and wait for response -- synchronization hack
  197. [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
  198. self.wait_for_pings(self.ping_counter)
  199. self.ping_counter += 1
  200. # Sort inv responses from each node
  201. with mininode_lock:
  202. [ c.cb.lastInv.sort() for c in self.connections ]
  203. # Verify that the tip of each connection all agree with each other, and
  204. # with the expected outcome (if given)
  205. def check_results(self, blockhash, outcome):
  206. with mininode_lock:
  207. for c in self.connections:
  208. if outcome is None:
  209. if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
  210. return False
  211. elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code
  212. if c.cb.bestblockhash == blockhash:
  213. return False
  214. if blockhash not in c.cb.block_reject_map:
  215. logger.error('Block not in reject map: %064x' % (blockhash))
  216. return False
  217. if not outcome.match(c.cb.block_reject_map[blockhash]):
  218. logger.error('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash))
  219. return False
  220. elif ((c.cb.bestblockhash == blockhash) != outcome):
  221. return False
  222. return True
  223. # Either check that the mempools all agree with each other, or that
  224. # txhash's presence in the mempool matches the outcome specified.
  225. # This is somewhat of a strange comparison, in that we're either comparing
  226. # a particular tx to an outcome, or the entire mempools altogether;
  227. # perhaps it would be useful to add the ability to check explicitly that
  228. # a particular tx's existence in the mempool is the same across all nodes.
  229. def check_mempool(self, txhash, outcome):
  230. with mininode_lock:
  231. for c in self.connections:
  232. if outcome is None:
  233. # Make sure the mempools agree with each other
  234. if c.cb.lastInv != self.connections[0].cb.lastInv:
  235. return False
  236. elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code
  237. if txhash in c.cb.lastInv:
  238. return False
  239. if txhash not in c.cb.tx_reject_map:
  240. logger.error('Tx not in reject map: %064x' % (txhash))
  241. return False
  242. if not outcome.match(c.cb.tx_reject_map[txhash]):
  243. logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash))
  244. return False
  245. elif ((txhash in c.cb.lastInv) != outcome):
  246. return False
  247. return True
  248. def run(self):
  249. # Wait until verack is received
  250. self.wait_for_verack()
  251. test_number = 1
  252. for test_instance in self.test_generator.get_tests():
  253. # We use these variables to keep track of the last block
  254. # and last transaction in the tests, which are used
  255. # if we're not syncing on every block or every tx.
  256. [ block, block_outcome, tip ] = [ None, None, None ]
  257. [ tx, tx_outcome ] = [ None, None ]
  258. invqueue = []
  259. for test_obj in test_instance.blocks_and_transactions:
  260. b_or_t = test_obj[0]
  261. outcome = test_obj[1]
  262. # Determine if we're dealing with a block or tx
  263. if isinstance(b_or_t, CBlock): # Block test runner
  264. block = b_or_t
  265. block_outcome = outcome
  266. tip = block.sha256
  267. # each test_obj can have an optional third argument
  268. # to specify the tip we should compare with
  269. # (default is to use the block being tested)
  270. if len(test_obj) >= 3:
  271. tip = test_obj[2]
  272. # Add to shared block_store, set as current block
  273. # If there was an open getdata request for the block
  274. # previously, and we didn't have an entry in the
  275. # block_store, then immediately deliver, because the
  276. # node wouldn't send another getdata request while
  277. # the earlier one is outstanding.
  278. first_block_with_hash = True
  279. if self.block_store.get(block.sha256) is not None:
  280. first_block_with_hash = False
  281. with mininode_lock:
  282. self.block_store.add_block(block)
  283. for c in self.connections:
  284. if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True:
  285. # There was a previous request for this block hash
  286. # Most likely, we delivered a header for this block
  287. # but never had the block to respond to the getdata
  288. c.send_message(msg_block(block))
  289. else:
  290. c.cb.block_request_map[block.sha256] = False
  291. # Either send inv's to each node and sync, or add
  292. # to invqueue for later inv'ing.
  293. if (test_instance.sync_every_block):
  294. # if we expect success, send inv and sync every block
  295. # if we expect failure, just push the block and see what happens.
  296. if outcome == True:
  297. [ c.cb.send_inv(block) for c in self.connections ]
  298. self.sync_blocks(block.sha256, 1)
  299. else:
  300. [ c.send_message(msg_block(block)) for c in self.connections ]
  301. [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
  302. self.wait_for_pings(self.ping_counter)
  303. self.ping_counter += 1
  304. if (not self.check_results(tip, outcome)):
  305. raise AssertionError("Test failed at test %d" % test_number)
  306. else:
  307. invqueue.append(CInv(2, block.sha256))
  308. elif isinstance(b_or_t, CBlockHeader):
  309. block_header = b_or_t
  310. self.block_store.add_header(block_header)
  311. [ c.cb.send_header(block_header) for c in self.connections ]
  312. else: # Tx test runner
  313. assert(isinstance(b_or_t, CTransaction))
  314. tx = b_or_t
  315. tx_outcome = outcome
  316. # Add to shared tx store and clear map entry
  317. with mininode_lock:
  318. self.tx_store.add_transaction(tx)
  319. for c in self.connections:
  320. c.cb.tx_request_map[tx.sha256] = False
  321. # Again, either inv to all nodes or save for later
  322. if (test_instance.sync_every_tx):
  323. [ c.cb.send_inv(tx) for c in self.connections ]
  324. self.sync_transaction(tx.sha256, 1)
  325. if (not self.check_mempool(tx.sha256, outcome)):
  326. raise AssertionError("Test failed at test %d" % test_number)
  327. else:
  328. invqueue.append(CInv(1, tx.sha256))
  329. # Ensure we're not overflowing the inv queue
  330. if len(invqueue) == MAX_INV_SZ:
  331. [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
  332. invqueue = []
  333. # Do final sync if we weren't syncing on every block or every tx.
  334. if (not test_instance.sync_every_block and block is not None):
  335. if len(invqueue) > 0:
  336. [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
  337. invqueue = []
  338. self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions))
  339. if (not self.check_results(tip, block_outcome)):
  340. raise AssertionError("Block test failed at test %d" % test_number)
  341. if (not test_instance.sync_every_tx and tx is not None):
  342. if len(invqueue) > 0:
  343. [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
  344. invqueue = []
  345. self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
  346. if (not self.check_mempool(tx.sha256, tx_outcome)):
  347. raise AssertionError("Mempool test failed at test %d" % test_number)
  348. logger.info("Test %d: PASS" % test_number)
  349. test_number += 1
  350. [ c.disconnect_node() for c in self.connections ]
  351. self.wait_for_disconnections()
  352. self.block_store.close()
  353. self.tx_store.close()