This commit is contained in:
shafi54 2026-02-09 01:04:43 +05:30
parent 002b73cf87
commit 8fe3e4a301
2 changed files with 100 additions and 67 deletions

View file

@ -1,5 +1,10 @@
import { Queue, Worker } from 'bullmq'; import { Queue, Worker } from 'bullmq';
import { Expo } from 'expo-server-sdk';
import { redisUrl } from './env-exporter'; import { redisUrl } from './env-exporter';
import { db } from '../db/db_index';
import { notifCreds } from '../db/schema';
import { eq } from 'drizzle-orm';
import { generateSignedUrlFromS3Url } from './s3-client';
import { import {
NOTIFS_QUEUE, NOTIFS_QUEUE,
ORDER_PLACED_MESSAGE, ORDER_PLACED_MESSAGE,
@ -14,21 +19,88 @@ import {
export const notificationQueue = new Queue(NOTIFS_QUEUE, { export const notificationQueue = new Queue(NOTIFS_QUEUE, {
connection: { url: redisUrl }, connection: { url: redisUrl },
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 50, removeOnComplete: true,
removeOnFail: 100, removeOnFail: 10,
attempts: 3, attempts: 3,
}, },
}); });
export const notificationWorker = new Worker(NOTIFS_QUEUE, async (job) => { export const notificationWorker = new Worker(NOTIFS_QUEUE, async (job) => {
if (!job) return; if (!job) return;
console.log(`Processing notification job ${job.id}`);
// TODO: Implement sendPushNotification const { name, data } = job;
console.log(`Processing notification job ${job.id} - ${name}`);
if (name === 'send-admin-notification') {
await sendAdminNotification(data);
} else if (name === 'send-notification') {
// Handle legacy notification type
console.log('Legacy notification job - not implemented yet');
}
}, { }, {
connection: { url: redisUrl }, connection: { url: redisUrl },
concurrency: 5, concurrency: 5,
}); });
async function sendAdminNotification(data: {
userId: number;
title: string;
body: string;
imageUrl: string | null;
notificationId: number;
}) {
const { userId, title, body, imageUrl } = data;
// Get user's push token
const [cred] = await db
.select({ token: notifCreds.token })
.from(notifCreds)
.where(eq(notifCreds.userId, userId))
.limit(1);
if (!cred || !cred.token) {
console.log(`No push token found for user ${userId}`);
return;
}
const token = cred.token;
// Validate Expo push token
if (!Expo.isExpoPushToken(token)) {
console.error(`Invalid Expo push token for user ${userId}: ${token}`);
throw new Error('Invalid push token');
}
// Generate signed URL for image if provided
const signedImageUrl = imageUrl ? await generateSignedUrlFromS3Url(imageUrl) : null;
// Send notification
const expo = new Expo();
const message = {
to: token,
sound: 'default',
title,
body,
data: { imageUrl },
...(signedImageUrl ? {
attachments: [
{
url: signedImageUrl,
contentType: 'image/jpeg',
}
]
} : {}),
};
try {
const [ticket] = await expo.sendPushNotificationsAsync([message]);
console.log(`Notification sent to user ${userId}:`, ticket);
} catch (error) {
console.error(`Failed to send notification to user ${userId}:`, error);
throw error;
}
}
notificationWorker.on('completed', (job) => { notificationWorker.on('completed', (job) => {
if (job) console.log(`Notification job ${job.id} completed`); if (job) console.log(`Notification job ${job.id} completed`);
}); });

View file

@ -4,8 +4,7 @@ import { db } from '../../db/db_index';
import { users, complaints, orders, orderItems, notifCreds, userNotifications, userDetails } from '../../db/schema'; import { users, complaints, orders, orderItems, notifCreds, userNotifications, userDetails } from '../../db/schema';
import { eq, sql, desc, asc, count, max } from 'drizzle-orm'; import { eq, sql, desc, asc, count, max } from 'drizzle-orm';
import { ApiError } from '../../lib/api-error'; import { ApiError } from '../../lib/api-error';
import { Expo } from 'expo-server-sdk'; import { notificationQueue } from '../../lib/notif-job';
import { generateSignedUrlFromS3Url } from '../../lib/s3-client';
async function createUserByMobile(mobile: string): Promise<typeof users.$inferSelect> { async function createUserByMobile(mobile: string): Promise<typeof users.$inferSelect> {
// Clean mobile number (remove non-digits) // Clean mobile number (remove non-digits)
@ -372,78 +371,40 @@ export const userRouter = {
const { userIds, title, text, imageUrl } = input; const { userIds, title, text, imageUrl } = input;
// Store notification in database // Store notification in database
await db.insert(userNotifications).values({ const [notification] = await db.insert(userNotifications).values({
title, title,
body: text, body: text,
imageUrl: imageUrl || null, imageUrl: imageUrl || null,
applicableUsers: userIds.length > 0 ? userIds : null, applicableUsers: userIds.length > 0 ? userIds : null,
}); }).returning();
// Fetch push tokens for target users // Queue one job per user
const tokens = await db let queuedCount = 0;
.select({ token: notifCreds.token, userId: notifCreds.userId }) for (const userId of userIds) {
.from(notifCreds) try {
.where(sql`${notifCreds.userId} IN (${sql.join(userIds, sql`, `)})`); await notificationQueue.add('send-admin-notification', {
userId,
if (tokens.length === 0) {
return {
success: true,
message: 'Notification saved but no push tokens found'
};
}
// Generate signed URL for image if provided
const signedImageUrl = imageUrl ? await generateSignedUrlFromS3Url(imageUrl) : null;
// Send using Expo
const expo = new Expo();
// Helper function to chunk array
const chunkArray = (array: any[], size: number) => {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
};
const chunks = chunkArray(tokens, 50);
let sentCount = 0;
let failedCount = 0;
for (const chunk of chunks) {
const messages = chunk
.filter(({ token }) => Expo.isExpoPushToken(token))
.map(({ token }) => ({
to: token,
sound: 'default',
title, title,
body: text, body: text,
data: { imageUrl }, imageUrl: imageUrl || null,
...(signedImageUrl ? { notificationId: notification.id,
attachments: [ }, {
{ attempts: 3,
url: signedImageUrl, backoff: {
contentType: 'image/jpeg', type: 'exponential',
} delay: 2000,
] },
} : {}), });
})); queuedCount++;
if (messages.length > 0) {
try {
await expo.sendPushNotificationsAsync(messages);
sentCount += messages.length;
} catch (error) { } catch (error) {
console.error('Error sending push notifications:', error); console.error(`Failed to queue notification for user ${userId}:`, error);
failedCount += chunk.length;
}
} }
} }
return { return {
success: true, success: true,
message: `Notification sent to ${sentCount} users${failedCount > 0 ? `, ${failedCount} failed` : ''}`, message: `Notification queued for ${queuedCount} users`,
notificationId: notification.id,
}; };
}), }),
}; };