diff --git a/db/10-cron.sql b/db/10-cron.sql new file mode 100644 index 0000000..3493563 --- /dev/null +++ b/db/10-cron.sql @@ -0,0 +1,11 @@ +CREATE TABLE cron ( + jobName varchar UNIQUE NOT NULL, -- Application-recognizable name for the job + runinterval interval NOT NULL, -- Duration between runs + -- Last successful run - only gets updated if run is successful + lastSuccess timestamptz DEFAULT to_timestamp(0), -- Defaults to beginning of time + PRIMARY KEY(jobName) +); + +INSERT INTO cron (jobName,runInterval) VALUES +('processBans','30 minutes'), +('processEmptyMetadata','1 day'); diff --git a/src/app.ts b/src/app.ts index e67ab26..dc50ec8 100644 --- a/src/app.ts +++ b/src/app.ts @@ -2,13 +2,13 @@ import * as config from "./config"; import * as requests from "./requests"; import * as twitch from "./twitch"; import * as queries from "./queries"; -import { log, LogLevel } from "./logging" import { URLSearchParams } from "url"; import express from "express"; import session from "express-session"; import pg from "pg"; import pgSessionStore from "connect-pg-simple"; import fetch, { Response as FetchResponse } from "node-fetch"; +import cron from "./cron"; import db from "./db"; import errorHandler from "./errors"; import * as version from "./version"; @@ -332,61 +332,12 @@ app.get("/", async (request, response) => { //}); // Logout -app.get("/logout", async (request, response) => request.session!.destroy(() => response.redirect(307, '/'))); - -async function processBannedUsers() { - log(LogLevel.INFO,"processBannedUsers run at " + new Date().toISOString()); - var streamer = await db.query(queries.getStreamerIdToken).then((result: pg.QueryResult) => result.rows[0]); - if (typeof streamer == 'undefined') return; - var response = await twitch.streamerApiRequest("/moderation/banned?broadcaster_id=" + streamer['userid']); - var dbconn = await db.connect(); - try { - await dbconn.query('BEGIN'); - dbconn.query("DELETE FROM bans"); - log(LogLevel.DEBUG,"Ban list:") - log(LogLevel.DEBUG,JSON.stringify(response.data,null,2)); - var insertBanQuery = "INSERT INTO bans (userid) VALUES "; - var banRow = 0; - var bansArray: number[] = []; - for (var ban of response.data) { - if (ban.expires_at == '') { - banRow++; - insertBanQuery += `($${banRow}), `; - bansArray.push(ban.user_id as number); - } - } - var banQueryConfig = { - text: insertBanQuery.slice(0,-2), // Cut last `, ` off of the end - values: bansArray - }; - log(LogLevel.DEBUG,"banQueryConfig object:") - log(LogLevel.DEBUG,JSON.stringify(banQueryConfig,null,2)) - dbconn.query(banQueryConfig); - dbconn.query("CALL update_scores()"); - await dbconn.query('COMMIT'); - } catch (e) { - await dbconn.query('ROLLBACK'); - } finally { - dbconn.release(); - } - setTimeout(processBannedUsers,3600000+Math.floor(Math.random()*900000)) // Run every 1-1.25 hours to balance load -} - -setTimeout(processBannedUsers,600000+Math.floor(Math.random()*600000)) - -async function processEmptyMetadata() { - log(LogLevel.INFO,"processEmptyMetadata run at " + new Date().toISOString()); - var result = await db.query(queries.getRequestsWithEmptyMetadata); - for (var row of result.rows) { - log(LogLevel.DEBUG,"Processing empty metadata for request: " + row['url']); - requests.updateRequestMetadata(row['url']); - } - setTimeout(processEmptyMetadata,3600000+Math.floor(Math.random()*900000)) // Run every 1-1.25 hours to balance load -} - -processEmptyMetadata(); +app.get("/logout", async (request, response) => + request.session!.destroy(() => response.redirect(307, '/'))); // Check version then listen version.checkVersion().then(_ => app.listen(config.port, () => { + cron.run(); + setInterval(cron.run,config.cronInterval); console.log(`Listening on port ${config.port}`); })); diff --git a/src/config.ts b/src/config.ts index ce3e768..3d9c47e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -36,6 +36,13 @@ if (typeof process.env.YOUTUBE_SECRET == 'undefined') { } export const youtubeSecret: string = process.env.YOUTUBE_SECRET!; -export const logLevel = (process.env.LOG_LEVEL ? LogLevel[process.env.LOG_LEVEL as keyof typeof LogLevel] : LogLevel.ERROR ); +export const logLevel = (process.env.LOG_LEVEL + ? LogLevel[process.env.LOG_LEVEL as keyof typeof LogLevel] + : LogLevel.ERROR ); + +export const cronInterval: number = (process.env.CRON_INTERVAL + ? parseInt(process.env.CRON_INTERVAL as string, 10) + : 60000); + console.log("Running with logLevel = " + logLevel) diff --git a/src/cron.ts b/src/cron.ts new file mode 100644 index 0000000..256835e --- /dev/null +++ b/src/cron.ts @@ -0,0 +1,52 @@ +import cronjobs from "./cronjobs" +import * as queries from "./queries"; +import * as twitch from "./twitch"; +import { log, LogLevel } from "./logging" +import db from "./db"; +import pg from "pg"; + +interface CronJob { + (streamer: twitch.StreamerUserIdTokenPair): Promise +} + +async function run() { + // If instance is not yet set up, end processing at this point + var streamer = await db.query(queries.getStreamerIdToken).then( + (result: pg.QueryResult) => result.rows[0]); + if (typeof streamer == 'undefined') { + log(LogLevel.INFO,"Cron run skipped due to instance not being set up") + return; + } + log(LogLevel.INFO,"Begin cron run") + for (var job in cronjobs) { + log(LogLevel.INFO,"cron: Checking job " + job); + var dbconn = await db.connect(); + await dbconn.query("BEGIN"); + try { + var needsRun = (await dbconn.query( + Object.assign(queries.getAndLockCronJob,{values: [job]}) + )).rowCount; + if (needsRun) { + log(LogLevel.INFO,`cron: Database says job ${job} needs run; executing`); + await (cronjobs as { [key: string]: CronJob })[job](streamer) + log(LogLevel.INFO,`cron: Job ${job} returned without exception; updating lastSuccess`); + await dbconn.query(Object.assign(queries.updateCronJobLastSuccess,{values: [job]})) + await dbconn.query("COMMIT"); + } else { + log(LogLevel.INFO,`cron: Database says job ${job} does not need run; skipping`); + await dbconn.query("ROLLBACK"); + } + } catch(e) { + log(LogLevel.ERROR,`cron: Job ${job} threw exception; rolling back`); + await dbconn.query("ROLLBACK"); + log(LogLevel.ERROR,`cron: Job ${job} exception message: ${e}`) + } finally { + log(LogLevel.DEBUG,`cron: Job ${job} hit finally; releasing dbconn`); + await dbconn.release(); + } + log(LogLevel.INFO,"cron: Finished job " + job); + } + log(LogLevel.INFO,"End cron run") +} + +export = {run} diff --git a/src/cronjobs/index.ts b/src/cronjobs/index.ts new file mode 100644 index 0000000..0c856fd --- /dev/null +++ b/src/cronjobs/index.ts @@ -0,0 +1,7 @@ +import { processBans } from './processBans'; +import { processEmptyMetadata } from './processEmptyMetadata'; + +export = { + processBans, + processEmptyMetadata +} diff --git a/src/cronjobs/processBans.ts b/src/cronjobs/processBans.ts new file mode 100644 index 0000000..1fe84ab --- /dev/null +++ b/src/cronjobs/processBans.ts @@ -0,0 +1,40 @@ +import * as twitch from "../twitch"; +import { log, LogLevel } from "../logging" +import db from "../db"; + +export async function processBans(streamer: twitch.StreamerUserIdTokenPair) { + var response = await twitch.streamerApiRequest(streamer, + "/moderation/banned?broadcaster_id=" + streamer.userid); + var dbconn = await db.connect(); + try { + await dbconn.query('BEGIN'); + await dbconn.query("DELETE FROM bans"); + log(LogLevel.DEBUG,"Ban list:") + log(LogLevel.DEBUG,JSON.stringify(response.data,null,2)); + var insertBanQuery = "INSERT INTO bans (userid) VALUES "; + var banRow = 0; + var bansArray: number[] = []; + for (var ban of response.data) { + if (ban.expires_at == '') { + banRow++; + insertBanQuery += `($${banRow}), `; + bansArray.push(ban.user_id as number); + } + } + var banQueryConfig = { + text: insertBanQuery.slice(0,-2), // Cut last `, ` off of the end + values: bansArray + }; + log(LogLevel.DEBUG,"banQueryConfig object:") + log(LogLevel.DEBUG,JSON.stringify(banQueryConfig,null,2)) + await dbconn.query(banQueryConfig); + await dbconn.query("CALL update_scores()"); + await dbconn.query('COMMIT'); + } catch (e) { + log(LogLevel.ERROR,"cronjobs.processBans: Exception thrown; rolling back"); + await dbconn.query('ROLLBACK'); + throw(e); + } finally { + await dbconn.release(); + } +} diff --git a/src/cronjobs/processEmptyMetadata.ts b/src/cronjobs/processEmptyMetadata.ts new file mode 100644 index 0000000..9b911bb --- /dev/null +++ b/src/cronjobs/processEmptyMetadata.ts @@ -0,0 +1,12 @@ +import * as requests from "../requests"; +import * as queries from "../queries"; +import { log, LogLevel } from "../logging" +import db from "../db"; + +export async function processEmptyMetadata(_: any) { // TODO: Consume streamer userid for WHERE clause + var result = await db.query(queries.getRequestsWithEmptyMetadata); + for (var row of result.rows) { + log(LogLevel.DEBUG,"Processing empty metadata for request: " + row['url']); + requests.updateRequestMetadata(row['url']); + } +} diff --git a/src/queries.ts b/src/queries.ts index 4f1d3a5..7b6725e 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -111,3 +111,16 @@ export const getDbVersion = { name: "getDbVersion", text: "SELECT get_version()" } + +export const getAndLockCronJob = { + name: "getCronJob", + text: `SELECT * FROM cron + WHERE (lastSuccess + runInterval) < now() + AND jobName = $1 + FOR UPDATE SKIP LOCKED;` +} + +export const updateCronJobLastSuccess = { + name: "updateCronJobLastSuccess", + text: "UPDATE cron SET lastSuccess = now() WHERE jobName = $1" +} diff --git a/src/twitch.ts b/src/twitch.ts index e44b303..814ba1f 100644 --- a/src/twitch.ts +++ b/src/twitch.ts @@ -10,6 +10,11 @@ export interface TokenPair { refresh_token: string; } +export interface StreamerUserIdTokenPair { + userid: number + tokenpair: TokenPair +} + // Refresh the API token. Returns true on success and false on failure. async function refreshApiToken(tokens: TokenPair): Promise { log(LogLevel.DEBUG,`Call: twitch.refreshApiToken(${JSON.stringify(tokens,null,2)})`); @@ -85,15 +90,15 @@ export async function apiRequest(tokens: TokenPair, endpoint: string): Promise < }) } -export async function streamerApiRequest(endpoint: string) { +export async function streamerApiRequest(streamer: StreamerUserIdTokenPair, endpoint: string) { log(LogLevel.DEBUG,`Call: twitch.streamerApiRequest(${endpoint})`); - var streamer = await db.query(queries.getStreamerIdToken).then((result: pg.QueryResult) => result.rows[0]); - var tokenpair = streamer.tokenpair; - var originaltoken = tokenpair.access_token; - log(LogLevel.DEBUG,"Original token: " + tokenpair.access_token); - var response = await apiRequest(tokenpair,endpoint); - log(LogLevel.DEBUG,"New token: " + tokenpair.access_token); - if (tokenpair.access_token != originaltoken) await db.query(Object.assign(queries.updateStreamer,{ values: [streamer.userid,tokenpair] })) + var originaltoken = streamer.tokenpair.access_token; + log(LogLevel.DEBUG,"Original token: " + originaltoken); + var response = await apiRequest(streamer.tokenpair,endpoint); + log(LogLevel.DEBUG,"New token: " + streamer.tokenpair.access_token); + if (streamer.tokenpair.access_token != originaltoken) + await db.query(Object.assign(queries.updateStreamer, + { values: [streamer.userid,streamer.tokenpair] })) return response; }