Moved blockQueue to DSKV singleton as part of communicator decoupling

master
Kevin 2020-07-24 14:37:01 -05:00
parent 47013431d2
commit 6ecb62356a
6 changed files with 37 additions and 25 deletions

View File

@ -59,6 +59,9 @@ class OnionrCommunicatorDaemon:
self.isOnline = True # Assume we're connected to the internet self.isOnline = True # Assume we're connected to the internet
self.shared_state = shared_state # TooManyObjects module self.shared_state = shared_state # TooManyObjects module
# populate kv values
self.shared_state.get_by_string('DeadSimpleKV').put('blockQueue', {})
if config.get('general.offline_mode', False): if config.get('general.offline_mode', False):
self.isOnline = False self.isOnline = False
@ -97,11 +100,7 @@ class OnionrCommunicatorDaemon:
# set true when shutdown command received # set true when shutdown command received
self.shutdown = False self.shutdown = False
# list of new blocks to download # list of blocks currently downloading
# added to when new block lists are fetched from peers
self.blockQueue = {}
# list of blocks currently downloading, avoid s
self.currentDownloading = [] self.currentDownloading = []
# timestamp when the last online node was seen # timestamp when the last online node was seen

View File

@ -6,6 +6,7 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from communicator import OnionrCommunicatorDaemon from communicator import OnionrCommunicatorDaemon
from deadsimplekv import DeadSimpleKV
from gevent import spawn from gevent import spawn
@ -45,15 +46,16 @@ storage_counter = storagecounter.StorageCounter()
def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
"""Use communicator instance to download blocks in the comms's queue""" """Use communicator instance to download blocks in the comms's queue"""
blacklist = onionrblacklist.OnionrBlackList() blacklist = onionrblacklist.OnionrBlackList()
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter
count: int = 0 count: int = 0
metadata_validation_result: bool = False metadata_validation_result: bool = False
# Iterate the block queue in the communicator # Iterate the block queue in the communicator
for blockHash in list(comm_inst.blockQueue): for blockHash in list(kv.get('blockQueue')):
count += 1 count += 1
try: try:
blockPeers = list(comm_inst.blockQueue[blockHash]) blockPeers = list(kv.get('blockQueue')[blockHash])
except KeyError: except KeyError:
blockPeers = [] blockPeers = []
removeFromQueue = True removeFromQueue = True
@ -61,7 +63,8 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
if not shoulddownload.should_download(comm_inst, blockHash): if not shoulddownload.should_download(comm_inst, blockHash):
continue continue
if comm_inst.shutdown or not comm_inst.isOnline or storage_counter.is_full(): if comm_inst.shutdown or not comm_inst.isOnline or \
storage_counter.is_full():
# Exit loop if shutting down or offline, or disk allocation reached # Exit loop if shutting down or offline, or disk allocation reached
break break
# Do not download blocks being downloaded # Do not download blocks being downloaded
@ -82,8 +85,12 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
peerUsed = blockPeers.pop(0) peerUsed = blockPeers.pop(0)
if not comm_inst.shutdown and peerUsed.strip() != '': if not comm_inst.shutdown and peerUsed.strip() != '':
logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed)) logger.info(
content = peeraction.peer_action(comm_inst, peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer (includes metadata) f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],))
content = peeraction.peer_action(
comm_inst, peerUsed,
'getdata/' + blockHash,
max_resp_size=3000000) # block content from random peer
if content is not False and len(content) > 0: if content is not False and len(content) > 0:
try: try:
@ -151,10 +158,10 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
removeFromQueue = False # Don't remove from queue if 404 removeFromQueue = False # Don't remove from queue if 404
if removeFromQueue: if removeFromQueue:
try: try:
del comm_inst.blockQueue[blockHash] # remove from block queue both if success or false del kv.get('blockQueue')[blockHash] # remove from block queue both if success or false
if count == LOG_SKIP_COUNT: if count == LOG_SKIP_COUNT:
logger.info('%s blocks remaining in queue' % logger.info('%s blocks remaining in queue' %
[len(comm_inst.blockQueue)], terminal=True) [len(kv.get('blockQueue'))], terminal=True)
count = 0 count = 0
except KeyError: except KeyError:
pass pass

View File

@ -25,6 +25,7 @@ def should_download(comm_inst, block_hash) -> bool:
"""Return bool for if a (assumed to exist) block should be downloaded.""" """Return bool for if a (assumed to exist) block should be downloaded."""
blacklist = onionrblacklist.OnionrBlackList() blacklist = onionrblacklist.OnionrBlackList()
should = True should = True
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
if block_hash in blockmetadb.get_block_list(): if block_hash in blockmetadb.get_block_list():
# Don't download block we have # Don't download block we have
should = False should = False
@ -35,7 +36,7 @@ def should_download(comm_inst, block_hash) -> bool:
if should is False: if should is False:
# Remove block from communicator queue if it shouldn't be downloaded # Remove block from communicator queue if it shouldn't be downloaded
try: try:
del comm_inst.blockQueue[block_hash] del kv.get('blockQueue')[block_hash]
except KeyError: except KeyError:
pass pass
return should return should

View File

@ -45,10 +45,11 @@ def lookup_blocks_from_communicator(comm_inst):
maxBacklog = 1560 maxBacklog = 1560
lastLookupTime = 0 # Last time we looked up a particular peer's list lastLookupTime = 0 # Last time we looked up a particular peer's list
new_block_count = 0 new_block_count = 0
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
for i in range(tryAmount): for i in range(tryAmount):
# Defined here to reset it each time, time offset is added later # Defined here to reset it each time, time offset is added later
listLookupCommand = 'getblocklist' listLookupCommand = 'getblocklist'
if len(comm_inst.blockQueue) >= maxBacklog: if len(kv.get('blockQueue')) >= maxBacklog:
break break
if not comm_inst.isOnline: if not comm_inst.isOnline:
break break
@ -100,19 +101,19 @@ def lookup_blocks_from_communicator(comm_inst):
# if block does not exist on disk + is not already in queue # if block does not exist on disk + is not already in queue
if i not in existingBlocks: if i not in existingBlocks:
if i not in comm_inst.blockQueue: if i not in kv.get('blockQueue'):
if onionrproofs.hashMeetsDifficulty(i) and \ if onionrproofs.hashMeetsDifficulty(i) and \
not blacklist.inBlacklist(i): not blacklist.inBlacklist(i):
if len(comm_inst.blockQueue) <= 1000000: if len(kv.get('blockQueue')) <= 1000000:
# add blocks to download queue # add blocks to download queue
comm_inst.blockQueue[i] = [peer] kv.get('blockQueue')[i] = [peer]
new_block_count += 1 new_block_count += 1
comm_inst.dbTimestamps[peer] = \ comm_inst.dbTimestamps[peer] = \
epoch.get_rounded_epoch(roundS=60) epoch.get_rounded_epoch(roundS=60)
else: else:
if peer not in comm_inst.blockQueue[i]: if peer not in kv.get('blockQueue')[i]:
if len(comm_inst.blockQueue[i]) < 10: if len(kv.get('blockQueue')[i]) < 10:
comm_inst.blockQueue[i].append(peer) kv.get('blockQueue')[i].append(peer)
if new_block_count > 0: if new_block_count > 0:
block_string = "" block_string = ""
if new_block_count > 1: if new_block_count > 1:

View File

@ -10,6 +10,7 @@ from threading import Thread
from stem.connection import IncorrectPassword from stem.connection import IncorrectPassword
import toomanyobjs import toomanyobjs
import filenuke import filenuke
from deadsimplekv import DeadSimpleKV
import config import config
import onionrstatistics import onionrstatistics
@ -70,7 +71,7 @@ def _show_info_messages():
(logger.colors.underline + (logger.colors.underline +
getourkeypair.get_keypair()[0][:52])) getourkeypair.get_keypair()[0][:52]))
def _setup_online_mode(use_existing_tor: bool, def _setup_online_mode(use_existing_tor: bool,
net: NetController, net: NetController,
security_level: int): security_level: int):
if config.get('transports.tor', True): if config.get('transports.tor', True):
@ -131,6 +132,9 @@ def daemon():
shared_state = toomanyobjs.TooMany() shared_state = toomanyobjs.TooMany()
# Add DeadSimpleKV for quasi-global variables (ephemeral key-value)
shared_state.get(DeadSimpleKV)
shared_state.get(daemoneventsapi.DaemonEventsBP) shared_state.get(daemoneventsapi.DaemonEventsBP)
Thread(target=shared_state.get(apiservers.ClientAPI).start, Thread(target=shared_state.get(apiservers.ClientAPI).start,
@ -144,7 +148,7 @@ def daemon():
# Run time tests are not normally run # Run time tests are not normally run
shared_state.get(runtests.OnionrRunTestManager) shared_state.get(runtests.OnionrRunTestManager)
# Create singleton # Create singleton
shared_state.get(serializeddata.SerializedData) shared_state.get(serializeddata.SerializedData)
shared_state.share_object() # share the parent object to the threads shared_state.share_object() # share the parent object to the threads
@ -169,8 +173,7 @@ def daemon():
if not offline_mode: if not offline_mode:
# we need to setup tor for use # we need to setup tor for use
_setup_online_mode(use_existing_tor, net, security_level) _setup_online_mode(use_existing_tor, net, security_level)
_show_info_messages() _show_info_messages()
events.event('init', threaded=False) events.event('init', threaded=False)

View File

@ -52,12 +52,13 @@ class SerializedData:
except AttributeError: except AttributeError:
sleep(1) sleep(1)
comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,)) comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,))
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
connected = [] connected = []
[connected.append(x) for x in comm_inst.onlinePeers if x not in connected] [connected.append(x) for x in comm_inst.onlinePeers if x not in connected]
stats['uptime'] = comm_inst.getUptime() stats['uptime'] = comm_inst.getUptime()
stats['connectedNodes'] = '\n'.join(connected) stats['connectedNodes'] = '\n'.join(connected)
stats['blockCount'] = len(blockmetadb.get_block_list()) stats['blockCount'] = len(blockmetadb.get_block_list())
stats['blockQueueCount'] = len(comm_inst.blockQueue) stats['blockQueueCount'] = len(kv.get('blockQueue'))
stats['threads'] = proc.num_threads() stats['threads'] = proc.num_threads()
stats['ramPercent'] = proc.memory_percent() stats['ramPercent'] = proc.memory_percent()
stats['fd'] = get_open_files() stats['fd'] = get_open_files()