renamed onionr dir and bugfixes/linting progress

This commit is contained in:
Kevin Froman 2019-11-20 04:52:50 -06:00
parent 2b996da17f
commit 720efe4fca
226 changed files with 179 additions and 142 deletions

View file

View file

@ -0,0 +1,95 @@
'''
Onionr - Private P2P Communication
Do HTTP GET or POST requests through a proxy
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import requests, streamedrequests
import logger, onionrexceptions
from etc import onionrvalues
from . import localcommand
def do_post_request(url, data={}, port=0, proxyType='tor', max_size=10000, content_type: str = ''):
'''
Do a POST request through a local tor or i2p instance
'''
if proxyType == 'tor':
if port == 0:
port = localcommand.local_command('/gettorsocks')
proxies = {'http': 'socks4a://127.0.0.1:' + str(port), 'https': 'socks4a://127.0.0.1:' + str(port)}
elif proxyType == 'i2p':
proxies = {'http': 'http://127.0.0.1:4444'}
else:
return
headers = {'User-Agent': 'PyOnionr', 'Connection':'close'}
if len(content_type) > 0:
headers['Content-Type'] = content_type
try:
proxies = {'http': 'socks4a://127.0.0.1:' + str(port), 'https': 'socks4a://127.0.0.1:' + str(port)}
#r = requests.post(url, data=data, headers=headers, proxies=proxies, allow_redirects=False, timeout=(15, 30))
r = streamedrequests.post(url, post_data=data, request_headers=headers, proxy=proxies, connect_timeout=15, stream_timeout=30, max_size=max_size, allow_redirects=False)
retData = r[1]
except KeyboardInterrupt:
raise KeyboardInterrupt
except requests.exceptions.RequestException as e:
logger.debug('Error: %s' % str(e))
retData = False
return retData
def do_get_request(url, port=0, proxyType='tor', ignoreAPI=False, returnHeaders=False, max_size=5242880):
'''
Do a get request through a local tor or i2p instance
'''
API_VERSION = onionrvalues.API_VERSION
retData = False
if proxyType == 'tor':
if port == 0:
raise onionrexceptions.MissingPort('Socks port required for Tor HTTP get request')
proxies = {'http': 'socks4a://127.0.0.1:' + str(port), 'https': 'socks4a://127.0.0.1:' + str(port)}
elif proxyType == 'i2p':
proxies = {'http': 'http://127.0.0.1:4444'}
else:
return
headers = {'User-Agent': 'PyOnionr', 'Connection':'close'}
response_headers = dict()
try:
proxies = {'http': 'socks4a://127.0.0.1:' + str(port), 'https': 'socks4a://127.0.0.1:' + str(port)}
r = streamedrequests.get(url, request_headers=headers, allow_redirects=False, proxy=proxies, connect_timeout=15, stream_timeout=120, max_size=max_size)
# Check server is using same API version as us
if not ignoreAPI:
try:
response_headers = r[0].headers
if r[0].headers['X-API'] != str(API_VERSION):
raise onionrexceptions.InvalidAPIVersion
except KeyError:
raise onionrexceptions.InvalidAPIVersion
retData = r[1]
except KeyboardInterrupt:
raise KeyboardInterrupt
except ValueError as e:
pass
except onionrexceptions.InvalidAPIVersion:
if 'X-API' in response_headers:
logger.debug('Using API version %s. Cannot communicate with node\'s API version of %s.' % (API_VERSION, response_headers['X-API']))
else:
logger.debug('Using API version %s. API version was not sent with the request.' % API_VERSION)
except requests.exceptions.RequestException as e:
if not 'ConnectTimeoutError' in str(e) and not 'Request rejected or failed' in str(e):
logger.debug('Error: %s' % str(e))
retData = False
if returnHeaders:
return (retData, response_headers)
else:
return retData

View file

@ -0,0 +1,24 @@
'''
Onionr - Private P2P Communication
Module to work with block metadata
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from . import hasblock, fromdata, process
has_block = hasblock.has_block
process_block_metadata = process.process_block_metadata
get_block_metadata_from_data = fromdata.get_block_metadata_from_data

View file

