Browse Source

Add comparison tool test runner, built on mininode creates a tool for running a test suite on top of the mininode p2p
framework.  It supports two types of tests: those for which we expect certain
behavior (acceptance or rejection of a block or transaction) and those for
which we are just comparing that the behavior of 2 or more nodes is the same. defines BlockStore and TxStore, which provide db-backed maps
between block/tx hashes and the corresponding block or tx. defines utility functions for creating and manipulating blocks
and transactions. is an example test in the comptool framework, which
tests the behavior of a single node when sent two different types of invalid
blocks (a block with a duplicated transaction and a block with a bad coinbase
Suhas Daftuar 6 years ago

+ 1
- 0
qa/pull-tester/ View File

@@ -31,6 +31,7 @@ testScripts=(
# ''
if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then
for (( i = 0; i < ${#testScripts[@]}; i++ ))

+ 127
- 0
qa/rpc-tests/ View File

@@ -0,0 +1,127 @@
# BlockStore: a helper class that keeps a map of blocks and implements
# helper functions for responding to getheaders and getdata,
# and for constructing a getheaders message

from mininode import *
import dbm

class BlockStore(object):
def __init__(self, datadir):
self.blockDB = + "/blocks", 'c')
self.currentBlock = 0L
def close(self):

def get(self, blockhash):
serialized_block = None
serialized_block = self.blockDB[repr(blockhash)]
except KeyError:
return None
f = cStringIO.StringIO(serialized_block)
ret = CBlock()
return ret

# Note: this pulls full blocks out of the database just to retrieve
# the headers -- perhaps we could keep a separate data structure
# to avoid this overhead.
def headers_for(self, locator, hash_stop, current_tip=None):
if current_tip is None:
current_tip = self.currentBlock
current_block = self.get(current_tip)
if current_block is None:
return None

response = msg_headers()
headersList = [ CBlockHeader(current_block) ]
maxheaders = 2000
while (headersList[0].sha256 not in locator.vHave):
prevBlockHash = headersList[0].hashPrevBlock
prevBlock = self.get(prevBlockHash)
if prevBlock is not None:
headersList.insert(0, CBlockHeader(prevBlock))
headersList = headersList[:maxheaders] # truncate if we have too many
hashList = [x.sha256 for x in headersList]
index = len(headersList)
if (hash_stop in hashList):
index = hashList.index(hash_stop)+1
response.headers = headersList[:index]
return response

def add_block(self, block):
self.blockDB[repr(block.sha256)] = bytes(block.serialize())
except TypeError as e:
print "Unexpected error: ", sys.exc_info()[0], e.args
self.currentBlock = block.sha256

def get_blocks(self, inv):
responses = []
for i in inv:
if (i.type == 2): # MSG_BLOCK
block = self.get(i.hash)
if block is not None:
return responses

def get_locator(self, current_tip=None):
if current_tip is None:
current_tip = self.currentBlock
r = []
counter = 0
step = 1
lastBlock = self.get(current_tip)
while lastBlock is not None:
for i in range(step):
lastBlock = self.get(lastBlock.hashPrevBlock)
if lastBlock is None:
counter += 1
if counter > 10:
step *= 2
locator = CBlockLocator()
locator.vHave = r
return locator

class TxStore(object):
def __init__(self, datadir):
self.txDB = + "/transactions", 'c')

def close(self):

def get(self, txhash):
serialized_tx = None
serialized_tx = self.txDB[repr(txhash)]
except KeyError:
return None
f = cStringIO.StringIO(serialized_tx)
ret = CTransaction()
return ret

def add_transaction(self, tx):
self.txDB[repr(tx.sha256)] = bytes(tx.serialize())
except TypeError as e:
print "Unexpected error: ", sys.exc_info()[0], e.args

def get_transactions(self, inv):
responses = []
for i in inv:
if (i.type == 1): # MSG_TX
tx = self.get(i.hash)
if tx is not None:
return responses

+ 65
- 0
qa/rpc-tests/ View File

@@ -0,0 +1,65 @@
# - utilities for manipulating blocks and transactions
# Distributed under the MIT/X11 software license, see the accompanying
# file COPYING or

from mininode import *
from script import CScript, CScriptOp

# Create a block (with regtest difficulty)
def create_block(hashprev, coinbase, nTime=None):
block = CBlock()
if nTime is None:
import time
block.nTime = int(time.time()+600)
block.nTime = nTime
block.hashPrevBlock = hashprev
block.nBits = 0x207fffff # Will break after a difficulty adjustment...
block.hashMerkleRoot = block.calc_merkle_root()
return block

def serialize_script_num(value):
r = bytearray(0)
if value == 0:
return r
neg = value < 0
absvalue = -value if neg else value
while (absvalue):
r.append(chr(absvalue & 0xff))
absvalue >>= 8
if r[-1] & 0x80:
r.append(0x80 if neg else 0)
elif neg:
r[-1] |= 0x80
return r

# Create an anyone-can-spend coinbase transaction, assuming no miner fees
def create_coinbase(heightAdjust = 0):
global counter
coinbase = CTransaction(), 0xffffffff),
ser_string(serialize_script_num(counter+heightAdjust)), 0xffffffff))
counter += 1
coinbaseoutput = CTxOut()
coinbaseoutput.nValue = 50*100000000
halvings = int((counter+heightAdjust)/150) # regtest
coinbaseoutput.nValue >>= halvings
coinbaseoutput.scriptPubKey = ""
coinbase.vout = [ coinbaseoutput ]
return coinbase

# Create a transaction with an anyone-can-spend output, that spends the
# nth output of prevtx.
def create_transaction(prevtx, n, sig, value):
tx = CTransaction()
assert(n < len(prevtx.vout)), n), sig, 0xffffffff))
tx.vout.append(CTxOut(value, ""))
return tx

+ 330
- 0
qa/rpc-tests/ View File

@@ -0,0 +1,330 @@
#!/usr/bin/env python2
# Distributed under the MIT/X11 software license, see the accompanying
# file COPYING or

from mininode import *
from blockstore import BlockStore, TxStore
from util import p2p_port

This is a tool for comparing two or more bitcoinds to each other
using a script provided.

To use, create a class that implements get_tests(), and pass it in
as the test generator to TestManager. get_tests() should be a python
generator that returns TestInstance objects. See below for definition.

# TestNode behaves as follows:
# Configure with a BlockStore and TxStore
# on_inv: log the message but don't request
# on_headers: log the chain tip
# on_pong: update ping response map (for synchronization)
# on_getheaders: provide headers via BlockStore
# on_getdata: provide blocks via BlockStore

class TestNode(NodeConnCB):

def __init__(self, block_store, tx_store):
self.conn = None
self.bestblockhash = None
self.block_store = block_store
self.block_request_map = {}
self.tx_store = tx_store
self.tx_request_map = {}

# When the pingmap is non-empty we're waiting for
# a response
self.pingMap = {}
self.lastInv = []

def add_connection(self, conn):
self.conn = conn

def on_headers(self, conn, message):
if len(message.headers) > 0:
best_header = message.headers[-1]
self.bestblockhash = best_header.sha256

def on_getheaders(self, conn, message):
response = self.block_store.headers_for(message.locator, message.hashstop)
if response is not None:

def on_getdata(self, conn, message):
[conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
[conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]

for i in message.inv:
if i.type == 1:
self.tx_request_map[i.hash] = True
elif i.type == 2:
self.block_request_map[i.hash] = True

def on_inv(self, conn, message):
self.lastInv = [x.hash for x in message.inv]

def on_pong(self, conn, message):
del self.pingMap[message.nonce]
except KeyError:
raise AssertionError("Got pong for unknown ping [%s]" % repr(message))

def send_inv(self, obj):
mtype = 2 if isinstance(obj, CBlock) else 1
self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))

def send_getheaders(self):
# We ask for headers from their last tip.
m = msg_getheaders()
m.locator = self.block_store.get_locator(self.bestblockhash)

