work on removing communicator

master
Kevin 2020-07-30 20:15:36 -05:00
parent 0b34aa7385
commit 4cf17ffe62
11 changed files with 52 additions and 60 deletions

View File

@ -14,7 +14,5 @@ def detect_disk_access(info):
return return
if identify_home() not in info[0]: if identify_home() not in info[0]:
if 'proc' in info[0]: if 'proc' not in info[0]: # if it is, it is onionr stats
logger.warn(f'[DISK MINISTRY] {info} - probably built in Onionr stats')
else:
logger.warn(f'[DISK MINISTRY] {info}') logger.warn(f'[DISK MINISTRY] {info}')

View File

@ -86,14 +86,6 @@ class OnionrCommunicatorDaemon:
# extends our upload list and saves our list when Onionr exits # extends our upload list and saves our list when Onionr exits
uploadqueue.UploadQueue(self) uploadqueue.UploadQueue(self)
# Set timers, function reference, seconds
# requires_peer True means the timer function won't fire if we
# have no connected peers
peerPoolTimer = OnionrCommunicatorTimers(
self, onlinepeers.get_online_peers, 60, max_threads=1,
my_args=[self])
# Timers to periodically lookup new blocks and download them # Timers to periodically lookup new blocks and download them
lookup_blocks_timer = OnionrCommunicatorTimers( lookup_blocks_timer = OnionrCommunicatorTimers(
self, self,
@ -184,7 +176,6 @@ class OnionrCommunicatorDaemon:
self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1) self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1)
# Adjust initial timer triggers # Adjust initial timer triggers
peerPoolTimer.count = (peerPoolTimer.frequency - 1)
cleanupTimer.count = (cleanupTimer.frequency - 60) cleanupTimer.count = (cleanupTimer.frequency - 60)
blockCleanupTimer.count = (blockCleanupTimer.frequency - 2) blockCleanupTimer.count = (blockCleanupTimer.frequency - 2)
lookup_blocks_timer = (lookup_blocks_timer.frequency - 2) lookup_blocks_timer = (lookup_blocks_timer.frequency - 2)
@ -193,7 +184,7 @@ class OnionrCommunicatorDaemon:
if config.get('general.use_bootstrap_list', True): if config.get('general.use_bootstrap_list', True):
bootstrappeers.add_bootstrap_list_to_peer_list( bootstrappeers.add_bootstrap_list_to_peer_list(
self, [], db_only=True) self.kv, [], db_only=True)
daemoneventhooks.daemon_event_handlers(shared_state) daemoneventhooks.daemon_event_handlers(shared_state)
@ -257,11 +248,6 @@ class OnionrCommunicatorDaemon:
except KeyError: except KeyError:
pass pass
def connectNewPeer(self, peer='', useBootstrap=False):
"""Adds a new random online peer to self.onlinePeers"""
connectnewpeers.connect_new_peer_to_communicator(
self, peer, useBootstrap)
def peerCleanup(self): def peerCleanup(self):
"""This just calls onionrpeers.cleanupPeers. """This just calls onionrpeers.cleanupPeers.

View File

@ -4,9 +4,6 @@ add bootstrap peers to the communicator peer list
""" """
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV
from utils import readstatic, gettransports from utils import readstatic, gettransports
from coredb import keydb from coredb import keydb
""" """
@ -27,9 +24,8 @@ from coredb import keydb
bootstrap_peers = readstatic.read_static('bootstrap-nodes.txt').split(',') bootstrap_peers = readstatic.read_static('bootstrap-nodes.txt').split(',')
def add_bootstrap_list_to_peer_list(comm_inst, peerList, db_only=False): def add_bootstrap_list_to_peer_list(kv, peerList, db_only=False):
"""Add the bootstrap list to the peer list (no duplicates).""" """Add the bootstrap list to the peer list (no duplicates)."""
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
for i in bootstrap_peers: for i in bootstrap_peers:
if i not in peerList and i not in kv.get('offlinePeers') \ if i not in peerList and i not in kv.get('offlinePeers') \
and i not in gettransports.get() and len(str(i).strip()) > 0: and i not in gettransports.get() and len(str(i).strip()) > 0:

View File

@ -5,11 +5,12 @@ get online peers in a communicator instance
import time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import config
from etc.humanreadabletime import human_readable_time from etc.humanreadabletime import human_readable_time
from communicatorutils.connectnewpeers import connect_new_peer_to_communicator
import logger import logger
if TYPE_CHECKING: if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV from deadsimplekv import DeadSimpleKV
from communicator import OnionrCommunicatorDaemon
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -26,17 +27,15 @@ if TYPE_CHECKING:
""" """
def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): def get_online_peers(shared_state):
"""Manage the kv.get('onlinePeers') attribute list. """Manage the kv.get('onlinePeers') attribute list.
Connect to more peers if we have none connected Connect to more peers if we have none connected
""" """
config = comm_inst.config kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
if config.get('general.offline_mode', False): if config.get('general.offline_mode', False):
comm_inst.decrementThreadCount('get_online_peers')
return return
logger.debug('Refreshing peer pool...') logger.info('Refreshing peer pool...')
max_peers = int(config.get('peers.max_connect', 10)) max_peers = int(config.get('peers.max_connect', 10))
needed = max_peers - len(kv.get('onlinePeers')) needed = max_peers - len(kv.get('onlinePeers'))
@ -46,9 +45,9 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
for _ in range(needed): for _ in range(needed):
if len(kv.get('onlinePeers')) == 0: if len(kv.get('onlinePeers')) == 0:
comm_inst.connectNewPeer(useBootstrap=True) connect_new_peer_to_communicator(shared_state, useBootstrap=True)
else: else:
comm_inst.connectNewPeer() connect_new_peer_to_communicator(shared_state)
if kv.get('shutdown'): if kv.get('shutdown'):
break break
@ -57,9 +56,8 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
logger.debug('Couldn\'t connect to any peers.' + logger.debug('Couldn\'t connect to any peers.' +
f' Last node seen {last_seen} ago.') f' Last node seen {last_seen} ago.')
try: try:
get_online_peers(comm_inst) get_online_peers(kv)
except RecursionError: except RecursionError:
pass pass
else: else:
kv.put('lastNodeSeen', time.time()) kv.put('lastNodeSeen', time.time())
comm_inst.decrementThreadCount('get_online_peers')

View File

@ -22,9 +22,8 @@ if TYPE_CHECKING:
""" """
def remove_online_peer(comm_inst, peer): def remove_online_peer(kv, peer):
"""Remove an online peer.""" """Remove an online peer."""
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
try: try:
del kv.get('connectTimes')[peer] del kv.get('connectTimes')[peer]
except KeyError: except KeyError:

View File

@ -2,11 +2,15 @@
This file implements logic for performing requests to Onionr peers This file implements logic for performing requests to Onionr peers
""" """
from typing import TYPE_CHECKING
import streamedrequests import streamedrequests
import logger import logger
from onionrutils import epoch, basicrequests from onionrutils import epoch, basicrequests
from coredb import keydb from coredb import keydb
from . import onlinepeers from . import onlinepeers
from onionrtypes import OnionAddressString
from onionrpeers.peerprofiles import PeerProfiles
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -23,17 +27,27 @@ from . import onlinepeers
""" """
def peer_action(comm_inst, peer, action, def get_peer_profile(kv, address: OnionAddressString) -> 'PeerProfiles':
profile_inst_list = kv.get('peerProfiles')
for profile in profile_inst_list:
if profile.address == address:
return profile
p = PeerProfiles(address)
return p
def peer_action(shared_state, peer, action,
returnHeaders=False, max_resp_size=5242880): returnHeaders=False, max_resp_size=5242880):
"""Perform a get request to a peer.""" """Perform a get request to a peer."""
penalty_score = -10 penalty_score = -10
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
if len(peer) == 0: if len(peer) == 0:
return False return False
url = 'http://%s/%s' % (peer, action) url = 'http://%s/%s' % (peer, action)
try: try:
ret_data = basicrequests.do_get_request(url, port=comm_inst.proxyPort, ret_data = basicrequests.do_get_request(url, port=kv.get('proxyPort'),
max_size=max_resp_size) max_size=max_resp_size)
except streamedrequests.exceptions.ResponseLimitReached: except streamedrequests.exceptions.ResponseLimitReached:
logger.warn( logger.warn(
@ -44,14 +58,14 @@ def peer_action(comm_inst, peer, action,
# if request failed, (error), mark peer offline # if request failed, (error), mark peer offline
if ret_data is False: if ret_data is False:
try: try:
comm_inst.getPeerProfileInstance(peer).addScore(penalty_score) get_peer_profile(kv, peer).addScore(penalty_score)
onlinepeers.remove_online_peer(comm_inst, peer) onlinepeers.remove_online_peer(kv, peer)
keydb.transportinfo.set_address_info( keydb.transportinfo.set_address_info(
peer, 'lastConnectAttempt', epoch.get_epoch()) peer, 'lastConnectAttempt', epoch.get_epoch())
if action != 'ping' and not kv.get('shutdown'): if action != 'ping' and not kv.get('shutdown'):
logger.warn(f'Lost connection to {peer}', terminal=True) logger.warn(f'Lost connection to {peer}', terminal=True)
# Will only add a new peer to pool if needed # Will only add a new peer to pool if needed
onlinepeers.get_online_peers(comm_inst) onlinepeers.get_online_peers(kv)
except ValueError: except ValueError:
pass pass
else: else:

View File

@ -13,6 +13,7 @@ from utils import networkmerger, gettransports
from onionrutils import stringvalidators, epoch from onionrutils import stringvalidators, epoch
from communicator import peeraction, bootstrappeers from communicator import peeraction, bootstrappeers
from coredb import keydb from coredb import keydb
import config
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -29,10 +30,9 @@ from coredb import keydb
""" """
def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): def connect_new_peer_to_communicator(shared_state, peer='', useBootstrap=False):
config = comm_inst.config
retData = False retData = False
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
tried = kv.get('offlinePeers') tried = kv.get('offlinePeers')
transports = gettransports.get() transports = gettransports.get()
if peer != '': if peer != '':
@ -63,7 +63,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
if len(peerList) == 0 or useBootstrap: if len(peerList) == 0 or useBootstrap:
# Avoid duplicating bootstrap addresses in peerList # Avoid duplicating bootstrap addresses in peerList
if config.get('general.use_bootstrap_list', True): if config.get('general.use_bootstrap_list', True):
bootstrappeers.add_bootstrap_list_to_peer_list(comm_inst, peerList) bootstrappeers.add_bootstrap_list_to_peer_list(kv, peerList)
for address in peerList: for address in peerList:
address = address.strip() address = address.strip()
@ -81,7 +81,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
if kv.get('shutdown'): if kv.get('shutdown'):
return return
# Ping a peer, # Ping a peer,
ret = peeraction.peer_action(comm_inst, address, 'ping') ret = peeraction.peer_action(shared_state, address, 'ping')
if ret == 'pong!': if ret == 'pong!':
time.sleep(0.1) time.sleep(0.1)
if address not in mainPeerList: if address not in mainPeerList:

View File

@ -28,12 +28,12 @@ if TYPE_CHECKING:
""" """
def lookup_new_peer_transports_with_communicator(comm_inst): def lookup_new_peer_transports_with_communicator(shared_state):
logger.info('Looking up new addresses...') logger.info('Looking up new addresses...')
tryAmount = 1 tryAmount = 1
newPeers = [] newPeers = []
transports = gettransports.get() transports = gettransports.get()
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV")
for i in range(tryAmount): for i in range(tryAmount):
# Download new peer address list from random online peers # Download new peer address list from random online peers
@ -41,7 +41,7 @@ def lookup_new_peer_transports_with_communicator(comm_inst):
# Don't get new peers if we have too many queued up # Don't get new peers if we have too many queued up
break break
try: try:
peer = onlinepeers.pick_online_peer(comm_inst) peer = onlinepeers.pick_online_peer()
newAdders = peeraction.peer_action(comm_inst, peer, action='pex') newAdders = peeraction.peer_action(comm_inst, peer, action='pex')
except onionrexceptions.OnlinePeerNeeded: except onionrexceptions.OnlinePeerNeeded:
continue continue

View File

@ -40,6 +40,7 @@ from lan.server import LANServer
from sneakernet import sneakernet_import_thread from sneakernet import sneakernet_import_thread
from onionrstatistics.devreporting import statistics_reporter from onionrstatistics.devreporting import statistics_reporter
from setupkvvars import setup_kv from setupkvvars import setup_kv
from .spawndaemonthreads import spawn_client_threads
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -139,6 +140,7 @@ def daemon():
# Initialize the quasi-global variables # Initialize the quasi-global variables
setup_kv(shared_state.get(DeadSimpleKV)) setup_kv(shared_state.get(DeadSimpleKV))
spawn_client_threads(shared_state)
shared_state.get(daemoneventsapi.DaemonEventsBP) shared_state.get(daemoneventsapi.DaemonEventsBP)
Thread(target=shared_state.get(apiservers.ClientAPI).start, Thread(target=shared_state.get(apiservers.ClientAPI).start,

View File

@ -12,7 +12,7 @@ import ujson as json
from coredb import blockmetadb from coredb import blockmetadb
from utils.sizeutils import size, human_size from utils.sizeutils import size, human_size
from utils.identifyhome import identify_home from utils.identifyhome import identify_home
import communicator from onionrutils.epoch import get_epoch
if TYPE_CHECKING: if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV from deadsimplekv import DeadSimpleKV
@ -57,13 +57,11 @@ class SerializedData:
self._too_many self._too_many
except AttributeError: except AttributeError:
sleep(1) sleep(1)
comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, kv: "DeadSimpleKV" = self._too_many.get_by_string("DeadSimpleKV")
args=(self._too_many,))
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
connected = [] connected = []
[connected.append(x) [connected.append(x)
for x in kv.get('onlinePeers') if x not in connected] for x in kv.get('onlinePeers') if x not in connected]
stats['uptime'] = kv.get('getUptime') stats['uptime'] = get_epoch() - kv.get('startTime')
stats['connectedNodes'] = '\n'.join(connected) stats['connectedNodes'] = '\n'.join(connected)
stats['blockCount'] = len(blockmetadb.get_block_list()) stats['blockCount'] = len(blockmetadb.get_block_list())
stats['blockQueueCount'] = len(kv.get('blockQueue')) stats['blockQueueCount'] = len(kv.get('blockQueue'))

View File

@ -3,23 +3,24 @@ from typing import Iterable
from threading import Thread from threading import Thread
from utils.bettersleep import better_sleep from time import sleep
def _onionr_thread(func: Callable, args: Iterable, def _onionr_thread(func: Callable, args: Iterable,
sleep: int, initial_sleep): sleep_secs: int, initial_sleep):
better_sleep(initial_sleep) if initial_sleep:
sleep(initial_sleep)
while True: while True:
func(*args) func(*args)
better_sleep(sleep) sleep(sleep_secs)
def add_onionr_thread( def add_onionr_thread(
func: Callable, args: Iterable, func: Callable, args: Iterable,
sleep: int, initial_sleep: int = 5): sleep_secs: int, initial_sleep: int = 5):
"""Spawn a new onionr thread that exits when the main thread does. """Spawn a new onionr thread that exits when the main thread does.
Runs in an infinite loop with sleep between calls Runs in an infinite loop with sleep between calls
Passes in an interable args and sleep variables""" Passes in an interable args and sleep variables"""
Thread(target=_onionr_thread, Thread(target=_onionr_thread,
args=(func, args, sleep, initial_sleep), daemon=True).start() args=(func, args, sleep_secs, initial_sleep), daemon=True).start()