mostly finished refactoring communicator into a module

master
Kevin Froman 2019-07-11 02:39:35 -05:00
parent 9bf6c76557
commit dbd154d450
8 changed files with 73 additions and 91 deletions

View File

@ -27,7 +27,7 @@ from communicatorutils import downloadblocks, lookupblocks, lookupadders
from communicatorutils import servicecreator, connectnewpeers, uploadblocks
from communicatorutils import daemonqueuehandler, announcenode, deniableinserts
from communicatorutils import cooldownpeer, housekeeping, netcheck
from onionrutils import localcommand, epoch, basicrequests
from onionrutils import localcommand, epoch
from etc import humanreadabletime
import onionrservices, onionr, onionrproofs
OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers
@ -117,13 +117,10 @@ class OnionrCommunicatorDaemon:
OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, myArgs=[self], requiresPeer=True)
# Timer to read the upload queue and upload the entries to peers
OnionrCommunicatorTimers(self, self.uploadBlock, 5, requiresPeer=True, maxThreads=1)
OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, myArgs=[self], requiresPeer=True, maxThreads=1)
# Timer to process the daemon command queue
OnionrCommunicatorTimers(self, self.daemonCommands, 6, maxThreads=3)
# Timer that kills Onionr if the API server crashes
#OnionrCommunicatorTimers(self, self.detectAPICrash, 30, maxThreads=1)
OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, myArgs=[self], maxThreads=3)
# Setup direct connections
if config.get('general.socket_servers', False):
@ -207,49 +204,15 @@ class OnionrCommunicatorDaemon:
except KeyError:
pass
def addBootstrapListToPeerList(self, peerList):
'''
Add the bootstrap list to the peer list (no duplicates)
'''
for i in self._core.bootstrapList:
if i not in peerList and i not in self.offlinePeers and i != self._core.hsAddress and len(str(i).strip()) > 0:
peerList.append(i)
self._core.addAddress(i)
def connectNewPeer(self, peer='', useBootstrap=False):
'''Adds a new random online peer to self.onlinePeers'''
connectnewpeers.connect_new_peer_to_communicator(self, peer, useBootstrap)
def removeOnlinePeer(self, peer):
'''Remove an online peer'''
try:
del self.connectTimes[peer]
except KeyError:
pass
try:
del self.dbTimestamps[peer]
except KeyError:
pass
try:
self.onlinePeers.remove(peer)
except ValueError:
pass
def peerCleanup(self):
'''This just calls onionrpeers.cleanupPeers, which removes dead or bad peers (offline too long, too slow)'''
onionrpeers.peerCleanup(self._core)
self.decrementThreadCount('peerCleanup')
def printOnlinePeers(self):
'''logs online peer list'''
if len(self.onlinePeers) == 0:
logger.warn('No online peers', terminal=True)
else:
logger.info('Online peers:', terminal=True)
for i in self.onlinePeers:
score = str(self.getPeerProfileInstance(i).score)
logger.info(i + ', score: ' + score, terminal=True)
def getPeerProfileInstance(self, peer):
'''Gets a peer profile instance from the list of profiles, by address name'''
for i in self.peerProfiles:
@ -270,35 +233,11 @@ class OnionrCommunicatorDaemon:
logger.debug('Heartbeat. Node running for %s.' % humanreadabletime.human_readable_time(self.getUptime()))
self.decrementThreadCount('heartbeat')
def daemonCommands(self):
'''
Process daemon commands from daemonQueue
'''
daemonqueuehandler.handle_daemon_commands(self)
def uploadBlock(self):
'''Upload our block to a few peers'''
uploadblocks.upload_blocks_from_communicator(self)
def announce(self, peer):
'''Announce to peers our address'''
if announcenode.announce_node(self) == False:
logger.warn('Could not introduce node.', terminal=True)
def detectAPICrash(self):
'''exit if the api server crashes/stops'''
if localcommand.local_command(self._core, 'ping', silent=False) not in ('pong', 'pong!'):
for i in range(300):
if localcommand.local_command(self._core, 'ping') in ('pong', 'pong!') or self.shutdown:
break # break for loop
time.sleep(1)
else:
# This executes if the api is NOT detected to be running
events.event('daemon_crash', onionr = self._core.onionrInst, data = {})
logger.fatal('Daemon detected API crash (or otherwise unable to reach API after long time), stopping...', terminal=True)
self.shutdown = True
self.decrementThreadCount('detectAPICrash')
def runCheck(self):
if run_file_exists(self):
logger.debug('Status check; looks good.')

View File

@ -0,0 +1,27 @@
'''
Onionr - Private P2P Communication
add bootstrap peers to the communicator peer list
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
def add_bootstrap_list_to_peer_list(comm_inst, peerList):
'''
Add the bootstrap list to the peer list (no duplicates)
'''
for i in comm_inst._core.bootstrapList:
if i not in peerList and i not in comm_inst.offlinePeers and i != comm_inst._core.hsAddress and len(str(i).strip()) > 0:
peerList.append(i)
comm_inst._core.addAddress(i)

View File

@ -1,25 +1,6 @@
'''
Onionr - Private P2P Communication
interact with the peer pool in a communicator instance
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from . import clearofflinepeer, onlinepeers, pickonlinepeers
from . import clearofflinepeer, onlinepeers, pickonlinepeers, removeonlinepeer
clear_offline_peer = clearofflinepeer.clear_offline_peer
get_online_peers = onlinepeers.get_online_peers
pick_online_peer = pickonlinepeers.pick_online_peer
remove_online_peer = removeonlinepeer.remove_online_peer

View File

@ -0,0 +1,33 @@
'''
Onionr - Private P2P Communication
remove an online peer from the pool in a communicator instance
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
def remove_online_peer(comm_inst, peer):
'''Remove an online peer'''
try:
del comm_inst.connectTimes[peer]
except KeyError:
pass
try:
del comm_inst.dbTimestamps[peer]
except KeyError:
pass
try:
comm_inst.onlinePeers.remove(peer)
except ValueError:
pass

View File

@ -34,13 +34,14 @@ def peer_action(comm_inst, peer, action, data='', returnHeaders=False, max_resp_
try:
retData = basicrequests.do_get_request(comm_inst._core, url, port=comm_inst.proxyPort, max_size=max_resp_size)
except streamedrequests.exceptions.ResponseLimitReached:
logger.warn('Request failed due to max response size being overflowed', terminal=True)
retData = False
penalty_score = -100
# if request failed, (error), mark peer offline
if retData == False:
try:
comm_inst.getPeerProfileInstance(peer).addScore(penalty_score)
comm_inst.removeOnlinePeer(peer)
onlinepeers.remove_online_peer(comm_inst, peer)
if action != 'ping' and not comm_inst.shutdown:
logger.warn('Lost connection to ' + peer, terminal=True)
onlinepeers.get_online_peers(comm_inst) # Will only add a new peer to pool if needed

View File

@ -21,7 +21,7 @@ import time, sys, secrets
import onionrexceptions, logger, onionrpeers
from utils import networkmerger
from onionrutils import stringvalidators, epoch
from communicator import peeraction
from communicator import peeraction, bootstrappeers
def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
config = comm_inst._core.config
@ -50,7 +50,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
if len(peerList) == 0 or useBootstrap:
# Avoid duplicating bootstrap addresses in peerList
comm_inst.addBootstrapListToPeerList(peerList)
bootstrappeers.add_bootstrap_list_to_peer_list(comm_inst, peerList)
for address in peerList:
if not config.get('tor.v3onions') and len(address) == 62:

View File

@ -18,6 +18,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from onionrutils import epoch
from communicator import onlinepeers
def cooldown_peer(comm_inst):
'''Randomly add an online peer to cooldown, so we can connect a new one'''
onlinePeerAmount = len(comm_inst.onlinePeers)
@ -46,7 +47,7 @@ def cooldown_peer(comm_inst):
except ValueError:
break
else:
comm_inst.removeOnlinePeer(toCool)
onlinepeers.remove_online_peer(comm_inst, toCool)
comm_inst.cooldownPeer[toCool] = epoch.get_epoch()
comm_inst.decrementThreadCount('cooldown_peer')

View File

@ -35,7 +35,7 @@ def download_blocks_from_communicator(comm_inst):
blockPeers = []
removeFromQueue = True
if shoulddownload.should_download(comm_inst, blockHash):
if not shoulddownload.should_download(comm_inst, blockHash):
continue
if comm_inst.shutdown or not comm_inst.isOnline or comm_inst._core.storage_counter.isFull():