@ -0,0 +1,46 @@
'''
Onionr - Private P2P Communication
Return a useful tuple of (metadata (header), meta, and data) by accepting raw block data
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import json
from onionrutils import bytesconverter
def get_block_metadata_from_data(block_data):
'''
accepts block contents as string, returns a tuple of
metadata, meta (meta being internal metadata, which will be
returned as an encrypted base64 string if it is encrypted, dict if not).
'''
meta = {}
metadata = {}
data = block_data
try:
block_data = block_data.encode()
except AttributeError:
pass
try:
metadata = json.loads(bytesconverter.bytes_to_str(block_data[:block_data.find(b'\n')]))
except json.decoder.JSONDecodeError:
pass
else:
data = block_data[block_data.find(b'\n'):]
meta = metadata['meta']
return (metadata, meta, data)

View file

@ -0,0 +1,43 @@
'''
Onionr - Private P2P Communication
Returns a bool if a block is in the block metadata db or not
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import sqlite3
from coredb import dbfiles
import onionrexceptions
from .. import stringvalidators
from etc import onionrvalues
def has_block(hash: str) -> bool:
'''
Check for new block in the block meta db
'''
conn = sqlite3.connect(dbfiles.block_meta_db, timeout=onionrvalues.DATABASE_LOCK_TIMEOUT)
c = conn.cursor()
if not stringvalidators.validate_hash(hash):
raise onionrexceptions.InvalidHexHash("Invalid hash")
for result in c.execute("SELECT COUNT() FROM hashes WHERE hash = ?", (hash,)):
if result[0] >= 1:
conn.commit()
conn.close()
return True
else:
conn.commit()
conn.close()
return False
return False

View file

@ -0,0 +1,72 @@
'''
Onionr - Private P2P Communication
Process block metadata with relevant actions
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from etc import onionrvalues
from onionrblocks import onionrblockapi
from .. import epoch, bytesconverter
from coredb import blockmetadb
import logger
from onionrplugins import onionrevents
import onionrexceptions
from onionrusers import onionrusers
from onionrutils import updater
def process_block_metadata(blockHash: str):
'''
Read metadata from a block and cache it to the block database
blockHash -> sha3_256 hex formatted hash of Onionr block
'''
curTime = epoch.get_rounded_epoch(roundS=60)
myBlock = onionrblockapi.Block(blockHash)
if myBlock.isEncrypted:
myBlock.decrypt()
if (myBlock.isEncrypted and myBlock.decrypted) or (not myBlock.isEncrypted):
blockType = myBlock.getMetadata('type') # we would use myBlock.getType() here, but it is bugged with encrypted blocks
signer = bytesconverter.bytes_to_str(myBlock.signer)
valid = myBlock.verifySig()
if valid:
if myBlock.getMetadata('newFSKey') is not None:
try:
onionrusers.OnionrUser(signer).addForwardKey(myBlock.getMetadata('newFSKey'))
except onionrexceptions.InvalidPubkey:
logger.warn('%s has invalid forward secrecy key to add: %s' % (signer, myBlock.getMetadata('newFSKey')))
try:
if len(blockType) <= onionrvalues.MAX_BLOCK_TYPE_LENGTH:
blockmetadb.update_block_info(blockHash, 'dataType', blockType)
except TypeError:
logger.warn("Missing block information")
pass
# Set block expire time if specified
try:
expireTime = int(myBlock.getHeader('expire'))
# test that expire time is an integer of sane length (for epoch)
# doesn't matter if its too large because of the min() func below
if not len(str(expireTime)) < 20: raise ValueError('timestamp invalid')
except (ValueError, TypeError) as e:
expireTime = onionrvalues.DEFAULT_EXPIRE + curTime
finally:
expireTime = min(expireTime, curTime + onionrvalues.DEFAULT_EXPIRE)
blockmetadb.update_block_info(blockHash, 'expire', expireTime)
if blockType == 'update': updater.update_event(myBlock)
onionrevents.event('processblocks', data = {'block': myBlock, 'type': blockType, 'signer': signer, 'validSig': valid})

View file

@ -0,0 +1,14 @@
def str_to_bytes(data):
'''Converts a string to bytes with .encode()'''
try:
data = data.encode('UTF-8')
except AttributeError:
pass
return data
def bytes_to_str(data):
try:
data = data.decode('UTF-8')
except AttributeError:
pass
return data

View file

@ -0,0 +1,39 @@
'''
Onionr - Private P2P Communication
Check if the communicator is running
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import time, os
import filepaths
def is_communicator_running(timeout = 5, interval = 0.1):
try:
runcheck_file = filepaths.run_check_file
if not os.path.isfile(runcheck_file):
open(runcheck_file, 'w+').close()
starttime = time.time()
while True:
time.sleep(interval)
if not os.path.isfile(runcheck_file):
return True
elif time.time() - starttime >= timeout:
return False
except:
return False

30
src/onionrutils/epoch.py Normal file
View file

@ -0,0 +1,30 @@
'''
Onionr - Private P2P Communication
Get floored epoch, or rounded epoch
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import math, time
def get_rounded_epoch(roundS=60):
'''
Returns the epoch, rounded down to given seconds (Default 60)
'''
epoch = get_epoch()
return epoch - (epoch % roundS)
def get_epoch():
'''returns epoch'''
return math.floor(time.time())

View file

@ -0,0 +1,10 @@
import re
def escape_ANSI(line):
'''
Remove ANSI escape codes from a string with regex
adapted from: https://stackoverflow.com/a/38662876 by user https://stackoverflow.com/users/802365/%c3%89douard-lopez
cc-by-sa-3 license https://creativecommons.org/licenses/by-sa/3.0/
'''
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line)

View file

@ -0,0 +1,37 @@
'''
Onionr - Private P2P Communication
Return the client api server address and port, which is usually random
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import filepaths
import config
def get_client_API_server():
config.reload()
retData = ''
getconf = lambda: config.get('client.client.port')
port = getconf()
if port is None:
config.reload()
port = getconf()
try:
with open(filepaths.private_API_host_file, 'r') as host:
hostname = host.read()
except FileNotFoundError:
raise FileNotFoundError
else:
retData += '%s:%s' % (hostname, port)
return retData

View file

@ -0,0 +1,51 @@
'''
Onionr - Private P2P Communication
import new blocks from disk, providing transport agnosticism
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import glob
import logger
from onionrutils import blockmetadata
from coredb import blockmetadb
import filepaths
import onionrcrypto as crypto
def import_new_blocks(scanDir=''):
'''
This function is intended to scan for new blocks ON THE DISK and import them
'''
blockList = blockmetadb.get_block_list()
exist = False
if scanDir == '':
scanDir = filepaths.block_data_location
if not scanDir.endswith('/'):
scanDir += '/'
for block in glob.glob(scanDir + "*.dat"):
if block.replace(scanDir, '').replace('.dat', '') not in blockList:
exist = True
logger.info('Found new block on dist %s' % block, terminal=True)
with open(block, 'rb') as newBlock:
block = block.replace(scanDir, '').replace('.dat', '')
if crypto.hashers.sha3_hash(newBlock.read()) == block.replace('.dat', ''):
blockmetadb.add_to_block_DB(block.replace('.dat', ''), dataSaved=True)
logger.info('Imported block %s.' % block, terminal=True)
blockmetadata.process_block_metadata(block)
else:
logger.warn('Failed to verify hash for %s' % block, terminal=True)
if not exist:
logger.info('No blocks found to import', terminal=True)
import_new_blocks.onionr_help = f"Scans the Onionr data directory under {filepaths.block_data_location} for new block files (.dat, .db not supported) to import"

View file

@ -0,0 +1,70 @@
'''
Onionr - Private P2P Communication
send a command to the local API server
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import urllib, requests, time
import logger, config, deadsimplekv
from . import getclientapiserver
import filepaths
config.reload()
cache = deadsimplekv.DeadSimpleKV(filepaths.cached_storage, refresh_seconds=1000)
def get_hostname():
hostname = ''
waited = 0
maxWait = 3
while True:
if cache.get('client_api') is None:
try:
hostname = getclientapiserver.get_client_API_server()
except FileNotFoundError:
hostname = False
else:
cache.put('hostname', hostname)
cache.flush()
else:
hostname = cache.get('hostname')
if hostname == '' or hostname is None:
time.sleep(1)
if waited == maxWait:
return False
else:
return hostname
def local_command(command, data='', silent = True, post=False, postData = {}, maxWait=20):
'''
Send a command to the local http API server, securely. Intended for local clients, DO NOT USE for remote peers.
'''
# TODO: URL encode parameters, just as an extra measure. May not be needed, but should be added regardless.
hostname = get_hostname()
if hostname == False: return False
if data != '':
data = '&data=' + urllib.parse.quote_plus(data)
payload = 'http://%s/%s%s' % (hostname, command, data)
try:
if post:
ret_data = requests.post(payload, data=postData, headers={'token': config.get('client.webpassword'), 'Connection':'close'}, timeout=(maxWait, maxWait)).text
else:
ret_data = requests.get(payload, headers={'token': config.get('client.webpassword'), 'Connection':'close'}, timeout=(maxWait, maxWait)).text
except Exception as error:
if not silent:
logger.error('Failed to make local request (command: %s):%s' % (command, error), terminal=True)
ret_data = False
return ret_data

