Moved blocksToUpload to KV to further reduce coupling

master
Kevin 2020-07-26 19:02:39 -05:00
parent 97a5f50271
commit 10c1cd7803
8 changed files with 28 additions and 16 deletions

View File

@ -68,6 +68,7 @@ class OnionrCommunicatorDaemon:
self.kv.put('announceCache', {}) self.kv.put('announceCache', {})
self.kv.put('newPeers', []) self.kv.put('newPeers', [])
self.kv.put('dbTimestamps', {}) self.kv.put('dbTimestamps', {})
self.kv.put('blocksToUpload', [])
if config.get('general.offline_mode', False): if config.get('general.offline_mode', False):
self.isOnline = False self.isOnline = False
@ -78,8 +79,6 @@ class OnionrCommunicatorDaemon:
# initialize core with Tor socks port being 3rd argument # initialize core with Tor socks port being 3rd argument
self.proxyPort = shared_state.get(NetController).socksPort self.proxyPort = shared_state.get(NetController).socksPort
# Upload information, list of blocks to upload
self.blocksToUpload = []
self.upload_session_manager = self.shared_state.get( self.upload_session_manager = self.shared_state.get(
uploadblocks.sessionmanager.BlockUploadSessionManager) uploadblocks.sessionmanager.BlockUploadSessionManager)
self.shared_state.share_object() self.shared_state.share_object()

View File

@ -15,6 +15,7 @@ from communicatorutils import restarttor
if TYPE_CHECKING: if TYPE_CHECKING:
from toomanyobjs import TooMany from toomanyobjs import TooMany
from deadsimplekv import DeadSimpleKV
from communicator import OnionrCommunicatorDaemon from communicator import OnionrCommunicatorDaemon
from httpapi.daemoneventsapi import DaemonEventsBP from httpapi.daemoneventsapi import DaemonEventsBP
from onionrtypes import BlockHash from onionrtypes import BlockHash
@ -45,6 +46,7 @@ def daemon_event_handlers(shared_state: 'TooMany'):
comm_inst = _get_inst('OnionrCommunicatorDaemon') comm_inst = _get_inst('OnionrCommunicatorDaemon')
public_api: 'PublicAPI' = _get_inst('PublicAPI') public_api: 'PublicAPI' = _get_inst('PublicAPI')
events_api: 'DaemonEventsBP' = _get_inst('DaemonEventsBP') events_api: 'DaemonEventsBP' = _get_inst('DaemonEventsBP')
kv: 'DeadSimpleKV' = _get_inst('DeadSimpleKV')
def remove_from_insert_queue_wrapper(block_hash: 'BlockHash'): def remove_from_insert_queue_wrapper(block_hash: 'BlockHash'):
remove_from_insert_queue(comm_inst, block_hash) remove_from_insert_queue(comm_inst, block_hash)
@ -59,7 +61,7 @@ def daemon_event_handlers(shared_state: 'TooMany'):
raise ValueError raise ValueError
public_api.hideBlocks.append(block) public_api.hideBlocks.append(block)
try: try:
mixmate.block_mixer(comm_inst.blocksToUpload, block) mixmate.block_mixer(kv.get('blocksToUpload'), block)
except ValueError: except ValueError:
pass pass
return "removed" return "removed"

View File

@ -13,6 +13,7 @@ import filepaths
from onionrutils import localcommand from onionrutils import localcommand
if TYPE_CHECKING: if TYPE_CHECKING:
from communicator import OnionrCommunicatorDaemon from communicator import OnionrCommunicatorDaemon
from deadsimplekv import DeadSimpleKV
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -47,19 +48,20 @@ class UploadQueue:
self.communicator = communicator self.communicator = communicator
cache: deadsimplekv.DeadSimpleKV = deadsimplekv.DeadSimpleKV( cache: deadsimplekv.DeadSimpleKV = deadsimplekv.DeadSimpleKV(
UPLOAD_MEMORY_FILE) UPLOAD_MEMORY_FILE)
self.kv: "DeadSimpleKV" = communicator.shared_state.get_by_string("DeadSimpleKV")
self.store_obj = cache self.store_obj = cache
cache = cache.get('uploads') cache = cache.get('uploads')
if cache is None: if cache is None:
cache = [] cache = []
_add_to_hidden_blocks(cache) _add_to_hidden_blocks(cache)
self.communicator.blocksToUpload.extend(cache) self.kv.get('blocksToUpload').extend(cache)
atexit.register(self.save) atexit.register(self.save)
def save(self): def save(self):
"""Save to disk on shutdown or if called manually.""" """Save to disk on shutdown or if called manually."""
bl: deadsimplekv.DeadSimpleKV = self.communicator.blocksToUpload bl: deadsimplekv.DeadSimpleKV = self.kv.get('blocksToUpload')
if len(bl) == 0: if len(bl) == 0:
try: try:
os.remove(UPLOAD_MEMORY_FILE) os.remove(UPLOAD_MEMORY_FILE)

