This commit is contained in:
shafi54 2026-04-03 21:00:02 +05:30
parent 982d3027f8
commit 1f42cfbc5e
6 changed files with 27 additions and 14 deletions

View file

@ -1,6 +1,6 @@
import dayjs from 'dayjs' import dayjs from 'dayjs'
import { initializeAllStores } from '@/src/stores/store-initializer' import { initializeAllStores } from '@/src/stores/store-initializer'
import { initDb } from '@/src/dbService' import { ensureWorkerInit } from '@/src/lib/worker-init'
const LAST_TRIGGER_KEY = 'lastTrigger' const LAST_TRIGGER_KEY = 'lastTrigger'
const ALARM_DELAY_MINUTES = 0.5 const ALARM_DELAY_MINUTES = 0.5
@ -13,7 +13,7 @@ export class CacheCreator {
constructor(state: any, env: any) { constructor(state: any, env: any) {
this.state = state this.state = state
this.env = env this.env = env
;(globalThis as any).ENV = env ensureWorkerInit(env)
} }
async schedule(): Promise<void> { async schedule(): Promise<void> {
@ -52,10 +52,7 @@ export class CacheCreator {
} }
async alarm(): Promise<void> { async alarm(): Promise<void> {
;(globalThis as any).ENV = this.env ensureWorkerInit(this.env)
if (this.env?.DB) {
initDb(this.env.DB)
}
const lastTrigger = await this.state.storage.get(LAST_TRIGGER_KEY) const lastTrigger = await this.state.storage.get(LAST_TRIGGER_KEY)
if (!lastTrigger) { if (!lastTrigger) {
return return

View file

@ -4,6 +4,7 @@ import {
} from '@/src/dbService' } from '@/src/dbService'
import { sendTelegramMessage } from '@/src/lib/telegram-service' import { sendTelegramMessage } from '@/src/lib/telegram-service'
import { queueDataPusher } from '@/src/lib/queue-data-pusher' import { queueDataPusher } from '@/src/lib/queue-data-pusher'
import { ensureWorkerInit } from './worker-init';
interface OrderIdMessage { interface OrderIdMessage {
orderIds: number[]; orderIds: number[];

View file

@ -24,6 +24,7 @@ export const handleNotifQueue = (batch: any) => {
} }
export const handleOrderPlacedQueue = async (batch: any) => { export const handleOrderPlacedQueue = async (batch: any) => {
console.log('from the order placed queue handler')
for (const message of batch.messages || []) { for (const message of batch.messages || []) {
const body = message?.body const body = message?.body
if (!body || !Array.isArray(body.orderIds)) { if (!body || !Array.isArray(body.orderIds)) {

View file

@ -0,0 +1,8 @@
import { initDb } from '@/src/dbService'
export const ensureWorkerInit = (env: any) => {
;(globalThis as any).ENV = env
if (env?.DB) {
initDb(env.DB)
}
}

View file

@ -5,7 +5,7 @@ import type {
} from '@cloudflare/workers-types' } from '@cloudflare/workers-types'
import { CacheCreator } from './src/jobs/cache-creator' import { CacheCreator } from './src/jobs/cache-creator'
import { createApp } from './src/app' import { createApp } from './src/app'
import { initDb } from './src/dbService' import { ensureWorkerInit } from './src/lib/worker-init'
import { import {
handleNotifQueue, handleNotifQueue,
handleOrderPlacedQueue, handleOrderPlacedQueue,
@ -32,10 +32,7 @@ export default {
}, },
ctx: ExecutionContext ctx: ExecutionContext
) { ) {
;(globalThis as any).ENV = env ensureWorkerInit(env)
if (env.DB) {
initDb(env.DB)
}
const app = createApp() const app = createApp()
return app.fetch(request, env, ctx) return app.fetch(request, env, ctx)
}, },
@ -51,19 +48,25 @@ export default {
ORDER_CANCELLED_QUEUE: { ORDER_CANCELLED_QUEUE: {
send: (message: unknown) => Promise<void> send: (message: unknown) => Promise<void>
} }
DB?: D1Database
NOTIF_QUEUE_NAME: string
ORDER_PLACED_QUEUE_NAME: string
ORDER_CANCELLED_QUEUE_NAME: string
} }
) { ) {
if (batch?.queue === 'notif_queue') { ensureWorkerInit(env)
console.log('from the queue handler')
if (batch?.queue === env.NOTIF_QUEUE_NAME) {
handleNotifQueue(batch) handleNotifQueue(batch)
return return
} }
if (batch?.queue === 'order_placed_queue') { if (batch?.queue === env.ORDER_PLACED_QUEUE_NAME) {
await handleOrderPlacedQueue(batch) await handleOrderPlacedQueue(batch)
return return
} }
if (batch?.queue === 'order_cancelled_queue') { if (batch?.queue === env.ORDER_CANCELLED_QUEUE_NAME) {
await handleOrderCancelledQueue(batch) await handleOrderCancelledQueue(batch)
return return
} }

View file

@ -57,6 +57,9 @@ head_sampling_rate = 1
[vars] [vars]
ENV_MODE = "PROD" ENV_MODE = "PROD"
NOTIF_QUEUE_NAME = "notif-queue-dev"
ORDER_PLACED_QUEUE_NAME = "order-placed-queue-dev"
ORDER_CANCELLED_QUEUE_NAME = "order-cancelled-queue-dev"
DATABASE_URL = "postgresql://postgres:meatfarmer_master_password@57.128.212.174:7447/meatfarmer" DATABASE_URL = "postgresql://postgres:meatfarmer_master_password@57.128.212.174:7447/meatfarmer"
PHONE_PE_BASE_URL = "https://api-preprod.phonepe.com/" PHONE_PE_BASE_URL = "https://api-preprod.phonepe.com/"
PHONE_PE_CLIENT_ID = "TEST-M23F2IGP34ZAR_25090" PHONE_PE_CLIENT_ID = "TEST-M23F2IGP34ZAR_25090"