* fleshed out daemon events; also is now used for insertion queue removal

* deprecated daemon queue and removed unused daemon queue commands
master
Kevin Froman 2020-01-03 04:17:00 -06:00
parent 6529d3e622
commit 1ba8b4c707
8 changed files with 93 additions and 72 deletions

View File

@ -7,30 +7,3 @@ Observer pattern
Register listeners dynamically per event
Spawn new greenlets
-------------------
## Attributes
events: dict
schema:
{
"event_id": dict{
"event_name": string,
"result_data": bytes,
"started": epoch,
"finished": epoch,
"done": bool
}
}
--------------------
MsgPack schema:
event_name: string
event_id: uuid4

View File

@ -36,6 +36,7 @@ from coredb import daemonqueue
from coredb import dbfiles
from netcontroller import NetController
from . import bootstrappeers
from . import daemoneventhooks
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -247,6 +248,8 @@ class OnionrCommunicatorDaemon:
bootstrappeers.add_bootstrap_list_to_peer_list(
self, [], db_only=True)
daemoneventhooks.daemon_event_handlers(shared_state)
if not config.get('onboarding.done', True):
logger.info(
'First run detected. Run openhome to get setup.',

View File

@ -2,6 +2,8 @@
Hooks to handle daemon events
"""
from .removefrominsertqueue import remove_from_insert_queue
from typing import TYPE_CHECKING
from gevent import sleep
@ -10,6 +12,7 @@ if TYPE_CHECKING:
from toomanyobjs import TooMany
from communicator import OnionrCommunicatorDaemon
from httpapi.daemoneventsapi import DaemonEventsBP
from onionrtypes import BlockHash
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -33,6 +36,17 @@ def daemon_event_handlers(shared_state: 'TooMany'):
return shared_state.get_by_string(class_name)
except KeyError:
sleep(0.2)
comm_inst = _get_inst('OnionrCommunicatorDaemon')
events_api: 'DaemonEventsBP' = _get_inst('DaemonEventsBP')
def remove_from_insert_queue_wrapper(block_hash: 'BlockHash'):
print(f'removed {block_hash} from upload')
remove_from_insert_queue(comm_inst, block_hash)
def print_test(text=''):
print("It works!", text)
return f"It works! {text}"
events_api.register_listener(remove_from_insert_queue_wrapper)
events_api.register_listener(print_test)

View File

@ -0,0 +1,31 @@
"""Onionr - P2P Anonymous Storage Network.
Remove block hash from daemon's upload list.
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from communicator import OnionrCommunicatorDaemon
from onionrtypes import BlockHash
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
def remove_from_insert_queue(comm_inst: "OnionrCommunicatorDaemon",
b_hash: "BlockHash"):
"""Remove block hash from daemon's upload list."""
try:
comm_inst.generating_blocks.remove(b_hash)
except ValueError:
pass

View File

@ -26,6 +26,7 @@ from communicatorutils.uploadblocks import mixmate
def handle_daemon_commands(comm_inst):
# Deprecated in favor of daemon events
cmd = daemonqueue.daemon_queue()
response = ''
if cmd is not False:
@ -33,35 +34,16 @@ def handle_daemon_commands(comm_inst):
if cmd[0] == 'shutdown':
comm_inst.shutdown = True
elif cmd[0] == 'runtimeTest':
comm_inst.shared_state.get_by_string("OnionrRunTestManager").run_tests()
comm_inst.shared_state.get_by_string(
"OnionrRunTestManager").run_tests()
elif cmd[0] == 'remove_from_insert_list':
try:
comm_inst.generating_blocks.remove(cmd[1])
except ValueError:
pass
elif cmd[0] == 'announceNode':
if len(comm_inst.onlinePeers) > 0:
comm_inst.announce(cmd[1])
else:
logger.debug("No nodes connected. Will not introduce node.")
elif cmd[0] == 'runCheck': # deprecated
logger.debug('Status check; looks good.')
open(filepaths.run_check_file + '.runcheck', 'w+').close()
elif cmd[0] == 'connectedPeers':
response = '\n'.join(list(comm_inst.onlinePeers)).strip()
if response == '':
response = 'none'
elif cmd[0] == 'localCommand':
response = localcommand.local_command(cmd[1])
elif cmd[0] == 'clearOffline':
comm_inst.offlinePeers = []
elif cmd[0] == 'restartTor':
restarttor.restart(comm_inst)
comm_inst.offlinePeers = []
elif cmd[0] == 'pex':
for i in comm_inst.timers:
if i.timerFunction.__name__ == 'lookupAdders':
i.count = (i.frequency - 1)
elif cmd[0] == 'uploadBlock':
comm_inst.blocksToUpload.append(cmd[1])
elif cmd[0] == 'uploadEvent':
@ -70,13 +52,18 @@ def handle_daemon_commands(comm_inst):
except ValueError:
pass
else:
localcommand.local_command('/waitforshare/' + cmd[1], post=True, maxWait=5)
localcommand.local_command(
'/waitforshare/' + cmd[1], post=True, maxWait=5)
else:
logger.debug('Received daemon queue command unable to be handled: %s' % (cmd[0],))
logger.debug(
'Received daemon queue command unable to be handled: %s' %
(cmd[0],))
if cmd[0] not in ('', None):
if response != '':
localcommand.local_command('queueResponseAdd/' + cmd[4], post=True, postData={'data': response})
localcommand.local_command(
'queueResponseAdd/' + cmd[4],
post=True, postData={'data': response})
response = ''
comm_inst.decrementThreadCount('handle_daemon_commands')

View File

@ -106,7 +106,10 @@ class BlockUploadSessionManager:
if (sess.total_success_count / onlinePeerCount) >= onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT:
sessions_to_delete.append(sess)
for sess in sessions_to_delete:
try:
self.sessions.remove(session)
except ValueError:
pass
# TODO cleanup to one round of search
# Remove the blocks from the sessions, upload list,
# and waitforshare list

View File

@ -2,9 +2,12 @@
Event driven interface to trigger events in communicator
"""
import json
from flask import Blueprint, request, Response
import config
from typing import Callable
from flask import Blueprint, request, Response, abort
from werkzeug.exceptions import BadRequest
from gevent import spawn
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -26,28 +29,28 @@ class DaemonEventsBP:
"""Create DaemonEvents instance, intended to be a singleton.
Attributes:
events: dict of current/finished events
listeners: callables that are called when a new event is added.
The callables name should match the event name
_too_many: TooManyObjects instance set by external code
"""
event_BP = Blueprint('event_BP', __name__)
self.events = {}
self.listeners = {}
self.listeners = set([])
self.flask_bp = event_BP
event_BP = self.flask_bp
@event_BP.route('/daemon-event/<name>', methods=['POST'])
def daemon_event_handler(name):
if name in self.listeners:
@event_BP.route('/daemon-event/bp-enabled')
def bp_enabled() -> Response:
return Response('true')
def clean_old(self):
"""Deletes old daemon events based on their completion date."""
pass
handler: Callable
try:
json_data = request.get_json(force=True)
except BadRequest:
json_data = {}
for handler in self.listeners:
if handler.__name__ == name:
return Response(
spawn(handler, **json_data).get(timeout=120))
abort(404)
def register_listener(self, listener: Callable):
self.listeners.add(listener)

View File

@ -20,6 +20,8 @@
from typing import Union
import json
from gevent import spawn
from onionrutils import bytesconverter, epoch
import filepaths, onionrstorage
from . import storagecounter
@ -211,5 +213,10 @@ def insert_block(data: Union[str, bytes], header: str = 'txt',
events.event('insertdeniable', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True)
else:
events.event('insertblock', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True)
coredb.daemonqueue.daemon_queue_add('remove_from_insert_list', data= dataNonce)
#coredb.daemonqueue.daemon_queue_add('remove_from_insert_list', data= dataNonce)
spawn(
localcommand.local_command,
'/daemon-event/remove_from_insert_queue_wrapper',
post=True, timeout=10
)
return retData