From ab49e3eaf66606b3e561356e913f3bc0b289483e Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Thu, 12 Sep 2019 14:50:06 -0500 Subject: [PATCH] improved typing for communicatortimes, give api threads a name --- onionr/communicator/__init__.py | 34 +++++----- .../onionrcommunicatortimers.py | 63 ++++++++++++------- onionr/onionrcommands/daemonlaunch.py | 13 ++-- 3 files changed, 64 insertions(+), 46 deletions(-) diff --git a/onionr/communicator/__init__.py b/onionr/communicator/__init__.py index db169f5c..0a853f3a 100755 --- a/onionr/communicator/__init__.py +++ b/onionr/communicator/__init__.py @@ -101,39 +101,39 @@ class OnionrCommunicatorDaemon: OnionrCommunicatorTimers(self, self.heartbeat, 30) # Set timers, function reference, seconds - # requiresPeer True means the timer function won't fire if we have no connected peers - peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, maxThreads=1, myArgs=[self]) - OnionrCommunicatorTimers(self, self.runCheck, 2, maxThreads=1) + # requires_peer True means the timer function won't fire if we have no connected peers + peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, max_threads=1, my_args=[self]) + OnionrCommunicatorTimers(self, self.runCheck, 2, max_threads=1) # Timers to periodically lookup new blocks and download them - lookup_blocks_timer = OnionrCommunicatorTimers(self, lookupblocks.lookup_blocks_from_communicator, config.get('timers.lookupBlocks', 25), myArgs=[self], requiresPeer=True, maxThreads=1) + lookup_blocks_timer = OnionrCommunicatorTimers(self, lookupblocks.lookup_blocks_from_communicator, config.get('timers.lookupBlocks', 25), my_args=[self], requires_peer=True, max_threads=1) # The block download timer is accessed by the block lookup function to trigger faster download starts - self.download_blocks_timer = OnionrCommunicatorTimers(self, self.getBlocks, config.get('timers.getBlocks', 10), requiresPeer=True, maxThreads=5) + self.download_blocks_timer = OnionrCommunicatorTimers(self, self.getBlocks, config.get('timers.getBlocks', 10), requires_peer=True, max_threads=5) # Timer to reset the longest offline peer so contact can be attempted again - OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, myArgs=[self]) + OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, my_args=[self]) # Timer to cleanup old blocks - blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, myArgs=[self]) + blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, my_args=[self]) # Timer to discover new peers - OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requiresPeer=True, myArgs=[self], maxThreads=2) + OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requires_peer=True, my_args=[self], max_threads=2) # Timer for adjusting which peers we actively communicate to at any given time, to avoid over-using peers - OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, myArgs=[self], requiresPeer=True) + OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, my_args=[self], requires_peer=True) # Timer to read the upload queue and upload the entries to peers - OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, myArgs=[self], requiresPeer=True, maxThreads=1) + OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, my_args=[self], requires_peer=True, max_threads=1) # Timer to process the daemon command queue - OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, myArgs=[self], maxThreads=3) + OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, my_args=[self], max_threads=3) # Setup direct connections if config.get('general.socket_servers', False): self.services = onionrservices.OnionrServices() self.active_services = [] self.service_greenlets = [] - OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, maxThreads=50, myArgs=[self]) + OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, max_threads=50, my_args=[self]) else: self.services = None @@ -142,25 +142,25 @@ class OnionrCommunicatorDaemon: # This timer creates deniable blocks, in an attempt to further obfuscate block insertion metadata if config.get('general.insert_deniable_blocks', True): - deniableBlockTimer = OnionrCommunicatorTimers(self, deniableinserts.insert_deniable_block, 180, myArgs=[self], requiresPeer=True, maxThreads=1) + deniableBlockTimer = OnionrCommunicatorTimers(self, deniableinserts.insert_deniable_block, 180, my_args=[self], requires_peer=True, max_threads=1) deniableBlockTimer.count = (deniableBlockTimer.frequency - 175) # Timer to check for connectivity, through Tor to various high-profile onion services - netCheckTimer = OnionrCommunicatorTimers(self, netcheck.net_check, 500, myArgs=[self], maxThreads=1) + netCheckTimer = OnionrCommunicatorTimers(self, netcheck.net_check, 500, my_args=[self], max_threads=1) # Announce the public API server transport address to other nodes if security level allows if config.get('general.security_level', 1) == 0 and config.get('general.announce_node', True): # Default to high security level incase config breaks - announceTimer = OnionrCommunicatorTimers(self, announcenode.announce_node, 3600, myArgs=[self], requiresPeer=True, maxThreads=1) + announceTimer = OnionrCommunicatorTimers(self, announcenode.announce_node, 3600, my_args=[self], requires_peer=True, max_threads=1) announceTimer.count = (announceTimer.frequency - 120) else: logger.debug('Will not announce node.') # Timer to delete malfunctioning or long-dead peers - cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requiresPeer=True) + cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requires_peer=True) # Timer to cleanup dead ephemeral forward secrecy keys - forwardSecrecyTimer = OnionrCommunicatorTimers(self, housekeeping.clean_keys, 15, myArgs=[self], maxThreads=1) + forwardSecrecyTimer = OnionrCommunicatorTimers(self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1) # Adjust initial timer triggers peerPoolTimer.count = (peerPoolTimer.frequency - 1) diff --git a/onionr/communicatorutils/onionrcommunicatortimers.py b/onionr/communicatorutils/onionrcommunicatortimers.py index bbdb9c95..058fb8eb 100755 --- a/onionr/communicatorutils/onionrcommunicatortimers.py +++ b/onionr/communicatorutils/onionrcommunicatortimers.py @@ -3,6 +3,7 @@ This file contains timer control for the communicator ''' +from __future__ import annotations # thank you python, very cool ''' This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,46 +18,62 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' -import threading, onionrexceptions, logger -class OnionrCommunicatorTimers: - def __init__(self, daemonInstance, timerFunction, frequency, makeThread=True, threadAmount=1, maxThreads=5, requiresPeer=False, myArgs=[]): - self.timerFunction = timerFunction - self.frequency = frequency - self.threadAmount = threadAmount - self.makeThread = makeThread - self.requiresPeer = requiresPeer - self.daemonInstance = daemonInstance - self.maxThreads = maxThreads - self.args = myArgs - self.daemonInstance.timers.append(self) +import uuid +import threading + +import onionrexceptions, logger + +from typing import TYPE_CHECKING +from typing import Callable, NewType, Iterable +if TYPE_CHECKING: + from communicator import OnionrCommunicatorDaemon + +CallFreqSeconds = NewType('CallFreqSeconds', int) + +class OnionrCommunicatorTimers: + def __init__(self, daemon_inst: OnionrCommunicatorDaemon, + timer_function: Callable, frequency: CallFreqSeconds, + make_thread:bool=True, thread_amount:int=1, max_threads:int=5, + requires_peer:bool=False, my_args:Iterable=[]): + self.timer_function = timer_function + self.frequency = frequency + self.thread_amount = thread_amount + self.make_thread = make_thread + self.requires_peer = requires_peer + self.daemon_inst = daemon_inst + self.max_threads = max_threads + self.args = my_args + + self.daemon_inst.timers.append(self) self.count = 0 def processTimer(self): # mark how many instances of a thread we have (decremented at thread end) try: - self.daemonInstance.threadCounts[self.timerFunction.__name__] + self.daemon_inst.threadCounts[self.timer_function.__name__] except KeyError: - self.daemonInstance.threadCounts[self.timerFunction.__name__] = 0 + self.daemon_inst.threadCounts[self.timer_function.__name__] = 0 # execute thread if it is time, and we are not missing *required* online peer - if self.count == self.frequency and not self.daemonInstance.shutdown: + if self.count == self.frequency and not self.daemon_inst.shutdown: try: - if self.requiresPeer and len(self.daemonInstance.onlinePeers) == 0: + if self.requires_peer and len(self.daemon_inst.onlinePeers) == 0: raise onionrexceptions.OnlinePeerNeeded except onionrexceptions.OnlinePeerNeeded: return else: - if self.makeThread: - for i in range(self.threadAmount): - if self.daemonInstance.threadCounts[self.timerFunction.__name__] >= self.maxThreads: - logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timerFunction.__name__, terminal=True) + if self.make_thread: + for i in range(self.thread_amount): + if self.daemon_inst.threadCounts[self.timer_function.__name__] >= self.max_threads: + logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timer_function.__name__, terminal=True) else: - self.daemonInstance.threadCounts[self.timerFunction.__name__] += 1 - newThread = threading.Thread(target=self.timerFunction, args=self.args, daemon=True) + self.daemon_inst.threadCounts[self.timer_function.__name__] += 1 + newThread = threading.Thread(target=self.timer_function, args=self.args, daemon=True, + name=self.timer_function.__name__ + ' - ' + str(uuid.uuid4())) newThread.start() else: - self.timerFunction() + self.timer_function() self.count = -1 # negative 1 because its incremented at bottom self.count += 1 diff --git a/onionr/onionrcommands/daemonlaunch.py b/onionr/onionrcommands/daemonlaunch.py index b2aa4f99..d623c80f 100755 --- a/onionr/onionrcommands/daemonlaunch.py +++ b/onionr/onionrcommands/daemonlaunch.py @@ -1,7 +1,7 @@ ''' Onionr - Private P2P Communication - launch the api server and communicator + launch the api servers and communicator ''' ''' This program is free software: you can redistribute it and/or modify @@ -56,8 +56,8 @@ def daemon(): shared_state = toomanyobjs.TooMany() - Thread(target=shared_state.get(apiservers.ClientAPI).start, daemon=True).start() - Thread(target=shared_state.get(apiservers.PublicAPI).start, daemon=True).start() + Thread(target=shared_state.get(apiservers.ClientAPI).start, daemon=True, name='client HTTP API').start() + Thread(target=shared_state.get(apiservers.PublicAPI).start, daemon=True, name='public HTTP API').start() shared_state.get(serializeddata.SerializedData) shared_state.share_object() # share the parent object to the threads @@ -91,7 +91,7 @@ def daemon(): logger.debug('Started .onion service: %s' % (logger.colors.underline + net.myID)) else: logger.debug('.onion service disabled') - logger.info('Using public key: %s' % (logger.colors.underline + getourkeypair.get_keypair()[0][:52]), terminal=True) + logger.info('Using public key: %s' % (logger.colors.underline + getourkeypair.get_keypair()[0][:52])) try: time.sleep(1) @@ -116,7 +116,7 @@ def _ignore_sigint(sig, frame): def kill_daemon(): ''' - Shutdown the Onionr daemon + Shutdown the Onionr daemon (communicator) ''' logger.warn('Stopping the running daemon...', timestamp = False, terminal=True) @@ -133,7 +133,8 @@ def kill_daemon(): logger.error('Failed to shutdown daemon: ' + str(e), error = e, timestamp = False, terminal=True) return -def start(input = False, override = False): +def start(input: bool = False, override: bool = False): + """If no lock file, make one and start onionr, error if there is and its not overridden""" if os.path.exists('.onionr-lock') and not override: logger.fatal('Cannot start. Daemon is already running, or it did not exit cleanly.\n(if you are sure that there is not a daemon running, delete .onionr-lock & try again).', terminal=True) else: