Retrieve follows/subscriptions from Twitch API

master
Dessa Simpson 2021-02-24 19:40:58 -07:00
parent dbe7e4c52d
commit 2a87e0408e
6 changed files with 120 additions and 5 deletions

View File

@ -8,4 +8,6 @@ CREATE TABLE cron (
INSERT INTO cron (jobName,runInterval) VALUES INSERT INTO cron (jobName,runInterval) VALUES
('processBans','30 minutes'), ('processBans','30 minutes'),
('processFollows','30 minutes'),
('processSubscriptions','30 minutes'),
('processEmptyMetadata','1 day'); ('processEmptyMetadata','1 day');

View File

@ -46,4 +46,8 @@ CREATE OR REPLACE VIEW vote_score_vw AS
ALTER TABLE users DROP COLUMN isfollower, DROP COLUMN issubscriber; ALTER TABLE users DROP COLUMN isfollower, DROP COLUMN issubscriber;
INSERT INTO cron (jobName,runInterval) VALUES
('processFollows','30 minutes'),
('processSubscriptions','30 minutes');
COMMIT; COMMIT;

View File

@ -1,7 +1,11 @@
import { processBans } from './processBans'; import { processBans } from './processBans';
import { processFollows } from './processFollows';
import { processSubscriptions } from './processSubscriptions';
import { processEmptyMetadata } from './processEmptyMetadata'; import { processEmptyMetadata } from './processEmptyMetadata';
export = { export = {
processBans, processBans,
processFollows,
processSubscriptions,
processEmptyMetadata processEmptyMetadata
} }

View File

@ -7,13 +7,13 @@ export async function processBans(streamer: twitch.StreamerUserIdTokenPair) {
try { try {
await dbconn.query('BEGIN'); await dbconn.query('BEGIN');
await dbconn.query("DELETE FROM bans"); await dbconn.query("DELETE FROM bans");
//log(LogLevel.DEBUG,"Ban API response:")
//log(LogLevel.DEBUG,JSON.stringify(response,null,2));
var response = await twitch.streamerApiRequest(streamer, var response = await twitch.streamerApiRequest(streamer,
`/moderation/banned?broadcaster_id=${streamer.userid}&first=100`); `/moderation/banned?broadcaster_id=${streamer.userid}&first=100`);
//log(LogLevel.DEBUG,"Ban API response:")
//log(LogLevel.DEBUG,JSON.stringify(response,null,2));
while (true) { while (true) {
var insertBanQuery = "INSERT INTO bans (userid) VALUES "; var insertBanQuery = "INSERT INTO bans (userid) VALUES ";
var banRow = 0; var banRow = 0; // Used for $1, $2, etc. in parameterized query
var bansArray: number[] = []; var bansArray: number[] = [];
if (Object.keys(response.data).length > 0) { if (Object.keys(response.data).length > 0) {
for (var ban of response.data) { for (var ban of response.data) {
@ -37,11 +37,11 @@ export async function processBans(streamer: twitch.StreamerUserIdTokenPair) {
var oldFirstUserid = response.data[0].user_id; var oldFirstUserid = response.data[0].user_id;
response = await twitch.streamerApiRequest(streamer, response = await twitch.streamerApiRequest(streamer,
`/moderation/banned?broadcaster_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`); `/moderation/banned?broadcaster_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`);
//log(LogLevel.DEBUG,"Ban API response:");
//log(LogLevel.DEBUG,JSON.stringify(response,null,2));
// Work around broken api endpoint giving a cursor referring to the // Work around broken api endpoint giving a cursor referring to the
// current page, causing an infinite loop // current page, causing an infinite loop
if (oldFirstUserid == response.data[0].user_id) break; if (oldFirstUserid == response.data[0].user_id) break;
//log(LogLevel.DEBUG,"Ban API response:");
//log(LogLevel.DEBUG,JSON.stringify(response,null,2));
} else { } else {
break; break;
} }

View File

@ -0,0 +1,53 @@
import * as twitch from "../twitch";
import { log, LogLevel } from "../logging"
import db from "../db";
export async function processFollows(streamer: twitch.StreamerUserIdTokenPair) {
var dbconn = await db.connect();
try {
await dbconn.query('BEGIN');
await dbconn.query("DELETE FROM follows");
// Insert the streamer as a follower
await dbconn.query({text: "INSERT INTO follows (userid) VALUES ($1)", values: [streamer.userid]})
var response = await twitch.streamerApiRequest(streamer,
`/users/follows?to_id=${streamer.userid}&first=100`);
log(LogLevel.DEBUG,"Follows API response:")
log(LogLevel.DEBUG,JSON.stringify(response,null,2));
while (true) {
var insertFollowQuery = "INSERT INTO follows (userid) VALUES ";
var followRow = 0; // Used for $1, $2, etc. in parameterized query
var followsArray: number[] = [];
if (Object.keys(response.data).length > 0) {
for (var follow of response.data) {
followRow++;
insertFollowQuery += `($${followRow}), `;
followsArray.push(follow.from_id as number);
}
insertFollowQuery = insertFollowQuery.slice(0,-2); // Cut last `, ` off of the end
var followQueryConfig = {
text: insertFollowQuery,
values: followsArray
};
log(LogLevel.DEBUG,"followQueryConfig object:")
log(LogLevel.DEBUG,JSON.stringify(followQueryConfig,null,2))
await dbconn.query(followQueryConfig);
}
if (response.pagination.cursor) {
response = await twitch.streamerApiRequest(streamer,
`/users/follows?to_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`);
log(LogLevel.DEBUG,"Follow API response:");
log(LogLevel.DEBUG,JSON.stringify(response,null,2));
} else {
break;
}
}
await dbconn.query("CALL update_scores()");
await dbconn.query('COMMIT');
} catch (e) {
log(LogLevel.ERROR,"cronjobs.processFollows: Exception thrown; rolling back");
await dbconn.query('ROLLBACK');
throw(e);
} finally {
await dbconn.release();
}
}

View File

@ -0,0 +1,52 @@
import * as twitch from "../twitch";
import { log, LogLevel } from "../logging"
import db from "../db";
export async function processSubscriptions(streamer: twitch.StreamerUserIdTokenPair) {
var dbconn = await db.connect();
try {
await dbconn.query('BEGIN');
await dbconn.query("DELETE FROM subscriptions");
var response = await twitch.streamerApiRequest(streamer,
`/subscriptions?broadcaster_id=${streamer.userid}&first=100`);
//log(LogLevel.DEBUG,"Subscriptions API response:")
//log(LogLevel.DEBUG,JSON.stringify(response,null,2));
while (true) {
var insertSubscriptionQuery = "INSERT INTO subscriptions (userid) VALUES ";
var subscriptionRow = 0; // Used for $1, $2, etc. in parameterized query
var subscriptionsArray: number[] = [];
if (Object.keys(response.data).length > 0) {
for (var subscription of response.data) {
subscriptionRow++;
insertSubscriptionQuery += `($${subscriptionRow}), `;
subscriptionsArray.push(subscription.user_id as number);
}
insertSubscriptionQuery = insertSubscriptionQuery.slice(0,-2); // Cut last `, ` off of the end
var subscriptionQueryConfig = {
text: insertSubscriptionQuery,
values: subscriptionsArray
};
log(LogLevel.DEBUG,"subscriptionQueryConfig object:")
log(LogLevel.DEBUG,JSON.stringify(subscriptionQueryConfig,null,2))
await dbconn.query(subscriptionQueryConfig);
}
if (response.pagination.cursor) {
//var oldFirstUserid = response.data[0].user_id;
response = await twitch.streamerApiRequest(streamer,
`/subscriptions?broadcaster_id=${streamer.userid}&after=${response.pagination.cursor}&first=100`);
//log(LogLevel.DEBUG,"Subscription API response:");
//log(LogLevel.DEBUG,JSON.stringify(response,null,2));
} else {
break;
}
}
await dbconn.query("CALL update_scores()");
await dbconn.query('COMMIT');
} catch (e) {
log(LogLevel.ERROR,"cronjobs.processSubscriptions: Exception thrown; rolling back");
await dbconn.query('ROLLBACK');
throw(e);
} finally {
await dbconn.release();
}
}