Implement cron

Fixes #22
This commit is contained in:
Dessa Simpson 2020-11-28 22:05:08 -07:00
parent 42d2a4a78c
commit a86e8a5667
9 changed files with 161 additions and 63 deletions

11
db/10-cron.sql Normal file
View file

@ -0,0 +1,11 @@
CREATE TABLE cron (
jobName varchar UNIQUE NOT NULL, -- Application-recognizable name for the job
runinterval interval NOT NULL, -- Duration between runs
-- Last successful run - only gets updated if run is successful
lastSuccess timestamptz DEFAULT to_timestamp(0), -- Defaults to beginning of time
PRIMARY KEY(jobName)
);
INSERT INTO cron (jobName,runInterval) VALUES
('processBans','30 minutes'),
('processEmptyMetadata','1 day');

View file

@ -2,13 +2,13 @@ import * as config from "./config";
import * as requests from "./requests";
import * as twitch from "./twitch";
import * as queries from "./queries";
import { log, LogLevel } from "./logging"
import { URLSearchParams } from "url";
import express from "express";
import session from "express-session";
import pg from "pg";
import pgSessionStore from "connect-pg-simple";
import fetch, { Response as FetchResponse } from "node-fetch";
import cron from "./cron";
import db from "./db";
import errorHandler from "./errors";
import * as version from "./version";
@ -332,61 +332,12 @@ app.get("/", async (request, response) => {
//});
// Logout
app.get("/logout", async (request, response) => request.session!.destroy(() => response.redirect(307, '/')));
async function processBannedUsers() {
log(LogLevel.INFO,"processBannedUsers run at " + new Date().toISOString());
var streamer = await db.query(queries.getStreamerIdToken).then((result: pg.QueryResult) => result.rows[0]);
if (typeof streamer == 'undefined') return;
var response = await twitch.streamerApiRequest("/moderation/banned?broadcaster_id=" + streamer['userid']);
var dbconn = await db.connect();
try {
await dbconn.query('BEGIN');
dbconn.query("DELETE FROM bans");
log(LogLevel.DEBUG,"Ban list:")
log(LogLevel.DEBUG,JSON.stringify(response.data,null,2));
var insertBanQuery = "INSERT INTO bans (userid) VALUES ";
var banRow = 0;
var bansArray: number[] = [];
for (var ban of response.data) {
if (ban.expires_at == '') {
banRow++;
insertBanQuery += `($${banRow}), `;
bansArray.push(ban.user_id as number);
}
}
var banQueryConfig = {
text: insertBanQuery.slice(0,-2), // Cut last `, ` off of the end
values: bansArray
};
log(LogLevel.DEBUG,"banQueryConfig object:")
log(LogLevel.DEBUG,JSON.stringify(banQueryConfig,null,2))
dbconn.query(banQueryConfig);
dbconn.query("CALL update_scores()");
await dbconn.query('COMMIT');
} catch (e) {
await dbconn.query('ROLLBACK');
} finally {
dbconn.release();
}
setTimeout(processBannedUsers,3600000+Math.floor(Math.random()*900000)) // Run every 1-1.25 hours to balance load
}
setTimeout(processBannedUsers,600000+Math.floor(Math.random()*600000))
async function processEmptyMetadata() {
log(LogLevel.INFO,"processEmptyMetadata run at " + new Date().toISOString());
var result = await db.query(queries.getRequestsWithEmptyMetadata);
for (var row of result.rows) {
log(LogLevel.DEBUG,"Processing empty metadata for request: " + row['url']);
requests.updateRequestMetadata(row['url']);
}
setTimeout(processEmptyMetadata,3600000+Math.floor(Math.random()*900000)) // Run every 1-1.25 hours to balance load
}
processEmptyMetadata();
app.get("/logout", async (request, response) =>
request.session!.destroy(() => response.redirect(307, '/')));
// Check version then listen
version.checkVersion().then(_ => app.listen(config.port, () => {
cron.run();
setInterval(cron.run,config.cronInterval);
console.log(`Listening on port ${config.port}`);
}));

View file

@ -36,6 +36,13 @@ if (typeof process.env.YOUTUBE_SECRET == 'undefined') {
}
export const youtubeSecret: string = process.env.YOUTUBE_SECRET!;
export const logLevel = (process.env.LOG_LEVEL ? LogLevel[process.env.LOG_LEVEL as keyof typeof LogLevel] : LogLevel.ERROR );
export const logLevel = (process.env.LOG_LEVEL
? LogLevel[process.env.LOG_LEVEL as keyof typeof LogLevel]
: LogLevel.ERROR );
export const cronInterval: number = (process.env.CRON_INTERVAL
? parseInt(process.env.CRON_INTERVAL as string, 10)
: 60000);
console.log("Running with logLevel = " + logLevel)

52
src/cron.ts Normal file
View file

@ -0,0 +1,52 @@
import cronjobs from "./cronjobs"
import * as queries from "./queries";
import * as twitch from "./twitch";
import { log, LogLevel } from "./logging"
import db from "./db";
import pg from "pg";
interface CronJob {
(streamer: twitch.StreamerUserIdTokenPair): Promise<void>
}
async function run() {
// If instance is not yet set up, end processing at this point
var streamer = await db.query(queries.getStreamerIdToken).then(
(result: pg.QueryResult) => result.rows[0]);
if (typeof streamer == 'undefined') {
log(LogLevel.INFO,"Cron run skipped due to instance not being set up")
return;
}
log(LogLevel.INFO,"Begin cron run")
for (var job in cronjobs) {
log(LogLevel.INFO,"cron: Checking job " + job);
var dbconn = await db.connect();
await dbconn.query("BEGIN");
try {
var needsRun = (await dbconn.query(
Object.assign(queries.getAndLockCronJob,{values: [job]})
)).rowCount;
if (needsRun) {
log(LogLevel.INFO,`cron: Database says job ${job} needs run; executing`);
await (cronjobs as { [key: string]: CronJob })[job](streamer)
log(LogLevel.INFO,`cron: Job ${job} returned without exception; updating lastSuccess`);
await dbconn.query(Object.assign(queries.updateCronJobLastSuccess,{values: [job]}))
await dbconn.query("COMMIT");
} else {
log(LogLevel.INFO,`cron: Database says job ${job} does not need run; skipping`);
await dbconn.query("ROLLBACK");
}
} catch(e) {
log(LogLevel.ERROR,`cron: Job ${job} threw exception; rolling back`);
await dbconn.query("ROLLBACK");
log(LogLevel.ERROR,`cron: Job ${job} exception message: ${e}`)
} finally {
log(LogLevel.DEBUG,`cron: Job ${job} hit finally; releasing dbconn`);
await dbconn.release();
}
log(LogLevel.INFO,"cron: Finished job " + job);
}
log(LogLevel.INFO,"End cron run")
}
export = {run}

7
src/cronjobs/index.ts Normal file
View file

@ -0,0 +1,7 @@
import { processBans } from './processBans';
import { processEmptyMetadata } from './processEmptyMetadata';
export = {
processBans,
processEmptyMetadata
}

View file

@ -0,0 +1,40 @@
import * as twitch from "../twitch";
import { log, LogLevel } from "../logging"
import db from "../db";
export async function processBans(streamer: twitch.StreamerUserIdTokenPair) {
var response = await twitch.streamerApiRequest(streamer,
"/moderation/banned?broadcaster_id=" + streamer.userid);
var dbconn = await db.connect();
try {
await dbconn.query('BEGIN');
await dbconn.query("DELETE FROM bans");
log(LogLevel.DEBUG,"Ban list:")
log(LogLevel.DEBUG,JSON.stringify(response.data,null,2));
var insertBanQuery = "INSERT INTO bans (userid) VALUES ";
var banRow = 0;
var bansArray: number[] = [];
for (var ban of response.data) {
if (ban.expires_at == '') {
banRow++;
insertBanQuery += `($${banRow}), `;
bansArray.push(ban.user_id as number);
}
}
var banQueryConfig = {
text: insertBanQuery.slice(0,-2), // Cut last `, ` off of the end
values: bansArray
};
log(LogLevel.DEBUG,"banQueryConfig object:")
log(LogLevel.DEBUG,JSON.stringify(banQueryConfig,null,2))
await dbconn.query(banQueryConfig);
await dbconn.query("CALL update_scores()");
await dbconn.query('COMMIT');
} catch (e) {
log(LogLevel.ERROR,"cronjobs.processBans: Exception thrown; rolling back");
await dbconn.query('ROLLBACK');
throw(e);
} finally {
await dbconn.release();
}
}

View file

@ -0,0 +1,12 @@
import * as requests from "../requests";
import * as queries from "../queries";
import { log, LogLevel } from "../logging"
import db from "../db";
export async function processEmptyMetadata(_: any) { // TODO: Consume streamer userid for WHERE clause
var result = await db.query(queries.getRequestsWithEmptyMetadata);
for (var row of result.rows) {
log(LogLevel.DEBUG,"Processing empty metadata for request: " + row['url']);
requests.updateRequestMetadata(row['url']);
}
}

View file

@ -111,3 +111,16 @@ export const getDbVersion = {
name: "getDbVersion",
text: "SELECT get_version()"
}
export const getAndLockCronJob = {
name: "getCronJob",
text: `SELECT * FROM cron
WHERE (lastSuccess + runInterval) < now()
AND jobName = $1
FOR UPDATE SKIP LOCKED;`
}
export const updateCronJobLastSuccess = {
name: "updateCronJobLastSuccess",
text: "UPDATE cron SET lastSuccess = now() WHERE jobName = $1"
}

View file

@ -10,6 +10,11 @@ export interface TokenPair {
refresh_token: string;
}
export interface StreamerUserIdTokenPair {
userid: number
tokenpair: TokenPair
}
// Refresh the API token. Returns true on success and false on failure.
async function refreshApiToken(tokens: TokenPair): Promise<boolean> {
log(LogLevel.DEBUG,`Call: twitch.refreshApiToken(${JSON.stringify(tokens,null,2)})`);
@ -85,15 +90,15 @@ export async function apiRequest(tokens: TokenPair, endpoint: string): Promise <
})
}
export async function streamerApiRequest(endpoint: string) {
export async function streamerApiRequest(streamer: StreamerUserIdTokenPair, endpoint: string) {
log(LogLevel.DEBUG,`Call: twitch.streamerApiRequest(${endpoint})`);
var streamer = await db.query(queries.getStreamerIdToken).then((result: pg.QueryResult) => result.rows[0]);
var tokenpair = streamer.tokenpair;
var originaltoken = tokenpair.access_token;
log(LogLevel.DEBUG,"Original token: " + tokenpair.access_token);
var response = await apiRequest(tokenpair,endpoint);
log(LogLevel.DEBUG,"New token: " + tokenpair.access_token);
if (tokenpair.access_token != originaltoken) await db.query(Object.assign(queries.updateStreamer,{ values: [streamer.userid,tokenpair] }))
var originaltoken = streamer.tokenpair.access_token;
log(LogLevel.DEBUG,"Original token: " + originaltoken);
var response = await apiRequest(streamer.tokenpair,endpoint);
log(LogLevel.DEBUG,"New token: " + streamer.tokenpair.access_token);
if (streamer.tokenpair.access_token != originaltoken)
await db.query(Object.assign(queries.updateStreamer,
{ values: [streamer.userid,streamer.tokenpair] }))
return response;
}