This commit is contained in:
Atridad Lahiji 2023-12-01 14:57:31 -07:00
parent 8c5cd373ed
commit 9457566efa
No known key found for this signature in database
12 changed files with 406 additions and 217 deletions

View file

@ -1,6 +1,7 @@
#Database
DATABASE_URL=""
DATABASE_AUTH_TOKEN=""
REDIS_URL=""
#Auth
CLERK_SIGN_UP_URL="/sign-up"

View file

@ -4,6 +4,7 @@ import { createId } from "@paralleldrive/cuid2";
import { db } from "~/services/db.server";
import { emitter } from "~/services/emitter.server";
import { rooms } from "~/services/schema";
import { invalidateCache } from "~/services/redis.server";
export async function action({ request, params, context }: ActionFunctionArgs) {
const { userId } = await getAuth({ context, params, request });
@ -33,7 +34,8 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
const success = room.length > 0;
if (success) {
emitter.emit("roomlist");
await invalidateCache(`kv_roomlist_${userId}`);
emitter.emit("nodes", "roomlist");
return json(room, {
status: 200,

View file

@ -3,6 +3,7 @@ import { ActionFunctionArgs, json } from "@remix-run/node";
import { eq } from "drizzle-orm";
import { db } from "~/services/db.server";
import { emitter } from "~/services/emitter.server";
import { invalidateCache } from "~/services/redis.server";
import { rooms } from "~/services/schema";
export async function action({ request, params, context }: ActionFunctionArgs) {
@ -32,7 +33,8 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
const success = deletedRoom.length > 0;
if (success) {
emitter.emit("roomlist");
await invalidateCache(`kv_roomlist_${userId}`);
emitter.emit("nodes", "roomlist");
return json(deletedRoom, {
status: 200,

View file

@ -39,16 +39,19 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
return eventStream(request.signal, function setup(send) {
async function handler() {
const roomFromDb = await db.query.rooms.findFirst({
db.query.rooms
.findFirst({
where: eq(rooms.id, roomId || ""),
with: {
logs: true,
},
});
send({
})
.then((roomFromDb) => {
return send({
event: `room-${roomId}`,
data: JSON.stringify(roomFromDb),
});
});
}
// Initial fetch

View file

@ -4,6 +4,7 @@ import { eq } from "drizzle-orm";
import { eventStream } from "remix-utils/sse/server";
import { db } from "~/services/db.server";
import { emitter } from "~/services/emitter.server";
import { fetchCache, setCache } from "~/services/redis.server";
import { rooms } from "~/services/schema";
// Get Room List
@ -19,21 +20,51 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
return eventStream(request.signal, function setup(send) {
async function handler() {
const roomList = await db.query.rooms.findMany({
where: eq(rooms.userId, userId || ""),
});
send({ event: userId!, data: JSON.stringify(roomList) });
}
// Initial fetch
fetchCache<
{
id: string;
createdAt: Date;
roomName: string;
}[]
>(`kv_roomlist_${userId}`).then((cachedResult) => {
if (cachedResult) {
send({ event: userId!, data: JSON.stringify(cachedResult) });
} else {
db.query.rooms
.findMany({
where: eq(rooms.userId, userId || ""),
})
.then((roomList) => {
setCache(`kv_roomlist_${userId}`, roomList).then(() => {
send({ event: userId!, data: JSON.stringify(roomList) });
});
});
}
});
}
// Initial fetch
fetchCache<
{
id: string;
createdAt: Date;
roomName: string;
}[]
>(`kv_roomlist_${userId}`).then((cachedResult) => {
if (cachedResult) {
send({ event: userId!, data: JSON.stringify(cachedResult) });
} else {
db.query.rooms
.findMany({
where: eq(rooms.userId, userId || ""),
})
.then((roomList) => {
setCache(`kv_roomlist_${userId}`, roomList).then(() => {
send({ event: userId!, data: JSON.stringify(roomList) });
});
});
}
});
emitter.on("roomlist", handler);

View file

@ -75,7 +75,7 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
},
})
.then(async () => {
emitter.emit("presence");
emitter.emit("nodes", "presence");
});
// Initial fetch
@ -86,7 +86,7 @@ export async function loader({ context, params, request }: LoaderFunctionArgs) {
.where(and(eq(presence.roomId, roomId), eq(presence.userId, userId)))
.returning()
.then(async () => {
emitter.emit("presence");
emitter.emit("nodes", "presence");
});
emitter.off("presence", handler);
};

View file

@ -76,8 +76,8 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
if (success) {
console.log(success);
emitter.emit("room");
emitter.emit("votes");
emitter.emit("nodes", "room");
emitter.emit("nodes", "votes");
return json(newRoom, {
status: 200,

View file

@ -40,7 +40,7 @@ export async function action({ request, params, context }: ActionFunctionArgs) {
const success = upsertResult.rowsAffected > 0;
if (success) {
emitter.emit("votes");
emitter.emit("nodes", "votes");
return json(upsertResult, {
status: 200,

View file

@ -1,4 +1,5 @@
import { EventEmitter } from "events";
import { publishToChannel, subscribeToChannel } from "./redis.server";
let emitter: EventEmitter;
@ -15,4 +16,21 @@ if (process.env.NODE_ENV === "production") {
emitter = global.__emitter;
}
if (process.env.REDIS_URL) {
subscribeToChannel("nodes", (message: string) => {
console.log(`[MULTI-NODE] RECEIVED ${message} EVENT FROM ANOTHER NODE!`);
const parsedMessage = message.split('"')[1];
emitter.emit(parsedMessage);
});
emitter.on("nodes", async (message: string) => {
await publishToChannel("nodes", message);
});
} else {
emitter.on("nodes", async (message: string) => {
console.log(`[SINGLE NODE] RECEIVED ${message} EVENT!`);
emitter.emit(message);
});
}
export { emitter };

View file

@ -0,0 +1,75 @@
import Redis from "ioredis";
export const cache = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, {
family: 6,
})
: null;
export const pub = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, {
family: 6,
})
: null;
export const sub = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, {
family: 6,
})
: null;
export const publishToChannel = async (channel: string, message: string) => {
await pub?.publish(channel, JSON.stringify(message));
};
export const subscribeToChannel = async (
channel: string,
callback: Function
) => {
await sub?.subscribe(channel, (err, count) => {
if (err) {
console.error("Failed to subscribe: %s", err.message);
} else {
console.log(
`Subscribed successfully! This client is currently subscribed to ${count} channels.`
);
}
});
sub?.on("message", (channel, message) => {
console.log(`Received ${message} from ${channel}`);
callback(message);
});
};
export const unsubscribeToChannel = (channel: string) => {
`Unsubscribed successfully from ${channel}!`;
Promise.resolve([sub?.unsubscribe(channel)]);
};
export const setCache = async <T>(key: string, value: T) => {
try {
await cache?.set(key, JSON.stringify(value));
return true;
} catch {
return false;
}
};
export const fetchCache = async <T>(key: string) => {
try {
const result = (await cache?.get(key)) as string;
return JSON.parse(result) as T;
} catch {
return null;
}
};
export const invalidateCache = async (key: string) => {
try {
await cache?.del(key);
return true;
} catch {
return false;
}
};

View file

@ -1,6 +1,6 @@
{
"name": "sprintpadawan",
"version": "4.0.4",
"version": "4.1.0",
"private": true,
"sideEffects": false,
"type": "module",
@ -12,14 +12,15 @@
},
"dependencies": {
"@clerk/remix": "^3.1.6",
"@libsql/client": "0.4.0-pre.4",
"@libsql/client": "0.4.0-pre.5",
"@paralleldrive/cuid2": "^2.2.2",
"@remix-run/css-bundle": "^2.3.1",
"@remix-run/node": "^2.3.1",
"@remix-run/react": "^2.3.1",
"@remix-run/serve": "^2.3.1",
"csv42": "^5.0.0",
"drizzle-orm": "^0.29.0",
"drizzle-orm": "^0.29.1",
"ioredis": "^5.3.2",
"isbot": "^3.7.1",
"lucide-react": "^0.294.0",
"react": "^18.2.0",
@ -35,7 +36,7 @@
"@types/react-dom": "^18.2.17",
"autoprefixer": "^10.4.16",
"better-sqlite3": "^9.1.1",
"daisyui": "^4.4.14",
"daisyui": "^4.4.17",
"dotenv": "^16.3.1",
"drizzle-kit": "^0.20.6",
"eslint": "^8.54.0",

426
pnpm-lock.yaml generated

File diff suppressed because it is too large Load diff