Moved onlinePeers to KV to further reduce coupling
This commit is contained in:
parent
0e4e7bb050
commit
6a6718c9fd
15 changed files with 56 additions and 25 deletions
|
@ -60,8 +60,10 @@ class OnionrCommunicatorDaemon:
|
||||||
self.shared_state = shared_state # TooManyObjects module
|
self.shared_state = shared_state # TooManyObjects module
|
||||||
|
|
||||||
# populate kv values
|
# populate kv values
|
||||||
self.shared_state.get_by_string('DeadSimpleKV').put('blockQueue', {})
|
self.kv = self.shared_state.get_by_string('DeadSimpleKV')
|
||||||
self.shared_state.get_by_string('DeadSimpleKV').put('shutdown', False)
|
self.kv.put('blockQueue', {})
|
||||||
|
self.kv.put('shutdown', False)
|
||||||
|
self.kv.put('onlinePeers', [])
|
||||||
|
|
||||||
if config.get('general.offline_mode', False):
|
if config.get('general.offline_mode', False):
|
||||||
self.isOnline = False
|
self.isOnline = False
|
||||||
|
@ -82,7 +84,6 @@ class OnionrCommunicatorDaemon:
|
||||||
self.delay = 1
|
self.delay = 1
|
||||||
|
|
||||||
# lists of connected peers and peers we know we can't reach currently
|
# lists of connected peers and peers we know we can't reach currently
|
||||||
self.onlinePeers = []
|
|
||||||
self.offlinePeers = []
|
self.offlinePeers = []
|
||||||
self.cooldownPeer = {}
|
self.cooldownPeer = {}
|
||||||
self.connectTimes = {}
|
self.connectTimes = {}
|
||||||
|
|
|
@ -8,6 +8,7 @@ from typing import TYPE_CHECKING
|
||||||
from etc import humanreadabletime
|
from etc import humanreadabletime
|
||||||
import logger
|
import logger
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
from communicator import OnionrCommunicatorDaemon
|
from communicator import OnionrCommunicatorDaemon
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
@ -26,7 +27,7 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
|
|
||||||
def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
|
def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
|
||||||
"""Manage the comm_inst.onlinePeers attribute list.
|
"""Manage the kv.get('onlinePeers') attribute list.
|
||||||
|
|
||||||
Connect to more peers if we have none connected
|
Connect to more peers if we have none connected
|
||||||
"""
|
"""
|
||||||
|
@ -37,7 +38,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
|
||||||
return
|
return
|
||||||
logger.debug('Refreshing peer pool...')
|
logger.debug('Refreshing peer pool...')
|
||||||
max_peers = int(config.get('peers.max_connect', 10))
|
max_peers = int(config.get('peers.max_connect', 10))
|
||||||
needed = max_peers - len(comm_inst.onlinePeers)
|
needed = max_peers - len(kv.get('onlinePeers'))
|
||||||
|
|
||||||
last_seen = 'never'
|
last_seen = 'never'
|
||||||
if not isinstance(comm_inst.lastNodeSeen, type(None)):
|
if not isinstance(comm_inst.lastNodeSeen, type(None)):
|
||||||
|
@ -45,7 +46,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
|
||||||
comm_inst.lastNodeSeen)
|
comm_inst.lastNodeSeen)
|
||||||
|
|
||||||
for _ in range(needed):
|
for _ in range(needed):
|
||||||
if len(comm_inst.onlinePeers) == 0:
|
if len(kv.get('onlinePeers')) == 0:
|
||||||
comm_inst.connectNewPeer(useBootstrap=True)
|
comm_inst.connectNewPeer(useBootstrap=True)
|
||||||
else:
|
else:
|
||||||
comm_inst.connectNewPeer()
|
comm_inst.connectNewPeer()
|
||||||
|
@ -53,7 +54,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
|
||||||
if kv.get('shutdown'):
|
if kv.get('shutdown'):
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
if len(comm_inst.onlinePeers) == 0:
|
if len(kv.get('onlinePeers')) == 0:
|
||||||
logger.debug('Couldn\'t connect to any peers.' +
|
logger.debug('Couldn\'t connect to any peers.' +
|
||||||
f' Last node seen {last_seen} ago.')
|
f' Last node seen {last_seen} ago.')
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -4,8 +4,12 @@ Onionr - Private P2P Communication.
|
||||||
pick online peers in a communicator instance
|
pick online peers in a communicator instance
|
||||||
"""
|
"""
|
||||||
import secrets
|
import secrets
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import onionrexceptions
|
import onionrexceptions
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
|
@ -24,18 +28,19 @@ import onionrexceptions
|
||||||
|
|
||||||
def pick_online_peer(comm_inst):
|
def pick_online_peer(comm_inst):
|
||||||
"""Randomly picks peer from pool without bias (using secrets module)."""
|
"""Randomly picks peer from pool without bias (using secrets module)."""
|
||||||
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
ret_data = ''
|
ret_data = ''
|
||||||
peer_length = len(comm_inst.onlinePeers)
|
peer_length = len(kv.get('onlinePeers'))
|
||||||
if peer_length <= 0:
|
if peer_length <= 0:
|
||||||
raise onionrexceptions.OnlinePeerNeeded
|
raise onionrexceptions.OnlinePeerNeeded
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
peer_length = len(comm_inst.onlinePeers)
|
peer_length = len(kv.get('onlinePeers'))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Get a random online peer, securely.
|
# Get a random online peer, securely.
|
||||||
# May get stuck in loop if network is lost
|
# May get stuck in loop if network is lost
|
||||||
ret_data = comm_inst.onlinePeers[secrets.randbelow(peer_length)]
|
ret_data = kv.get('onlinePeers')[secrets.randbelow(peer_length)]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -2,6 +2,10 @@
|
||||||
|
|
||||||
remove an online peer from the pool in a communicator instance
|
remove an online peer from the pool in a communicator instance
|
||||||
"""
|
"""
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
|
@ -20,6 +24,7 @@ remove an online peer from the pool in a communicator instance
|
||||||
|
|
||||||
def remove_online_peer(comm_inst, peer):
|
def remove_online_peer(comm_inst, peer):
|
||||||
"""Remove an online peer."""
|
"""Remove an online peer."""
|
||||||
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
try:
|
try:
|
||||||
del comm_inst.connectTimes[peer]
|
del comm_inst.connectTimes[peer]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -29,6 +34,6 @@ def remove_online_peer(comm_inst, peer):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
comm_inst.onlinePeers.remove(peer)
|
kv.get('onlinePeers').remove(peer)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -30,6 +30,7 @@ import onionrexceptions
|
||||||
def announce_node(daemon):
|
def announce_node(daemon):
|
||||||
"""Announce our node to our peers."""
|
"""Announce our node to our peers."""
|
||||||
ret_data = False
|
ret_data = False
|
||||||
|
kv: "DeadSimpleKV" = daemon.shared_state.get_by_string("DeadSimpleKV")
|
||||||
|
|
||||||
# Do not let announceCache get too large
|
# Do not let announceCache get too large
|
||||||
if len(daemon.announceCache) >= 10000:
|
if len(daemon.announceCache) >= 10000:
|
||||||
|
@ -37,7 +38,7 @@ def announce_node(daemon):
|
||||||
|
|
||||||
if daemon.config.get('general.security_level', 0) == 0:
|
if daemon.config.get('general.security_level', 0) == 0:
|
||||||
# Announce to random online peers
|
# Announce to random online peers
|
||||||
for i in daemon.onlinePeers:
|
for i in kv.get('onlinePeers'):
|
||||||
if i not in daemon.announceCache and\
|
if i not in daemon.announceCache and\
|
||||||
i not in daemon.announceProgress:
|
i not in daemon.announceProgress:
|
||||||
peer = i
|
peer = i
|
||||||
|
|
|
@ -75,7 +75,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
|
||||||
if its already been tried/connected, or if its cooled down
|
if its already been tried/connected, or if its cooled down
|
||||||
"""
|
"""
|
||||||
if len(address) == 0 or address in tried \
|
if len(address) == 0 or address in tried \
|
||||||
or address in comm_inst.onlinePeers \
|
or address in kv.get('onlinePeers') \
|
||||||
or address in comm_inst.cooldownPeer:
|
or address in comm_inst.cooldownPeer:
|
||||||
continue
|
continue
|
||||||
if kv.get('shutdown'):
|
if kv.get('shutdown'):
|
||||||
|
@ -87,9 +87,9 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
|
||||||
if address not in mainPeerList:
|
if address not in mainPeerList:
|
||||||
# Add a peer to our list if it isn't already since it connected
|
# Add a peer to our list if it isn't already since it connected
|
||||||
networkmerger.mergeAdders(address)
|
networkmerger.mergeAdders(address)
|
||||||
if address not in comm_inst.onlinePeers:
|
if address not in kv.get('onlinePeers'):
|
||||||
logger.info('Connected to ' + address, terminal=True)
|
logger.info('Connected to ' + address, terminal=True)
|
||||||
comm_inst.onlinePeers.append(address)
|
kv.get('onlinePeers').append(address)
|
||||||
comm_inst.connectTimes[address] = epoch.get_epoch()
|
comm_inst.connectTimes[address] = epoch.get_epoch()
|
||||||
retData = address
|
retData = address
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,13 @@
|
||||||
|
|
||||||
Select random online peer in a communicator instance and have them "cool down"
|
Select random online peer in a communicator instance and have them "cool down"
|
||||||
"""
|
"""
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from onionrutils import epoch
|
from onionrutils import epoch
|
||||||
from communicator import onlinepeers
|
from communicator import onlinepeers
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
|
@ -22,8 +27,9 @@ from communicator import onlinepeers
|
||||||
|
|
||||||
def cooldown_peer(comm_inst):
|
def cooldown_peer(comm_inst):
|
||||||
"""Randomly add an online peer to cooldown, so we can connect a new one."""
|
"""Randomly add an online peer to cooldown, so we can connect a new one."""
|
||||||
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
config = comm_inst.config
|
config = comm_inst.config
|
||||||
online_peer_amount = len(comm_inst.onlinePeers)
|
online_peer_amount = len(kv.get('onlinePeers'))
|
||||||
minTime = 300
|
minTime = 300
|
||||||
cooldown_time = 600
|
cooldown_time = 600
|
||||||
to_cool = ''
|
to_cool = ''
|
||||||
|
|
|
@ -66,7 +66,7 @@ def lookup_blocks_from_communicator(comm_inst):
|
||||||
continue
|
continue
|
||||||
# if we've already tried all the online peers this time around, stop
|
# if we've already tried all the online peers this time around, stop
|
||||||
if peer in triedPeers:
|
if peer in triedPeers:
|
||||||
if len(comm_inst.onlinePeers) == len(triedPeers):
|
if len(kv.get('onlinePeers')) == len(triedPeers):
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -9,6 +9,11 @@ import logger
|
||||||
from utils import netutils
|
from utils import netutils
|
||||||
from onionrutils import localcommand, epoch
|
from onionrutils import localcommand, epoch
|
||||||
from . import restarttor
|
from . import restarttor
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
|
@ -33,7 +38,7 @@ def net_check(comm_inst):
|
||||||
# for detecting if we have received incoming connections recently
|
# for detecting if we have received incoming connections recently
|
||||||
rec = False
|
rec = False
|
||||||
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
if len(comm_inst.onlinePeers) == 0:
|
if len(kv.get('onlinePeers')) == 0:
|
||||||
try:
|
try:
|
||||||
if (epoch.get_epoch() - int(localcommand.local_command
|
if (epoch.get_epoch() - int(localcommand.local_command
|
||||||
('/lastconnect'))) <= 60:
|
('/lastconnect'))) <= 60:
|
||||||
|
|
|
@ -66,7 +66,7 @@ class OnionrCommunicatorTimers:
|
||||||
if self.count == self.frequency and not self.kv.get('shutdown'):
|
if self.count == self.frequency and not self.kv.get('shutdown'):
|
||||||
try:
|
try:
|
||||||
if self.requires_peer and \
|
if self.requires_peer and \
|
||||||
len(self.daemon_inst.onlinePeers) == 0:
|
len(self.kv.get('onlinePeers')) == 0:
|
||||||
raise onionrexceptions.OnlinePeerNeeded
|
raise onionrexceptions.OnlinePeerNeeded
|
||||||
except onionrexceptions.OnlinePeerNeeded:
|
except onionrexceptions.OnlinePeerNeeded:
|
||||||
return
|
return
|
||||||
|
|
|
@ -17,6 +17,7 @@ from onionrutils import stringvalidators, basicrequests
|
||||||
import onionrcrypto
|
import onionrcrypto
|
||||||
from communicator import onlinepeers
|
from communicator import onlinepeers
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
from communicator import OnionrCommunicatorDaemon
|
from communicator import OnionrCommunicatorDaemon
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
@ -38,6 +39,7 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'):
|
||||||
"""Accept a communicator instance + upload blocks from its upload queue."""
|
"""Accept a communicator instance + upload blocks from its upload queue."""
|
||||||
"""when inserting a block, we try to upload
|
"""when inserting a block, we try to upload
|
||||||
it to a few peers to add some deniability & increase functionality"""
|
it to a few peers to add some deniability & increase functionality"""
|
||||||
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
TIMER_NAME = "upload_blocks_from_communicator"
|
TIMER_NAME = "upload_blocks_from_communicator"
|
||||||
|
|
||||||
session_manager: sessionmanager.BlockUploadSessionManager
|
session_manager: sessionmanager.BlockUploadSessionManager
|
||||||
|
@ -63,7 +65,7 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'):
|
||||||
comm_inst.decrementThreadCount(TIMER_NAME)
|
comm_inst.decrementThreadCount(TIMER_NAME)
|
||||||
return
|
return
|
||||||
session = session_manager.add_session(bl)
|
session = session_manager.add_session(bl)
|
||||||
for _ in range(min(len(comm_inst.onlinePeers), 6)):
|
for _ in range(min(len(kv.get('onlinePeers')), 6)):
|
||||||
try:
|
try:
|
||||||
peer = onlinepeers.pick_online_peer(comm_inst)
|
peer = onlinepeers.pick_online_peer(comm_inst)
|
||||||
except onionrexceptions.OnlinePeerNeeded:
|
except onionrexceptions.OnlinePeerNeeded:
|
||||||
|
|
|
@ -4,6 +4,7 @@ Manager for upload 'sessions'
|
||||||
"""
|
"""
|
||||||
from typing import List, Union, TYPE_CHECKING
|
from typing import List, Union, TYPE_CHECKING
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from deadsimplekv import DeadSimpleKV
|
||||||
from session import UploadSession
|
from session import UploadSession
|
||||||
|
|
||||||
from onionrutils import bytesconverter
|
from onionrutils import bytesconverter
|
||||||
|
@ -84,10 +85,12 @@ class BlockUploadSessionManager:
|
||||||
comm_inst: 'OnionrCommunicatorDaemon' # type: ignore
|
comm_inst: 'OnionrCommunicatorDaemon' # type: ignore
|
||||||
comm_inst = self._too_many.get_by_string( # pylint: disable=E1101 type: ignore
|
comm_inst = self._too_many.get_by_string( # pylint: disable=E1101 type: ignore
|
||||||
"OnionrCommunicatorDaemon")
|
"OnionrCommunicatorDaemon")
|
||||||
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string(
|
||||||
|
"DeadSimpleKV")
|
||||||
sessions_to_delete = []
|
sessions_to_delete = []
|
||||||
if comm_inst.getUptime() < 120:
|
if comm_inst.getUptime() < 120:
|
||||||
return
|
return
|
||||||
onlinePeerCount = len(comm_inst.onlinePeers)
|
onlinePeerCount = len(kv.get('onlinePeers'))
|
||||||
|
|
||||||
# If we have no online peers right now,
|
# If we have no online peers right now,
|
||||||
if onlinePeerCount == 0:
|
if onlinePeerCount == 0:
|
||||||
|
|
|
@ -40,7 +40,7 @@ def accept_upload(request):
|
||||||
try:
|
try:
|
||||||
b_hash = blockimporter.import_block_from_data(data)
|
b_hash = blockimporter.import_block_from_data(data)
|
||||||
if b_hash:
|
if b_hash:
|
||||||
if g.too_many.get_by_string("OnionrCommunicatorDaemon").onlinePeers:
|
if g.too_many.get_by_string("DeadSimpleKV").get('onlinePeers'):
|
||||||
spawn(
|
spawn(
|
||||||
localcommand.local_command,
|
localcommand.local_command,
|
||||||
f'/daemon-event/upload_event',
|
f'/daemon-event/upload_event',
|
||||||
|
|
|
@ -28,13 +28,15 @@ from onionrutils import epoch
|
||||||
def statistics_reporter(shared_state):
|
def statistics_reporter(shared_state):
|
||||||
server = config.get('statistics.server', '')
|
server = config.get('statistics.server', '')
|
||||||
if not config.get('statistics.i_dont_want_privacy', False) or \
|
if not config.get('statistics.i_dont_want_privacy', False) or \
|
||||||
not server: return
|
not server:
|
||||||
|
return
|
||||||
|
|
||||||
def compile_data():
|
def compile_data():
|
||||||
return {
|
return {
|
||||||
'time': epoch.get_epoch(),
|
'time': epoch.get_epoch(),
|
||||||
'adders': get_transports(),
|
'adders': get_transports(),
|
||||||
'peers': shared_state.get_by_string('OnionrCommunicatorDaemon').onlinePeers
|
'peers': shared_state.get_by_string(
|
||||||
|
'DeadSimpleKV').get('onlinePeers')
|
||||||
}
|
}
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
|
@ -54,7 +54,7 @@ class SerializedData:
|
||||||
comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,))
|
comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,))
|
||||||
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
|
||||||
connected = []
|
connected = []
|
||||||
[connected.append(x) for x in comm_inst.onlinePeers if x not in connected]
|
[connected.append(x) for x in kv.get('onlinePeers') if x not in connected]
|
||||||
stats['uptime'] = comm_inst.getUptime()
|
stats['uptime'] = comm_inst.getUptime()
|
||||||
stats['connectedNodes'] = '\n'.join(connected)
|
stats['connectedNodes'] = '\n'.join(connected)
|
||||||
stats['blockCount'] = len(blockmetadb.get_block_list())
|
stats['blockCount'] = len(blockmetadb.get_block_list())
|
||||||
|
|
Loading…
Reference in a new issue