
Business Intelligence/6. Mai 2024 -Aktualisiert am 6. Mai 2024/4 Min. Lesezeit
Google Cloud Task mit Cloud Functions

Bei Peaks & Pies setzen wir auf viele Microservices, die einen kleinen Teil eines großen Prozesses abarbeiten. Beispielsweise importieren wir eine CSV-Datei mit Userdaten via API, die dann in einem anderen System angelegt werden müssen.
Hierzu wird
- eine große CSV-Datei eingelesen, in der jede Zeile einen User repräsentiert.
- Diese werden transformiert in das Format des Zielsystems und
- schließlich via API importiert.
Als Cloud Pipeline bilden wir dies so ab:
Cloud Storage (CSV) → Cloud Function (Transform) → Pub/Sub → Cloud Function (3rd Party API)
Warum brauchen wir Queues?
Aufgrund wachsender Anzahl an Usern dauert der Prozess aktuell sehr lange und wir müssen versuchen, Teile des Prozesses, die blockieren, zu parallelisieren. In unserem aktuellen Setup, mit Google Pub/Sub und Cloud Function würde dies funktionieren, aber Teil 3 aus dem Prozess führt hierbei zu Problemen: die externe API hat bestimmte Rate Limits und blockiert bei zu vielen parallelen Anfragen in kurzer Zeit.
Unser erster Versuch, eine maximale Concurrency bei der Cloud Function, die den API-Aufruft macht, zu konfigurieren, hat leider dazu geführt, dass nicht alle Nachrichten von dem Pub/Sub Topic abgearbeitet werden, da eine Subscription auf einem Topic nur max. ca. 600 Sekunden versucht die Cloud Function zu triggern und am Ende mit “the request was aborted because there was no available instance” abbricht.
Eine Alternative wäre gewesen, Instanzen zu schreiben, die Nachrichten von der Subscription pullen, aber die laufen dann ggf. unnötig lange und das Handling müssten wir selbst entwickeln und konfigurieren.
Google Cloud Tasks
Bei unserer Suche sind wir dann auf Google Cloud Tasks gestoßen, die genau diesen UseCase abbilden und was wir bisher von Pub/Sub erwartet hätten. Cloud Tasks erlauben die Verarbeitung von Tasks in einer Queue. Die Tasks werden asynchron auf die Queue abgelegt und triggern dann Cloud Functions, um diese Tasks abzuarbeiten. Auf dieser Queue lässt sich ebenfalls die Concurrency konfigurieren.
Hier also die angepasste Pipeline:
Cloud Storage (CSV) → Cloud Function (Transform) → Cloud Tasks → Cloud Function (3rd Party API)
Queue anlegen
Eine Queue kann dabei schnell über die Konsole oder die Oberfläche angelegt werden. Stellt sicher, dass die Cloud Tasks API aktiviert ist und die Einstellungen der App Engine und Regionen passend vorliegt. Dies ist notwendig, da die Queue auf den Einstellungen der App Engine basiert. Mit dem folgenden Befehl wird z.B. eine Queue erzeugt:
1gcloud tasks queues create tasks
und konfiguriert:
1gcloud tasks queues update tasks --max-concurrent-dispatches=5 --max-attempts=3 --min-backoff=5s
batchfile
Tasks zur Queue hinzufügen
Das kann im Grunde alles sein. Wir haben lokal einen kleinen Code geschrieben, der Tasks zur Queue hinzufügt.
Authentifizierung
GCP-Dienste wie Cloud Functions können inbound OIDC-Token automatisch validieren. Dies ermöglicht die Authentifizierung und Autorisierung durch IAM-Richtlinien für das mit diesem Token verbundene Benutzer- oder Dienstkonto. Wir nutzen dies, um unsere Empfangsfunktion sicher über HTTPS auszulösen, ohne dass sie öffentlich zugänglich ist.
Die unter der serviceAccountEmail angegebene E-Mail muss einem IAM-Konto entsprechen, welches die Cloud Function ausführen darf. Hier das Beispiel:
1const projectId = 'PROJECT_ID';
2const queueId = 'tasks';
3const locationId = 'europe-west3';
4const parent = `projects/${projectId}/locations/${locationId}/queues/${queueId}`;
5
6const { CloudTasksClient } = require('@google-cloud/tasks').v2;
7
8const tasksClient = new CloudTasksClient();
9
10async function callCreateTask(n) {
11 const payload = {
12 foo: 'bar',
13 key: 'value',
14 };
15 const task = {
16 httpRequest: {
17 httpMethod: 'POST',
18 url: `https://${locationId}-${projectId}.cloudfunctions.net/tasks`,
19 body: Buffer.from(JSON.stringify(payload)).toString('base64'),
20 headers: {
21 'Content-Type': 'application/json',
22 },
23 oidcToken: {
24 serviceAccountEmail: `${projectId}@appspot.gserviceaccount.com`,
25 },
26 },
27 };
28 console.log(task);
29
30 const formattedParent = tasksClient.queuePath(projectId, locationId, queueId);
31 // Construct request
32 const request = {
33 parent,
34 task,
35 };
36
37 // Run request
38 const response = await tasksClient.createTask(request);
39 console.log(response);
40}
41
42for (let i = 0; i < 10000; i++) {
43 callCreateTask(i);
44}
45
javascript
Cloud Function als Konsument erstellen
Über die Oberfläche oder die Konsole kann eine einfache HTTP-triggered Cloud Function angelegt werden welche dann von der Queue aufgerufen wird.
1const functions = require('@google-cloud/functions-framework');
2
3
4functions.http('helloHttp', (req, res) => {
5
6 res.send(`Hello ${req.query.name || req.body.name || 'World'}!`);
7
8});
javascript
Das wars!
Nun haben wir:
- Eine Queue in Google Cloud Tasks
- Ein Script, um Tasks auf der Queue abzulegen
- Eine Cloud Function, die per HTTP von der Queue getriggert wird
Queues bilden einen wichtigen Baustein bei der Abarbeitung und besonders Konfiguration komplexerer Workflows.
Zum Abschluss noch ein paar hilfreiche Links: