progress removing communicator timers

master
Kevin Froman 2020-11-15 18:26:25 +00:00
parent bbd76da333
commit ecd2cc54da
5 changed files with 21 additions and 30 deletions

View File

@ -90,19 +90,15 @@ class OnionrCommunicatorDaemon:
lookupblocks.lookup_blocks_from_communicator, lookupblocks.lookup_blocks_from_communicator,
[self.shared_state], 25, 3) [self.shared_state], 25, 3)
add_onionr_thread(
"""The block download timer is accessed by the block lookup function downloadblocks.download_blocks_from_communicator,
to trigger faster download starts""" [self.shared_state],
self.download_blocks_timer = OnionrCommunicatorTimers( config.get('timers.getBlocks', 10), 1)
self, self.getBlocks, config.get('timers.getBlocks', 10),
requires_peer=True, max_threads=5)
add_onionr_thread(onlinepeers.clear_offline_peer, [self.kv], 58) add_onionr_thread(onlinepeers.clear_offline_peer, [self.kv], 58)
# Timer to cleanup old blocks add_onionr_thread(
blockCleanupTimer = OnionrCommunicatorTimers( housekeeping.clean_old_blocks, self.shared_state, 20, 1)
self, housekeeping.clean_old_blocks, 20, my_args=[self],
max_threads=1)
# Timer to discover new peers # Timer to discover new peers
OnionrCommunicatorTimers( OnionrCommunicatorTimers(
@ -173,7 +169,6 @@ class OnionrCommunicatorDaemon:
# Adjust initial timer triggers # Adjust initial timer triggers
cleanupTimer.count = (cleanupTimer.frequency - 60) cleanupTimer.count = (cleanupTimer.frequency - 60)
blockCleanupTimer.count = (blockCleanupTimer.frequency - 2)
shared_state.add(self) shared_state.add(self)
@ -228,10 +223,6 @@ class OnionrCommunicatorDaemon:
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
def getBlocks(self):
"""Download new blocks in queue."""
downloadblocks.download_blocks_from_communicator(self)
def decrementThreadCount(self, threadName): def decrementThreadCount(self, threadName):
"""Decrement amount of a thread name if more than zero. """Decrement amount of a thread name if more than zero.

View File

@ -44,10 +44,10 @@ from . import shoulddownload
storage_counter = storagecounter.StorageCounter() storage_counter = storagecounter.StorageCounter()
def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): def download_blocks_from_communicator(shared_state: "TooMany"):
"""Use communicator instance to download blocks in the comms's queue""" """Use communicator instance to download blocks in the comms's queue"""
blacklist = onionrblacklist.OnionrBlackList() blacklist = onionrblacklist.OnionrBlackList()
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter
count: int = 0 count: int = 0
metadata_validation_result: bool = False metadata_validation_result: bool = False
@ -61,7 +61,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
blockPeers = [] blockPeers = []
removeFromQueue = True removeFromQueue = True
if not shoulddownload.should_download(comm_inst, blockHash): if not shoulddownload.should_download(shared_state, blockHash):
continue continue
if kv.get('shutdown') or not kv.get('isOnline') or \ if kv.get('shutdown') or not kv.get('isOnline') or \
@ -90,7 +90,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
logger.info( logger.info(
f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],)) f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],))
content = peeraction.peer_action( content = peeraction.peer_action(
comm_inst.shared_state, peerUsed, shared_state, peerUsed,
'getdata/' + blockHash, 'getdata/' + blockHash,
max_resp_size=3000000) # block content from random peer max_resp_size=3000000) # block content from random peer
@ -171,4 +171,3 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
except KeyError: except KeyError:
pass pass
kv.get('currentDownloading').remove(blockHash) kv.get('currentDownloading').remove(blockHash)
comm_inst.decrementThreadCount('getBlocks')

View File

@ -21,11 +21,11 @@ from onionrblocks import onionrblacklist
""" """
def should_download(comm_inst, block_hash) -> bool: def should_download(shared_state, block_hash) -> bool:
"""Return bool for if a (assumed to exist) block should be downloaded.""" """Return bool for if a (assumed to exist) block should be downloaded."""
blacklist = onionrblacklist.OnionrBlackList() blacklist = onionrblacklist.OnionrBlackList()
should = True should = True
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
if block_hash in blockmetadb.get_block_list(): if block_hash in blockmetadb.get_block_list():
# Don't download block we have # Don't download block we have
should = False should = False

View File

@ -36,15 +36,15 @@ from etc.onionrvalues import DATABASE_LOCK_TIMEOUT
storage_counter = StorageCounter() storage_counter = StorageCounter()
def __remove_from_upload(comm_inst, block_hash: str): def __remove_from_upload(shared_state, block_hash: str):
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
try: try:
kv.get('blocksToUpload').remove(block_hash) kv.get('blocksToUpload').remove(block_hash)
except ValueError: except ValueError:
pass pass
def clean_old_blocks(comm_inst): def clean_old_blocks(shared_state):
"""Delete expired blocks + old blocks if disk allocation is near full""" """Delete expired blocks + old blocks if disk allocation is near full"""
blacklist = onionrblacklist.OnionrBlackList() blacklist = onionrblacklist.OnionrBlackList()
# Delete expired blocks # Delete expired blocks
@ -52,7 +52,7 @@ def clean_old_blocks(comm_inst):
blacklist.addToDB(bHash) blacklist.addToDB(bHash)
removeblock.remove_block(bHash) removeblock.remove_block(bHash)
onionrstorage.deleteBlock(bHash) onionrstorage.deleteBlock(bHash)
__remove_from_upload(comm_inst, bHash) __remove_from_upload(shared_state, bHash)
logger.info('Deleted block: %s' % (bHash,)) logger.info('Deleted block: %s' % (bHash,))
while storage_counter.is_full(): while storage_counter.is_full():
@ -64,11 +64,9 @@ def clean_old_blocks(comm_inst):
blacklist.addToDB(oldest) blacklist.addToDB(oldest)
removeblock.remove_block(oldest) removeblock.remove_block(oldest)
onionrstorage.deleteBlock(oldest) onionrstorage.deleteBlock(oldest)
__remove_from_upload(comm_inst, oldest) __remove_from_upload(shared_state, oldest)
logger.info('Deleted block: %s' % (oldest,)) logger.info('Deleted block: %s' % (oldest,))
comm_inst.decrementThreadCount('clean_old_blocks')
def clean_keys(comm_inst): def clean_keys(comm_inst):
"""Delete expired forward secrecy keys""" """Delete expired forward secrecy keys"""

View File

@ -3,6 +3,7 @@ from typing import Iterable
import traceback import traceback
from threading import Thread from threading import Thread
from uuid import uuid4
from time import sleep from time import sleep
@ -11,6 +12,7 @@ import logger
def _onionr_thread(func: Callable, args: Iterable, def _onionr_thread(func: Callable, args: Iterable,
sleep_secs: int, initial_sleep): sleep_secs: int, initial_sleep):
thread_id = str(uuid4())
if initial_sleep: if initial_sleep:
sleep(initial_sleep) sleep(initial_sleep)
while True: while True:
@ -18,7 +20,8 @@ def _onionr_thread(func: Callable, args: Iterable,
func(*args) func(*args)
except Exception as _: # noqa except Exception as _: # noqa
logger.warn( logger.warn(
"Onionr thread exception \n" + traceback.format_exc(), f"Onionr thread exception in {thread_id} \n" +
traceback.format_exc(),
terminal=True) terminal=True)
sleep(sleep_secs) sleep(sleep_secs)