From 2a87e0408e4af8df794280b19a81f2219ba2adfe Mon Sep 17 00:00:00 2001 From: Dessa Simpson Date: Wed, 24 Feb 2021 19:40:58 -0700 Subject: [PATCH] Retrieve follows/subscriptions from Twitch API --- db/10-cron.sql | 2 ++ db/upgrade/v0.4-0.5.sql | 4 +++ src/cronjobs/index.ts | 4 +++ src/cronjobs/processBans.ts | 10 +++--- src/cronjobs/processFollows.ts | 53 ++++++++++++++++++++++++++++ src/cronjobs/processSubscriptions.ts | 52 +++++++++++++++++++++++++++ 6 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 src/cronjobs/processFollows.ts create mode 100644 src/cronjobs/processSubscriptions.ts diff --git a/db/10-cron.sql b/db/10-cron.sql index 3493563..a0b1941 100644 --- a/db/10-cron.sql +++ b/db/10-cron.sql @@ -8,4 +8,6 @@ CREATE TABLE cron ( INSERT INTO cron (jobName,runInterval) VALUES ('processBans','30 minutes'), +('processFollows','30 minutes'), +('processSubscriptions','30 minutes'), ('processEmptyMetadata','1 day'); diff --git a/db/upgrade/v0.4-0.5.sql b/db/upgrade/v0.4-0.5.sql index 058eef6..d85f6d7 100644 --- a/db/upgrade/v0.4-0.5.sql +++ b/db/upgrade/v0.4-0.5.sql @@ -46,4 +46,8 @@ CREATE OR REPLACE VIEW vote_score_vw AS ALTER TABLE users DROP COLUMN isfollower, DROP COLUMN issubscriber; +INSERT INTO cron (jobName,runInterval) VALUES +('processFollows','30 minutes'), +('processSubscriptions','30 minutes'); + COMMIT; diff --git a/src/cronjobs/index.ts b/src/cronjobs/index.ts index 0c856fd..94dd385 100644 --- a/src/cronjobs/index.ts +++ b/src/cronjobs/index.ts @@ -1,7 +1,11 @@ import { processBans } from './processBans'; +import { processFollows } from './processFollows'; +import { processSubscriptions } from './processSubscriptions'; import { processEmptyMetadata } from './processEmptyMetadata'; export = { processBans, + processFollows, + processSubscriptions, processEmptyMetadata } diff --git a/src/cronjobs/processBans.ts b/src/cronjobs/processBans.ts index d21cb76..2316612 100644 --- a/src/cronjobs/processBans.ts +++ b/src/cronjobs/processBans.ts @@ -7,13 +7,13 @@ export async function processBans(streamer: twitch.StreamerUserIdTokenPair) { try { await dbconn.query('BEGIN'); await dbconn.query("DELETE FROM bans"); - //log(LogLevel.DEBUG,"Ban API response:") - //log(LogLevel.DEBUG,JSON.stringify(response,null,2)); var response = await twitch.streamerApiRequest(streamer, `/moderation/banned?broadcaster_id=${streamer.userid}&first=100`); + //log(LogLevel.DEBUG,"Ban API response:") + //log(LogLevel.DEBUG,JSON.stringify(response,null,2)); while (true) { var insertBanQuery = "INSERT INTO bans (userid) VALUES "; - var banRow = 0; + var banRow = 0; // Used for $1, $2, etc. in parameterized query var bansArray: number[] = []; if (Object.keys(response.data).length > 0) { for (var ban of response.data) { @@ -37,11 +37,11 @@ export async function processBans(streamer: twitch.StreamerUserIdTokenPair) { var oldFirstUserid = response.data[0].user_id; response = await twitch.streamerApiRequest(streamer, `/moderation/banned?broadcaster_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`); + //log(LogLevel.DEBUG,"Ban API response:"); + //log(LogLevel.DEBUG,JSON.stringify(response,null,2)); // Work around broken api endpoint giving a cursor referring to the // current page, causing an infinite loop if (oldFirstUserid == response.data[0].user_id) break; - //log(LogLevel.DEBUG,"Ban API response:"); - //log(LogLevel.DEBUG,JSON.stringify(response,null,2)); } else { break; } diff --git a/src/cronjobs/processFollows.ts b/src/cronjobs/processFollows.ts new file mode 100644 index 0000000..9d74330 --- /dev/null +++ b/src/cronjobs/processFollows.ts @@ -0,0 +1,53 @@ +import * as twitch from "../twitch"; +import { log, LogLevel } from "../logging" +import db from "../db"; + +export async function processFollows(streamer: twitch.StreamerUserIdTokenPair) { + var dbconn = await db.connect(); + try { + await dbconn.query('BEGIN'); + await dbconn.query("DELETE FROM follows"); + // Insert the streamer as a follower + await dbconn.query({text: "INSERT INTO follows (userid) VALUES ($1)", values: [streamer.userid]}) + var response = await twitch.streamerApiRequest(streamer, + `/users/follows?to_id=${streamer.userid}&first=100`); + log(LogLevel.DEBUG,"Follows API response:") + log(LogLevel.DEBUG,JSON.stringify(response,null,2)); + while (true) { + var insertFollowQuery = "INSERT INTO follows (userid) VALUES "; + var followRow = 0; // Used for $1, $2, etc. in parameterized query + var followsArray: number[] = []; + if (Object.keys(response.data).length > 0) { + for (var follow of response.data) { + followRow++; + insertFollowQuery += `($${followRow}), `; + followsArray.push(follow.from_id as number); + } + insertFollowQuery = insertFollowQuery.slice(0,-2); // Cut last `, ` off of the end + var followQueryConfig = { + text: insertFollowQuery, + values: followsArray + }; + log(LogLevel.DEBUG,"followQueryConfig object:") + log(LogLevel.DEBUG,JSON.stringify(followQueryConfig,null,2)) + await dbconn.query(followQueryConfig); + } + if (response.pagination.cursor) { + response = await twitch.streamerApiRequest(streamer, + `/users/follows?to_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`); + log(LogLevel.DEBUG,"Follow API response:"); + log(LogLevel.DEBUG,JSON.stringify(response,null,2)); + } else { + break; + } + } + await dbconn.query("CALL update_scores()"); + await dbconn.query('COMMIT'); + } catch (e) { + log(LogLevel.ERROR,"cronjobs.processFollows: Exception thrown; rolling back"); + await dbconn.query('ROLLBACK'); + throw(e); + } finally { + await dbconn.release(); + } +} diff --git a/src/cronjobs/processSubscriptions.ts b/src/cronjobs/processSubscriptions.ts new file mode 100644 index 0000000..4c25524 --- /dev/null +++ b/src/cronjobs/processSubscriptions.ts @@ -0,0 +1,52 @@ +import * as twitch from "../twitch"; +import { log, LogLevel } from "../logging" +import db from "../db"; + +export async function processSubscriptions(streamer: twitch.StreamerUserIdTokenPair) { + var dbconn = await db.connect(); + try { + await dbconn.query('BEGIN'); + await dbconn.query("DELETE FROM subscriptions"); + var response = await twitch.streamerApiRequest(streamer, + `/subscriptions?broadcaster_id=${streamer.userid}&first=100`); + //log(LogLevel.DEBUG,"Subscriptions API response:") + //log(LogLevel.DEBUG,JSON.stringify(response,null,2)); + while (true) { + var insertSubscriptionQuery = "INSERT INTO subscriptions (userid) VALUES "; + var subscriptionRow = 0; // Used for $1, $2, etc. in parameterized query + var subscriptionsArray: number[] = []; + if (Object.keys(response.data).length > 0) { + for (var subscription of response.data) { + subscriptionRow++; + insertSubscriptionQuery += `($${subscriptionRow}), `; + subscriptionsArray.push(subscription.user_id as number); + } + insertSubscriptionQuery = insertSubscriptionQuery.slice(0,-2); // Cut last `, ` off of the end + var subscriptionQueryConfig = { + text: insertSubscriptionQuery, + values: subscriptionsArray + }; + log(LogLevel.DEBUG,"subscriptionQueryConfig object:") + log(LogLevel.DEBUG,JSON.stringify(subscriptionQueryConfig,null,2)) + await dbconn.query(subscriptionQueryConfig); + } + if (response.pagination.cursor) { + //var oldFirstUserid = response.data[0].user_id; + response = await twitch.streamerApiRequest(streamer, + `/subscriptions?broadcaster_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`); + //log(LogLevel.DEBUG,"Subscription API response:"); + //log(LogLevel.DEBUG,JSON.stringify(response,null,2)); + } else { + break; + } + } + await dbconn.query("CALL update_scores()"); + await dbconn.query('COMMIT'); + } catch (e) { + log(LogLevel.ERROR,"cronjobs.processSubscriptions: Exception thrown; rolling back"); + await dbconn.query('ROLLBACK'); + throw(e); + } finally { + await dbconn.release(); + } +}