Merge pull request #68 from atridadl/dev
4.1.0 ✨ Added Redis caching ✨ Added Redis pub/sub for inter-node communication
This commit is contained in:
commit
86af50ce0e
12 changed files with 406 additions and 217 deletions
|
@ -1,6 +1,7 @@
|
||||||
#Database
|
#Database
|
||||||
DATABASE_URL=""
|
DATABASE_URL=""
|
||||||
DATABASE_AUTH_TOKEN=""
|
DATABASE_AUTH_TOKEN=""
|
||||||
|
REDIS_URL=""
|
||||||
|
|
||||||
#Auth
|
#Auth
|
||||||
CLERK_SIGN_UP_URL="/sign-up"
|
CLERK_SIGN_UP_URL="/sign-up"
|
||||||
|
|
|
@ -4,6 +4,7 @@ import { createId } from "@paralleldrive/cuid2";
|
||||||
import { db } from "~/services/db.server";
|
import { db } from "~/services/db.server";
|
||||||
import { emitter } from "~/services/emitter.server";
|
import { emitter } from "~/services/emitter.server";
|
||||||
import { rooms } from "~/services/schema";
|
import { rooms } from "~/services/schema";
|
||||||
|
import { invalidateCache } from "~/services/redis.server";
|
||||||
|
|
||||||
export async function action({ request, params, context }: ActionFunctionArgs) {
|
export async function action({ request, params, context }: ActionFunctionArgs) {
|
||||||
const { userId } = await getAuth({ context, params, request });
|
const { userId } = await getAuth({ context, params, request });
|
||||||
|
@ -33,7 +34,8 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
|
||||||
const success = room.length > 0;
|
const success = room.length > 0;
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
emitter.emit("roomlist");
|
await invalidateCache(`kv_roomlist_${userId}`);
|
||||||
|
emitter.emit("nodes", "roomlist");
|
||||||
|
|
||||||
return json(room, {
|
return json(room, {
|
||||||
status: 200,
|
status: 200,
|
||||||
|
|
|
@ -3,6 +3,7 @@ import { ActionFunctionArgs, json } from "@remix-run/node";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "~/services/db.server";
|
import { db } from "~/services/db.server";
|
||||||
import { emitter } from "~/services/emitter.server";
|
import { emitter } from "~/services/emitter.server";
|
||||||
|
import { invalidateCache } from "~/services/redis.server";
|
||||||
import { rooms } from "~/services/schema";
|
import { rooms } from "~/services/schema";
|
||||||
|
|
||||||
export async function action({ request, params, context }: ActionFunctionArgs) {
|
export async function action({ request, params, context }: ActionFunctionArgs) {
|
||||||
|
@ -32,7 +33,8 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
|
||||||
const success = deletedRoom.length > 0;
|
const success = deletedRoom.length > 0;
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
emitter.emit("roomlist");
|
await invalidateCache(`kv_roomlist_${userId}`);
|
||||||
|
emitter.emit("nodes", "roomlist");
|
||||||
|
|
||||||
return json(deletedRoom, {
|
return json(deletedRoom, {
|
||||||
status: 200,
|
status: 200,
|
||||||
|
|
|
@ -39,16 +39,19 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
|
||||||
|
|
||||||
return eventStream(request.signal, function setup(send) {
|
return eventStream(request.signal, function setup(send) {
|
||||||
async function handler() {
|
async function handler() {
|
||||||
const roomFromDb = await db.query.rooms.findFirst({
|
db.query.rooms
|
||||||
where: eq(rooms.id, roomId || ""),
|
.findFirst({
|
||||||
with: {
|
where: eq(rooms.id, roomId || ""),
|
||||||
logs: true,
|
with: {
|
||||||
},
|
logs: true,
|
||||||
});
|
},
|
||||||
send({
|
})
|
||||||
event: `room-${roomId}`,
|
.then((roomFromDb) => {
|
||||||
data: JSON.stringify(roomFromDb),
|
return send({
|
||||||
});
|
event: `room-${roomId}`,
|
||||||
|
data: JSON.stringify(roomFromDb),
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initial fetch
|
// Initial fetch
|
||||||
|
|
|
@ -4,6 +4,7 @@ import { eq } from "drizzle-orm";
|
||||||
import { eventStream } from "remix-utils/sse/server";
|
import { eventStream } from "remix-utils/sse/server";
|
||||||
import { db } from "~/services/db.server";
|
import { db } from "~/services/db.server";
|
||||||
import { emitter } from "~/services/emitter.server";
|
import { emitter } from "~/services/emitter.server";
|
||||||
|
import { fetchCache, setCache } from "~/services/redis.server";
|
||||||
import { rooms } from "~/services/schema";
|
import { rooms } from "~/services/schema";
|
||||||
|
|
||||||
// Get Room List
|
// Get Room List
|
||||||
|
@ -19,21 +20,51 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
|
||||||
|
|
||||||
return eventStream(request.signal, function setup(send) {
|
return eventStream(request.signal, function setup(send) {
|
||||||
async function handler() {
|
async function handler() {
|
||||||
const roomList = await db.query.rooms.findMany({
|
fetchCache<
|
||||||
where: eq(rooms.userId, userId || ""),
|
{
|
||||||
|
id: string;
|
||||||
|
createdAt: Date;
|
||||||
|
roomName: string;
|
||||||
|
}[]
|
||||||
|
>(`kv_roomlist_${userId}`).then((cachedResult) => {
|
||||||
|
if (cachedResult) {
|
||||||
|
send({ event: userId!, data: JSON.stringify(cachedResult) });
|
||||||
|
} else {
|
||||||
|
db.query.rooms
|
||||||
|
.findMany({
|
||||||
|
where: eq(rooms.userId, userId || ""),
|
||||||
|
})
|
||||||
|
.then((roomList) => {
|
||||||
|
setCache(`kv_roomlist_${userId}`, roomList).then(() => {
|
||||||
|
send({ event: userId!, data: JSON.stringify(roomList) });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
send({ event: userId!, data: JSON.stringify(roomList) });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initial fetch
|
// Initial fetch
|
||||||
db.query.rooms
|
fetchCache<
|
||||||
.findMany({
|
{
|
||||||
where: eq(rooms.userId, userId || ""),
|
id: string;
|
||||||
})
|
createdAt: Date;
|
||||||
.then((roomList) => {
|
roomName: string;
|
||||||
send({ event: userId!, data: JSON.stringify(roomList) });
|
}[]
|
||||||
});
|
>(`kv_roomlist_${userId}`).then((cachedResult) => {
|
||||||
|
if (cachedResult) {
|
||||||
|
send({ event: userId!, data: JSON.stringify(cachedResult) });
|
||||||
|
} else {
|
||||||
|
db.query.rooms
|
||||||
|
.findMany({
|
||||||
|
where: eq(rooms.userId, userId || ""),
|
||||||
|
})
|
||||||
|
.then((roomList) => {
|
||||||
|
setCache(`kv_roomlist_${userId}`, roomList).then(() => {
|
||||||
|
send({ event: userId!, data: JSON.stringify(roomList) });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
emitter.on("roomlist", handler);
|
emitter.on("roomlist", handler);
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
.then(async () => {
|
.then(async () => {
|
||||||
emitter.emit("presence");
|
emitter.emit("nodes", "presence");
|
||||||
});
|
});
|
||||||
|
|
||||||
// Initial fetch
|
// Initial fetch
|
||||||
|
@ -86,7 +86,7 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
|
||||||
.where(and(eq(presence.roomId, roomId), eq(presence.userId, userId)))
|
.where(and(eq(presence.roomId, roomId), eq(presence.userId, userId)))
|
||||||
.returning()
|
.returning()
|
||||||
.then(async () => {
|
.then(async () => {
|
||||||
emitter.emit("presence");
|
emitter.emit("nodes", "presence");
|
||||||
});
|
});
|
||||||
emitter.off("presence", handler);
|
emitter.off("presence", handler);
|
||||||
};
|
};
|
||||||
|
|
|
@ -76,8 +76,8 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
console.log(success);
|
console.log(success);
|
||||||
emitter.emit("room");
|
emitter.emit("nodes", "room");
|
||||||
emitter.emit("votes");
|
emitter.emit("nodes", "votes");
|
||||||
|
|
||||||
return json(newRoom, {
|
return json(newRoom, {
|
||||||
status: 200,
|
status: 200,
|
||||||
|
|
|
@ -40,7 +40,7 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
|
||||||
const success = upsertResult.rowsAffected > 0;
|
const success = upsertResult.rowsAffected > 0;
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
emitter.emit("votes");
|
emitter.emit("nodes", "votes");
|
||||||
|
|
||||||
return json(upsertResult, {
|
return json(upsertResult, {
|
||||||
status: 200,
|
status: 200,
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import { EventEmitter } from "events";
|
import { EventEmitter } from "events";
|
||||||
|
import { publishToChannel, subscribeToChannel } from "./redis.server";
|
||||||
|
|
||||||
let emitter: EventEmitter;
|
let emitter: EventEmitter;
|
||||||
|
|
||||||
|
@ -15,4 +16,21 @@ if (process.env.NODE_ENV === "production") {
|
||||||
emitter = global.__emitter;
|
emitter = global.__emitter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (process.env.REDIS_URL) {
|
||||||
|
subscribeToChannel("nodes", (message: string) => {
|
||||||
|
console.log(`[MULTI-NODE] RECEIVED ${message} EVENT FROM ANOTHER NODE!`);
|
||||||
|
const parsedMessage = message.split('"')[1];
|
||||||
|
emitter.emit(parsedMessage);
|
||||||
|
});
|
||||||
|
|
||||||
|
emitter.on("nodes", async (message: string) => {
|
||||||
|
await publishToChannel("nodes", message);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
emitter.on("nodes", async (message: string) => {
|
||||||
|
console.log(`[SINGLE NODE] RECEIVED ${message} EVENT!`);
|
||||||
|
emitter.emit(message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
export { emitter };
|
export { emitter };
|
||||||
|
|
75
app/services/redis.server.ts
Normal file
75
app/services/redis.server.ts
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
import Redis from "ioredis";
|
||||||
|
|
||||||
|
export const cache = process.env.REDIS_URL
|
||||||
|
? new Redis(process.env.REDIS_URL, {
|
||||||
|
family: 6,
|
||||||
|
})
|
||||||
|
: null;
|
||||||
|
|
||||||
|
export const pub = process.env.REDIS_URL
|
||||||
|
? new Redis(process.env.REDIS_URL, {
|
||||||
|
family: 6,
|
||||||
|
})
|
||||||
|
: null;
|
||||||
|
|
||||||
|
export const sub = process.env.REDIS_URL
|
||||||
|
? new Redis(process.env.REDIS_URL, {
|
||||||
|
family: 6,
|
||||||
|
})
|
||||||
|
: null;
|
||||||
|
|
||||||
|
export const publishToChannel = async (channel: string, message: string) => {
|
||||||
|
await pub?.publish(channel, JSON.stringify(message));
|
||||||
|
};
|
||||||
|
|
||||||
|
export const subscribeToChannel = async (
|
||||||
|
channel: string,
|
||||||
|
callback: Function
|
||||||
|
) => {
|
||||||
|
await sub?.subscribe(channel, (err, count) => {
|
||||||
|
if (err) {
|
||||||
|
console.error("Failed to subscribe: %s", err.message);
|
||||||
|
} else {
|
||||||
|
console.log(
|
||||||
|
`Subscribed successfully! This client is currently subscribed to ${count} channels.`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
sub?.on("message", (channel, message) => {
|
||||||
|
console.log(`Received ${message} from ${channel}`);
|
||||||
|
callback(message);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
export const unsubscribeToChannel = (channel: string) => {
|
||||||
|
`Unsubscribed successfully from ${channel}!`;
|
||||||
|
Promise.resolve([sub?.unsubscribe(channel)]);
|
||||||
|
};
|
||||||
|
|
||||||
|
export const setCache = async <T>(key: string, value: T) => {
|
||||||
|
try {
|
||||||
|
await cache?.set(key, JSON.stringify(value));
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const fetchCache = async <T>(key: string) => {
|
||||||
|
try {
|
||||||
|
const result = (await cache?.get(key)) as string;
|
||||||
|
return JSON.parse(result) as T;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const invalidateCache = async (key: string) => {
|
||||||
|
try {
|
||||||
|
await cache?.del(key);
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "sprintpadawan",
|
"name": "sprintpadawan",
|
||||||
"version": "4.0.4",
|
"version": "4.1.0",
|
||||||
"private": true,
|
"private": true,
|
||||||
"sideEffects": false,
|
"sideEffects": false,
|
||||||
"type": "module",
|
"type": "module",
|
||||||
|
@ -12,14 +12,15 @@
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@clerk/remix": "^3.1.6",
|
"@clerk/remix": "^3.1.6",
|
||||||
"@libsql/client": "0.4.0-pre.4",
|
"@libsql/client": "0.4.0-pre.5",
|
||||||
"@paralleldrive/cuid2": "^2.2.2",
|
"@paralleldrive/cuid2": "^2.2.2",
|
||||||
"@remix-run/css-bundle": "^2.3.1",
|
"@remix-run/css-bundle": "^2.3.1",
|
||||||
"@remix-run/node": "^2.3.1",
|
"@remix-run/node": "^2.3.1",
|
||||||
"@remix-run/react": "^2.3.1",
|
"@remix-run/react": "^2.3.1",
|
||||||
"@remix-run/serve": "^2.3.1",
|
"@remix-run/serve": "^2.3.1",
|
||||||
"csv42": "^5.0.0",
|
"csv42": "^5.0.0",
|
||||||
"drizzle-orm": "^0.29.0",
|
"drizzle-orm": "^0.29.1",
|
||||||
|
"ioredis": "^5.3.2",
|
||||||
"isbot": "^3.7.1",
|
"isbot": "^3.7.1",
|
||||||
"lucide-react": "^0.294.0",
|
"lucide-react": "^0.294.0",
|
||||||
"react": "^18.2.0",
|
"react": "^18.2.0",
|
||||||
|
@ -35,7 +36,7 @@
|
||||||
"@types/react-dom": "^18.2.17",
|
"@types/react-dom": "^18.2.17",
|
||||||
"autoprefixer": "^10.4.16",
|
"autoprefixer": "^10.4.16",
|
||||||
"better-sqlite3": "^9.1.1",
|
"better-sqlite3": "^9.1.1",
|
||||||
"daisyui": "^4.4.14",
|
"daisyui": "^4.4.17",
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"drizzle-kit": "^0.20.6",
|
"drizzle-kit": "^0.20.6",
|
||||||
"eslint": "^8.54.0",
|
"eslint": "^8.54.0",
|
||||||
|
|
426
pnpm-lock.yaml
generated
426
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load diff
Loading…
Add table
Reference in a new issue