From c469c5d871d6d1efc5ab944c965f69bbbbb941b1 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Fri, 20 Dec 2019 02:59:27 -0600 Subject: [PATCH] made communicator init have better lint compliance --- src/communicator/__init__.py | 170 +++++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 55 deletions(-) diff --git a/src/communicator/__init__.py b/src/communicator/__init__.py index 072bda84..e0be6382 100755 --- a/src/communicator/__init__.py +++ b/src/communicator/__init__.py @@ -1,9 +1,11 @@ -''' +""" Onionr - Private P2P Communication - This file contains both the OnionrCommunicate class for communcating with peers - and code to operate as a daemon, getting commands from the command queue database (see core.Core.daemonQueue) -''' + This file contains both the OnionrCommunicate class for + communcating with peers and code to operate as a daemon, + getting commands from the command queue database + (see core.Core.daemonQueue) +""" import os import time @@ -33,8 +35,8 @@ from onionrblocks import storagecounter from coredb import daemonqueue from coredb import dbfiles from netcontroller import NetController - -''' +from . import bootstrappeers +""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or @@ -47,11 +49,13 @@ from netcontroller import NetController You should have received a copy of the GNU General Public License along with this program. If not, see . -''' +""" OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers config.reload() + + class OnionrCommunicatorDaemon: def __init__(self, shared_state, developmentMode=None): if developmentMode is None: @@ -74,7 +78,8 @@ class OnionrCommunicatorDaemon: # Upload information, list of blocks to upload self.blocksToUpload = [] - self.upload_session_manager = self.shared_state.get(uploadblocks.sessionmanager.BlockUploadSessionManager) + self.upload_session_manager = self.shared_state.get( + uploadblocks.sessionmanager.BlockUploadSessionManager) self.shared_state.share_object() # loop time.sleep delay in seconds @@ -100,7 +105,8 @@ class OnionrCommunicatorDaemon: # set true when shutdown command received self.shutdown = False - # list of new blocks to download, added to when new block lists are fetched from peers + # list of new blocks to download + # added to when new block lists are fetched from peers self.blockQueue = {} # list of blocks currently downloading, avoid s @@ -109,7 +115,8 @@ class OnionrCommunicatorDaemon: # timestamp when the last online node was seen self.lastNodeSeen = None - # Dict of time stamps for peer's block list lookup times, to avoid downloading full lists all the time + # Dict of time stamps for peer's block list lookup times, + # to avoid downloading full lists all the time self.dbTimestamps = {} # Clear the daemon queue for any dead messages @@ -122,73 +129,111 @@ class OnionrCommunicatorDaemon: # time app started running for info/statistics purposes self.startTime = epoch.get_epoch() - uploadqueue.UploadQueue(self) # extends our upload list and saves our list when Onionr exits + # extends our upload list and saves our list when Onionr exits + uploadqueue.UploadQueue(self) if developmentMode: OnionrCommunicatorTimers(self, self.heartbeat, 30) # Set timers, function reference, seconds - # requires_peer True means the timer function won't fire if we have no connected peers - peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, max_threads=1, my_args=[self]) + # requires_peer True means the timer function won't fire if we + # have no connected peers + peerPoolTimer = OnionrCommunicatorTimers( + self, onlinepeers.get_online_peers, 60, max_threads=1, + my_args=[self]) OnionrCommunicatorTimers(self, self.runCheck, 2, max_threads=1) # Timers to periodically lookup new blocks and download them - lookup_blocks_timer = OnionrCommunicatorTimers(self, lookupblocks.lookup_blocks_from_communicator, config.get('timers.lookupBlocks', 25), my_args=[self], requires_peer=True, max_threads=1) - # The block download timer is accessed by the block lookup function to trigger faster download starts - self.download_blocks_timer = OnionrCommunicatorTimers(self, self.getBlocks, config.get('timers.getBlocks', 10), requires_peer=True, max_threads=5) + lookup_blocks_timer = OnionrCommunicatorTimers( + self, + lookupblocks.lookup_blocks_from_communicator, + config.get('timers.lookupBlocks', 25), + my_args=[self], requires_peer=True, max_threads=1) - # Timer to reset the longest offline peer so contact can be attempted again - OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, my_args=[self]) + """The block download timer is accessed by the block lookup function + to trigger faster download starts""" + self.download_blocks_timer = OnionrCommunicatorTimers( + self, self.getBlocks, config.get('timers.getBlocks', 10), + requires_peer=True, max_threads=5) + + # Timer to reset the longest offline peer + # so contact can be attempted again + OnionrCommunicatorTimers( + self, onlinepeers.clear_offline_peer, 58, my_args=[self]) # Timer to cleanup old blocks - blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, my_args=[self]) + blockCleanupTimer = OnionrCommunicatorTimers( + self, housekeeping.clean_old_blocks, 20, my_args=[self]) # Timer to discover new peers - OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requires_peer=True, my_args=[self], max_threads=2) + OnionrCommunicatorTimers( + self, lookupadders.lookup_new_peer_transports_with_communicator, + 60, requires_peer=True, my_args=[self], max_threads=2) - # Timer for adjusting which peers we actively communicate to at any given time, to avoid over-using peers - OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, my_args=[self], requires_peer=True) + # Timer for adjusting which peers + # we actively communicate to at any given time, + # to avoid over-using peers + OnionrCommunicatorTimers( + self, cooldownpeer.cooldown_peer, 30, + my_args=[self], requires_peer=True) # Timer to read the upload queue and upload the entries to peers - OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, my_args=[self], requires_peer=True, max_threads=1) + OnionrCommunicatorTimers( + self, uploadblocks.upload_blocks_from_communicator, + 5, my_args=[self], requires_peer=True, max_threads=1) # Timer to process the daemon command queue - OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, my_args=[self], max_threads=3) + OnionrCommunicatorTimers( + self, daemonqueuehandler.handle_daemon_commands, + 6, my_args=[self], max_threads=3) # Setup direct connections if config.get('general.socket_servers', False): self.services = onionrservices.OnionrServices() self.active_services = [] self.service_greenlets = [] - OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, max_threads=50, my_args=[self]) + OnionrCommunicatorTimers( + self, servicecreator.service_creator, 5, + max_threads=50, my_args=[self]) else: self.services = None - + # {peer_pubkey: ephemeral_address}, the address to reach them self.direct_connection_clients = {} - - # This timer creates deniable blocks, in an attempt to further obfuscate block insertion metadata + + # This timer creates deniable blocks, + # in an attempt to further obfuscate block insertion metadata if config.get('general.insert_deniable_blocks', True): - deniableBlockTimer = OnionrCommunicatorTimers(self, deniableinserts.insert_deniable_block, 180, my_args=[self], requires_peer=True, max_threads=1) + deniableBlockTimer = OnionrCommunicatorTimers( + self, deniableinserts.insert_deniable_block, + 180, my_args=[self], requires_peer=True, max_threads=1) deniableBlockTimer.count = (deniableBlockTimer.frequency - 175) - # Timer to check for connectivity, through Tor to various high-profile onion services + # Timer to check for connectivity, + # through Tor to various high-profile onion services OnionrCommunicatorTimers(self, netcheck.net_check, 500, my_args=[self], max_threads=1) - # Announce the public API server transport address to other nodes if security level allows - if config.get('general.security_level', 1) == 0 and config.get('general.announce_node', True): + # Announce the public API server transport address + # to other nodes if security level allows + if config.get('general.security_level', 1) == 0 \ + and config.get('general.announce_node', True): # Default to high security level incase config breaks - announceTimer = OnionrCommunicatorTimers(self, announcenode.announce_node, 3600, my_args=[self], requires_peer=True, max_threads=1) + announceTimer = OnionrCommunicatorTimers( + self, + announcenode.announce_node, + 3600, my_args=[self], requires_peer=True, max_threads=1) announceTimer.count = (announceTimer.frequency - 120) else: logger.debug('Will not announce node.') - - # Timer to delete malfunctioning or long-dead peers - cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requires_peer=True) - # Timer to cleanup dead ephemeral forward secrecy keys - forwardSecrecyTimer = OnionrCommunicatorTimers(self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1) + # Timer to delete malfunctioning or long-dead peers + cleanupTimer = OnionrCommunicatorTimers( + self, self.peerCleanup, 300, requires_peer=True) + + # Timer to cleanup dead ephemeral forward secrecy keys + OnionrCommunicatorTimers( + self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1) # Adjust initial timer triggers peerPoolTimer.count = (peerPoolTimer.frequency - 1) @@ -197,17 +242,21 @@ class OnionrCommunicatorDaemon: lookup_blocks_timer = (lookup_blocks_timer.frequency - 2) shared_state.add(self) - + if config.get('general.use_bootstrap', True): - bootstrappeers.add_bootstrap_list_to_peer_list(self, [], db_only=True) + bootstrappeers.add_bootstrap_list_to_peer_list( + self, [], db_only=True) if not config.get('onboarding.done', True): - logger.info('First run detected. Run openhome to get setup.', terminal=True) + logger.info( + 'First run detected. Run openhome to get setup.', + terminal=True) while not config.get('onboarding.done', True): time.sleep(5) - # Main daemon loop, mainly for calling timers, don't do any complex operations here to avoid locking + # Main daemon loop, mainly for calling timers, + # don't do any complex operations here to avoid locking try: while not self.shutdown: for i in self.timers: @@ -217,9 +266,9 @@ class OnionrCommunicatorDaemon: time.sleep(self.delay) except KeyboardInterrupt: self.shutdown = True - pass - logger.info('Goodbye. (Onionr is cleaning up, and will exit)', terminal=True) + logger.info( + 'Goodbye. (Onionr is cleaning up, and will exit)', terminal=True) try: self.service_greenlets except AttributeError: @@ -228,18 +277,21 @@ class OnionrCommunicatorDaemon: # Stop onionr direct connection services for server in self.service_greenlets: server.stop() - localcommand.local_command('shutdown') # shutdown the api + localcommand.local_command('shutdown') # shutdown the api try: time.sleep(0.5) except KeyboardInterrupt: pass def getBlocks(self): - '''download new blocks in queue''' + """Download new blocks in queue.""" downloadblocks.download_blocks_from_communicator(self) def decrementThreadCount(self, threadName): - '''Decrement amount of a thread name if more than zero, called when a function meant to be run in a thread ends''' + """Decrement amount of a thread name if more than zero. + + called when a function meant to be run in a thread ends + """ try: if self.threadCounts[threadName] > 0: self.threadCounts[threadName] -= 1 @@ -247,23 +299,27 @@ class OnionrCommunicatorDaemon: pass def connectNewPeer(self, peer='', useBootstrap=False): - '''Adds a new random online peer to self.onlinePeers''' - connectnewpeers.connect_new_peer_to_communicator(self, peer, useBootstrap) + """Adds a new random online peer to self.onlinePeers""" + connectnewpeers.connect_new_peer_to_communicator( + self, peer, useBootstrap) def peerCleanup(self): - '''This just calls onionrpeers.cleanupPeers, which removes dead or bad peers (offline too long, too slow)''' + """This just calls onionrpeers.cleanupPeers. + + Remove dead or bad peers (offline too long, too slow)""" onionrpeers.peer_cleanup() self.decrementThreadCount('peerCleanup') def getPeerProfileInstance(self, peer): - '''Gets a peer profile instance from the list of profiles, by address name''' + """Gets a peer profile instance from the list of profiles""" for i in self.peerProfiles: # if the peer's profile is already loaded, return that if i.address == peer: retData = i break else: - # if the peer's profile is not loaded, return a new one. connectNewPeer also adds it to the list on connect + # if the peer's profile is not loaded, return a new one. + # connectNewPeer also adds it to the list on connect retData = onionrpeers.PeerProfiles(peer) self.peerProfiles.append(retData) return retData @@ -272,21 +328,25 @@ class OnionrCommunicatorDaemon: return epoch.get_epoch() - self.startTime def heartbeat(self): - '''Show a heartbeat debug message''' - logger.debug('Heartbeat. Node running for %s.' % humanreadabletime.human_readable_time(self.getUptime())) + """Show a heartbeat debug message.""" + logger.debug('Heartbeat. Node running for %s.' % + humanreadabletime.human_readable_time(self.getUptime())) self.decrementThreadCount('heartbeat') def runCheck(self): + """Show message if run file exists""" if run_file_exists(self): logger.debug('Status check; looks good.') self.decrementThreadCount('runCheck') + def startCommunicator(shared_state): OnionrCommunicatorDaemon(shared_state) + def run_file_exists(daemon): if os.path.isfile(filepaths.run_check_file): os.remove(filepaths.run_check_file) return True - return False \ No newline at end of file + return False