improved communicator2 comments

master
Kevin Froman 2018-07-01 23:04:14 -05:00
parent 5c49f544fe
commit f5bd9220fc
No known key found for this signature in database
GPG Key ID: 0D414D0FE405B63B
1 changed files with 26 additions and 16 deletions

View File

@ -71,7 +71,10 @@ class OnionrCommunicatorDaemon:
if debug or developmentMode: if debug or developmentMode:
OnionrCommunicatorTimers(self, self.heartbeat, 10) OnionrCommunicatorTimers(self, self.heartbeat, 10)
# Initalize peer online list
self.getOnlinePeers() self.getOnlinePeers()
# Set timers, function reference, seconds
OnionrCommunicatorTimers(self, self.daemonCommands, 5) OnionrCommunicatorTimers(self, self.daemonCommands, 5)
OnionrCommunicatorTimers(self, self.detectAPICrash, 5) OnionrCommunicatorTimers(self, self.detectAPICrash, 5)
OnionrCommunicatorTimers(self, self.getOnlinePeers, 60) OnionrCommunicatorTimers(self, self.getOnlinePeers, 60)
@ -81,7 +84,7 @@ class OnionrCommunicatorDaemon:
OnionrCommunicatorTimers(self, self.lookupKeys, 125) OnionrCommunicatorTimers(self, self.lookupKeys, 125)
OnionrCommunicatorTimers(self, self.lookupAdders, 600) OnionrCommunicatorTimers(self, self.lookupAdders, 600)
# Main daemon loop, mainly for calling timers, do not do any complex operations here # Main daemon loop, mainly for calling timers, don't do any complex operations here to avoid locking
while not self.shutdown: while not self.shutdown:
for i in self.timers: for i in self.timers:
i.processTimer() i.processTimer()
@ -93,6 +96,7 @@ class OnionrCommunicatorDaemon:
logger.info('LOOKING UP NEW KEYS') logger.info('LOOKING UP NEW KEYS')
tryAmount = 1 tryAmount = 1
for i in range(tryAmount): for i in range(tryAmount):
# Download new key list from random online peers
peer = self.pickOnlinePeer() peer = self.pickOnlinePeer()
newKeys = self.peerAction(peer, action='kex') newKeys = self.peerAction(peer, action='kex')
self._core._utils.mergeKeys(newKeys) self._core._utils.mergeKeys(newKeys)
@ -105,21 +109,23 @@ class OnionrCommunicatorDaemon:
logger.info('LOOKING UP NEW ADDRESSES') logger.info('LOOKING UP NEW ADDRESSES')
tryAmount = 1 tryAmount = 1
for i in range(tryAmount): for i in range(tryAmount):
# Download new peer address list from random online peers
peer = self.pickOnlinePeer() peer = self.pickOnlinePeer()
newAdders = self.peerAction(peer, action='pex') newAdders = self.peerAction(peer, action='pex')
self._core._utils.mergeAdders(newAdders) self._core._utils.mergeAdders(newAdders)
self.decrementThreadCount('lookupKeys') self.decrementThreadCount('lookupKeys')
def lookupBlocks(self): def lookupBlocks(self):
'''Lookup new blocks''' '''Lookup new blocks & add them to download queue'''
logger.info('LOOKING UP NEW BLOCKS') logger.info('LOOKING UP NEW BLOCKS')
tryAmount = 2 tryAmount = 2
newBlocks = '' newBlocks = ''
for i in range(tryAmount): for i in range(tryAmount):
peer = self.pickOnlinePeer() peer = self.pickOnlinePeer() # select random online peer
newDBHash = self.peerAction(peer, 'getDBHash') newDBHash = self.peerAction(peer, 'getDBHash') # get their db hash
if newDBHash == False: if newDBHash == False:
continue continue # if request failed, restart loop (peer is added to offline peers automatically)
if newDBHash != self._core.getAddressInfo(peer, 'DBHash'): if newDBHash != self._core.getAddressInfo(peer, 'DBHash'):
self._core.setAddressInfo(peer, 'DBHash', newDBHash) self._core.setAddressInfo(peer, 'DBHash', newDBHash)
newBlocks = self.peerAction(peer, 'getBlockHashes') newBlocks = self.peerAction(peer, 'getBlockHashes')
@ -136,23 +142,23 @@ class OnionrCommunicatorDaemon:
return return
def getBlocks(self): def getBlocks(self):
'''download new blocks''' '''download new blocks in queue'''
for blockHash in self.blockQueue: for blockHash in self.blockQueue:
logger.info("ATTEMPTING TO DOWNLOAD " + blockHash) logger.info("ATTEMPTING TO DOWNLOAD " + blockHash)
content = self.peerAction(self.pickOnlinePeer(), 'getData', data=blockHash) content = self.peerAction(self.pickOnlinePeer(), 'getData', data=blockHash) # block content from random peer (includes metadata)
if content != False: if content != False:
try: try:
content = content.encode() content = content.encode()
except AttributeError: except AttributeError:
pass pass
content = base64.b64decode(content) content = base64.b64decode(content) # content is base64 encoded in transport
if self._core._crypto.sha3Hash(content) == blockHash: if self._core._crypto.sha3Hash(content) == blockHash:
content = content.decode() # decode here because sha3Hash needs bytes above content = content.decode() # decode here because sha3Hash needs bytes above
metas = self._core._utils.getBlockMetadataFromData(content) # returns tuple(metadata, meta), meta is also in metadata metas = self._core._utils.getBlockMetadataFromData(content) # returns tuple(metadata, meta), meta is also in metadata
metadata = metas[0] metadata = metas[0]
meta = metas[1] meta = metas[1]
if self._core._utils.validateMetadata(metadata): if self._core._utils.validateMetadata(metadata): # check if metadata is valid
if self._core._crypto.verifyPow(metas[2], metadata): if self._core._crypto.verifyPow(metas[2], metadata): # check if POW is enough/correct
logger.info('Block passed proof, saving.') logger.info('Block passed proof, saving.')
self._core.setData(content) self._core.setData(content)
self._core.addToBlockDB(blockHash, dataSaved=True) self._core.addToBlockDB(blockHash, dataSaved=True)
@ -160,9 +166,10 @@ class OnionrCommunicatorDaemon:
logger.warn('POW failed for block ' + blockHash) logger.warn('POW failed for block ' + blockHash)
else: else:
logger.warn('Metadata for ' + blockHash + ' is invalid.') logger.warn('Metadata for ' + blockHash + ' is invalid.')
self.blockQueue.remove(blockHash)
else: else:
# if block didn't meet expected hash
logger.warn('Block hash validation failed for ' + blockHash + ' got ' + self._core._crypto.sha3Hash(content)) logger.warn('Block hash validation failed for ' + blockHash + ' got ' + self._core._crypto.sha3Hash(content))
self.blockQueue.remove(blockHash) # remove from block queue both if success or false
self.decrementThreadCount('getBlocks') self.decrementThreadCount('getBlocks')
return return
@ -201,7 +208,8 @@ class OnionrCommunicatorDaemon:
self.decrementThreadCount('clearOfflinePeer') self.decrementThreadCount('clearOfflinePeer')
def getOnlinePeers(self): def getOnlinePeers(self):
'''Manages the self.onlinePeers attribute list''' '''Manages the self.onlinePeers attribute list, connects to more peers if we have none connected'''
logger.info('Refreshing peer pool.') logger.info('Refreshing peer pool.')
maxPeers = 4 maxPeers = 4
needed = maxPeers - len(self.onlinePeers) needed = maxPeers - len(self.onlinePeers)
@ -267,6 +275,7 @@ class OnionrCommunicatorDaemon:
if len(data) > 0: if len(data) > 0:
url += '&data=' + data url += '&data=' + data
retData = self._core._utils.doGetRequest(url, port=self.proxyPort) retData = self._core._utils.doGetRequest(url, port=self.proxyPort)
# if request failed, (error), mark peer offline
if retData == False: if retData == False:
try: try:
self.onlinePeers.remove(peer) self.onlinePeers.remove(peer)
@ -302,7 +311,7 @@ class OnionrCommunicatorDaemon:
self.decrementThreadCount('daemonCommands') self.decrementThreadCount('daemonCommands')
def announce(self, peer): def announce(self, peer):
'''Announce to peers''' '''Announce to peers our address'''
announceCount = 0 announceCount = 0
announceAmount = 2 announceAmount = 2
for peer in self._core.listAdders(): for peer in self._core.listAdders():
@ -349,12 +358,13 @@ class OnionrCommunicatorTimers:
self.count = 0 self.count = 0
def processTimer(self): def processTimer(self):
# mark how many instances of a thread we have (decremented at thread end)
self.count += 1 self.count += 1
try: try:
self.daemonInstance.threadCounts[self.timerFunction.__name__] self.daemonInstance.threadCounts[self.timerFunction.__name__]
except KeyError: except KeyError:
self.daemonInstance.threadCounts[self.timerFunction.__name__] = 0 self.daemonInstance.threadCounts[self.timerFunction.__name__] = 0
# execute thread if it is time
if self.count == self.frequency: if self.count == self.frequency:
if self.makeThread: if self.makeThread:
for i in range(self.threadAmount): for i in range(self.threadAmount):