started work on upload sessions

This commit is contained in:
Kevin Froman 2019-09-17 01:56:13 -05:00
parent d598d9b9c2
commit 1114db8a30
6 changed files with 39 additions and 13 deletions

View file

@ -44,7 +44,7 @@ def peer_action(comm_inst, peer, action, returnHeaders=False, max_resp_size=5242
onlinepeers.remove_online_peer(comm_inst, peer)
keydb.transportinfo.set_address_info(peer, 'lastConnectAttempt', epoch.get_epoch())
if action != 'ping' and not comm_inst.shutdown:
logger.warn('Lost connection to ' + peer, terminal=True)
logger.warn(f'Lost connection to {peer}', terminal=True)
onlinepeers.get_online_peers(comm_inst) # Will only add a new peer to pool if needed
except ValueError:
pass

View file

@ -18,7 +18,7 @@ from __future__ import annotations
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from typing import Union
from typing import Union, TYPE_CHECKING
import logger
from communicatorutils import proxypicker
import onionrexceptions
@ -26,6 +26,7 @@ import onionrblockapi as block
from onionrutils import localcommand, stringvalidators, basicrequests
from communicator import onlinepeers
import onionrcrypto
from . import sessionmanager
def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon):
"""Accepts a communicator instance and uploads blocks from its upload queue"""
@ -33,7 +34,7 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon):
it to a few peers to add some deniability & increase functionality"""
TIMER_NAME = "upload_blocks_from_communicator"
session_manager = comm_inst.shared_state.get_by_string('BlockUploadSessionManager')
session_manager: sessionmanager.BlockUploadSessionManager = comm_inst.shared_state.get(sessionmanager.BlockUploadSessionManager)
triedPeers = []
finishedUploads = []
comm_inst.blocksToUpload = onionrcrypto.cryptoutils.random_shuffle(comm_inst.blocksToUpload)
@ -43,11 +44,10 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon):
logger.warn('Requested to upload invalid block', terminal=True)
comm_inst.decrementThreadCount(TIMER_NAME)
return
session_manager.new_session(bl)
session_manager.add_session(bl)
for i in range(min(len(comm_inst.onlinePeers), 6)):
peer = onlinepeers.pick_online_peer(comm_inst)
if peer in triedPeers:
continue
if peer in triedPeers: continue
triedPeers.append(peer)
url = f'http://{peer}/upload'
try:
@ -60,12 +60,15 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon):
resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream')
if not resp == False:
if resp == 'success':
localcommand.local_command('waitforshare/' + bl, post=True)
session_manager.get
localcommand.local_command(f'waitforshare/{bl}', post=True)
finishedUploads.append(bl)
elif resp == 'exists':
comm_inst.getPeerProfileInstance(peer).addScore(-1)
finishedUploads.append(bl)
else:
logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}'), terminal=True)
comm_inst.getPeerProfileInstance(peer).addScore(-5)
logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}', terminal=True)
for x in finishedUploads:
try:
comm_inst.blocksToUpload.remove(x)

View file

@ -38,6 +38,7 @@ class UploadSession:
self.total_fail_count: int = 0
self.total_success_count: int = 0
self.peer_fails = {}
self.peer_exists = {}
def fail_peer(self, peer):
try:

View file

@ -19,19 +19,39 @@ from __future__ import annotations
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from typing import Iterable, Union
from onionrutils import bytesconverter
from . import session
class BlockUploadSessionManager:
"""Holds block UploadSession instances. Optionally accepts iterable of sessions to added on init
Arguments: old_session: iterable of old UploadSession objects"""
def __init__(self, old_sessions:Iterable=None):
if old_session is None:
if old_sessions is None:
self.sessions = []
else:
self.sessions = old_session
def add_session(self, session_or_block: Union(str, bytes, UploadSession, Block)):
def add_session(self, session_or_block: Union(str, bytes, session.UploadSession, Block))->session.UploadSession:
"""Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession or Block object"""
if isinstance(session_or_block, session.UploadSession):
self.sessions.append(session_or_block)
return session_or_block
# convert Block to hash string
if hasattr(session_or_block, 'bheader') and hasattr(session_or_block, 'raw'): session_or_block = session_or_block.hash
# convert bytes hash to str
if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block)
# intentionally not elif
if isinstance(session_or_block, str):
self.sessions.append()
new_session = session.UploadSession(session_or_block)
self.sessions.append(new_session)
return new_session
def get_session(self, block_hash: Union(str, bytes))->session.UploadSession:
block_hash = bytesconverter.bytes_to_str(block_hash).replace('=', '')
for session in self.sessions: if session.block_hash == block_hash: return session
def clean_session(self, specific_session: Union[str, UploadSession]):
return

View file

@ -49,6 +49,8 @@ saveBtn.onclick = function(){
}})
.then((resp) => resp.text()) // Transform the data into text
.then(function(data) {
alert('Config saved')
PNotify.success({
text: 'Config saved'
})
})
}

View file

@ -13,7 +13,7 @@ document.getElementById('openSite').onclick = function(){
window.location.href = '/site/' + hash
}
else{
PNotify.error({
PNotify.notice({
text: 'Invalid site hash'
})
}