# This assumes BIP31
def send_ping(self, nonce):
self.pingMap[nonce] = True

def received_ping_response(self, nonce):
return nonce not in self.pingMap

def send_mempool(self):
self.lastInv = []

# TestInstance:
# Instances of these are generated by the test generator, and fed into the
# comptool.
# "blocks_and_transactions" should be an array of [obj, True/False/None]:
# - obj is either a CBlock or a CTransaction, and
# - the second value indicates whether the object should be accepted
# into the blockchain or mempool (for tests where we expect a certain
# answer), or "None" if we don't expect a certain answer and are just
# comparing the behavior of the nodes being tested.
# sync_every_block: if True, then each block will be inv'ed, synced, and
# nodes will be tested based on the outcome for the block. If False,
# then inv's accumulate until all blocks are processed (or max inv size
# is reached) and then sent out in one inv message. Then the final block
# will be synced across all connections, and the outcome of the final
# block will be tested.
# sync_every_tx: analagous to behavior for sync_every_block, except if outcome
# on the final tx is None, then contents of entire mempool are compared
# across all connections. (If outcome of final tx is specified as true
# or false, then only the last tx is tested against outcome.)

class TestInstance(object):
def __init__(self, objects=[], sync_every_block=True, sync_every_tx=False):
self.blocks_and_transactions = objects
self.sync_every_block = sync_every_block
self.sync_every_tx = sync_every_tx

class TestManager(object):

def __init__(self, testgen, datadir):
self.test_generator = testgen
self.connections = []
self.block_store = BlockStore(datadir)
self.tx_store = TxStore(datadir)
self.ping_counter = 1

def add_all_connections(self, nodes):
for i in range(len(nodes)):
# Create a p2p connection to each node
self.connections.append(NodeConn('', p2p_port(i),
nodes[i], TestNode(self.block_store, self.tx_store)))
# Make sure the TestNode (callback class) has a reference to its
# associated NodeConn

def wait_for_verack(self):
sleep_time = 0.05
max_tries = 10 / sleep_time # Wait at most 10 seconds
while max_tries > 0:
done = True
for c in self.connections:
if c.cb.verack_received is False:
done = False
if done:

def wait_for_pings(self, counter):
received_pongs = False
while received_pongs is not True:
received_pongs = True
for c in self.connections:
if c.cb.received_ping_response(counter) is not True:
received_pongs = False

# sync_blocks: Wait for all connections to request the blockhash given
# then send get_headers to find out the tip of each node, and synchronize
# the response by using a ping (and waiting for pong with same nonce).
def sync_blocks(self, blockhash, num_blocks):
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
max_tries = 20*num_blocks
while max_tries > 0:
results = [ blockhash in c.cb.block_request_map and
c.cb.block_request_map[blockhash] for c in self.connections ]
if False not in results:
max_tries -= 1

# --> error if not requested
if max_tries == 0:
# print [ c.cb.block_request_map for c in self.connections ]
raise AssertionError("Not all nodes requested block")
# --> Answer request (we did this inline!)

# Send getheaders message
[ c.cb.send_getheaders() for c in self.connections ]

# Send ping and wait for response -- synchronization hack
[ c.cb.send_ping(self.ping_counter) for c in self.connections ]
self.ping_counter += 1

# Analogous to sync_block (see above)
def sync_transaction(self, txhash, num_events):
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
max_tries = 20*num_events
while max_tries > 0:
results = [ txhash in c.cb.tx_request_map and
c.cb.tx_request_map[txhash] for c in self.connections ]
if False not in results:
max_tries -= 1

# --> error if not requested
if max_tries == 0:
# print [ c.cb.tx_request_map for c in self.connections ]
raise AssertionError("Not all nodes requested transaction")
# --> Answer request (we did this inline!)

# Get the mempool
[ c.cb.send_mempool() for c in self.connections ]

# Send ping and wait for response -- synchronization hack
[ c.cb.send_ping(self.ping_counter) for c in self.connections ]
self.ping_counter += 1

# Sort inv responses from each node
[ c.cb.lastInv.sort() for c in self.connections ]

# Verify that the tip of each connection all agree with each other, and
# with the expected outcome (if given)
def check_results(self, blockhash, outcome):
for c in self.connections:
if outcome is None:
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
return False
elif ((c.cb.bestblockhash == blockhash) != outcome):
# print c.cb.bestblockhash, blockhash, outcome
return False
return True

# Either check that the mempools all agree with each other, or that
# txhash's presence in the mempool matches the outcome specified.
# This is somewhat of a strange comparison, in that we're either comparing
# a particular tx to an outcome, or the entire mempools altogether;
# perhaps it would be useful to add the ability to check explicitly that
# a particular tx's existence in the mempool is the same across all nodes.
def check_mempool(self, txhash, outcome):
for c in self.connections:
if outcome is None:
# Make sure the mempools agree with each other
if c.cb.lastInv != self.connections[0].cb.lastInv:
# print c.rpc.getrawmempool()
return False
elif ((txhash in c.cb.lastInv) != outcome):
# print c.rpc.getrawmempool(), c.cb.lastInv
return False
return True

def run(self):
# Wait until verack is received

test_number = 1
for test_instance in self.test_generator.get_tests():
# We use these variables to keep track of the last block
# and last transaction in the tests, which are used
# if we're not syncing on every block or every tx.
[ block, block_outcome ] = [ None, None ]
[ tx, tx_outcome ] = [ None, None ]
invqueue = []

for b_or_t, outcome in test_instance.blocks_and_transactions:
# Determine if we're dealing with a block or tx
if isinstance(b_or_t, CBlock): # Block test runner
block = b_or_t
block_outcome = outcome
# Add to shared block_store, set as current block
for c in self.connections:
c.cb.block_request_map[block.sha256] = False
# Either send inv's to each node and sync, or add
# to invqueue for later inv'ing.
if (test_instance.sync_every_block):
[ c.cb.send_inv(block) for c in self.connections ]
self.sync_blocks(block.sha256, 1)
if (not self.check_results(block.sha256, outcome)):
raise AssertionError("Test failed at test %d" % test_number)
invqueue.append(CInv(2, block.sha256))
else: # Tx test runner
assert(isinstance(b_or_t, CTransaction))
tx = b_or_t
tx_outcome = outcome
# Add to shared tx store
for c in self.connections:
c.cb.tx_request_map[tx.sha256] = False
# Again, either inv to all nodes or save for later
if (test_instance.sync_every_tx):
[ c.cb.send_inv(tx) for c in self.connections ]
self.sync_transaction(tx.sha256, 1)
if (not self.check_mempool(tx.sha256, outcome)):
raise AssertionError("Test failed at test %d" % test_number)
invqueue.append(CInv(1, tx.sha256))
# Ensure we're not overflowing the inv queue
if len(invqueue) == MAX_INV_SZ:
[ for c in self.connections ]
invqueue = []

# Do final sync if we weren't syncing on every block or every tx.
if (not test_instance.sync_every_block and block is not None):
if len(invqueue) > 0:
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
invqueue = []
if (not self.check_results(block.sha256, block_outcome)):
raise AssertionError("Block test failed at test %d" % test_number)
if (not test_instance.sync_every_tx and tx is not None):
if len(invqueue) > 0:
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
invqueue = []
self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
if (not self.check_mempool(tx.sha256, tx_outcome)):
raise AssertionError("Mempool test failed at test %d" % test_number)

print "Test %d: PASS" % test_number, [ c.rpc.getblockcount() for c in self.connections ]
test_number += 1

[ c.disconnect_node() for c in self.connections ]

+ 115
- 0
qa/rpc-tests/ View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python2
# Distributed under the MIT/X11 software license, see the accompanying
# file COPYING or

from test_framework import ComparisonTestFramework
from util import *
from comptool import TestManager, TestInstance
from mininode import *
from blocktools import *
import logging
import copy
import time

