switched to upload event api
This commit is contained in:
parent
1ba8b4c707
commit
c975d27906
8 changed files with 67 additions and 27 deletions
|
@ -13,3 +13,4 @@
|
|||
* localization support
|
||||
* add BCC support to mail
|
||||
* prevent local insertion success of duplicate block content
|
||||
* truncate last N blocks when sharing list
|
||||
|
|
|
@ -8,11 +8,14 @@ from typing import TYPE_CHECKING
|
|||
|
||||
from gevent import sleep
|
||||
|
||||
from communicatorutils.uploadblocks import mixmate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from toomanyobjs import TooMany
|
||||
from communicator import OnionrCommunicatorDaemon
|
||||
from httpapi.daemoneventsapi import DaemonEventsBP
|
||||
from onionrtypes import BlockHash
|
||||
from apiservers import PublicAPI
|
||||
"""
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
|
@ -37,16 +40,28 @@ def daemon_event_handlers(shared_state: 'TooMany'):
|
|||
except KeyError:
|
||||
sleep(0.2)
|
||||
comm_inst = _get_inst('OnionrCommunicatorDaemon')
|
||||
public_api: 'PublicAPI' = _get_inst('PublicAPI')
|
||||
events_api: 'DaemonEventsBP' = _get_inst('DaemonEventsBP')
|
||||
|
||||
def remove_from_insert_queue_wrapper(block_hash: 'BlockHash'):
|
||||
print(f'removed {block_hash} from upload')
|
||||
remove_from_insert_queue(comm_inst, block_hash)
|
||||
return "removed"
|
||||
|
||||
def print_test(text=''):
|
||||
print("It works!", text)
|
||||
return f"It works! {text}"
|
||||
|
||||
def upload_event(block: 'BlockHash' = ''):
|
||||
if not block:
|
||||
raise ValueError
|
||||
public_api.hideBlocks.append(block)
|
||||
try:
|
||||
mixmate.block_mixer(comm_inst.blocksToUpload, block)
|
||||
except ValueError:
|
||||
pass
|
||||
return "removed"
|
||||
|
||||
events_api.register_listener(remove_from_insert_queue_wrapper)
|
||||
events_api.register_listener(print_test)
|
||||
events_api.register_listener(upload_event)
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ Class to remember blocks that need to be uploaded
|
|||
and not shared on startup/shutdown
|
||||
"""
|
||||
import atexit
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import deadsimplekv
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
from typing import TYPE_CHECKING
|
||||
if TYPE_CHECKING:
|
||||
from communicator import OnionrCommunicatorDaemon
|
||||
|
||||
from gevent import spawn
|
||||
|
||||
import onionrexceptions
|
||||
import logger
|
||||
import onionrpeers
|
||||
|
@ -16,6 +19,7 @@ from onionrutils import blockmetadata
|
|||
from onionrutils import validatemetadata
|
||||
from coredb import blockmetadb
|
||||
from coredb import daemonqueue
|
||||
from onionrutils.localcommand import local_command
|
||||
import onionrcrypto
|
||||
import onionrstorage
|
||||
from onionrblocks import onionrblacklist
|
||||
|
@ -82,7 +86,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
|||
logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed))
|
||||
content = peeraction.peer_action(comm_inst, peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer (includes metadata)
|
||||
|
||||
if content != False and len(content) > 0:
|
||||
if content is not False and len(content) > 0:
|
||||
try:
|
||||
content = content.encode()
|
||||
except AttributeError:
|
||||
|
@ -98,7 +102,8 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
|||
metas = blockmetadata.get_block_metadata_from_data(content) # returns tuple(metadata, meta), meta is also in metadata
|
||||
metadata = metas[0]
|
||||
try:
|
||||
metadata_validation_result = validatemetadata.validate_metadata(metadata, metas[2])
|
||||
metadata_validation_result = \
|
||||
validatemetadata.validate_metadata(metadata, metas[2])
|
||||
except onionrexceptions.DataExists:
|
||||
metadata_validation_result = False
|
||||
if metadata_validation_result: # check if metadata is valid, and verify nonce
|
||||
|
@ -113,7 +118,14 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
|||
removeFromQueue = False
|
||||
else:
|
||||
blockmetadb.add_to_block_DB(blockHash, dataSaved=True) # add block to meta db
|
||||
daemonqueue.daemon_queue_add('uploadEvent', blockHash)
|
||||
spawn(
|
||||
local_command,
|
||||
f'/daemon-event/upload_event',
|
||||
post=True,
|
||||
is_json=True,
|
||||
postData={'block': blockHash}
|
||||
)
|
||||
|
||||
blockmetadata.process_block_metadata(blockHash) # caches block metadata values to block database
|
||||
else:
|
||||
logger.warn('POW failed for block %s.' % (blockHash,))
|
||||
|
@ -134,14 +146,17 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
|
|||
onionrpeers.PeerProfiles(peerUsed).addScore(-50)
|
||||
if tempHash != 'ed55e34cb828232d6c14da0479709bfa10a0923dca2b380496e6b2ed4f7a0253':
|
||||
# Dumb hack for 404 response from peer. Don't log it if 404 since its likely not malicious or a critical error.
|
||||
logger.warn('Block hash validation failed for ' + blockHash + ' got ' + tempHash)
|
||||
logger.warn(
|
||||
'Block hash validation failed for ' +
|
||||
blockHash + ' got ' + tempHash)
|
||||
else:
|
||||
removeFromQueue = False # Don't remove from queue if 404
|
||||
if removeFromQueue:
|
||||
try:
|
||||
del comm_inst.blockQueue[blockHash] # remove from block queue both if success or false
|
||||
if count == LOG_SKIP_COUNT:
|
||||
logger.info('%s blocks remaining in queue' % [len(comm_inst.blockQueue)], terminal=True)
|
||||
logger.info('%s blocks remaining in queue' %
|
||||
[len(comm_inst.blockQueue)], terminal=True)
|
||||
count = 0
|
||||
except KeyError:
|
||||
pass
|
||||
|
|
|
@ -122,4 +122,4 @@ class BlockUploadSessionManager:
|
|||
comm_inst.blocksToUpload.remove(sess.block_hash)
|
||||
except ValueError:
|
||||
pass
|
||||
localcommand.local_command('waitforshare/{session.block_hash}')
|
||||
localcommand.local_command(f'waitforshare/{session.block_hash}')
|
||||
|
|
|
@ -48,8 +48,7 @@ class DaemonEventsBP:
|
|||
json_data = {}
|
||||
for handler in self.listeners:
|
||||
if handler.__name__ == name:
|
||||
return Response(
|
||||
spawn(handler, **json_data).get(timeout=120))
|
||||
return Response(handler(**json_data))
|
||||
abort(404)
|
||||
|
||||
def register_listener(self, listener: Callable):
|
||||
|
|
|
@ -197,7 +197,14 @@ def insert_block(data: Union[str, bytes], header: str = 'txt',
|
|||
retData = False
|
||||
else:
|
||||
# Tell the api server through localCommand to wait for the daemon to upload this block to make statistical analysis more difficult
|
||||
coredb.daemonqueue.daemon_queue_add('uploadEvent', retData)
|
||||
#coredb.daemonqueue.daemon_queue_add('uploadEvent', retData)
|
||||
spawn(
|
||||
localcommand.local_command,
|
||||
f'/daemon-event/upload_event',
|
||||
post=True,
|
||||
is_json=True,
|
||||
postData={'block': retData}
|
||||
).get(timeout=5)
|
||||
coredb.blockmetadb.add.add_to_block_DB(retData, selfInsert=True, dataSaved=True)
|
||||
|
||||
if expire is None:
|
||||
|
@ -213,10 +220,10 @@ def insert_block(data: Union[str, bytes], header: str = 'txt',
|
|||
events.event('insertdeniable', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True)
|
||||
else:
|
||||
events.event('insertblock', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True)
|
||||
#coredb.daemonqueue.daemon_queue_add('remove_from_insert_list', data= dataNonce)
|
||||
|
||||
spawn(
|
||||
localcommand.local_command,
|
||||
'/daemon-event/remove_from_insert_queue_wrapper',
|
||||
post=True, timeout=10
|
||||
)
|
||||
post=True
|
||||
).get(timeout=5)
|
||||
return retData
|
||||
|
|
|
@ -1,9 +1,19 @@
|
|||
'''
|
||||
"""
|
||||
Onionr - Private P2P Communication
|
||||
|
||||
send a command to the local API server
|
||||
'''
|
||||
'''
|
||||
"""
|
||||
import urllib, time
|
||||
import json
|
||||
import functools
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
|
||||
import requests
|
||||
|
||||
import logger, config, deadsimplekv
|
||||
from . import getclientapiserver
|
||||
import filepaths
|
||||
"""
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
|
@ -16,15 +26,7 @@
|
|||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
import urllib, time
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
import logger, config, deadsimplekv
|
||||
from . import getclientapiserver
|
||||
import filepaths
|
||||
"""
|
||||
config.reload()
|
||||
|
||||
cache = deadsimplekv.DeadSimpleKV(filepaths.cached_storage, refresh_seconds=1000)
|
||||
|
@ -54,9 +56,9 @@ def local_command(command, data='', silent = True, post=False,
|
|||
postData = {}, maxWait=20,
|
||||
is_json=False
|
||||
):
|
||||
'''
|
||||
"""
|
||||
Send a command to the local http API server, securely. Intended for local clients, DO NOT USE for remote peers.
|
||||
'''
|
||||
"""
|
||||
# TODO: URL encode parameters, just as an extra measure. May not be needed, but should be added regardless.
|
||||
hostname = get_hostname()
|
||||
if hostname == False: return False
|
||||
|
|
Loading…
Reference in a new issue