Worker und Gateway mit Grist Zugriff. Grundlegend.
This commit is contained in:
parent
2254ec6e87
commit
f283b9d8e1
|
|
@ -1,4 +1,9 @@
|
|||
const Redis = require("ioredis");
|
||||
var data; //Die Daten von Grist.
|
||||
|
||||
const fetch = (...args) => import('node-fetch').then(({default: fetch}) => fetch(...args));
|
||||
const GRIST_URL = process.env.GRIST_BASE_URL;
|
||||
const API_KEY = process.env.GRIST_API_KEY;
|
||||
|
||||
// Verbindung zu Redis herstellen
|
||||
const redis = new Redis({
|
||||
|
|
@ -6,24 +11,43 @@ const redis = new Redis({
|
|||
port: 6379
|
||||
});
|
||||
|
||||
startWorker();
|
||||
|
||||
// Endlosschleife: Immer auf neue Jobs warten
|
||||
async function startWorker() {
|
||||
console.log("Worker gestartet – warte auf Jobs...");
|
||||
|
||||
while (true) {
|
||||
// BLPOP = Warte auf Job in der Queue "grist:jobs"
|
||||
const job = await redis.blpop("grist:jobs", 0);
|
||||
const payload = job[1]; // [0] = Queue-Name, [1] = Daten
|
||||
const reply = await redis.blpop("grist:jobs", 0);
|
||||
const payload = JSON.parse(reply[1]);
|
||||
const { client, path, row, meta } = payload;
|
||||
|
||||
console.log("Job empfangen:", payload);
|
||||
const data = await getFromGrist(path, row);
|
||||
|
||||
// Ergebnis vorbereiten
|
||||
const result = `Bearbeitet: ${payload}`;
|
||||
// Ergebnis verpacken in ein JSON
|
||||
const freight = JSON.stringify({
|
||||
job_ok: true,
|
||||
client,
|
||||
data
|
||||
});
|
||||
|
||||
// Ergebnis in andere Queue schreiben
|
||||
await redis.rpush("grist:results", result);
|
||||
console.log("Ergebnis gespeichert:", result);
|
||||
// Ergebnis in Fertig Queue schreiben. Von da holt es sich der Client.
|
||||
await redis.rpush("grist:results", freight);
|
||||
console.log (freight);
|
||||
}
|
||||
}
|
||||
|
||||
startWorker();
|
||||
async function getFromGrist(path, row) {
|
||||
//
|
||||
const [docId, , tableId] = path.split("/");
|
||||
const base = `${GRIST_URL}/api/docs/${docId}/tables/${tableId}/records`;
|
||||
|
||||
const url = row > 0 ? `${base}/${row}` : base;
|
||||
|
||||
const res = await fetch(url, {
|
||||
headers: { "Authorization": `Bearer ${API_KEY}` }
|
||||
});
|
||||
|
||||
return res.json(); // -> JSON mit Records
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
73
gateway.cjs
73
gateway.cjs
|
|
@ -1,47 +1,46 @@
|
|||
const http = require("http");
|
||||
const Redis = require("ioredis");
|
||||
|
||||
const REDIS_HOST = process.env.REDIS_HOST || "data_jobs";
|
||||
const REDIS_PORT = Number(process.env.REDIS_PORT || 6379);
|
||||
const JOB_QUEUE = process.env.JOB_QUEUE || "grist:jobs";
|
||||
const RES_QUEUE = process.env.RESULT_QUEUE || "grist:results";
|
||||
const redis = new Redis({ host: process.env.REDIS_HOST || "data_jobs",
|
||||
port: Number(process.env.REDIS_PORT || 6379) });
|
||||
const JOBS = process.env.JOB_QUEUE || "grist:jobs";
|
||||
const RESULTS = process.env.RESULT_QUEUE || "grist:results";
|
||||
|
||||
const redis = new Redis({ host: REDIS_HOST, port: REDIS_PORT });
|
||||
http.createServer(async (req, res) => {
|
||||
if (req.method === "POST" && req.url === "/api/job") {
|
||||
const stream = await readJsonBody(req); // JSON Stream vom Client. mit readJsonBody() wieder zurück in JSON.
|
||||
await redis.lpush(JOBS, JSON.stringify(stream)); // Job in Queue. Als String in Redis eintragen. Der Worker nimmt dann den String.
|
||||
const reply = await redis.blpop(RESULTS, 30); // auf Antwort warten
|
||||
|
||||
function cors(res){
|
||||
res.setHeader("Access-Control-Allow-Origin","*");
|
||||
res.setHeader("Access-Control-Allow-Methods","GET, POST, OPTIONS");
|
||||
res.setHeader("Access-Control-Allow-Headers","*");
|
||||
}
|
||||
|
||||
const server = http.createServer(async (req, res) => {
|
||||
cors(res);
|
||||
if (req.method === "OPTIONS") { res.writeHead(204); return res.end(); }
|
||||
const path = req.url.split("?")[0];
|
||||
|
||||
if (req.method === "GET" && path === "/health") {
|
||||
res.writeHead(200, {"content-type":"text/plain"}); return res.end("ok");
|
||||
}
|
||||
|
||||
if (req.method === "GET" && (path === "/places" || path === "/api/places")) {
|
||||
try {
|
||||
await redis.lpush(JOB_QUEUE, "places");
|
||||
const reply = await redis.blpop(RES_QUEUE, 8);
|
||||
if (!reply) {
|
||||
res.writeHead(202, {"content-type":"application/json"});
|
||||
return res.end('{"ok":false,"status":"queued-or-timeout"}');
|
||||
}
|
||||
const [, value] = reply;
|
||||
const value = reply[1]; // Worker-Response
|
||||
res.writeHead(200, {"content-type":"application/json"});
|
||||
return res.end(JSON.stringify({ ok:true, raw:value }));
|
||||
} catch (e) {
|
||||
res.writeHead(500, {"content-type":"application/json"});
|
||||
return res.end(JSON.stringify({ ok:false, error:e.message }));
|
||||
try {
|
||||
JSON.parse(value); res.end(value); //Prüfen für JSON
|
||||
} catch {
|
||||
res.end(JSON.stringify({ //Fallback damit es sicher ein JSON ist.
|
||||
ok:true,
|
||||
data:value
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
res.writeHead(404, {"content-type":"application/json"});
|
||||
res.end('{"error":"not found"}');
|
||||
//mit curl http://192.168.0.104:8080/health kann man checken ob das Gateway überhaupt arbeitet.
|
||||
if (req.method === "GET" && req.url === "/health") {
|
||||
res.writeHead(200, { "content-type":"text/plain" });
|
||||
return res.end("ok");
|
||||
}
|
||||
|
||||
res.writeHead(404, { "content-type":"text/plain" });
|
||||
res.end("not found");
|
||||
|
||||
}).listen(8080, "0.0.0.0", () => console.log("[gateway] listening 8080"));
|
||||
|
||||
|
||||
//DataStream in Json zurückverwandeln.
|
||||
function readJsonBody(req){
|
||||
return new Promise((resolve) => {
|
||||
let buf = "";
|
||||
req.on("data", c => buf += c);
|
||||
req.on("end", () => resolve(buf ? JSON.parse(buf) : null));
|
||||
});
|
||||
|
||||
server.listen(8080, "0.0.0.0", () => console.log("[gateway] listening 8080"));
|
||||
}
|
||||
|
|
|
|||
17
job json Vorlage
Normal file
17
job json Vorlage
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
"client": "string", //Name des Anfragenden
|
||||
"path": "string", //Pfad der Grist Tabelle. z.B. Arenos/Map Marker/Städte
|
||||
"row": 0, //Reihe der Daten die man will. 0 = ganze Tabelle
|
||||
"meta": {
|
||||
"requestID":"string", //Auftragsnummer
|
||||
"timestamp": new Date().toISOString(), //Zeit und Datum des Auftrags für Logs. ACHTUNG: new Date(). muss an anderer Stelle im Code gemacht werden!
|
||||
"priority":"bool" //Prio Ja-Nein. Für Wichtige Anfragen.
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
"job_ok":"bool",
|
||||
"client":"string",
|
||||
"data": ["data"]
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
"name": "data-gateway",
|
||||
"version": "1.0.0",
|
||||
"type": "commonjs",
|
||||
"main": "app.cjs",
|
||||
"main": "gateway.cjs",
|
||||
"dependencies": {
|
||||
"ioredis": "^5.3.2"
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user