In this test we connect to one node over p2p, and test block requests:
1) Valid blocks should be requested and become chain tip.
2) Invalid block with duplicated transaction should be re-requested.
3) Invalid block with bad coinbase value should be rejected and not

# Use the ComparisonTestFramework with 1 node: only use --testbinary.
class InvalidBlockRequestTest(ComparisonTestFramework):

''' Can either run this test as 1 node with expected answers, or two and compare them.
Change the "outcome" variable from each TestInstance object to only do the comparison. '''
def __init__(self):
self.num_nodes = 1

def run_test(self):
test = TestManager(self, self.options.tmpdir)
self.tip = None
self.block_time = None
NetworkThread().start() # Start up network handling in another thread

def get_tests(self):
if self.tip is None:
self.tip = int ("0x" + self.nodes[0].getbestblockhash() + "L", 0)
self.block_time = int(time.time())+1

Create a new block with an anyone-can-spend coinbase
block = create_block(self.tip, create_coinbase(), self.block_time)
self.block_time += 1
# Save the coinbase for later
self.block1 = block
self.tip = block.sha256
yield TestInstance([[block, True]])

Now we need that block to mature so we can spend the coinbase.
test = TestInstance(sync_every_block=False)
for i in xrange(100):
block = create_block(self.tip, create_coinbase(), self.block_time)
self.tip = block.sha256
self.block_time += 1
test.blocks_and_transactions.append([block, True])
yield test

Now we use merkle-root malleability to generate an invalid block with
same blockheader.
Manufacture a block with 3 transactions (coinbase, spend of prior
coinbase, spend of that spend). Duplicate the 3rd transaction to
leave merkle root and blockheader unchanged but invalidate the block.
block2 = create_block(self.tip, create_coinbase(), self.block_time)
self.block_time += 1

# chr(81) is OP_TRUE
tx1 = create_transaction(self.block1.vtx[0], 0, chr(81), 50*100000000)
tx2 = create_transaction(tx1, 0, chr(81), 50*100000000)

block2.vtx.extend([tx1, tx2])
block2.hashMerkleRoot = block2.calc_merkle_root()
orig_hash = block2.sha256
block2_orig = copy.deepcopy(block2)

# Mutate block 2
assert_equal(block2.hashMerkleRoot, block2.calc_merkle_root())
assert_equal(orig_hash, block2.rehash())
assert(block2_orig.vtx != block2.vtx)

self.tip = block2.sha256
yield TestInstance([[block2, False], [block2_orig, True]])

Make sure that a totally screwed up block is not valid.
block3 = create_block(self.tip, create_coinbase(), self.block_time)
self.block_time += 1
block3.vtx[0].vout[0].nValue = 100*100000000 # Too high!
block3.hashMerkleRoot = block3.calc_merkle_root()

yield TestInstance([[block3, False]])

if __name__ == '__main__':

+ 29
- 0
qa/rpc-tests/ View File

@@ -140,3 +140,32 @@ class BitcoinTestFramework(object):

# Test framework for doing p2p comparison testing, which sets up some bitcoind
# binaries:
# 1 binary: test binary
# 2 binaries: 1 test binary, 1 ref binary
# n>2 binaries: 1 test binary, n-1 ref binaries

class ComparisonTestFramework(BitcoinTestFramework):

# Can override the num_nodes variable to indicate how many nodes to run.
def __init__(self):
self.num_nodes = 2

def add_options(self, parser):
parser.add_option("--testbinary", dest="testbinary", default="bitcoind",
help="bitcoind binary to test")
parser.add_option("--refbinary", dest="refbinary", default="bitcoind",
help="bitcoind binary to use for reference nodes (if any)")

def setup_chain(self):
print "Initializing test directory "+self.options.tmpdir
initialize_chain_clean(self.options.tmpdir, self.num_nodes)

def setup_network(self):
self.nodes = start_nodes(self.num_nodes, self.options.tmpdir,
extra_args=[['-debug', '-whitelist=']] * self.num_nodes,
binary=[self.options.testbinary] +