View File

@ -4,6 +4,10 @@ Cleanup old Onionr blocks and forward secrecy keys using the communicator.
Ran from a communicator timer usually Ran from a communicator timer usually
""" """
import sqlite3 import sqlite3
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV
import logger import logger
from onionrusers import onionrusers from onionrusers import onionrusers
@ -32,8 +36,9 @@ storage_counter = StorageCounter()
def __remove_from_upload(comm_inst, block_hash: str): def __remove_from_upload(comm_inst, block_hash: str):
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
try: try:
comm_inst.blocksToUpload.remove(block_hash) kv.get('blocksToUpload').remove(block_hash)
except ValueError: except ValueError:
pass pass

View File

@ -2,8 +2,13 @@
Lookup new blocks with the communicator using a random connected peer Lookup new blocks with the communicator using a random connected peer
""" """
from typing import TYPE_CHECKING
from gevent import time from gevent import time
if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV
import logger import logger
import onionrproofs import onionrproofs
from onionrutils import stringvalidators, epoch from onionrutils import stringvalidators, epoch

View File

@ -47,8 +47,8 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'):
sessionmanager.BlockUploadSessionManager) sessionmanager.BlockUploadSessionManager)
tried_peers: UserID = [] tried_peers: UserID = []
finishedUploads = [] finishedUploads = []
comm_inst.blocksToUpload = onionrcrypto.cryptoutils.random_shuffle( kv.put('blocksToUpload', onionrcrypto.cryptoutils.random_shuffle(
comm_inst.blocksToUpload) kv.get('blocksToUpload')))
def remove_from_hidden(bl): def remove_from_hidden(bl):
sleep(60) sleep(60)
@ -58,8 +58,8 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'):
except ValueError: except ValueError:
pass pass
if len(comm_inst.blocksToUpload) != 0: if len(kv.get('blocksToUpload')) != 0:
for bl in comm_inst.blocksToUpload: for bl in kv.get('blocksToUpload'):
if not stringvalidators.validate_hash(bl): if not stringvalidators.validate_hash(bl):
logger.warn('Requested to upload invalid block', terminal=True) logger.warn('Requested to upload invalid block', terminal=True)
comm_inst.decrementThreadCount(TIMER_NAME) comm_inst.decrementThreadCount(TIMER_NAME)
@ -116,7 +116,7 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'):
session_manager.clean_session() session_manager.clean_session()
for x in finishedUploads: for x in finishedUploads:
try: try:
comm_inst.blocksToUpload.remove(x) kv.get('blocksToUpload').remove(x)
comm_inst.shared_state.get_by_string( comm_inst.shared_state.get_by_string(
'PublicAPI').hideBlocks.remove(x) 'PublicAPI').hideBlocks.remove(x)

View File

@ -116,11 +116,11 @@ class BlockUploadSessionManager:
# Remove the blocks from the sessions, upload list, # Remove the blocks from the sessions, upload list,
# and waitforshare list # and waitforshare list
try: try:
comm_inst.blocksToUpload.remove( kv.get('blocksToUpload').remove(
reconstructhash.reconstruct_hash(sess.block_hash)) reconstructhash.reconstruct_hash(sess.block_hash))
except ValueError: except ValueError:
pass pass
try: try:
comm_inst.blocksToUpload.remove(sess.block_hash) kv.get('blocksToUpload').remove(sess.block_hash)
except ValueError: except ValueError:
pass pass

View File

@ -121,11 +121,10 @@ class Block:
try: try:
self.bcontent = onionrusers.OnionrUser(self.signer).forwardDecrypt(self.bcontent) self.bcontent = onionrusers.OnionrUser(self.signer).forwardDecrypt(self.bcontent)
except (onionrexceptions.DecryptionError, nacl.exceptions.CryptoError) as e: except (onionrexceptions.DecryptionError, nacl.exceptions.CryptoError) as e:
#logger.error(str(e)) logger.error(str(e))
pass pass
except nacl.exceptions.CryptoError: except nacl.exceptions.CryptoError:
pass logger.debug('Could not decrypt block. Either invalid key or corrupted data')
#logger.debug('Could not decrypt block. Either invalid key or corrupted data')
except onionrexceptions.ReplayAttack: except onionrexceptions.ReplayAttack:
logger.warn('%s is possibly a replay attack' % (self.hash,)) logger.warn('%s is possibly a replay attack' % (self.hash,))
else: else: