learn-request-queue/src/cron.ts

71 lines
2.3 KiB
TypeScript

import cronjobs from "./cronjobs"
import * as queries from "./queries";
import * as twitch from "./twitch";
import { log, LogLevel } from "./logging"
import db from "./db";
import pg from "pg";
interface CronJob {
(streamer: twitch.StreamerUserIdTokenPair): Promise<void>
}
function validateJob(job: string) {
if (!Object.keys(cronjobs).includes(job)) throw new Error("Invalid cronjob " + job);
}
async function runJob(job: string) {
validateJob(job);
var streamer = await db.query(queries.getStreamerIdToken).then(
(result: pg.QueryResult) => result.rows[0]);
return await (cronjobs as { [key: string]: CronJob })[job](streamer);
}
// Run a specific job on request of streamer
async function request(job: string) {
validateJob(job);
// TODO: Rate limiting
await runJob(job);
}
async function run() {
// If instance is not yet set up, end processing at this point
var streamer = await db.query(queries.getStreamerIdToken).then(
(result: pg.QueryResult) => result.rows[0]);
if (typeof streamer == 'undefined') {
log(LogLevel.INFO,"Cron run skipped due to instance not being set up")
return;
}
log(LogLevel.INFO,"Begin cron run")
for (var job in cronjobs) {
log(LogLevel.INFO,"cron: Checking job " + job);
var dbconn = await db.connect();
await dbconn.query("BEGIN");
try {
var needsRun = (await dbconn.query(
Object.assign(queries.getAndLockCronJob,{values: [job]})
)).rowCount;
if (needsRun) {
log(LogLevel.INFO,`cron: Database says job ${job} needs run; executing`);
await (cronjobs as { [key: string]: CronJob })[job](streamer)
log(LogLevel.INFO,`cron: Job ${job} returned without exception; updating lastSuccess`);
await dbconn.query(Object.assign(queries.updateCronJobLastSuccess,{values: [job]}))
await dbconn.query("COMMIT");
} else {
log(LogLevel.INFO,`cron: Database says job ${job} does not need run; skipping`);
await dbconn.query("ROLLBACK");
}
} catch(e) {
log(LogLevel.ERROR,`cron: Job ${job} threw exception; rolling back`);
await dbconn.query("ROLLBACK");
log(LogLevel.ERROR,`cron: Job ${job} exception message: ${e}`)
} finally {
log(LogLevel.DEBUG,`cron: Job ${job} hit finally; releasing dbconn`);
dbconn.release();
}
log(LogLevel.INFO,"cron: Finished job " + job);
}
log(LogLevel.INFO,"End cron run")
}
export = {run,validateJob,request}