fix run_retries and add retry commands to manage
This commit is contained in:
parent
ead1247e83
commit
af5c968d7e
|
@ -1,4 +1,3 @@
|
|||
izzylib[hasher,http_server_async,http_signatures,http_urllib_client,sql,template] @ git+https://git.barkshark.xyz/izaliamae/izzylib@b2b67b2ed8fbc9a5d04b4156f282ac8beb2ead81
|
||||
izzylib[hasher,http_server_async,http_signatures,http_urllib_client,sql,template] @ git+https://git.barkshark.xyz/izaliamae/izzylib@cc8f8b09a1c991b4328bf405c88dcf90626adeb7
|
||||
|
||||
pyyaml==5.4.1
|
||||
apscheduler==3.8.0
|
||||
|
|
|
@ -91,20 +91,24 @@ def cmd_instance(self, inbox, actor, followid=None):
|
|||
|
||||
|
||||
def cmd_retry(self, inbox, data, headers={}, timestamp=None):
|
||||
row = self.get.retry(data.id, inbox)
|
||||
instance = self.get.instance(inbox)
|
||||
row = instance.retry(data.id)
|
||||
|
||||
if not row:
|
||||
instance = self.get.instance(inbox)
|
||||
logging.verbose(f'Putting new retry in db for {instance.domain}: {data.id}')
|
||||
|
||||
row = self.insert('retry',
|
||||
msgid = data.id,
|
||||
inboxid = instanceid,
|
||||
inboxid = instance.id,
|
||||
data = data,
|
||||
headers = headers,
|
||||
timestamp = timestamp or datetime.now(),
|
||||
return_row = True
|
||||
)
|
||||
|
||||
else:
|
||||
logging.verbose(f'Retry for {instance.domain} already in db: {data.id}')
|
||||
|
||||
return row
|
||||
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import json
|
||||
|
||||
from functools import wraps
|
||||
from izzylib import LruCache, logging
|
||||
from izzylib import DotDict, LruCache, logging
|
||||
from izzylib.http_urllib_client import HttpUrllibClient
|
||||
from izzylib.http_urllib_client.error import MaxRetryError
|
||||
|
||||
|
@ -111,19 +111,28 @@ def push_message(inbox, message, headers={}):
|
|||
keyid = f'https://{config.host}/actor#main-key'
|
||||
)
|
||||
|
||||
except MaxRetryError:
|
||||
return
|
||||
except MaxRetryError as e:
|
||||
response = DotDict(status=0, error=e)
|
||||
|
||||
except Exception as e:
|
||||
logging.debug(f'push_message: {e.__class__.__name__}: {e}')
|
||||
return
|
||||
#logging.debug(f'push_message: {e.__class__.__name__}: {e}')
|
||||
response = DotDict(status=0, error=e)
|
||||
|
||||
if response.status not in [200, 202]:
|
||||
try:
|
||||
body = response.dict
|
||||
except:
|
||||
body = response.text
|
||||
s.put.retry(inbox, message, headers)
|
||||
|
||||
logging.debug(f'Error from {inbox}: {body}')
|
||||
if response.status:
|
||||
try:
|
||||
body = response.dict
|
||||
except:
|
||||
body = response.text
|
||||
|
||||
logging.debug(f'Error from {inbox}: {response.status} {body}')
|
||||
|
||||
else:
|
||||
logging.debug(f'Error from {inbox}: {response.error}')
|
||||
|
||||
return
|
||||
|
||||
logging.debug(f'Pushed message to {inbox}:', message['id'])
|
||||
return response
|
||||
|
|
|
@ -13,6 +13,7 @@ from .config import config, dbconfig, path, write_config
|
|||
from .database import db
|
||||
from .functions import fetch_actor, get_inbox
|
||||
from .messages import Message
|
||||
from .server import retry_instance
|
||||
|
||||
|
||||
exe = f'{sys.executable} -m uncia.manage'
|
||||
|
@ -95,6 +96,14 @@ python3 -m uncia.manage accept <actor, inbox, or domain>: *
|
|||
python3 -m uncia.manage deny <actor, inbox, or domain>: *
|
||||
Reject a request.
|
||||
|
||||
python3 -m uncia.manage retries [domain]:
|
||||
List all of the current retires. If a domain is specified, only list
|
||||
retries from that instance
|
||||
|
||||
python3 -m uncia.manage retry [domain]:
|
||||
Re-run retries. If a domain is specified, only the retries from that
|
||||
instance will be ran.
|
||||
|
||||
python3 -m uncia.manage convert [pleroma or uncia]:
|
||||
Convert the database and config of another relay. Tries to convert a
|
||||
Pleroma Relay by default.
|
||||
|
@ -309,6 +318,35 @@ python3 -m uncia.manage convert [pleroma or uncia]:
|
|||
return data or 'No banned domains or users yet'
|
||||
|
||||
|
||||
def cmd_retries(self, domain=None):
|
||||
with db.session as s:
|
||||
if domain:
|
||||
instances = [s.get.instance(domain)]
|
||||
|
||||
else:
|
||||
instances = s.search('inbox')
|
||||
|
||||
retries = []
|
||||
|
||||
for instance in instances:
|
||||
for retry in instance.retries():
|
||||
retries.append(f'{retry.timestamp} {retry.msgid}')
|
||||
|
||||
return '\n'.join(retries)
|
||||
|
||||
|
||||
def cmd_retry(self, domain=None):
|
||||
with db.session as s:
|
||||
if domain:
|
||||
instances = [s.get.instance(domain)]
|
||||
|
||||
else:
|
||||
instances = s.search('inbox')
|
||||
|
||||
for instance in instances:
|
||||
retry_instance(instance)
|
||||
|
||||
|
||||
def cmd_convert(self, relay='uncia'):
|
||||
if relay.lower() == 'uncia':
|
||||
return self.convert_uncia()
|
||||
|
|
|
@ -126,7 +126,7 @@ class ProcessData:
|
|||
traceback.print_exc()
|
||||
|
||||
if not response or response.status not in [200, 202]:
|
||||
logging.verbose(f'Failed to send object announce to {instance.domain}: {object.id}')
|
||||
logging.verbose(f'Failed to send announce object to {instance.domain}: {object.id}')
|
||||
s.put.retry(instance.inbox, msg)
|
||||
|
||||
if response:
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
import asyncio
|
||||
|
||||
from izzylib import logging
|
||||
from izzylib.http_server_async import Application
|
||||
|
||||
from . import __version__, views
|
||||
from .config import config, path, first_start
|
||||
from .database import db
|
||||
from .functions import push_message
|
||||
from .middleware import AuthCheck
|
||||
|
||||
|
||||
|
@ -36,23 +38,37 @@ with db.session as s:
|
|||
app.add_static('/style', path.frontend.join('style'))
|
||||
|
||||
|
||||
def run_retries():
|
||||
async def run_retries():
|
||||
await asyncio.sleep(60 * 60)
|
||||
|
||||
with db.session as s:
|
||||
for instance in s.get.instance_list():
|
||||
for instance in s.search('inbox'):
|
||||
retry_instance(instance)
|
||||
|
||||
|
||||
def retry_instance(instance):
|
||||
retries = instance.retries()
|
||||
fails = 0
|
||||
|
||||
if not retries:
|
||||
return
|
||||
|
||||
logging.debug(f'Retrying {len(retries)} message(s) for {instance.domain}')
|
||||
logging.verbose(f'Retrying {len(retries)} message(s) for {instance.domain}')
|
||||
|
||||
for retry in instance.retries():
|
||||
try:
|
||||
push_message(instance.inbox, retry.data, headers=retry.headers)
|
||||
if push_message(instance.inbox, retry.data, headers=retry.headers):
|
||||
fails = 0
|
||||
|
||||
with db.session as s:
|
||||
s.remove('retry', row=retry)
|
||||
|
||||
else:
|
||||
fails += 1
|
||||
|
||||
if fails >= 5:
|
||||
logging.verbose(f'Failed 5 times in a row when retrying messages for {instance.domain}')
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logging.debug(f'{e.__class__.__name__}: {e}')
|
||||
|
@ -64,10 +80,4 @@ def main():
|
|||
logging.error(f'Uncia has not been configured yet. Please edit {config.envfile} first.')
|
||||
return
|
||||
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.add_job(run_retries, 'interval', hours=1)
|
||||
scheduler.start()
|
||||
|
||||
app.start()
|
||||
|
||||
scheduler.shutdown()
|
||||
app.start(run_retries())
|
||||
|
|
Loading…
Reference in a new issue