View file

@ -0,0 +1,49 @@
'''
Onionr - Private P2P Communication
convert a base32 string (intended for ed25519 user ids) to pgp word list
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import base64
import niceware
import unpaddedbase32
import onionrcrypto
from etc import onionrvalues
DELIMITER = '-'
def get_human_readable_ID(pub=''):
'''gets a human readable ID from a public key'''
if pub == '':
pub = onionrcrypto.pub_key
if not len(pub) == onionrvalues.MAIN_PUBLIC_KEY_SIZE:
pub = base64.b32decode(pub)
return DELIMITER.join(niceware.bytes_to_passphrase(pub))
#return niceware.bytes_to_passphrase(pub).replace(' ', DELIMITER)
def get_base32(words):
'''converts mnemonic to base32'''
if DELIMITER not in words and not type(words) in (type(list), type(tuple)): return words
try:
return unpaddedbase32.b32encode(niceware.passphrase_to_bytes(words.split(DELIMITER)))
except AttributeError:
ret = unpaddedbase32.b32encode(niceware.passphrase_to_bytes(words))
return ret

View file

@ -0,0 +1,119 @@
'''
Onionr - Private P2P Communication
validate various string data types
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import base64, string
import unpaddedbase32, nacl.signing, nacl.encoding
from onionrutils import bytesconverter
def validate_hash(data, length=64):
'''
Validate if a string is a valid hash hex digest (does not compare, just checks length and charset)
Length is only invalid if its *more* than the specified
'''
retVal = True
if data == False or data == True:
return False
data = data.strip()
if len(data) > length:
retVal = False
else:
try:
int(data, 16)
except ValueError:
retVal = False
return retVal
def validate_pub_key(key):
'''
Validate if a string is a valid base32 encoded Ed25519 key
'''
if type(key) is type(None):
return False
# Accept keys that have no = padding
key = unpaddedbase32.repad(bytesconverter.str_to_bytes(key))
retVal = False
try:
nacl.signing.SigningKey(seed=key, encoder=nacl.encoding.Base32Encoder)
except nacl.exceptions.ValueError:
pass
except base64.binascii.Error as err:
pass
else:
retVal = True
return retVal
def validate_transport(id):
try:
idLength = len(id)
retVal = True
idNoDomain = ''
peerType = ''
# i2p b32 addresses are 60 characters long (including .b32.i2p)
if idLength == 60:
peerType = 'i2p'
if not id.endswith('.b32.i2p'):
retVal = False
else:
idNoDomain = id.split('.b32.i2p')[0]
# Onion v2's are 22 (including .onion), v3's are 62 with .onion
elif idLength == 22 or idLength == 62:
peerType = 'onion'
if not id.endswith('.onion'):
retVal = False
else:
idNoDomain = id.split('.onion')[0]
else:
retVal = False
if retVal:
if peerType == 'i2p':
try:
id.split('.b32.i2p')[2]
except IndexError:
pass
else:
retVal = False
elif peerType == 'onion':
try:
id.split('.onion')[2]
except IndexError:
pass
else:
retVal = False
if not idNoDomain.isalnum():
retVal = False
# Validate address is valid base32 (when capitalized and minus extension); v2/v3 onions and .b32.i2p use base32
for x in idNoDomain.upper():
if x not in string.ascii_uppercase and x not in '234567':
retVal = False
return retVal
except Exception as e:
return False
def is_integer_string(data):
'''Check if a string is a valid base10 integer (also returns true if already an int)'''
try:
int(data)
except (ValueError, TypeError) as e:
return False
else:
return True

