diff --git a/data_worker.js b/data_worker.js index d20da9a..bd15d94 100644 --- a/data_worker.js +++ b/data_worker.js @@ -1,4 +1,9 @@ const Redis = require("ioredis"); +var data; //Die Daten von Grist. + +const fetch = (...args) => import('node-fetch').then(({default: fetch}) => fetch(...args)); +const GRIST_URL = process.env.GRIST_BASE_URL; +const API_KEY = process.env.GRIST_API_KEY; // Verbindung zu Redis herstellen const redis = new Redis({ @@ -6,24 +11,43 @@ const redis = new Redis({ port: 6379 }); +startWorker(); + // Endlosschleife: Immer auf neue Jobs warten async function startWorker() { - console.log("Worker gestartet – warte auf Jobs..."); - while (true) { // BLPOP = Warte auf Job in der Queue "grist:jobs" - const job = await redis.blpop("grist:jobs", 0); - const payload = job[1]; // [0] = Queue-Name, [1] = Daten + const reply = await redis.blpop("grist:jobs", 0); + const payload = JSON.parse(reply[1]); + const { client, path, row, meta } = payload; + + const data = await getFromGrist(path, row); - console.log("Job empfangen:", payload); + // Ergebnis verpacken in ein JSON + const freight = JSON.stringify({ + job_ok: true, + client, + data + }); - // Ergebnis vorbereiten - const result = `Bearbeitet: ${payload}`; - - // Ergebnis in andere Queue schreiben - await redis.rpush("grist:results", result); - console.log("Ergebnis gespeichert:", result); + // Ergebnis in Fertig Queue schreiben. Von da holt es sich der Client. + await redis.rpush("grist:results", freight); + console.log (freight); } } -startWorker(); +async function getFromGrist(path, row) { + // + const [docId, , tableId] = path.split("/"); + const base = `${GRIST_URL}/api/docs/${docId}/tables/${tableId}/records`; + + const url = row > 0 ? `${base}/${row}` : base; + + const res = await fetch(url, { + headers: { "Authorization": `Bearer ${API_KEY}` } + }); + + return res.json(); // -> JSON mit Records +} + + diff --git a/gateway.cjs b/gateway.cjs index ef67961..c32a3e0 100644 --- a/gateway.cjs +++ b/gateway.cjs @@ -1,47 +1,46 @@ -const http = require("http"); +const http = require("http"); const Redis = require("ioredis"); -const REDIS_HOST = process.env.REDIS_HOST || "data_jobs"; -const REDIS_PORT = Number(process.env.REDIS_PORT || 6379); -const JOB_QUEUE = process.env.JOB_QUEUE || "grist:jobs"; -const RES_QUEUE = process.env.RESULT_QUEUE || "grist:results"; +const redis = new Redis({ host: process.env.REDIS_HOST || "data_jobs", + port: Number(process.env.REDIS_PORT || 6379) }); +const JOBS = process.env.JOB_QUEUE || "grist:jobs"; +const RESULTS = process.env.RESULT_QUEUE || "grist:results"; -const redis = new Redis({ host: REDIS_HOST, port: REDIS_PORT }); +http.createServer(async (req, res) => { + if (req.method === "POST" && req.url === "/api/job") { + const stream = await readJsonBody(req); // JSON Stream vom Client. mit readJsonBody() wieder zurück in JSON. + await redis.lpush(JOBS, JSON.stringify(stream)); // Job in Queue. Als String in Redis eintragen. Der Worker nimmt dann den String. + const reply = await redis.blpop(RESULTS, 30); // auf Antwort warten -function cors(res){ - res.setHeader("Access-Control-Allow-Origin","*"); - res.setHeader("Access-Control-Allow-Methods","GET, POST, OPTIONS"); - res.setHeader("Access-Control-Allow-Headers","*"); -} - -const server = http.createServer(async (req, res) => { - cors(res); - if (req.method === "OPTIONS") { res.writeHead(204); return res.end(); } - const path = req.url.split("?")[0]; - - if (req.method === "GET" && path === "/health") { - res.writeHead(200, {"content-type":"text/plain"}); return res.end("ok"); - } - - if (req.method === "GET" && (path === "/places" || path === "/api/places")) { - try { - await redis.lpush(JOB_QUEUE, "places"); - const reply = await redis.blpop(RES_QUEUE, 8); - if (!reply) { - res.writeHead(202, {"content-type":"application/json"}); - return res.end('{"ok":false,"status":"queued-or-timeout"}'); - } - const [, value] = reply; - res.writeHead(200, {"content-type":"application/json"}); - return res.end(JSON.stringify({ ok:true, raw:value })); - } catch (e) { - res.writeHead(500, {"content-type":"application/json"}); - return res.end(JSON.stringify({ ok:false, error:e.message })); + const value = reply[1]; // Worker-Response + res.writeHead(200, {"content-type":"application/json"}); + try { + JSON.parse(value); res.end(value); //Prüfen für JSON + } catch { + res.end(JSON.stringify({ //Fallback damit es sicher ein JSON ist. + ok:true, + data:value + })); } } - res.writeHead(404, {"content-type":"application/json"}); - res.end('{"error":"not found"}'); -}); + //mit curl http://192.168.0.104:8080/health kann man checken ob das Gateway überhaupt arbeitet. + if (req.method === "GET" && req.url === "/health") { + res.writeHead(200, { "content-type":"text/plain" }); + return res.end("ok"); + } -server.listen(8080, "0.0.0.0", () => console.log("[gateway] listening 8080")); + res.writeHead(404, { "content-type":"text/plain" }); + res.end("not found"); + +}).listen(8080, "0.0.0.0", () => console.log("[gateway] listening 8080")); + + +//DataStream in Json zurückverwandeln. +function readJsonBody(req){ + return new Promise((resolve) => { + let buf = ""; + req.on("data", c => buf += c); + req.on("end", () => resolve(buf ? JSON.parse(buf) : null)); + }); +} diff --git a/job json Vorlage b/job json Vorlage new file mode 100644 index 0000000..34281e2 --- /dev/null +++ b/job json Vorlage @@ -0,0 +1,17 @@ +{ + "client": "string", //Name des Anfragenden + "path": "string", //Pfad der Grist Tabelle. z.B. Arenos/Map Marker/Städte + "row": 0, //Reihe der Daten die man will. 0 = ganze Tabelle + "meta": { + "requestID":"string", //Auftragsnummer + "timestamp": new Date().toISOString(), //Zeit und Datum des Auftrags für Logs. ACHTUNG: new Date(). muss an anderer Stelle im Code gemacht werden! + "priority":"bool" //Prio Ja-Nein. Für Wichtige Anfragen. + } +} + + +{ + "job_ok":"bool", + "client":"string", + "data": ["data"] +} \ No newline at end of file diff --git a/package.json b/package.json index d374b43..a44db80 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "data-gateway", "version": "1.0.0", "type": "commonjs", - "main": "app.cjs", + "main": "gateway.cjs", "dependencies": { "ioredis": "^5.3.2" },