15
Async debugged
jonroach 9 Sep 2025 13:14
1 contributor
511 lines12.4 KB
1#include "coroutine.h"
2#include <assert.h>
3#include <setjmp.h>
4#include <stdbool.h>
5#include <stddef.h>
6#include <stdio.h>
7#include <pthread.h>
8#include <stdlib.h>
9#include "cor_thread_local.h"
10
11
12
13static void *mustmalloc(size_t size){
14 void *p = malloc(size);
15 assert(p);
16 return p;
17}
18
19#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
20#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
21
22///////////////////////////////////////////////////////////////////////////////
23// 2-way linked lists...
24//
25// Brought inline here to avoid namespace polution
26///////////////////////////////////////////////////////////////////////////////
27
28typedef struct List_Link List_Link;
29struct List_Link {
30 List_Link *next;
31 List_Link *prev;
32};
33
34typedef struct List_Head List_Head;
35struct List_Head {
36 union {
37 struct {
38 List_Link link;
39 List_Link *filler;
40 } fwd;
41 struct {
42 List_Link *filler;
43 List_Link link;
44 } back;
45 };
46};
47
48
49static inline bool List_IsEmpty(
50 const List_Head *list
51){
52 return list->fwd.link.next == &list->back.link;
53}
54
55
56static inline List_Link *List_GetHead(
57 const List_Head *list
58){
59 return List_IsEmpty(list) ? NULL : list->fwd.link.next;
60}
61
62
63// static inline List_Link *List_GetTail(
64// const List_Head *list
65// ){
66// return List_IsEmpty(list) ? NULL : list->back.link.prev;
67// }
68
69
70#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
71#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
72
73
74static inline void List_Init(
75 List_Head *list
76){
77 list->fwd.link.next = &list->back.link;
78 list->fwd.link.prev = NULL;
79 list->back.link.prev = &list->fwd.link;
80}
81
82
83static inline void List_AddHead(
84 List_Head *list,
85 List_Link *link
86){
87 List_Link *first = list->fwd.link.next;
88 link->next = first;
89 link->prev = &list->fwd.link;
90 first->prev = link;
91 list->fwd.link.next = link;
92}
93
94
95static inline void List_AddTail(
96 List_Head *list,
97 List_Link *link
98){
99 List_Link *last = list->back.link.prev;
100 link->prev = last;
101 link->next = &list->back.link;
102 last->next = link;
103 list->back.link.prev = link;
104}
105
106
107static inline void List_Remove(
108 List_Link *link
109){
110 link->prev->next = link->next;
111 link->next->prev = link->prev;
112}
113
114///////////////////////////////////////////////////////////////////////////////
115// ...2-way linked lists
116///////////////////////////////////////////////////////////////////////////////
117
118typedef struct Coroutines Coroutines;
119
120enum {
121 Coroutines_Idle,
122 Coroutines_Starting,
123 Coroutines_Started,
124 Coroutines_Active,
125 Coroutines_Stopping
126};
127
128enum {
129 Chunk_Initial,
130 Chunk_Create,
131 Chunk_Enter
132};
133
134enum {
135 Coroutine_Constructing,
136 Coroutine_Free,
137 Coroutine_Idle,
138 Coroutine_Running,
139 Coroutine_Waiting,
140 Coroutine_Complete
141};
142
143enum {
144 Coroutines_Init,
145 Coroutines_AllocatedChunk,
146 Coroutines_CoroutineComplete,
147};
148
149struct Coroutine {
150 Coroutines *coroutines;
151 List_Link link;
152 jmp_buf buf;
153 void *this;
154 Coroutine_Start start;
155 void *entry_param;
156 void *value;
157 char state;
158 char action;
159};
160
161struct Coroutines {
162 pthread_mutex_t mutex;
163 jmp_buf controller;
164 jmp_buf chunk_allocated;
165
166 // singletons
167 Coroutine *tip; // top of stack chunk
168 Coroutine *active; // currently running coroutine
169 Coroutine *primary; // Coroutine_Run coroutine
170
171 // lists
172 List_Head free;
173 List_Head inactive; // idle or complete
174 List_Head runable; // running or waiting to run
175 List_Head waiting; // yielded / waiting to run
176 pthread_cond_t waiting_cond;
177
178 // state
179 char state;
180};
181
182_Cor_thread_local Coroutines *g_c;
183
184static void stack_chunk_chunk(Coroutine *parent);
185static void stack_chunk_base(Coroutine *parent);
186
187
188static void Coroutine_PrimeStackChunks()
189{
190 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
191 chunk_of_stack[0] = 0xde;
192 chunk_of_stack[1] = 0xad;
193 chunk_of_stack[2] = 0xbe;
194 chunk_of_stack[3] = 0xef;
195 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
196 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
197 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
198 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
199 stack_chunk_base(NULL);
200}
201
202
203static void stack_chunk_chunk(
204 Coroutine *parent
205){
206 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
207 chunk_of_stack[0] = 0xde;
208 chunk_of_stack[1] = 0xad;
209 chunk_of_stack[2] = 0xbe;
210 chunk_of_stack[3] = 0xef;
211 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
212 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
213 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
214 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
215 stack_chunk_base(parent);
216}
217
218
219static void Coroutine_RunNext()
220{
221 // arrive here with mutex unlocked
222 int r;
223
224 r = pthread_mutex_lock(&g_c->mutex);
225 assert(r == 0);
226 while (List_IsEmpty(&g_c->runable)) {
227 r = pthread_cond_wait(&g_c->waiting_cond, &g_c->mutex);
228 assert(r == 0);
229 }
230 Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
231 assert(next->state == Coroutine_Running);
232 longjmp(next->buf, Chunk_Enter);
233 assert(false);
234}
235
236
237static void stack_chunk_base(
238 Coroutine *parent
239){
240 Coroutine here;
241 here.state = Coroutine_Constructing;
242 switch (setjmp(here.buf)) {
243 case Chunk_Initial:
244 // got here for the first time
245 // parent now has a chunk_of_stack - add it to the free list
246 if (parent) {
247 assert(parent->state == Coroutine_Constructing);
248 parent->state = Coroutine_Free;
249 List_AddHead(&g_c->free, &parent->link);
250 }
251 // note that here is the tip of the chunk-claim stack
252 here.coroutines = g_c;
253 g_c->tip = &here;
254
255 // return to the coroutine allocator
256 longjmp(g_c->chunk_allocated, 1);
257 case Chunk_Create:
258 // request to create a new chunk on the stack
259 assert(here.state == Coroutine_Constructing);
260 stack_chunk_chunk(&here);
261 assert(false);
262 case Chunk_Enter:
263 // request to start a coroutine (ie use the chunk for a coroutine)
264 // arrive here with mutex locked
265 assert(here.state == Coroutine_Running);
266 g_c->active = &here;
267 int r = pthread_mutex_unlock(&g_c->mutex);
268 assert(r == 0);
269 here.value = here.start(here.entry_param);
270 r = pthread_mutex_lock(&g_c->mutex);
271 assert(r == 0);
272 g_c->active = NULL;
273 assert(here.state == Coroutine_Running);
274 List_Remove(&here.link);
275 here.state = Coroutine_Complete;
276 List_AddTail(&g_c->inactive, &here.link);
277 // coroutine has completed
278 if (g_c->primary == &here) {
279 // if primary coroutine - return to Coroutine_Run
280 longjmp(g_c->controller, Coroutines_CoroutineComplete);
281 }
282 r = pthread_mutex_unlock(&g_c->mutex);
283 assert(r == 0);
284 Coroutine_RunNext();
285 assert(false);
286 }
287}
288
289
290void Coroutine_StartSystem()
291{
292 assert(!g_c);
293 g_c = mustmalloc(sizeof(Coroutines));
294
295 g_c->state = Coroutines_Starting;
296
297 pthread_mutexattr_t attr;
298 int r = pthread_mutexattr_init(&attr);
299 assert(r == 0);
300 r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
301 assert(r == 0);
302 r = pthread_mutex_init(&g_c->mutex, &attr);
303 assert(r == 0);
304 r = pthread_mutexattr_destroy(&attr);
305 assert(r == 0);
306
307 g_c->tip = NULL;
308 g_c->active = NULL;
309
310 List_Init(&g_c->free);
311 List_Init(&g_c->inactive);
312 List_Init(&g_c->runable);
313 List_Init(&g_c->waiting);
314 r = pthread_cond_init(&g_c->waiting_cond, NULL);
315 assert(r == 0);
316
317 // prime the chunk system
318 if (!setjmp(g_c->chunk_allocated)){
319 Coroutine_PrimeStackChunks();
320 assert(false);
321 }
322
323 assert(g_c->state == Coroutines_Starting);
324 g_c->state = Coroutines_Started;
325}
326
327
328void Coroutine_StopSystem()
329{
330 int r = pthread_mutex_lock(&g_c->mutex);
331 assert(r == 0);
332 assert(g_c->state == Coroutines_Started);
333 g_c->state = Coroutines_Stopping;
334
335 assert(List_IsEmpty(&g_c->inactive));
336 r = pthread_cond_destroy(&g_c->waiting_cond);
337 assert(r == 0);
338
339 assert(g_c->state == Coroutines_Stopping);
340 pthread_mutex_unlock(&g_c->mutex);
341 assert(r == 0);
342 g_c->state = Coroutines_Idle;
343 r = pthread_mutex_destroy(&g_c->mutex);
344 assert(r == 0);
345 free(g_c);
346 g_c = NULL;
347}
348
349
350void Coroutine_Run_Coroutine(
351 Coroutine *cor,
352 void *value
353){
354 Coroutines *cors = cor->coroutines;
355 assert(g_c == cors);
356 int r = pthread_mutex_lock(&cors->mutex);
357 assert(r == 0);
358 assert(cors->state == Coroutines_Started);
359 cors->state = Coroutines_Active;
360 cors->primary = cor;
361 Coroutine_Continue(cor, value, true);
362
363 if (!setjmp(cors->controller)){
364 pthread_mutex_unlock(&cors->mutex);
365 assert(r == 0);
366 // start the first coroutine
367 Coroutine_RunNext();
368 }
369 // arrive here with mutex locked
370 assert(List_IsEmpty(&cors->runable));
371 assert(List_IsEmpty(&cors->waiting));
372 assert(cors->state == Coroutines_Active);
373 cors->state = Coroutines_Started;
374 pthread_mutex_unlock(&cors->mutex);
375 assert(r == 0);
376}
377
378
379void *Coroutine_Run(
380 Coroutine_Start start,
381 void *value
382){
383 Coroutine *cor = Coroutine_New(start);
384 Coroutine_Run_Coroutine(cor, value);
385 void *res = Coroutine_GetValue(cor);
386 Coroutine_Delete(cor);
387 return res;
388}
389
390
391Coroutine *Coroutine_New(
392 Coroutine_Start start
393){
394 assert((g_c->state == Coroutines_Started && List_IsEmpty(&g_c->inactive)) || g_c->state == Coroutines_Active);
395
396 // if none free - add one
397 if (List_IsEmpty(&g_c->free)){
398 if (!setjmp(g_c->chunk_allocated)){
399 longjmp(g_c->tip->buf, Chunk_Create);
400 }
401 }
402
403 Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
404 assert(cor->state == Coroutine_Free);
405 cor->state = Coroutine_Idle;
406 cor->start = start;
407 cor->value = NULL;
408 List_Remove(&cor->link);
409 List_AddHead(&g_c->inactive, &cor->link);
410
411 return cor;
412}
413
414
415void Coroutine_Delete(
416 Coroutine *cor
417){
418 Coroutines *cors = cor->coroutines;
419 int r = pthread_mutex_lock(&cors->mutex);
420 assert(r == 0);
421 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
422 cor->state = Coroutine_Free;
423 List_Remove(&cor->link);
424 List_AddTail(&cors->free, &cor->link);
425 r = pthread_mutex_unlock(&cors->mutex);
426 assert(r == 0);
427}
428
429
430void Coroutine_Continue(
431 Coroutine *cor,
432 void *value,
433 bool early
434){
435 Coroutines *cors = cor->coroutines;
436 int r = pthread_mutex_lock(&cors->mutex);
437 assert(r == 0);
438 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
439 cor->entry_param = value;
440 cor->state = Coroutine_Running;
441 List_Remove(&cor->link);
442 if ( early ) {
443 List_AddHead(&cors->runable, &cor->link);
444 } else {
445 List_AddTail(&cors->runable, &cor->link);
446 }
447 r = pthread_cond_signal(&cors->waiting_cond);
448 r = pthread_mutex_unlock(&cors->mutex);
449 assert(r == 0);
450}
451
452
453void *Coroutine_Yield(
454 void *value,
455 Coroutine_YieldCallback on_yield,
456 void *this
457){
458 int r = pthread_mutex_lock(&g_c->mutex);
459 assert(r == 0);
460 Coroutine *me = g_c->active;
461 Coroutines *cors = me->coroutines;
462 assert(me && me->state == Coroutine_Running && cors == g_c);
463 me->value = value;
464 me->state = Coroutine_Waiting;
465
466 List_Remove(&me->link);
467 List_AddTail(&cors->waiting, &me->link);
468
469 switch (setjmp(me->buf)){
470 case Chunk_Initial:
471 r = pthread_mutex_unlock(&cors->mutex);
472 assert(r == 0);
473 on_yield(this);
474 Coroutine_RunNext();
475 case Chunk_Create:
476 assert(false);
477 case Chunk_Enter:
478 // arrive here with mutex locked
479 cors->active = me;
480 // when we return here - we are running again
481 assert(me->state == Coroutine_Running);
482 void *res = me->entry_param;
483 r = pthread_mutex_unlock(&cors->mutex);
484 assert(r == 0);
485 return res;
486 }
487 return NULL;
488}
489
490
491void *Coroutine_GetValue(
492 Coroutine *cor
493){
494 return cor->value;
495}
496
497
498Coroutine *Coroutine_GetActive()
499{
500 return g_c->active;
501}
502
503
504bool Coroutine_IsRunning(
505 Coroutine *cor
506)
507{
508 int state = cor->state;
509 return state == Coroutine_Running || state == Coroutine_Waiting;
510}
511