Skip to content

Commit

Permalink
feat: Storage changes for ProcessBulkImportUsers cron
Browse files Browse the repository at this point in the history
  • Loading branch information
anku255 committed Mar 8, 2024
1 parent a139b55 commit 68b6a06
Show file tree
Hide file tree
Showing 8 changed files with 614 additions and 355 deletions.
396 changes: 261 additions & 135 deletions src/main/java/io/supertokens/storage/postgresql/Start.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
Expand All @@ -47,8 +48,8 @@ static String getQueryToCreateBulkImportUsersTable(Start start) {
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000,"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000,"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey")
+ " PRIMARY KEY(app_id, id),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
Expand Down Expand Up @@ -92,18 +93,19 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie
});
}

public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status)
public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage)
throws SQLException, StorageQueryException {
if (bulkImportUserIds.length == 0) {
return;
}

String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?";
String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?";
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

parameters.add(status.toString());
parameters.add(errorMessage);
parameters.add(appIdentifier.getAppId());

queryBuilder.append(" AND id IN (");
Expand All @@ -125,6 +127,39 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio
});
}

public static List<BulkImportUser> getBulkImportUsersForProcessing(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit)
throws StorageQueryException, StorageTransactionLogicException {

return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE status = 'NEW' AND app_id = ? "
+ " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) "
+ " LIMIT ? FOR UPDATE SKIP LOCKED";

List<BulkImportUser> bulkImportUsers = new ArrayList<>();

execute(sqlCon, selectQuery, pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setInt(2, limit);
}, result -> {
while (result.next()) {
bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result));
}
return null;
});

String[] bulkImportUserIds = bulkImportUsers.stream().map(user -> user.id).toArray(String[]::new);

updateBulkImportUserStatus_Transaction(start, sqlCon, appIdentifier, bulkImportUserIds, BULK_IMPORT_USER_STATUS.PROCESSING, null);
return bulkImportUsers;
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
});
}

public static List<BulkImportUser> getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt)
throws SQLException, StorageQueryException {
Expand Down Expand Up @@ -205,6 +240,16 @@ public static List<String> deleteBulkImportUsers(Start start, AppIdentifier appI
return deletedIds;
});
}

public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId) throws SQLException, StorageQueryException {
String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ? AND id = ?";

update(con, query, pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, bulkImportUserId);
});
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

Expand All @@ -219,7 +264,7 @@ private static BulkImportUserRowMapper getInstance() {
public BulkImportUser map(ResultSet result) throws Exception {
return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"),
BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")),
result.getLong("created_at"), result.getLong("updated_at"));
result.getString("error_msg"), result.getLong("created_at"), result.getLong("updated_at"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.emailpassword.PasswordResetTokenInfo;
import io.supertokens.pluginInterface.emailpassword.exceptions.DuplicateEmailException;
import io.supertokens.pluginInterface.emailpassword.exceptions.UnknownUserIdException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.postgresql.ConnectionPool;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;
Expand Down Expand Up @@ -266,74 +264,85 @@ public static void addPasswordResetToken(Start start, AppIdentifier appIdentifie
}
}

public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined)
throws StorageQueryException, StorageTransactionLogicException {
return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
{ // app_id_to_user_id
String QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});
}
private static AuthRecipeUserInfo signUpQuery(Start start, Connection sqlCon, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined) throws StorageQueryException, StorageTransactionLogicException {
try {
{ // app_id_to_user_id
String QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});
}

{ // all_auth_recipe_users
String QUERY = "INSERT INTO " + getConfig(start).getUsersTable()
+ "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, timeJoined);
pst.setLong(7, timeJoined);
});
}
{ // all_auth_recipe_users
String QUERY = "INSERT INTO " + getConfig(start).getUsersTable()
+ "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, timeJoined);
pst.setLong(7, timeJoined);
});
}

{ // emailpassword_users
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, email);
pst.setString(4, passwordHash);
pst.setLong(5, timeJoined);
});
}
{ // emailpassword_users
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

{ // emailpassword_user_to_tenant
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, email);
pst.setString(4, passwordHash);
pst.setLong(5, timeJoined);
});
}

update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, email);
});
}
{ // emailpassword_user_to_tenant
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined);
fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
sqlCon.commit();
return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod());
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, email);
});
}

UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined);
fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);

return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod());
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
}

public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined)
throws StorageQueryException, StorageTransactionLogicException {
return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
return signUpQuery(start, sqlCon, tenantIdentifier, userId, email, passwordHash, timeJoined);
});
}

public static AuthRecipeUserInfo bulkImport_signUp_Transaction(Start start, Connection sqlCon, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined)
throws StorageQueryException, StorageTransactionLogicException {
return signUpQuery(start, sqlCon, tenantIdentifier, userId, email, passwordHash, timeJoined);
}

public static void deleteUser_Transaction(Connection sqlCon, Start start, AppIdentifier appIdentifier,
String userId, boolean deleteUserIdMappingToo)
throws StorageQueryException, SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,30 +476,35 @@ public static boolean isUserIdBeingUsedForEmailVerification(Start start, AppIden
}
}

public static void updateIsEmailVerifiedToExternalUserIdQuery(Start start, Connection sqlCon, AppIdentifier appIdentifier, String supertokensUserId, String externalUserId)
throws StorageQueryException, SQLException {
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
}

public static void updateIsEmailVerifiedToExternalUserId(Start start, AppIdentifier appIdentifier, String supertokensUserId, String externalUserId)
throws StorageQueryException {
try {
start.startTransaction((TransactionConnection con) -> {
Connection sqlCon = (Connection) con.getConnection();
try {
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
updateIsEmailVerifiedToExternalUserIdQuery(start, sqlCon, appIdentifier, supertokensUserId, externalUserId);
} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
}
Expand All @@ -511,6 +516,11 @@ public static void updateIsEmailVerifiedToExternalUserId(Start start, AppIdentif
}
}

public static void bulkImport_updateIsEmailVerifiedToExternalUserId_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, String supertokensUserId, String externalUserId)
throws SQLException, StorageQueryException {
updateIsEmailVerifiedToExternalUserIdQuery(start, sqlCon, appIdentifier, supertokensUserId, externalUserId);
}

private static class EmailVerificationTokenInfoRowMapper
implements RowMapper<EmailVerificationTokenInfo, ResultSet> {
private static final EmailVerificationTokenInfoRowMapper INSTANCE = new EmailVerificationTokenInfoRowMapper();
Expand Down
Loading

0 comments on commit 68b6a06

Please sign in to comment.