From c975d27906cd72beeb47584167e6883f560f4655 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Sat, 4 Jan 2020 06:13:10 -0600 Subject: [PATCH] switched to upload event api --- docs/TODO.txt | 1 + src/communicator/daemoneventhooks/__init__.py | 17 ++++++++++- src/communicator/uploadqueue/__init__.py | 1 + .../downloadblocks/__init__.py | 25 ++++++++++++---- .../uploadblocks/sessionmanager.py | 2 +- src/httpapi/daemoneventsapi/__init__.py | 3 +- src/onionrblocks/insert.py | 15 +++++++--- src/onionrutils/localcommand.py | 30 ++++++++++--------- 8 files changed, 67 insertions(+), 27 deletions(-) diff --git a/docs/TODO.txt b/docs/TODO.txt index 9dcba42c..2fa43e14 100644 --- a/docs/TODO.txt +++ b/docs/TODO.txt @@ -13,3 +13,4 @@ * localization support * add BCC support to mail * prevent local insertion success of duplicate block content +* truncate last N blocks when sharing list diff --git a/src/communicator/daemoneventhooks/__init__.py b/src/communicator/daemoneventhooks/__init__.py index 09a9d2d4..aa37d82c 100644 --- a/src/communicator/daemoneventhooks/__init__.py +++ b/src/communicator/daemoneventhooks/__init__.py @@ -8,11 +8,14 @@ from typing import TYPE_CHECKING from gevent import sleep +from communicatorutils.uploadblocks import mixmate + if TYPE_CHECKING: from toomanyobjs import TooMany from communicator import OnionrCommunicatorDaemon from httpapi.daemoneventsapi import DaemonEventsBP from onionrtypes import BlockHash + from apiservers import PublicAPI """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -37,16 +40,28 @@ def daemon_event_handlers(shared_state: 'TooMany'): except KeyError: sleep(0.2) comm_inst = _get_inst('OnionrCommunicatorDaemon') + public_api: 'PublicAPI' = _get_inst('PublicAPI') events_api: 'DaemonEventsBP' = _get_inst('DaemonEventsBP') def remove_from_insert_queue_wrapper(block_hash: 'BlockHash'): - print(f'removed {block_hash} from upload') remove_from_insert_queue(comm_inst, block_hash) + return "removed" def print_test(text=''): print("It works!", text) return f"It works! {text}" + def upload_event(block: 'BlockHash' = ''): + if not block: + raise ValueError + public_api.hideBlocks.append(block) + try: + mixmate.block_mixer(comm_inst.blocksToUpload, block) + except ValueError: + pass + return "removed" + events_api.register_listener(remove_from_insert_queue_wrapper) events_api.register_listener(print_test) + events_api.register_listener(upload_event) diff --git a/src/communicator/uploadqueue/__init__.py b/src/communicator/uploadqueue/__init__.py index 02e4df8b..d699efa6 100644 --- a/src/communicator/uploadqueue/__init__.py +++ b/src/communicator/uploadqueue/__init__.py @@ -4,6 +4,7 @@ Class to remember blocks that need to be uploaded and not shared on startup/shutdown """ import atexit +import os from typing import TYPE_CHECKING import deadsimplekv diff --git a/src/communicatorutils/downloadblocks/__init__.py b/src/communicatorutils/downloadblocks/__init__.py index d532d5c9..ef8a76e9 100755 --- a/src/communicatorutils/downloadblocks/__init__.py +++ b/src/communicatorutils/downloadblocks/__init__.py @@ -6,6 +6,9 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from communicator import OnionrCommunicatorDaemon + +from gevent import spawn + import onionrexceptions import logger import onionrpeers @@ -16,6 +19,7 @@ from onionrutils import blockmetadata from onionrutils import validatemetadata from coredb import blockmetadb from coredb import daemonqueue +from onionrutils.localcommand import local_command import onionrcrypto import onionrstorage from onionrblocks import onionrblacklist @@ -82,7 +86,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed)) content = peeraction.peer_action(comm_inst, peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer (includes metadata) - if content != False and len(content) > 0: + if content is not False and len(content) > 0: try: content = content.encode() except AttributeError: @@ -98,7 +102,8 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): metas = blockmetadata.get_block_metadata_from_data(content) # returns tuple(metadata, meta), meta is also in metadata metadata = metas[0] try: - metadata_validation_result = validatemetadata.validate_metadata(metadata, metas[2]) + metadata_validation_result = \ + validatemetadata.validate_metadata(metadata, metas[2]) except onionrexceptions.DataExists: metadata_validation_result = False if metadata_validation_result: # check if metadata is valid, and verify nonce @@ -113,7 +118,14 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): removeFromQueue = False else: blockmetadb.add_to_block_DB(blockHash, dataSaved=True) # add block to meta db - daemonqueue.daemon_queue_add('uploadEvent', blockHash) + spawn( + local_command, + f'/daemon-event/upload_event', + post=True, + is_json=True, + postData={'block': blockHash} + ) + blockmetadata.process_block_metadata(blockHash) # caches block metadata values to block database else: logger.warn('POW failed for block %s.' % (blockHash,)) @@ -134,14 +146,17 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): onionrpeers.PeerProfiles(peerUsed).addScore(-50) if tempHash != 'ed55e34cb828232d6c14da0479709bfa10a0923dca2b380496e6b2ed4f7a0253': # Dumb hack for 404 response from peer. Don't log it if 404 since its likely not malicious or a critical error. - logger.warn('Block hash validation failed for ' + blockHash + ' got ' + tempHash) + logger.warn( + 'Block hash validation failed for ' + + blockHash + ' got ' + tempHash) else: removeFromQueue = False # Don't remove from queue if 404 if removeFromQueue: try: del comm_inst.blockQueue[blockHash] # remove from block queue both if success or false if count == LOG_SKIP_COUNT: - logger.info('%s blocks remaining in queue' % [len(comm_inst.blockQueue)], terminal=True) + logger.info('%s blocks remaining in queue' % + [len(comm_inst.blockQueue)], terminal=True) count = 0 except KeyError: pass diff --git a/src/communicatorutils/uploadblocks/sessionmanager.py b/src/communicatorutils/uploadblocks/sessionmanager.py index 763300c2..a00da9a3 100644 --- a/src/communicatorutils/uploadblocks/sessionmanager.py +++ b/src/communicatorutils/uploadblocks/sessionmanager.py @@ -122,4 +122,4 @@ class BlockUploadSessionManager: comm_inst.blocksToUpload.remove(sess.block_hash) except ValueError: pass - localcommand.local_command('waitforshare/{session.block_hash}') + localcommand.local_command(f'waitforshare/{session.block_hash}') diff --git a/src/httpapi/daemoneventsapi/__init__.py b/src/httpapi/daemoneventsapi/__init__.py index 0863f410..14805879 100644 --- a/src/httpapi/daemoneventsapi/__init__.py +++ b/src/httpapi/daemoneventsapi/__init__.py @@ -48,8 +48,7 @@ class DaemonEventsBP: json_data = {} for handler in self.listeners: if handler.__name__ == name: - return Response( - spawn(handler, **json_data).get(timeout=120)) + return Response(handler(**json_data)) abort(404) def register_listener(self, listener: Callable): diff --git a/src/onionrblocks/insert.py b/src/onionrblocks/insert.py index 5aeeaa6f..4f869c27 100644 --- a/src/onionrblocks/insert.py +++ b/src/onionrblocks/insert.py @@ -197,7 +197,14 @@ def insert_block(data: Union[str, bytes], header: str = 'txt', retData = False else: # Tell the api server through localCommand to wait for the daemon to upload this block to make statistical analysis more difficult - coredb.daemonqueue.daemon_queue_add('uploadEvent', retData) + #coredb.daemonqueue.daemon_queue_add('uploadEvent', retData) + spawn( + localcommand.local_command, + f'/daemon-event/upload_event', + post=True, + is_json=True, + postData={'block': retData} + ).get(timeout=5) coredb.blockmetadb.add.add_to_block_DB(retData, selfInsert=True, dataSaved=True) if expire is None: @@ -213,10 +220,10 @@ def insert_block(data: Union[str, bytes], header: str = 'txt', events.event('insertdeniable', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True) else: events.event('insertblock', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True) - #coredb.daemonqueue.daemon_queue_add('remove_from_insert_list', data= dataNonce) + spawn( localcommand.local_command, '/daemon-event/remove_from_insert_queue_wrapper', - post=True, timeout=10 - ) + post=True + ).get(timeout=5) return retData diff --git a/src/onionrutils/localcommand.py b/src/onionrutils/localcommand.py index 3f62fac8..9e1ab9fd 100644 --- a/src/onionrutils/localcommand.py +++ b/src/onionrutils/localcommand.py @@ -1,9 +1,19 @@ -''' +""" Onionr - Private P2P Communication send a command to the local API server -''' -''' +""" +import urllib, time +import json +import functools +from typing import TYPE_CHECKING, Callable + +import requests + +import logger, config, deadsimplekv +from . import getclientapiserver +import filepaths +""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or @@ -16,15 +26,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . -''' -import urllib, time -import json - -import requests - -import logger, config, deadsimplekv -from . import getclientapiserver -import filepaths +""" config.reload() cache = deadsimplekv.DeadSimpleKV(filepaths.cached_storage, refresh_seconds=1000) @@ -54,9 +56,9 @@ def local_command(command, data='', silent = True, post=False, postData = {}, maxWait=20, is_json=False ): - ''' + """ Send a command to the local http API server, securely. Intended for local clients, DO NOT USE for remote peers. - ''' + """ # TODO: URL encode parameters, just as an extra measure. May not be needed, but should be added regardless. hostname = get_hostname() if hostname == False: return False