improved typing for communicatortimes, give api threads a name

This commit is contained in:
Kevin Froman 2019-09-12 14:50:06 -05:00
parent 48f111021b
commit ab49e3eaf6
3 changed files with 64 additions and 46 deletions

View file

@ -101,39 +101,39 @@ class OnionrCommunicatorDaemon:
OnionrCommunicatorTimers(self, self.heartbeat, 30)
# Set timers, function reference, seconds
# requiresPeer True means the timer function won't fire if we have no connected peers
peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, maxThreads=1, myArgs=[self])
OnionrCommunicatorTimers(self, self.runCheck, 2, maxThreads=1)
# requires_peer True means the timer function won't fire if we have no connected peers
peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, max_threads=1, my_args=[self])
OnionrCommunicatorTimers(self, self.runCheck, 2, max_threads=1)
# Timers to periodically lookup new blocks and download them
lookup_blocks_timer = OnionrCommunicatorTimers(self, lookupblocks.lookup_blocks_from_communicator, config.get('timers.lookupBlocks', 25), myArgs=[self], requiresPeer=True, maxThreads=1)
lookup_blocks_timer = OnionrCommunicatorTimers(self, lookupblocks.lookup_blocks_from_communicator, config.get('timers.lookupBlocks', 25), my_args=[self], requires_peer=True, max_threads=1)
# The block download timer is accessed by the block lookup function to trigger faster download starts
self.download_blocks_timer = OnionrCommunicatorTimers(self, self.getBlocks, config.get('timers.getBlocks', 10), requiresPeer=True, maxThreads=5)
self.download_blocks_timer = OnionrCommunicatorTimers(self, self.getBlocks, config.get('timers.getBlocks', 10), requires_peer=True, max_threads=5)
# Timer to reset the longest offline peer so contact can be attempted again
OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, myArgs=[self])
OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, my_args=[self])
# Timer to cleanup old blocks
blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, myArgs=[self])
blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, my_args=[self])
# Timer to discover new peers
OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requiresPeer=True, myArgs=[self], maxThreads=2)
OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requires_peer=True, my_args=[self], max_threads=2)
# Timer for adjusting which peers we actively communicate to at any given time, to avoid over-using peers
OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, myArgs=[self], requiresPeer=True)
OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, my_args=[self], requires_peer=True)
# Timer to read the upload queue and upload the entries to peers
OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, myArgs=[self], requiresPeer=True, maxThreads=1)
OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, my_args=[self], requires_peer=True, max_threads=1)
# Timer to process the daemon command queue
OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, myArgs=[self], maxThreads=3)
OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, my_args=[self], max_threads=3)
# Setup direct connections
if config.get('general.socket_servers', False):
self.services = onionrservices.OnionrServices()
self.active_services = []
self.service_greenlets = []
OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, maxThreads=50, myArgs=[self])
OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, max_threads=50, my_args=[self])
else:
self.services = None
@ -142,25 +142,25 @@ class OnionrCommunicatorDaemon:
# This timer creates deniable blocks, in an attempt to further obfuscate block insertion metadata
if config.get('general.insert_deniable_blocks', True):
deniableBlockTimer = OnionrCommunicatorTimers(self, deniableinserts.insert_deniable_block, 180, myArgs=[self], requiresPeer=True, maxThreads=1)
deniableBlockTimer = OnionrCommunicatorTimers(self, deniableinserts.insert_deniable_block, 180, my_args=[self], requires_peer=True, max_threads=1)
deniableBlockTimer.count = (deniableBlockTimer.frequency - 175)
# Timer to check for connectivity, through Tor to various high-profile onion services
netCheckTimer = OnionrCommunicatorTimers(self, netcheck.net_check, 500, myArgs=[self], maxThreads=1)
netCheckTimer = OnionrCommunicatorTimers(self, netcheck.net_check, 500, my_args=[self], max_threads=1)
# Announce the public API server transport address to other nodes if security level allows
if config.get('general.security_level', 1) == 0 and config.get('general.announce_node', True):
# Default to high security level incase config breaks
announceTimer = OnionrCommunicatorTimers(self, announcenode.announce_node, 3600, myArgs=[self], requiresPeer=True, maxThreads=1)
announceTimer = OnionrCommunicatorTimers(self, announcenode.announce_node, 3600, my_args=[self], requires_peer=True, max_threads=1)
announceTimer.count = (announceTimer.frequency - 120)
else:
logger.debug('Will not announce node.')
# Timer to delete malfunctioning or long-dead peers
cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requiresPeer=True)
cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requires_peer=True)
# Timer to cleanup dead ephemeral forward secrecy keys
forwardSecrecyTimer = OnionrCommunicatorTimers(self, housekeeping.clean_keys, 15, myArgs=[self], maxThreads=1)
forwardSecrecyTimer = OnionrCommunicatorTimers(self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1)
# Adjust initial timer triggers
peerPoolTimer.count = (peerPoolTimer.frequency - 1)

View file

@ -3,6 +3,7 @@
This file contains timer control for the communicator
'''
from __future__ import annotations # thank you python, very cool
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -17,46 +18,62 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import threading, onionrexceptions, logger
class OnionrCommunicatorTimers:
def __init__(self, daemonInstance, timerFunction, frequency, makeThread=True, threadAmount=1, maxThreads=5, requiresPeer=False, myArgs=[]):
self.timerFunction = timerFunction
self.frequency = frequency
self.threadAmount = threadAmount
self.makeThread = makeThread
self.requiresPeer = requiresPeer
self.daemonInstance = daemonInstance
self.maxThreads = maxThreads
self.args = myArgs
self.daemonInstance.timers.append(self)
import uuid
import threading
import onionrexceptions, logger
from typing import TYPE_CHECKING
from typing import Callable, NewType, Iterable
if TYPE_CHECKING:
from communicator import OnionrCommunicatorDaemon
CallFreqSeconds = NewType('CallFreqSeconds', int)
class OnionrCommunicatorTimers:
def __init__(self, daemon_inst: OnionrCommunicatorDaemon,
timer_function: Callable, frequency: CallFreqSeconds,
make_thread:bool=True, thread_amount:int=1, max_threads:int=5,
requires_peer:bool=False, my_args:Iterable=[]):
self.timer_function = timer_function
self.frequency = frequency
self.thread_amount = thread_amount
self.make_thread = make_thread
self.requires_peer = requires_peer
self.daemon_inst = daemon_inst
self.max_threads = max_threads
self.args = my_args
self.daemon_inst.timers.append(self)
self.count = 0
def processTimer(self):
# mark how many instances of a thread we have (decremented at thread end)
try:
self.daemonInstance.threadCounts[self.timerFunction.__name__]
self.daemon_inst.threadCounts[self.timer_function.__name__]
except KeyError:
self.daemonInstance.threadCounts[self.timerFunction.__name__] = 0
self.daemon_inst.threadCounts[self.timer_function.__name__] = 0
# execute thread if it is time, and we are not missing *required* online peer
if self.count == self.frequency and not self.daemonInstance.shutdown:
if self.count == self.frequency and not self.daemon_inst.shutdown:
try:
if self.requiresPeer and len(self.daemonInstance.onlinePeers) == 0:
if self.requires_peer and len(self.daemon_inst.onlinePeers) == 0:
raise onionrexceptions.OnlinePeerNeeded
except onionrexceptions.OnlinePeerNeeded:
return
else:
if self.makeThread:
for i in range(self.threadAmount):
if self.daemonInstance.threadCounts[self.timerFunction.__name__] >= self.maxThreads:
logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timerFunction.__name__, terminal=True)
if self.make_thread:
for i in range(self.thread_amount):
if self.daemon_inst.threadCounts[self.timer_function.__name__] >= self.max_threads:
logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timer_function.__name__, terminal=True)
else:
self.daemonInstance.threadCounts[self.timerFunction.__name__] += 1
newThread = threading.Thread(target=self.timerFunction, args=self.args, daemon=True)
self.daemon_inst.threadCounts[self.timer_function.__name__] += 1
newThread = threading.Thread(target=self.timer_function, args=self.args, daemon=True,
name=self.timer_function.__name__ + ' - ' + str(uuid.uuid4()))
newThread.start()
else:
self.timerFunction()
self.timer_function()
self.count = -1 # negative 1 because its incremented at bottom
self.count += 1

View file

@ -1,7 +1,7 @@
'''
Onionr - Private P2P Communication
launch the api server and communicator
launch the api servers and communicator
'''
'''
This program is free software: you can redistribute it and/or modify
@ -56,8 +56,8 @@ def daemon():
shared_state = toomanyobjs.TooMany()
Thread(target=shared_state.get(apiservers.ClientAPI).start, daemon=True).start()
Thread(target=shared_state.get(apiservers.PublicAPI).start, daemon=True).start()
Thread(target=shared_state.get(apiservers.ClientAPI).start, daemon=True, name='client HTTP API').start()
Thread(target=shared_state.get(apiservers.PublicAPI).start, daemon=True, name='public HTTP API').start()
shared_state.get(serializeddata.SerializedData)
shared_state.share_object() # share the parent object to the threads
@ -91,7 +91,7 @@ def daemon():
logger.debug('Started .onion service: %s' % (logger.colors.underline + net.myID))
else:
logger.debug('.onion service disabled')
logger.info('Using public key: %s' % (logger.colors.underline + getourkeypair.get_keypair()[0][:52]), terminal=True)
logger.info('Using public key: %s' % (logger.colors.underline + getourkeypair.get_keypair()[0][:52]))
try:
time.sleep(1)
@ -116,7 +116,7 @@ def _ignore_sigint(sig, frame):
def kill_daemon():
'''
Shutdown the Onionr daemon
Shutdown the Onionr daemon (communicator)
'''
logger.warn('Stopping the running daemon...', timestamp = False, terminal=True)
@ -133,7 +133,8 @@ def kill_daemon():
logger.error('Failed to shutdown daemon: ' + str(e), error = e, timestamp = False, terminal=True)
return
def start(input = False, override = False):
def start(input: bool = False, override: bool = False):
"""If no lock file, make one and start onionr, error if there is and its not overridden"""
if os.path.exists('.onionr-lock') and not override:
logger.fatal('Cannot start. Daemon is already running, or it did not exit cleanly.\n(if you are sure that there is not a daemon running, delete .onionr-lock & try again).', terminal=True)
else: