|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# Copyright (c) 2017-2019 The Bitcoin Core developers |
| 3 | +# Distributed under the MIT software license, see the accompanying |
| 4 | +# file COPYING or http://www.opensource.org/licenses/mit-license.php. |
| 5 | +""" |
| 6 | +A test for RPC users with restricted permissions |
| 7 | +""" |
| 8 | +from test_framework.test_framework import BitcoinTestFramework |
| 9 | +import os |
| 10 | +from test_framework.util import ( |
| 11 | + get_datadir_path, |
| 12 | + assert_equal, |
| 13 | + str_to_b64str |
| 14 | +) |
| 15 | +import http.client |
| 16 | +import urllib.parse |
| 17 | + |
| 18 | +def rpccall(node, user, method): |
| 19 | + url = urllib.parse.urlparse(node.url) |
| 20 | + headers = {"Authorization": "Basic " + str_to_b64str('{}:{}'.format(user[0], user[3]))} |
| 21 | + conn = http.client.HTTPConnection(url.hostname, url.port) |
| 22 | + conn.connect() |
| 23 | + conn.request('POST', '/', '{"method": "' + method + '"}', headers) |
| 24 | + resp = conn.getresponse() |
| 25 | + conn.close() |
| 26 | + return resp |
| 27 | + |
| 28 | + |
| 29 | +class RPCWhitelistTest(BitcoinTestFramework): |
| 30 | + def set_test_params(self): |
| 31 | + self.num_nodes = 1 |
| 32 | + |
| 33 | + def setup_chain(self): |
| 34 | + super().setup_chain() |
| 35 | + # 0 => Username |
| 36 | + # 1 => Password (Hashed) |
| 37 | + # 2 => Permissions |
| 38 | + # 3 => Password Plaintext |
| 39 | + self.users = [ |
| 40 | + ["user1", "50358aa884c841648e0700b073c32b2e$b73e95fff0748cc0b517859d2ca47d9bac1aa78231f3e48fa9222b612bd2083e", "getbestblockhash,getblockcount,", "12345"], |
| 41 | + ["user2", "8650ba41296f62092377a38547f361de$4620db7ba063ef4e2f7249853e9f3c5c3592a9619a759e3e6f1c63f2e22f1d21", "getblockcount", "54321"] |
| 42 | + ] |
| 43 | + # For exceptions |
| 44 | + self.strange_users = [ |
| 45 | + # Test empty |
| 46 | + ["strangedude", "62d67dffec03836edd698314f1b2be62$c2fb4be29bb0e3646298661123cf2d8629640979cabc268ef05ea613ab54068d", ":", "s7R4nG3R7H1nGZ"], |
| 47 | + ["strangedude2", "575c012c7fe4b1e83b9d809412da3ef7$09f448d0acfc19924dd62ecb96004d3c2d4b91f471030dfe43c6ea64a8f658c1", "", "s7R4nG3R7H1nGZ"], |
| 48 | + # Test trailing comma |
| 49 | + ["strangedude3", "23189c561b5975a56f4cf94030495d61$3a2f6aac26351e2257428550a553c4c1979594e36675bbd3db692442387728c0", ":getblockcount,", "s7R4nG3R7H1nGZ"], |
| 50 | + # Test overwrite |
| 51 | + ["strangedude4", "990c895760a70df83949e8278665e19a$8f0906f20431ff24cb9e7f5b5041e4943bdf2a5c02a19ef4960dcf45e72cde1c", ":getblockcount, getbestblockhash", "s7R4nG3R7H1nGZ"], |
| 52 | + ["strangedude4", "990c895760a70df83949e8278665e19a$8f0906f20431ff24cb9e7f5b5041e4943bdf2a5c02a19ef4960dcf45e72cde1c", ":getblockcount", "s7R4nG3R7H1nGZ"], |
| 53 | + # Testing the same permission twice |
| 54 | + ["strangedude5", "d12c6e962d47a454f962eb41225e6ec8$2dd39635b155536d3c1a2e95d05feff87d5ba55f2d5ff975e6e997a836b717c9", ":getblockcount,getblockcount", "s7R4nG3R7H1nGZ"] |
| 55 | + ] |
| 56 | + # These commands shouldn't be allowed for any user to test failures |
| 57 | + self.never_allowed = ["getnetworkinfo"] |
| 58 | + with open(os.path.join(get_datadir_path(self.options.tmpdir, 0), "bitcoin.conf"), 'a', encoding='utf8') as f: |
| 59 | + f.write("\nrpcwhitelistdefault=0\n") |
| 60 | + for user in self.users: |
| 61 | + f.write("rpcauth=" + user[0] + ":" + user[1] + "\n") |
| 62 | + f.write("rpcwhitelist=" + user[0] + ":" + user[2] + "\n") |
| 63 | + # Special cases |
| 64 | + for strangedude in self.strange_users: |
| 65 | + f.write("rpcauth=" + strangedude[0] + ":" + strangedude[1] + "\n") |
| 66 | + f.write("rpcwhitelist=" + strangedude[0] + strangedude[2] + "\n") |
| 67 | + |
| 68 | + |
| 69 | + def run_test(self): |
| 70 | + for user in self.users: |
| 71 | + permissions = user[2].replace(" ", "").split(",") |
| 72 | + # Pop all empty items |
| 73 | + i = 0 |
| 74 | + while i < len(permissions): |
| 75 | + if permissions[i] == '': |
| 76 | + permissions.pop(i) |
| 77 | + |
| 78 | + i += 1 |
| 79 | + for permission in permissions: |
| 80 | + self.log.info("[" + user[0] + "]: Testing a permitted permission (" + permission + ")") |
| 81 | + assert_equal(200, rpccall(self.nodes[0], user, permission).status) |
| 82 | + for permission in self.never_allowed: |
| 83 | + self.log.info("[" + user[0] + "]: Testing a non permitted permission (" + permission + ")") |
| 84 | + assert_equal(403, rpccall(self.nodes[0], user, permission).status) |
| 85 | + # Now test the strange users |
| 86 | + for permission in self.never_allowed: |
| 87 | + self.log.info("Strange test 1") |
| 88 | + assert_equal(403, rpccall(self.nodes[0], self.strange_users[0], permission).status) |
| 89 | + for permission in self.never_allowed: |
| 90 | + self.log.info("Strange test 2") |
| 91 | + assert_equal(403, rpccall(self.nodes[0], self.strange_users[1], permission).status) |
| 92 | + self.log.info("Strange test 3") |
| 93 | + assert_equal(200, rpccall(self.nodes[0], self.strange_users[2], "getblockcount").status) |
| 94 | + self.log.info("Strange test 4") |
| 95 | + assert_equal(403, rpccall(self.nodes[0], self.strange_users[3], "getbestblockhash").status) |
| 96 | + self.log.info("Strange test 5") |
| 97 | + assert_equal(200, rpccall(self.nodes[0], self.strange_users[4], "getblockcount").status) |
| 98 | + |
| 99 | +if __name__ == "__main__": |
| 100 | + RPCWhitelistTest().main() |
0 commit comments