diff --git a/apps/backend/src/lib/notif-job.ts b/apps/backend/src/lib/notif-job.ts index fcbd7e1..ce154b3 100644 --- a/apps/backend/src/lib/notif-job.ts +++ b/apps/backend/src/lib/notif-job.ts @@ -1,5 +1,10 @@ import { Queue, Worker } from 'bullmq'; +import { Expo } from 'expo-server-sdk'; import { redisUrl } from './env-exporter'; +import { db } from '../db/db_index'; +import { notifCreds } from '../db/schema'; +import { eq } from 'drizzle-orm'; +import { generateSignedUrlFromS3Url } from './s3-client'; import { NOTIFS_QUEUE, ORDER_PLACED_MESSAGE, @@ -14,21 +19,88 @@ import { export const notificationQueue = new Queue(NOTIFS_QUEUE, { connection: { url: redisUrl }, defaultJobOptions: { - removeOnComplete: 50, - removeOnFail: 100, + removeOnComplete: true, + removeOnFail: 10, attempts: 3, }, }); export const notificationWorker = new Worker(NOTIFS_QUEUE, async (job) => { if (!job) return; - console.log(`Processing notification job ${job.id}`); - // TODO: Implement sendPushNotification + + const { name, data } = job; + console.log(`Processing notification job ${job.id} - ${name}`); + + if (name === 'send-admin-notification') { + await sendAdminNotification(data); + } else if (name === 'send-notification') { + // Handle legacy notification type + console.log('Legacy notification job - not implemented yet'); + } }, { connection: { url: redisUrl }, concurrency: 5, }); +async function sendAdminNotification(data: { + userId: number; + title: string; + body: string; + imageUrl: string | null; + notificationId: number; +}) { + const { userId, title, body, imageUrl } = data; + + // Get user's push token + const [cred] = await db + .select({ token: notifCreds.token }) + .from(notifCreds) + .where(eq(notifCreds.userId, userId)) + .limit(1); + + if (!cred || !cred.token) { + console.log(`No push token found for user ${userId}`); + return; + } + + const token = cred.token; + + // Validate Expo push token + if (!Expo.isExpoPushToken(token)) { + console.error(`Invalid Expo push token for user ${userId}: ${token}`); + throw new Error('Invalid push token'); + } + + // Generate signed URL for image if provided + const signedImageUrl = imageUrl ? await generateSignedUrlFromS3Url(imageUrl) : null; + + // Send notification + const expo = new Expo(); + const message = { + to: token, + sound: 'default', + title, + body, + data: { imageUrl }, + ...(signedImageUrl ? { + attachments: [ + { + url: signedImageUrl, + contentType: 'image/jpeg', + } + ] + } : {}), + }; + + try { + const [ticket] = await expo.sendPushNotificationsAsync([message]); + console.log(`Notification sent to user ${userId}:`, ticket); + } catch (error) { + console.error(`Failed to send notification to user ${userId}:`, error); + throw error; + } +} + notificationWorker.on('completed', (job) => { if (job) console.log(`Notification job ${job.id} completed`); }); @@ -108,4 +180,4 @@ export async function sendRefundInitiatedNotification(userId: number, orderId?: process.on('SIGTERM', async () => { await notificationQueue.close(); await notificationWorker.close(); -}); \ No newline at end of file +}); diff --git a/apps/backend/src/trpc/admin-apis/user.ts b/apps/backend/src/trpc/admin-apis/user.ts index d02861e..59ea561 100644 --- a/apps/backend/src/trpc/admin-apis/user.ts +++ b/apps/backend/src/trpc/admin-apis/user.ts @@ -4,8 +4,7 @@ import { db } from '../../db/db_index'; import { users, complaints, orders, orderItems, notifCreds, userNotifications, userDetails } from '../../db/schema'; import { eq, sql, desc, asc, count, max } from 'drizzle-orm'; import { ApiError } from '../../lib/api-error'; -import { Expo } from 'expo-server-sdk'; -import { generateSignedUrlFromS3Url } from '../../lib/s3-client'; +import { notificationQueue } from '../../lib/notif-job'; async function createUserByMobile(mobile: string): Promise { // Clean mobile number (remove non-digits) @@ -372,78 +371,40 @@ export const userRouter = { const { userIds, title, text, imageUrl } = input; // Store notification in database - await db.insert(userNotifications).values({ + const [notification] = await db.insert(userNotifications).values({ title, body: text, imageUrl: imageUrl || null, applicableUsers: userIds.length > 0 ? userIds : null, - }); + }).returning(); - // Fetch push tokens for target users - const tokens = await db - .select({ token: notifCreds.token, userId: notifCreds.userId }) - .from(notifCreds) - .where(sql`${notifCreds.userId} IN (${sql.join(userIds, sql`, `)})`); - - if (tokens.length === 0) { - return { - success: true, - message: 'Notification saved but no push tokens found' - }; - } - - // Generate signed URL for image if provided - const signedImageUrl = imageUrl ? await generateSignedUrlFromS3Url(imageUrl) : null; - - // Send using Expo - const expo = new Expo(); - - // Helper function to chunk array - const chunkArray = (array: any[], size: number) => { - const chunks = []; - for (let i = 0; i < array.length; i += size) { - chunks.push(array.slice(i, i + size)); - } - return chunks; - }; - - const chunks = chunkArray(tokens, 50); - let sentCount = 0; - let failedCount = 0; - - for (const chunk of chunks) { - const messages = chunk - .filter(({ token }) => Expo.isExpoPushToken(token)) - .map(({ token }) => ({ - to: token, - sound: 'default', + // Queue one job per user + let queuedCount = 0; + for (const userId of userIds) { + try { + await notificationQueue.add('send-admin-notification', { + userId, title, body: text, - data: { imageUrl }, - ...(signedImageUrl ? { - attachments: [ - { - url: signedImageUrl, - contentType: 'image/jpeg', - } - ] - } : {}), - })); - - if (messages.length > 0) { - try { - await expo.sendPushNotificationsAsync(messages); - sentCount += messages.length; - } catch (error) { - console.error('Error sending push notifications:', error); - failedCount += chunk.length; - } + imageUrl: imageUrl || null, + notificationId: notification.id, + }, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 2000, + }, + }); + queuedCount++; + } catch (error) { + console.error(`Failed to queue notification for user ${userId}:`, error); } } return { success: true, - message: `Notification sent to ${sentCount} users${failedCount > 0 ? `, ${failedCount} failed` : ''}`, + message: `Notification queued for ${queuedCount} users`, + notificationId: notification.id, }; }), }; \ No newline at end of file