progress in removing core
parent
3d50dbcaac
commit
19fa128710
|
@ -17,8 +17,9 @@
|
|||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
from coredb import dbfiles
|
||||
import sqlite3, os
|
||||
from coredb import dbfiles
|
||||
import filepaths
|
||||
|
||||
def createAddressDB():
|
||||
'''
|
||||
|
@ -116,6 +117,9 @@ def createBlockDB():
|
|||
def createBlockDataDB():
|
||||
if os.path.exists(dbfiles.block_data_db):
|
||||
raise FileExistsError("Block data database already exists")
|
||||
else:
|
||||
if not os.path.exists(filepaths.block_data_location):
|
||||
os.mkdir(filepaths.block_data_location)
|
||||
conn = sqlite3.connect(dbfiles.block_data_db)
|
||||
c = conn.cursor()
|
||||
c.execute('''CREATE TABLE blockData(
|
||||
|
|
|
@ -4,6 +4,7 @@ if not home.endswith('/'): home += '/'
|
|||
|
||||
usage_file = home + 'disk-usage.txt'
|
||||
block_data_location = home + 'blocks/'
|
||||
contacts_location = home + 'contacts/'
|
||||
public_API_host_file = home + 'public-host.txt'
|
||||
private_API_host_file = home + 'private-host.txt'
|
||||
bootstrap_file_location = 'static-data/bootstrap-nodes.txt'
|
||||
|
|
|
@ -32,7 +32,7 @@ if sys.version_info[0] == 2 or sys.version_info[1] < MIN_PY_VERSION:
|
|||
|
||||
from utils import detectoptimization
|
||||
if detectoptimization.detect_optimization():
|
||||
sys.stderr.write('Error, Onionr cannot be run in optimized mode\n')
|
||||
sys.stderr.write('Error, Onionr cannot be run with an optimized Python interpreter\n')
|
||||
sys.exit(1)
|
||||
from utils import createdirs
|
||||
createdirs.create_dirs()
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
|
||||
from . import generate, hashers, getourkeypair, signing, encryption
|
||||
from . import generate, hashers, getourkeypair, signing, encryption, cryptoutils
|
||||
generate_deterministic = generate.generate_deterministic
|
||||
generate = generate.generate_pub_key
|
||||
|
||||
keypair = getourkeypair.get_keypair()
|
||||
|
|
|
@ -1,6 +1,26 @@
|
|||
import nacl.signing, nacl.encoding
|
||||
import nacl.signing, nacl.encoding, nacl.pwhash
|
||||
import onionrexceptions
|
||||
from onionrutils import bytesconverter
|
||||
def generate_pub_key():
|
||||
'''Generate a Ed25519 public key pair, return tuple of base32encoded pubkey, privkey'''
|
||||
private_key = nacl.signing.SigningKey.generate()
|
||||
public_key = private_key.verify_key.encode(encoder=nacl.encoding.Base32Encoder())
|
||||
return (public_key.decode(), private_key.encode(encoder=nacl.encoding.Base32Encoder()).decode())
|
||||
return (public_key.decode(), private_key.encode(encoder=nacl.encoding.Base32Encoder()).decode())
|
||||
|
||||
def generate_deterministic(passphrase, bypassCheck=False):
|
||||
'''Generate a Ed25519 public key pair from a password'''
|
||||
passStrength = 25
|
||||
passphrase = bytesconverter.str_to_bytes(passphrase) # Convert to bytes if not already
|
||||
# Validate passphrase length
|
||||
if not bypassCheck:
|
||||
if len(passphrase) < passStrength:
|
||||
raise onionrexceptions.PasswordStrengthError("Passphase must be at least %s characters" % (passStrength,))
|
||||
# KDF values
|
||||
kdf = nacl.pwhash.argon2id.kdf
|
||||
salt = b"U81Q7llrQcdTP0Ux" # Does not need to be unique or secret, but must be 16 bytes
|
||||
ops = nacl.pwhash.argon2id.OPSLIMIT_SENSITIVE
|
||||
mem = nacl.pwhash.argon2id.MEMLIMIT_SENSITIVE
|
||||
|
||||
key = kdf(32, passphrase, salt, opslimit=ops, memlimit=mem) # Generate seed for ed25519 key
|
||||
key = nacl.signing.SigningKey(key)
|
||||
return (key.verify_key.encode(nacl.encoding.Base32Encoder).decode(), key.encode(nacl.encoding.Base32Encoder).decode())
|
|
@ -34,264 +34,6 @@ PLUGIN_VERSION = '0.0.1'
|
|||
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
|
||||
import sentboxdb, mailapi, loadinbox # import after path insert
|
||||
flask_blueprint = mailapi.flask_blueprint
|
||||
"""
|
||||
def draw_border(text):
|
||||
# This function taken from https://stackoverflow.com/a/20757491 by https://stackoverflow.com/users/816449/bunyk, under https://creativecommons.org/licenses/by-sa/3.0/
|
||||
lines = text.splitlines()
|
||||
width = max(len(s) for s in lines)
|
||||
res = ['┌' + '─' * width + '┐']
|
||||
for s in lines:
|
||||
res.append('│' + (s + ' ' * width)[:width] + '│')
|
||||
res.append('└' + '─' * width + '┘')
|
||||
return '\n'.join(res)
|
||||
|
||||
class MailStrings:
|
||||
def __init__(self, mailInstance):
|
||||
self.mailInstance = mailInstance
|
||||
|
||||
self.programTag = 'OnionrMail v%s' % (PLUGIN_VERSION)
|
||||
choices = ['view inbox', 'view sentbox', 'send message', 'toggle pseudonymity', 'quit']
|
||||
self.mainMenuChoices = choices
|
||||
self.mainMenu = '''-----------------
|
||||
1. %s
|
||||
2. %s
|
||||
3. %s
|
||||
4. %s
|
||||
5. %s''' % (choices[0], choices[1], choices[2], choices[3], choices[4])
|
||||
|
||||
class OnionrMail:
|
||||
def __init__(self, pluginapi):
|
||||
self.strings = MailStrings(self)
|
||||
|
||||
self.sentboxTools = sentboxdb.SentBox()
|
||||
self.sentboxList = []
|
||||
self.sentMessages = {}
|
||||
self.doSigs = True
|
||||
return
|
||||
|
||||
def inbox(self):
|
||||
blockCount = 0
|
||||
pmBlockMap = {}
|
||||
pmBlocks = {}
|
||||
logger.info('Decrypting messages...', terminal=True)
|
||||
choice = ''
|
||||
displayList = []
|
||||
subject = ''
|
||||
|
||||
# this could use a lot of memory if someone has received a lot of messages
|
||||
for blockHash in blockmetadb.get_blocks_by_type('pm'):
|
||||
pmBlocks[blockHash] = Block(blockHash, core=self.myCore)
|
||||
pmBlocks[blockHash].decrypt()
|
||||
blockCount = 0
|
||||
for blockHash in pmBlocks:
|
||||
if not pmBlocks[blockHash].decrypted:
|
||||
continue
|
||||
blockCount += 1
|
||||
pmBlockMap[blockCount] = blockHash
|
||||
|
||||
block = pmBlocks[blockHash]
|
||||
senderKey = block.signer
|
||||
try:
|
||||
senderKey = senderKey.decode()
|
||||
except AttributeError:
|
||||
pass
|
||||
senderDisplay = onionrusers.OnionrUser(self.myCore, senderKey).getName()
|
||||
if senderDisplay == 'anonymous':
|
||||
senderDisplay = senderKey
|
||||
|
||||
blockDate = pmBlocks[blockHash].getDate().strftime("%m/%d %H:%M")
|
||||
try:
|
||||
subject = pmBlocks[blockHash].bmetadata['subject']
|
||||
except KeyError:
|
||||
subject = ''
|
||||
|
||||
displayList.append('%s. %s - %s - <%s>: %s' % (blockCount, blockDate, senderDisplay[:12], subject[:10], blockHash))
|
||||
while choice not in ('-q', 'q', 'quit'):
|
||||
for i in displayList:
|
||||
logger.info(i, terminal=True)
|
||||
try:
|
||||
choice = logger.readline('Enter a block number, -r to refresh, or -q to stop: ').strip().lower()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
choice = '-q'
|
||||
|
||||
if choice in ('-q', 'q', 'quit'):
|
||||
continue
|
||||
|
||||
if choice in ('-r', 'r', 'refresh'):
|
||||
# dirty hack
|
||||
self.inbox()
|
||||
return
|
||||
|
||||
try:
|
||||
choice = int(choice)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
pmBlockMap[choice]
|
||||
readBlock = pmBlocks[pmBlockMap[choice]]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
cancel = ''
|
||||
readBlock.verifySig()
|
||||
senderDisplay = bytesconverter.bytes_to_str(readBlock.signer)
|
||||
if len(senderDisplay.strip()) == 0:
|
||||
senderDisplay = 'Anonymous'
|
||||
logger.info('Message received from %s' % (senderDisplay,), terminal=True)
|
||||
logger.info('Valid signature: %s' % readBlock.validSig, terminal=True)
|
||||
|
||||
if not readBlock.validSig:
|
||||
logger.warn('This message has an INVALID/NO signature. ANYONE could have sent this message.', terminal=True)
|
||||
cancel = logger.readline('Press enter to continue to message, or -q to not open the message (recommended).')
|
||||
print('')
|
||||
if cancel != '-q':
|
||||
try:
|
||||
print(draw_border(escapeansi.escape_ANSI(readBlock.bcontent.decode().strip())))
|
||||
except ValueError:
|
||||
logger.warn('Error presenting message. This is usually due to a malformed or blank message.', terminal=True)
|
||||
pass
|
||||
if readBlock.validSig:
|
||||
reply = logger.readline("Press enter to continue, or enter %s to reply" % ("-r",))
|
||||
print('')
|
||||
if reply == "-r":
|
||||
self.draft_message(bytesconverter.bytes_to_str(readBlock.signer,))
|
||||
else:
|
||||
logger.readline("Press enter to continue")
|
||||
print('')
|
||||
return
|
||||
|
||||
def sentbox(self):
|
||||
'''
|
||||
Display sent mail messages
|
||||
'''
|
||||
entering = True
|
||||
while entering:
|
||||
self.get_sent_list()
|
||||
logger.info('Enter a block number or -q to return', terminal=True)
|
||||
try:
|
||||
choice = input('>')
|
||||
except (EOFError, KeyboardInterrupt) as e:
|
||||
entering = False
|
||||
else:
|
||||
try:
|
||||
choice = int(choice) - 1
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
self.sentboxList[int(choice)]
|
||||
except (IndexError, ValueError) as e:
|
||||
logger.warn('Invalid block.', terminal=True)
|
||||
else:
|
||||
logger.info('Sent to: ' + self.sentMessages[self.sentboxList[int(choice)]][1], terminal=True)
|
||||
# Print ansi escaped sent message
|
||||
logger.info(escapeansi.escape_ANSI(self.sentMessages[self.sentboxList[int(choice)]][0]), terminal=True)
|
||||
input('Press enter to continue...')
|
||||
finally:
|
||||
if choice == '-q':
|
||||
entering = False
|
||||
return
|
||||
|
||||
def get_sent_list(self, display=True):
|
||||
count = 1
|
||||
self.sentboxList = []
|
||||
self.sentMessages = {}
|
||||
for i in self.sentboxTools.listSent():
|
||||
self.sentboxList.append(i['hash'])
|
||||
self.sentMessages[i['hash']] = (bytesconverter.bytes_to_str(i['message']), i['peer'], i['subject'])
|
||||
if display:
|
||||
logger.info('%s. %s - %s - (%s) - %s' % (count, i['hash'], i['peer'][:12], i['subject'], i['date']), terminal=True)
|
||||
count += 1
|
||||
return json.dumps(self.sentMessages)
|
||||
|
||||
def draft_message(self, recip=''):
|
||||
message = ''
|
||||
newLine = ''
|
||||
subject = ''
|
||||
entering = False
|
||||
if len(recip) == 0:
|
||||
entering = True
|
||||
while entering:
|
||||
try:
|
||||
recip = logger.readline('Enter peer address, or -q to stop:').strip()
|
||||
if recip in ('-q', 'q'):
|
||||
raise EOFError
|
||||
if not stringvalidators.validate_pub_key(recip):
|
||||
raise onionrexceptions.InvalidPubkey('Must be a valid ed25519 base32 encoded public key')
|
||||
except onionrexceptions.InvalidPubkey:
|
||||
logger.warn('Invalid public key', terminal=True)
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
entering = False
|
||||
else:
|
||||
break
|
||||
else:
|
||||
# if -q or ctrl-c/d, exit function here, otherwise we successfully got the public key
|
||||
return
|
||||
try:
|
||||
subject = logger.readline('Message subject: ')
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
pass
|
||||
|
||||
cancelEnter = False
|
||||
logger.info('Enter your message, stop by entering -q on a new line. -c to cancel', terminal=True)
|
||||
while newLine != '-q':
|
||||
try:
|
||||
newLine = input()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
cancelEnter = True
|
||||
if newLine == '-c':
|
||||
cancelEnter = True
|
||||
break
|
||||
if newLine == '-q':
|
||||
continue
|
||||
newLine += '\n'
|
||||
message += newLine
|
||||
|
||||
if not cancelEnter:
|
||||
logger.info('Inserting encrypted message as Onionr block....', terminal=True)
|
||||
|
||||
blockID = self.myCore.insertBlock(message, header='pm', encryptType='asym', asymPeer=recip, sign=self.doSigs, meta={'subject': subject})
|
||||
|
||||
def toggle_signing(self):
|
||||
self.doSigs = not self.doSigs
|
||||
|
||||
def menu(self):
|
||||
choice = ''
|
||||
while True:
|
||||
sigMsg = 'Message Signing: %s'
|
||||
|
||||
logger.info(self.strings.programTag + '\n\nUser ID: ' + self.myCore._crypto.pubKey, terminal=True)
|
||||
if self.doSigs:
|
||||
sigMsg = sigMsg % ('enabled',)
|
||||
else:
|
||||
sigMsg = sigMsg % ('disabled (Your messages cannot be trusted)',)
|
||||
if self.doSigs:
|
||||
logger.info(sigMsg, terminal=True)
|
||||
else:
|
||||
logger.warn(sigMsg, terminal=True)
|
||||
logger.info(self.strings.mainMenu.title(), terminal=True) # print out main menu
|
||||
try:
|
||||
choice = logger.readline('Enter 1-%s:\n' % (len(self.strings.mainMenuChoices))).lower().strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
choice = '5'
|
||||
|
||||
if choice in (self.strings.mainMenuChoices[0], '1'):
|
||||
self.inbox()
|
||||
elif choice in (self.strings.mainMenuChoices[1], '2'):
|
||||
self.sentbox()
|
||||
elif choice in (self.strings.mainMenuChoices[2], '3'):
|
||||
self.draft_message()
|
||||
elif choice in (self.strings.mainMenuChoices[3], '4'):
|
||||
self.toggle_signing()
|
||||
elif choice in (self.strings.mainMenuChoices[4], '5'):
|
||||
logger.info('Goodbye.', terminal=True)
|
||||
break
|
||||
elif choice == '':
|
||||
pass
|
||||
else:
|
||||
logger.warn('Invalid choice.', terminal=True)
|
||||
return """
|
||||
|
||||
def add_deleted(keyStore, bHash):
|
||||
existing = keyStore.get('deleted_mail')
|
||||
|
|
|
@ -7,13 +7,12 @@ import nacl.signing, nacl.hash, nacl.encoding
|
|||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
import core, onionr
|
||||
|
||||
c = core.Core()
|
||||
|
||||
import onionrblocks
|
||||
from utils import createdirs
|
||||
createdirs.create_dirs()
|
||||
class OnionrBlockTests(unittest.TestCase):
|
||||
def test_plaintext_insert(self):
|
||||
message = 'hello world'
|
||||
c.insertBlock(message)
|
||||
onionrblocks.insert(message)
|
||||
|
||||
unittest.main()
|
|
@ -6,35 +6,34 @@ TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
|||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
from urllib.request import pathname2url
|
||||
import core, onionr
|
||||
|
||||
c = core.Core()
|
||||
|
||||
from coredb import keydb
|
||||
from utils import createdirs
|
||||
createdirs.create_dirs()
|
||||
class OnionrTests(unittest.TestCase):
|
||||
|
||||
def test_address_add(self):
|
||||
testAddresses = ['facebookcorewwwi.onion', '56kmnycrvepfarolhnx6t2dvmldfeyg7jdymwgjb7jjzg47u2lqw2sad.onion', '5bvb5ncnfr4dlsfriwczpzcvo65kn7fnnlnt2ln7qvhzna2xaldq.b32.i2p']
|
||||
for address in testAddresses:
|
||||
c.addAddress(address)
|
||||
dbAddresses = c.listAdders()
|
||||
keydb.addkeys.add_address(address)
|
||||
dbAddresses = keydb.listkeys.list_adders()
|
||||
for address in testAddresses:
|
||||
self.assertIn(address, dbAddresses)
|
||||
|
||||
invalidAddresses = [None, '', ' ', '\t', '\n', ' test ', 24, 'fake.onion', 'fake.b32.i2p']
|
||||
for address in invalidAddresses:
|
||||
try:
|
||||
c.addAddress(address)
|
||||
keydb.addkeys.add_address(address)
|
||||
except TypeError:
|
||||
pass
|
||||
dbAddresses = c.listAdders()
|
||||
dbAddresses = keydb.listkeys.list_adders()
|
||||
for address in invalidAddresses:
|
||||
self.assertNotIn(address, dbAddresses)
|
||||
|
||||
def test_address_info(self):
|
||||
adder = 'nytimes3xbfgragh.onion'
|
||||
c.addAddress(adder)
|
||||
self.assertNotEqual(c.getAddressInfo(adder, 'success'), 1000)
|
||||
c.setAddressInfo(adder, 'success', 1000)
|
||||
self.assertEqual(c.getAddressInfo(adder, 'success'), 1000)
|
||||
keydb.addkeys.add_address(adder)
|
||||
self.assertNotEqual(keydb.transportinfo.get_address_info(adder, 'success'), 1000)
|
||||
keydb.transportinfo.set_address_info(adder, 'success', 1000)
|
||||
self.assertEqual(keydb.transportinfo.get_address_info(adder, 'success'), 1000)
|
||||
|
||||
unittest.main()
|
|
@ -1,70 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
import sys, os
|
||||
sys.path.append(".")
|
||||
import unittest, uuid, sqlite3
|
||||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
from urllib.request import pathname2url
|
||||
import core, onionr
|
||||
|
||||
core.Core()
|
||||
|
||||
class OnionrTests(unittest.TestCase):
|
||||
|
||||
def test_peer_db_creation(self):
|
||||
try:
|
||||
dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'peers.db'))
|
||||
conn = sqlite3.connect(dburi, uri=True, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
conn.close()
|
||||
except sqlite3.OperationalError:
|
||||
self.assertTrue(False)
|
||||
else:
|
||||
self.assertTrue(True)
|
||||
|
||||
def test_block_db_creation(self):
|
||||
try:
|
||||
dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'blocks.db'))
|
||||
conn = sqlite3.connect(dburi, uri=True, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
conn.close()
|
||||
except sqlite3.OperationalError:
|
||||
self.assertTrue(False)
|
||||
else:
|
||||
self.assertTrue(True)
|
||||
|
||||
def test_forward_keys_db_creation(self):
|
||||
try:
|
||||
dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'forward-keys.db'))
|
||||
conn = sqlite3.connect(dburi, uri=True, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
conn.close()
|
||||
except sqlite3.OperationalError:
|
||||
self.assertTrue(False)
|
||||
else:
|
||||
self.assertTrue(True)
|
||||
|
||||
def test_address_db_creation(self):
|
||||
try:
|
||||
dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'address.db'))
|
||||
conn = sqlite3.connect(dburi, uri=True, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
conn.close()
|
||||
except sqlite3.OperationalError:
|
||||
self.assertTrue(False)
|
||||
else:
|
||||
self.assertTrue(True)
|
||||
|
||||
def blacklist_db_creation(self):
|
||||
try:
|
||||
dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'blacklist.db'))
|
||||
conn = sqlite3.connect(dburi, uri=True, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
conn.close()
|
||||
except sqlite3.OperationalError:
|
||||
self.assertTrue(False)
|
||||
else:
|
||||
self.assertTrue(True)
|
||||
|
||||
unittest.main()
|
|
@ -4,11 +4,12 @@ sys.path.append(".")
|
|||
import unittest, uuid
|
||||
TEST_DIR_1 = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
TEST_DIR_2 = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
import core, onionr, time
|
||||
import onionr, time
|
||||
|
||||
import onionrexceptions
|
||||
import onionrexceptions, onionrcrypto as crypto
|
||||
from onionrusers import onionrusers
|
||||
from onionrusers import contactmanager
|
||||
from utils import createdirs
|
||||
|
||||
class OnionrForwardSecrecyTests(unittest.TestCase):
|
||||
'''
|
||||
|
@ -17,23 +18,23 @@ class OnionrForwardSecrecyTests(unittest.TestCase):
|
|||
|
||||
def test_forward_encrypt(self):
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR_1
|
||||
o = onionr.Onionr()
|
||||
createdirs.create_dirs()
|
||||
|
||||
friend = o.onionrCore._crypto.generatePubKey()
|
||||
friend = crypto.generate()
|
||||
|
||||
friendUser = onionrusers.OnionrUser(o.onionrCore, friend[0], saveUser=True)
|
||||
friendUser = onionrusers.OnionrUser(friend[0], saveUser=True)
|
||||
|
||||
for x in range(5):
|
||||
message = 'hello world %s' % (random.randint(1, 1000))
|
||||
forwardKey = friendUser.generateForwardKey()
|
||||
|
||||
fakeForwardPair = o.onionrCore._crypto.generatePubKey()
|
||||
fakeForwardPair = crypto.generate()
|
||||
|
||||
self.assertTrue(friendUser.addForwardKey(fakeForwardPair[0]))
|
||||
|
||||
encrypted = friendUser.forwardEncrypt(message)
|
||||
|
||||
decrypted = o.onionrCore._crypto.pubKeyDecrypt(encrypted[0], privkey=fakeForwardPair[1], encodedData=True)
|
||||
decrypted = crypto.encryption.pub_key_decrypt(encrypted[0], privkey=fakeForwardPair[1], encodedData=True)
|
||||
self.assertEqual(decrypted, message.encode())
|
||||
return
|
||||
|
||||
|
|
|
@ -8,34 +8,32 @@ from onionrutils import stringvalidators, mnemonickeys
|
|||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
import core, onionr, onionrexceptions
|
||||
import onionrcrypto as crypto, onionrexceptions
|
||||
|
||||
c = core.Core()
|
||||
crypto = c._crypto
|
||||
class OnionrCryptoTests(unittest.TestCase):
|
||||
|
||||
def test_blake2b(self):
|
||||
self.assertEqual(crypto.blake2bHash('test'), crypto.blake2bHash(b'test'))
|
||||
self.assertEqual(crypto.blake2bHash(b'test'), crypto.blake2bHash(b'test'))
|
||||
self.assertEqual(crypto.hashers.blake2b_hash('test'), crypto.hashers.blake2b_hash(b'test'))
|
||||
self.assertEqual(crypto.hashers.blake2b_hash(b'test'), crypto.hashers.blake2b_hash(b'test'))
|
||||
|
||||
self.assertNotEqual(crypto.blake2bHash(''), crypto.blake2bHash(b'test'))
|
||||
self.assertNotEqual(crypto.hashers.blake2b_hash(''), crypto.hashers.blake2b_hash(b'test'))
|
||||
try:
|
||||
crypto.blake2bHash(None)
|
||||
crypto.hashers.blake2b_hash(None)
|
||||
except nacl.exceptions.TypeError:
|
||||
pass
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
self.assertEqual(nacl.hash.blake2b(b'test'), crypto.blake2bHash(b'test'))
|
||||
self.assertEqual(nacl.hash.blake2b(b'test'), crypto.hashers.blake2b_hash(b'test'))
|
||||
|
||||
def test_sha3256(self):
|
||||
hasher = hashlib.sha3_256()
|
||||
self.assertEqual(crypto.sha3Hash('test'), crypto.sha3Hash(b'test'))
|
||||
self.assertEqual(crypto.sha3Hash(b'test'), crypto.sha3Hash(b'test'))
|
||||
self.assertEqual(crypto.hashers.sha3_hash('test'), crypto.hashers.sha3_hash(b'test'))
|
||||
self.assertEqual(crypto.hashers.sha3_hash(b'test'), crypto.hashers.sha3_hash(b'test'))
|
||||
|
||||
self.assertNotEqual(crypto.sha3Hash(''), crypto.sha3Hash(b'test'))
|
||||
self.assertNotEqual(crypto.hashers.sha3_hash(''), crypto.hashers.sha3_hash(b'test'))
|
||||
try:
|
||||
crypto.sha3Hash(None)
|
||||
crypto.hashers.sha3_hash(None)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
|
@ -43,21 +41,21 @@ class OnionrCryptoTests(unittest.TestCase):
|
|||
|
||||
hasher.update(b'test')
|
||||
normal = hasher.hexdigest()
|
||||
self.assertEqual(crypto.sha3Hash(b'test'), normal)
|
||||
self.assertEqual(crypto.hashers.sha3_hash(b'test'), normal)
|
||||
|
||||
def valid_default_id(self):
|
||||
self.assertTrue(stringvalidators.validate_pub_key(crypto.pubKey))
|
||||
self.assertTrue(stringvalidators.validate_pub_key(crypto.pub_key))
|
||||
|
||||
def test_human_readable_length(self):
|
||||
human = mnemonickeys.get_human_readable_ID(c)
|
||||
human = mnemonickeys.get_human_readable_ID()
|
||||
self.assertTrue(len(human.split(' ')) == 32)
|
||||
|
||||
def test_safe_compare(self):
|
||||
self.assertTrue(crypto.safeCompare('test', 'test'))
|
||||
self.assertTrue(crypto.safeCompare('test', b'test'))
|
||||
self.assertFalse(crypto.safeCompare('test', 'test2'))
|
||||
self.assertTrue(crypto.cryptoutils.safe_compare('test', 'test'))
|
||||
self.assertTrue(crypto.cryptoutils.safe_compare('test', b'test'))
|
||||
self.assertFalse(crypto.cryptoutils.safe_compare('test', 'test2'))
|
||||
try:
|
||||
crypto.safeCompare('test', None)
|
||||
crypto.cryptoutils.safe_compare('test', None)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
|
@ -67,82 +65,53 @@ class OnionrCryptoTests(unittest.TestCase):
|
|||
# Small chance that the randomized list will be same. Rerun test a couple times if it fails
|
||||
startList = ['cat', 'dog', 'moose', 'rabbit', 'monkey', 'crab', 'human', 'dolphin', 'whale', 'etc'] * 10
|
||||
|
||||
self.assertNotEqual(startList, list(crypto.randomShuffle(startList)))
|
||||
self.assertTrue(len(list(crypto.randomShuffle(startList))) == len(startList))
|
||||
self.assertNotEqual(startList, list(crypto.cryptoutils.random_shuffle(startList)))
|
||||
self.assertTrue(len(list(crypto.cryptoutils.random_shuffle(startList))) == len(startList))
|
||||
|
||||
def test_asymmetric(self):
|
||||
keyPair = crypto.generatePubKey()
|
||||
keyPair2 = crypto.generatePubKey()
|
||||
keyPair = crypto.generate()
|
||||
keyPair2 = crypto.generate()
|
||||
message = "hello world"
|
||||
|
||||
self.assertTrue(len(crypto.pubKeyEncrypt(message, keyPair2[0], encodedData=True)) > 0)
|
||||
encrypted = crypto.pubKeyEncrypt(message, keyPair2[0], encodedData=False)
|
||||
decrypted = crypto.pubKeyDecrypt(encrypted, privkey=keyPair2[1], encodedData=False)
|
||||
self.assertTrue(len(crypto.encryption.pub_key_encrypt(message, keyPair2[0], encodedData=True)) > 0)
|
||||
encrypted = crypto.encryption.pub_key_encrypt(message, keyPair2[0], encodedData=False)
|
||||
decrypted = crypto.encryption.pub_key_decrypt(encrypted, privkey=keyPair2[1], encodedData=False)
|
||||
|
||||
self.assertTrue(decrypted.decode() == message)
|
||||
try:
|
||||
crypto.pubKeyEncrypt(None, keyPair2[0])
|
||||
crypto.encryption.pub_key_encrypt(None, keyPair2[0])
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
blankMessage = crypto.pubKeyEncrypt('', keyPair2[0])
|
||||
self.assertTrue('' == crypto.pubKeyDecrypt(blankMessage, privkey=keyPair2[1], encodedData=False).decode())
|
||||
blankMessage = crypto.encryption.pub_key_encrypt('', keyPair2[0])
|
||||
self.assertTrue('' == crypto.encryption.pub_key_decrypt(blankMessage, privkey=keyPair2[1], encodedData=False).decode())
|
||||
# Try to encrypt arbitrary bytes
|
||||
crypto.pubKeyEncrypt(os.urandom(32), keyPair2[0])
|
||||
|
||||
def test_symmetric(self):
|
||||
dataString = "this is a secret message"
|
||||
dataBytes = dataString.encode()
|
||||
key = b"tttttttttttttttttttttttttttttttt"
|
||||
invalidKey = b'tttttttttttttttttttttttttttttttb'
|
||||
encrypted = crypto.symmetricEncrypt(dataString, key, returnEncoded=True)
|
||||
decrypted = crypto.symmetricDecrypt(encrypted, key, encodedMessage=True)
|
||||
self.assertTrue(dataString == decrypted.decode())
|
||||
|
||||
try:
|
||||
crypto.symmetricDecrypt(encrypted, invalidKey, encodedMessage=True)
|
||||
except nacl.exceptions.CryptoError:
|
||||
pass
|
||||
else:
|
||||
self.assertFalse(True)
|
||||
try:
|
||||
crypto.symmetricEncrypt(None, key, returnEncoded=True)
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
self.assertFalse(True)
|
||||
crypto.symmetricEncrypt("string", key, returnEncoded=True)
|
||||
try:
|
||||
crypto.symmetricEncrypt("string", None, returnEncoded=True)
|
||||
except nacl.exceptions.TypeError:
|
||||
pass
|
||||
else:
|
||||
self.assertFalse(True)
|
||||
crypto.encryption.pub_key_encrypt(os.urandom(32), keyPair2[0])
|
||||
|
||||
def test_deterministic(self):
|
||||
password = os.urandom(32)
|
||||
gen = crypto.generateDeterministic(password)
|
||||
gen = crypto.generate_deterministic(password)
|
||||
self.assertTrue(stringvalidators.validate_pub_key(gen[0]))
|
||||
try:
|
||||
crypto.generateDeterministic('weakpassword')
|
||||
crypto.generate_deterministic('weakpassword')
|
||||
except onionrexceptions.PasswordStrengthError:
|
||||
pass
|
||||
else:
|
||||
self.assertFalse(True)
|
||||
try:
|
||||
crypto.generateDeterministic(None)
|
||||
crypto.generate_deterministic(None)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
self.assertFalse(True)
|
||||
|
||||
gen = crypto.generateDeterministic('weakpassword', bypassCheck=True)
|
||||
gen = crypto.generate_deterministic('weakpassword', bypassCheck=True)
|
||||
|
||||
password = base64.b64encode(os.urandom(32))
|
||||
gen1 = crypto.generateDeterministic(password)
|
||||
gen2 = crypto.generateDeterministic(password)
|
||||
gen1 = crypto.generate_deterministic(password)
|
||||
gen2 = crypto.generate_deterministic(password)
|
||||
self.assertFalse(gen == gen1)
|
||||
self.assertTrue(gen1 == gen2)
|
||||
self.assertTrue(stringvalidators.validate_pub_key(gen1[0]))
|
||||
|
|
|
@ -6,37 +6,38 @@ import json
|
|||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
import core, onionr
|
||||
|
||||
c = core.Core()
|
||||
import onionrexceptions
|
||||
from onionrusers import onionrusers
|
||||
from onionrusers import contactmanager
|
||||
|
||||
import onionrcrypto as crypto
|
||||
from coredb import keydb
|
||||
from utils import identifyhome, createdirs
|
||||
createdirs.create_dirs()
|
||||
class OnionrUserTests(unittest.TestCase):
|
||||
'''
|
||||
Tests both the onionrusers class and the contactmanager (which inherits it)
|
||||
'''
|
||||
|
||||
def test_users(self):
|
||||
keypair = c._crypto.generatePubKey()
|
||||
onionrusers.OnionrUser(c, keypair[0])
|
||||
keypair = crypto.generate()
|
||||
onionrusers.OnionrUser(keypair[0])
|
||||
return
|
||||
|
||||
def test_contact_init_no_save(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
contact = contactmanager.ContactManager(c, contact)
|
||||
self.assertFalse(contact.publicKey in c.listPeers())
|
||||
contact = crypto.generate()[0]
|
||||
contact = contactmanager.ContactManager(contact)
|
||||
self.assertFalse(contact.publicKey in keydb.listkeys.list_peers())
|
||||
|
||||
def test_contact_create(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
contact = contactmanager.ContactManager(c, contact, saveUser=True)
|
||||
self.assertTrue(contact.publicKey in c.listPeers())
|
||||
contact = crypto.generate()[0]
|
||||
contact = contactmanager.ContactManager(contact, saveUser=True)
|
||||
self.assertTrue(contact.publicKey in keydb.listkeys.list_peers())
|
||||
|
||||
def test_contact_set_info(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
contact = contactmanager.ContactManager(c, contact, saveUser=True)
|
||||
fileLocation = '%s/contacts/%s.json' % (c.dataDir, contact.publicKey)
|
||||
contact = crypto.generate()[0]
|
||||
contact = contactmanager.ContactManager(contact, saveUser=True)
|
||||
fileLocation = '%s/contacts/%s.json' % (identifyhome.identify_home(), contact.publicKey)
|
||||
contact.set_info('alias', 'bob')
|
||||
self.assertTrue(os.path.exists(fileLocation))
|
||||
|
||||
|
@ -47,9 +48,9 @@ class OnionrUserTests(unittest.TestCase):
|
|||
self.assertEqual(data['alias'], 'bob')
|
||||
|
||||
def test_contact_get_info(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
contact = contactmanager.ContactManager(c, contact, saveUser=True)
|
||||
fileLocation = '%s/contacts/%s.json' % (c.dataDir, contact.publicKey)
|
||||
contact = crypto.generate()[0]
|
||||
contact = contactmanager.ContactManager(contact, saveUser=True)
|
||||
fileLocation = '%s/contacts/%s.json' % (identifyhome.identify_home(), contact.publicKey)
|
||||
|
||||
with open(fileLocation, 'w') as contactFile:
|
||||
contactFile.write('{"alias": "bob"}')
|
||||
|
@ -59,16 +60,16 @@ class OnionrUserTests(unittest.TestCase):
|
|||
self.assertEqual(contact.get_info('fail'), None)
|
||||
|
||||
def test_encrypt(self):
|
||||
contactPair = c._crypto.generatePubKey()
|
||||
contact = contactmanager.ContactManager(c, contactPair[0], saveUser=True)
|
||||
contactPair = crypto.generate()
|
||||
contact = contactmanager.ContactManager(contactPair[0], saveUser=True)
|
||||
encrypted = contact.encrypt('test')
|
||||
decrypted = c._crypto.pubKeyDecrypt(encrypted, privkey=contactPair[1], encodedData=True).decode()
|
||||
decrypted = crypto.encryption.pub_key_decrypt(encrypted, privkey=contactPair[1], encodedData=True).decode()
|
||||
self.assertEqual('test', decrypted)
|
||||
|
||||
def test_delete_contact(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
contact = contactmanager.ContactManager(c, contact, saveUser=True)
|
||||
fileLocation = '%s/contacts/%s.json' % (c.dataDir, contact.publicKey)
|
||||
contact = crypto.generate()[0]
|
||||
contact = contactmanager.ContactManager(contact, saveUser=True)
|
||||
fileLocation = '%s/contacts/%s.json' % (identifyhome.identify_home(), contact.publicKey)
|
||||
self.assertFalse(os.path.exists(fileLocation))
|
||||
with open(fileLocation, 'w') as contactFile:
|
||||
contactFile.write('{"alias": "test"}')
|
||||
|
|
|
@ -5,11 +5,8 @@ import unittest, uuid
|
|||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
import core, onionr
|
||||
from onionrutils import stringvalidators
|
||||
|
||||
core.Core()
|
||||
|
||||
class OnionrValidations(unittest.TestCase):
|
||||
|
||||
def test_peer_validator(self):
|
||||
|
@ -50,7 +47,6 @@ class OnionrValidations(unittest.TestCase):
|
|||
# Test ed25519 public key validity
|
||||
valids = ['JZ5VE72GUS3C7BOHDRIYZX4B5U5EJMCMLKHLYCVBQQF3UKHYIRRQ====', 'JZ5VE72GUS3C7BOHDRIYZX4B5U5EJMCMLKHLYCVBQQF3UKHYIRRQ']
|
||||
invalid = [None, '', ' ', 'dfsg', '\n', 'JZ5VE72GUS3C7BOHDRIYZX4B5U5EJMCMLKHLYCVBQQF3UKHYIR$Q====']
|
||||
c = core.Core()
|
||||
|
||||
for valid in valids:
|
||||
print('testing', valid)
|
||||
|
|
|
@ -8,6 +8,8 @@ def create_dirs():
|
|||
os.mkdir(home)
|
||||
if not os.path.exists(filepaths.block_data_location):
|
||||
os.mkdir(filepaths.block_data_location)
|
||||
if not os.path.exists(filepaths.contacts_location):
|
||||
os.mkdir(filepaths.contacts_location)
|
||||
|
||||
for db in dbcreator.create_funcs:
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue