refactored timers and added lastconnect to address database

This commit is contained in:
Kevin Froman 2018-05-18 01:22:16 -05:00
parent c933e76c3e
commit 16282d79d3
No known key found for this signature in database
GPG key ID: 0D414D0FE405B63B
5 changed files with 73 additions and 41 deletions

View file

@ -49,16 +49,18 @@ class OnionrCommunicate:
self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads
self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads
blockProcessTimer = 19 self.communicatorTimers = {} # communicator timers, name: rate (in seconds)
blockProcessAmount = 20 self.communicatorTimerCounts = {}
highFailureTimer = 0 self.communicatorTimerFuncs = {}
highFailureRate = 10
heartBeatTimer = 0 self.registerTimer('blockProcess', 20)
heartBeatRate = 10 self.registerTimer('highFailure', 10)
pexTimer = 120 # How often we should check for new peers self.registerTimer('heartBeat', 10)
pexCount = 0 self.registerTimer('pex', 120)
logger.debug('Communicator debugging enabled.') logger.debug('Communicator debugging enabled.')
torID = open('data/hs/hostname').read()
with open('data/hs/hostname', 'r') as torID:
todID = torID.read()
apiRunningCheckRate = 10 apiRunningCheckRate = 10
apiRunningCheckCount = 0 apiRunningCheckCount = 0
@ -74,24 +76,23 @@ class OnionrCommunicate:
while True: while True:
command = self._core.daemonQueue() command = self._core.daemonQueue()
# Process blocks based on a timer # Process blocks based on a timer
blockProcessTimer += 1 self.timerTick()
heartBeatTimer += 1 # TODO: migrate below if statements to be own functions which are called in the above timerTick() function
pexCount += 1 if self.communicatorTimers['highFailure'] == self.communicatorTimerCounts['highFailure']:
if highFailureTimer == highFailureRate: self.communicatorTimerCounts['highFailure'] = 0
highFailureTimer = 0
for i in self.peerData: for i in self.peerData:
if self.peerData[i]['failCount'] >= self.highFailureAmount: if self.peerData[i]['failCount'] >= self.highFailureAmount:
self.peerData[i]['failCount'] -= 1 self.peerData[i]['failCount'] -= 1
if pexTimer == pexCount: if self.communicatorTimers['pex'] == self.communicatorTimerCounts['pex']:
pT1 = threading.Thread(target=self.getNewPeers, name="pT1") pT1 = threading.Thread(target=self.getNewPeers, name="pT1")
pT1.start() pT1.start()
pT2 = threading.Thread(target=self.getNewPeers, name="pT2") pT2 = threading.Thread(target=self.getNewPeers, name="pT2")
pT2.start() pT2.start()
pexCount = 0 # TODO: do not reset timer if low peer count self.communicatorTimerCounts['pex'] = 0# TODO: do not reset timer if low peer count
if heartBeatRate == heartBeatTimer: if self.communicatorTimers['heartBeat'] == self.communicatorTimerCounts['heartBeat']:
logger.debug('Communicator heartbeat') logger.debug('Communicator heartbeat')
heartBeatTimer = 0 self.communicatorTimerCounts['heartBeat'] = 0
if blockProcessTimer == blockProcessAmount: if self.communicatorTimers['blockProcess'] == self.communicatorTimerCounts['blockProcess']:
lT1 = threading.Thread(target=self.lookupBlocks, name="lt1", args=(True,)) lT1 = threading.Thread(target=self.lookupBlocks, name="lt1", args=(True,))
lT2 = threading.Thread(target=self.lookupBlocks, name="lt2", args=(True,)) lT2 = threading.Thread(target=self.lookupBlocks, name="lt2", args=(True,))
lT3 = threading.Thread(target=self.lookupBlocks, name="lt3", args=(True,)) lT3 = threading.Thread(target=self.lookupBlocks, name="lt3", args=(True,))
@ -109,7 +110,7 @@ class OnionrCommunicate:
pbT2.start() pbT2.start()
pbT3.start() pbT3.start()
pbT4.start() pbT4.start()
blockProcessTimer = 0 self.communicatorTimerCounts['blockProcess'] = 0
else: else:
logger.debug(threading.active_count()) logger.debug(threading.active_count())
logger.debug('Too many threads.') logger.debug('Too many threads.')
@ -185,6 +186,28 @@ class OnionrCommunicate:
connection_handlers = {} connection_handlers = {}
id_peer_cache = {} id_peer_cache = {}
def registerTimer(self, timerName, rate, timerFunc=None):
'''Register a communicator timer'''
self.communicatorTimers[timerName] = rate
self.communicatorTimerCounts[timerName] = 0
self.communicatorTimerFuncs[timerName] = timerFunc
def timerTick(self):
'''Increments timers "ticks" and calls funcs if applicable'''
tName = ''
for i in self.communicatorTimers.items():
tName = i[0]
self.communicatorTimerCounts[tName] += 1
if self.communicatorTimerCounts[tName] == self.communicatorTimers[tName]:
try:
self.communicatorTimerFuncs[tName]()
except TypeError:
pass
else:
self.communicatorTimerCounts[tName] = 0
def get_connection_handlers(self, name = None): def get_connection_handlers(self, name = None):
''' '''
Returns a list of callback handlers by name, or, if name is None, it returns all handlers. Returns a list of callback handlers by name, or, if name is None, it returns all handlers.
@ -637,12 +660,6 @@ class OnionrCommunicate:
logger.info('Successfully obtained data for %s' % str(hash), timestamp=True) logger.info('Successfully obtained data for %s' % str(hash), timestamp=True)
retVal = True retVal = True
break break
'''
if data.startswith(b'-txt-'):
self._core.setBlockType(hash, 'txt')
if len(data) < 120:
logger.debug('Block text:\n' + data.decode())
'''
else: else:
logger.warn("Failed to validate %s -- hash calculated was %s" % (hash, digest)) logger.warn("Failed to validate %s -- hash calculated was %s" % (hash, digest))
peerTryCount += 1 peerTryCount += 1
@ -672,7 +689,7 @@ class OnionrCommunicate:
# Store peer in peerData dictionary (non permanent) # Store peer in peerData dictionary (non permanent)
if not peer in self.peerData: if not peer in self.peerData:
self.peerData[peer] = {'connectCount': 0, 'failCount': 0, 'lastConnectTime': math.floor(time.time())} self.peerData[peer] = {'connectCount': 0, 'failCount': 0, 'lastConnectTime': self._utils.getEpoch()}
socksPort = sys.argv[2] socksPort = sys.argv[2]
'''We use socks5h to use tor as DNS''' '''We use socks5h to use tor as DNS'''
proxies = {'http': 'socks5://127.0.0.1:' + str(socksPort), 'https': 'socks5://127.0.0.1:' + str(socksPort)} proxies = {'http': 'socks5://127.0.0.1:' + str(socksPort), 'https': 'socks5://127.0.0.1:' + str(socksPort)}
@ -698,12 +715,13 @@ class OnionrCommunicate:
else: else:
self.peerData[peer]['connectCount'] += 1 self.peerData[peer]['connectCount'] += 1
self.peerData[peer]['failCount'] -= 1 self.peerData[peer]['failCount'] -= 1
self.peerData[peer]['lastConnectTime'] = math.floor(time.time()) self.peerData[peer]['lastConnectTime'] = self._utils.getEpoch()
self._core.setAddressInfo(peer, 'lastConnect', self._utils.getEpoch())
return retData return retData
def peerStatusTaken(self, peer, status): def peerStatusTaken(self, peer, status):
''' '''
Returns if a peer is currently performing a specified action Returns if we are currently performing a specific action with a peer.
''' '''
try: try:
if self.peerStatus[peer] == status: if self.peerStatus[peer] == status:

View file

@ -188,7 +188,8 @@ class Core:
speed int, speed int,
success int, success int,
DBHash text, DBHash text,
failure int failure int,
lastConnect int
); );
''') ''')
conn.commit() conn.commit()
@ -263,7 +264,7 @@ class Core:
return return
conn = sqlite3.connect(self.blockDB) conn = sqlite3.connect(self.blockDB)
c = conn.cursor() c = conn.cursor()
currentTime = math.floor(time.time()) currentTime = self._utils.getEpoch()
if selfInsert or dataSaved: if selfInsert or dataSaved:
selfInsert = 1 selfInsert = 1
else: else:
@ -388,7 +389,7 @@ class Core:
Add a command to the daemon queue, used by the communication daemon (communicator.py) Add a command to the daemon queue, used by the communication daemon (communicator.py)
''' '''
# Intended to be used by the web server # Intended to be used by the web server
date = math.floor(time.time()) date = self._utils.getEpoch()
conn = sqlite3.connect(self.queueDB) conn = sqlite3.connect(self.queueDB)
c = conn.cursor() c = conn.cursor()
t = (command, data, date) t = (command, data, date)
@ -523,11 +524,12 @@ class Core:
success int, 4 success int, 4
DBHash text, 5 DBHash text, 5
failure int 6 failure int 6
lastConnect 7
''' '''
conn = sqlite3.connect(self.addressDB) conn = sqlite3.connect(self.addressDB)
c = conn.cursor() c = conn.cursor()
command = (address,) command = (address,)
infoNumbers = {'address': 0, 'type': 1, 'knownPeer': 2, 'speed': 3, 'success': 4, 'DBHash': 5, 'failure': 6} infoNumbers = {'address': 0, 'type': 1, 'knownPeer': 2, 'speed': 3, 'success': 4, 'DBHash': 5, 'failure': 6, 'lastConnect': 7}
info = infoNumbers[info] info = infoNumbers[info]
iterCount = 0 iterCount = 0
retVal = '' retVal = ''

View file

@ -89,6 +89,7 @@ DataDirectory data/tordata/
torVersion.kill() torVersion.kill()
# wait for tor to get to 100% bootstrap # wait for tor to get to 100% bootstrap
try:
for line in iter(tor.stdout.readline, b''): for line in iter(tor.stdout.readline, b''):
if 'Bootstrapped 100%: Done' in line.decode(): if 'Bootstrapped 100%: Done' in line.decode():
break break
@ -97,6 +98,9 @@ DataDirectory data/tordata/
else: else:
logger.fatal('Failed to start Tor. Try killing any other Tor processes owned by this user.') logger.fatal('Failed to start Tor. Try killing any other Tor processes owned by this user.')
return False return False
except KeyboardInterrupt:
logger.fatal("Got keyboard interrupt")
return False
logger.info('Finished starting Tor', timestamp=True) logger.info('Finished starting Tor', timestamp=True)
self.readyState = True self.readyState = True

View file

@ -473,6 +473,10 @@ class OnionrUtils:
sys.stdout.write("\r{0}{1}%".format(arrow + spaces, int(round(percent * 100)))) sys.stdout.write("\r{0}{1}%".format(arrow + spaces, int(round(percent * 100))))
sys.stdout.flush() sys.stdout.flush()
def getEpoch():
'''returns epoch'''
return math.floor(time.time())
def size(path='.'): def size(path='.'):
''' '''
Returns the size of a folder's contents in bytes Returns the size of a folder's contents in bytes

View file

@ -12,5 +12,9 @@
"output": true, "output": true,
"color": true "color": true
} }
},
"allocations":{
"disk": 1000000000,
"netTotal": 1000000000
} }
} }