Moved onlinePeers and announceCache to kv for more decoupling
parent
6a6718c9fd
commit
e00d41f8a9
|
@ -64,6 +64,8 @@ class OnionrCommunicatorDaemon:
|
||||||
self.kv.put('blockQueue', {})
|
self.kv.put('blockQueue', {})
|
||||||
self.kv.put('shutdown', False)
|
self.kv.put('shutdown', False)
|
||||||
self.kv.put('onlinePeers', [])
|
self.kv.put('onlinePeers', [])
|
||||||
|
self.kv.put('currentDownloading', [])
|
||||||
|
self.kv.put('announceCache', {})
|
||||||
|
|
||||||
if config.get('general.offline_mode', False):
|
if config.get('general.offline_mode', False):
|
||||||
self.isOnline = False
|
self.isOnline = False
|
||||||
|
@ -92,16 +94,12 @@ class OnionrCommunicatorDaemon:
|
||||||
# Peers merged to us. Don't add to db until we know they're reachable
|
# Peers merged to us. Don't add to db until we know they're reachable
|
||||||
self.newPeers = []
|
self.newPeers = []
|
||||||
self.announceProgress = {}
|
self.announceProgress = {}
|
||||||
self.announceCache = {}
|
|
||||||
|
|
||||||
self.generating_blocks = []
|
self.generating_blocks = []
|
||||||
|
|
||||||
# amount of threads running by name, used to prevent too many
|
# amount of threads running by name, used to prevent too many
|
||||||
self.threadCounts = {}
|
self.threadCounts = {}
|
||||||
|
|
||||||
# list of blocks currently downloading
|
|
||||||
self.currentDownloading = []
|
|
||||||
|
|
||||||
# timestamp when the last online node was seen
|
# timestamp when the last online node was seen
|
||||||
self.lastNodeSeen = None
|
self.lastNodeSeen = None
|
||||||
|
|
||||||
|
|
|
@ -33,13 +33,13 @@ def announce_node(daemon):
|
||||||
kv: "DeadSimpleKV" = daemon.shared_state.get_by_string("DeadSimpleKV")
|
kv: "DeadSimpleKV" = daemon.shared_state.get_by_string("DeadSimpleKV")
|
||||||
|
|
||||||
# Do not let announceCache get too large
|
# Do not let announceCache get too large
|
||||||
if len(daemon.announceCache) >= 10000:
|
if len(kv.get('announceCache')) >= 10000:
|
||||||
daemon.announceCache.popitem()
|
kv.get('announceCache').popitem()
|
||||||
|
|
||||||
if daemon.config.get('general.security_level', 0) == 0:
|
if daemon.config.get('general.security_level', 0) == 0:
|
||||||
# Announce to random online peers
|
# Announce to random online peers
|
||||||
for i in kv.get('onlinePeers'):
|
for i in kv.get('onlinePeers'):
|
||||||
if i not in daemon.announceCache and\
|
if i not in kv.get('announceCache') and\
|
||||||
i not in daemon.announceProgress:
|
i not in daemon.announceProgress:
|
||||||
peer = i
|
peer = i
|
||||||
break
|
break
|
||||||
|
|
|
@ -47,7 +47,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
||||||
"""Use communicator instance to download blocks in the comms's queue"""
|
"""Use communicator instance to download blocks in the comms's queue"""
|
||||||
blacklist = onionrblacklist.OnionrBlackList()
|
blacklist = onionrblacklist.OnionrBlackList()
|
||||||
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter
|
LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter
|
||||||
count: int = 0
|
count: int = 0
|
||||||
metadata_validation_result: bool = False
|
metadata_validation_result: bool = False
|
||||||
# Iterate the block queue in the communicator
|
# Iterate the block queue in the communicator
|
||||||
|
@ -68,13 +68,14 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
||||||
# Exit loop if shutting down or offline, or disk allocation reached
|
# Exit loop if shutting down or offline, or disk allocation reached
|
||||||
break
|
break
|
||||||
# Do not download blocks being downloaded
|
# Do not download blocks being downloaded
|
||||||
if blockHash in comm_inst.currentDownloading:
|
if blockHash in kv.get('currentDownloading'):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if len(comm_inst.onlinePeers) == 0:
|
if len(kv.get('onlinePeers')) == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
comm_inst.currentDownloading.append(blockHash) # So we can avoid concurrent downloading in other threads of same block
|
# So we can avoid concurrent downloading in other threads of same block
|
||||||
|
kv.get('currentDownloading').append(blockHash)
|
||||||
if len(blockPeers) == 0:
|
if len(blockPeers) == 0:
|
||||||
try:
|
try:
|
||||||
peerUsed = onlinepeers.pick_online_peer(comm_inst)
|
peerUsed = onlinepeers.pick_online_peer(comm_inst)
|
||||||
|
@ -165,5 +166,5 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
||||||
count = 0
|
count = 0
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
comm_inst.currentDownloading.remove(blockHash)
|
kv.get('currentDownloading').remove(blockHash)
|
||||||
comm_inst.decrementThreadCount('getBlocks')
|
comm_inst.decrementThreadCount('getBlocks')
|
||||||
|
|
Loading…
Reference in New Issue