View file

@ -0,0 +1,7 @@
import notifier
def update_event(bl):
"""Show update notification if available, return bool of if update happened"""
if not bl.isSigner(onionrvalues.UPDATE_SIGN_KEY): raise onionrexceptions.InvalidUpdate
onionr.notifier.notify(message="A new Onionr update is available. Stay updated to remain secure.")

View file

@ -0,0 +1,119 @@
"""
Onionr - Private P2P Communication
validate new block's metadata
"""
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import json
import logger, onionrexceptions
from etc import onionrvalues
from . import stringvalidators, epoch, bytesconverter
import config, filepaths, onionrcrypto
def validate_metadata(metadata, block_data) -> bool:
"""Validate metadata meets onionr spec (does not validate proof value computation), take in either dictionary or json string"""
ret_data = False
max_clock_difference = onionrvalues.MAX_BLOCK_CLOCK_SKEW
# convert to dict if it is json string
if type(metadata) is str:
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
pass
# Validate metadata dict for invalid keys to sizes that are too large
maxAge = min(config.get("general.max_block_age", onionrvalues.DEFAULT_EXPIRE), onionrvalues.DEFAULT_EXPIRE)
if type(metadata) is dict:
for i in metadata:
try:
onionrvalues.BLOCK_METADATA_LENGTHS[i]
except KeyError:
logger.warn('Block has invalid metadata key ' + i)
break
else:
testData = metadata[i]
try:
testData = len(testData)
except (TypeError, AttributeError) as e:
testData = len(str(testData))
if onionrvalues.BLOCK_METADATA_LENGTHS[i] < testData:
logger.warn('Block metadata key ' + i + ' exceeded maximum size')
break
if i == 'time':
if not stringvalidators.is_integer_string(metadata[i]):
logger.warn('Block metadata time stamp is not integer string or int')
break
isFuture = (metadata[i] - epoch.get_epoch())
if isFuture > max_clock_difference:
logger.warn('Block timestamp is skewed to the future over the max %s: %s' (max_clock_difference, isFuture))
break
if (epoch.get_epoch() - metadata[i]) > maxAge:
logger.warn('Block is outdated: %s' % (metadata[i],))
break
elif i == 'expire':
try:
if not int(metadata[i]) > epoch.get_epoch(): raise ValueError
except ValueError:
logger.warn('Block is expired: %s less than %s' % (metadata[i], epoch.get_epoch()))
break
elif i == 'encryptType':
try:
if not metadata[i] in ('asym', 'sym', ''): raise ValueError
if not config.get('general.store_plaintext_blocks', True):
break
except ValueError:
logger.warn('Invalid encryption mode')
break
elif i == 'sig':
try:
metadata['encryptType']
except KeyError:
signer = metadata['signer']
sig = metadata['sig']
encodedMeta = bytesconverter.str_to_bytes(metadata['meta'])
encodedBlock = bytesconverter.str_to_bytes(block_data)
if not onionrcrypto.signing.ed_verify(encodedMeta + encodedBlock[1:], signer, sig):
break
else:
# if metadata loop gets no errors, it does not break, therefore metadata is valid
# make sure we do not have another block with the same data content (prevent data duplication and replay attacks)
# Make sure time is set (validity was checked above if it is)
try:
metadata['time']
except KeyError:
return False
nonce = bytesconverter.bytes_to_str(onionrcrypto.hashers.sha3_hash(block_data))
try:
with open(filepaths.data_nonce_file, 'r') as nonceFile:
if nonce in nonceFile.read():
# we've seen that nonce before, so we can't pass metadata
raise onionrexceptions.DataExists
except FileNotFoundError:
ret_data = True
except onionrexceptions.DataExists:
# do not set ret_data to True, because data has been seen before
logger.warn(f'{nonce} seen before')
raise onionrexceptions.DataExists
else:
ret_data = True
else:
logger.warn('In call to utils.validateMetadata, metadata must be JSON string or a dictionary object')
return ret_data