652 lines16.1 KB
1#include "coroutine.h"
2#include <assert.h>
3#include <setjmp.h>
4#include <stdbool.h>
5#include <stddef.h>
6#include <stdio.h>
7#include <pthread.h>
8#include <stdlib.h>
9#include "cor_thread_local.h"
10
11
12static void *mustmalloc(size_t size){
13 void *p = malloc(size);
14 assert(p);
15 return p;
16}
17
18#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
19#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
20static void Coroutine_RunNext();
21static void _Coroutine_Continue(Coroutine *cor, void *value, bool early);
22
23///////////////////////////////////////////////////////////////////////////////
24// 2-way linked lists...
25//
26// Brought inline here to avoid namespace polution
27///////////////////////////////////////////////////////////////////////////////
28
29typedef struct List_Link List_Link;
30struct List_Link {
31 List_Link *next;
32 List_Link *prev;
33};
34
35typedef struct List_Head List_Head;
36struct List_Head {
37 union {
38 struct {
39 List_Link link;
40 List_Link *filler;
41 } fwd;
42 struct {
43 List_Link *filler;
44 List_Link link;
45 } back;
46 };
47};
48
49
50static inline bool List_IsEmpty(
51 const List_Head *list
52){
53 return list->fwd.link.next == &list->back.link;
54}
55
56
57static inline List_Link *List_GetHead(
58 const List_Head *list
59){
60 return List_IsEmpty(list) ? NULL : list->fwd.link.next;
61}
62
63
64// static inline List_Link *List_GetTail(
65// const List_Head *list
66// ){
67// return List_IsEmpty(list) ? NULL : list->back.link.prev;
68// }
69
70
71#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
72#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
73
74
75static inline void List_Init(
76 List_Head *list
77){
78 list->fwd.link.next = &list->back.link;
79 list->fwd.link.prev = NULL;
80 list->back.link.prev = &list->fwd.link;
81}
82
83
84static inline void List_AddHead(
85 List_Head *list,
86 List_Link *link
87){
88 List_Link *first = list->fwd.link.next;
89 link->next = first;
90 link->prev = &list->fwd.link;
91 first->prev = link;
92 list->fwd.link.next = link;
93}
94
95
96static inline void List_AddTail(
97 List_Head *list,
98 List_Link *link
99){
100 List_Link *last = list->back.link.prev;
101 link->prev = last;
102 link->next = &list->back.link;
103 last->next = link;
104 list->back.link.prev = link;
105}
106
107
108static inline void List_Remove(
109 List_Link *link
110){
111 link->prev->next = link->next;
112 link->next->prev = link->prev;
113}
114
115///////////////////////////////////////////////////////////////////////////////
116// ...2-way linked lists
117///////////////////////////////////////////////////////////////////////////////
118
119typedef struct Coroutines Coroutines;
120
121enum {
122 Coroutines_Idle,
123 Coroutines_Starting,
124 Coroutines_Started,
125 Coroutines_Active,
126 Coroutines_Stopping
127};
128
129enum {
130 Chunk_Initial,
131 Chunk_Create,
132 Chunk_Enter
133};
134
135typedef enum Coroutine_State {
136 Coroutine_Constructing,
137 Coroutine_Free,
138 Coroutine_Idle,
139 Coroutine_Running,
140 Coroutine_Waiting,
141 Coroutine_Complete
142} Coroutine_State;
143
144enum {
145 Coroutines_Init,
146 Coroutines_AllocatedChunk,
147 Coroutines_CoroutineComplete,
148};
149
150struct Coroutine {
151 Coroutines *coroutines; // so can work with it off-thread
152 List_Link link; // for whichever list it's on
153 jmp_buf buf; // how to get back to it
154 unsigned char *guard; // where the stack overrun guard is
155 Coroutine_Start start; // entry point
156 void *entry_param; // to pass to start
157 void *value; // yielded/returned
158 Coroutine_State state;
159};
160
161struct Coroutines {
162 pthread_mutex_t mutex;
163 jmp_buf controller; // to return from Coroutine_Run
164 jmp_buf chunk_allocated;// for chunk allocation
165 unsigned char *guard; // the stack guard for the startup sequence
166
167 // singletons
168 Coroutine *tip; // top of stack chunk
169 Coroutine *active; // currently running coroutine
170 Coroutine *primary; // Coroutine_Run coroutine
171
172 // lists
173 List_Head free;
174 List_Head inactive; // idle or complete
175 List_Head runable; // running or waiting to run
176 List_Head waiting; // yielded / waiting to run
177 pthread_mutex_t waiting_mutex;
178
179 // Summary of the system
180 Coroutine_Report report;
181
182 // state
183 char state;
184};
185
186_Cor_thread_local Coroutines *g_c;
187
188static void stack_chunk_chunk(Coroutine *parent);
189static void stack_chunk_base(Coroutine *parent, unsigned char *guard);
190
191
192// Check whether the guard is intact
193static inline bool Check_Guard(
194 unsigned char *guard
195){
196 return guard[0] == 0xde &&
197 guard[1] == 0xad &&
198 guard[2] == 0xbe &&
199 guard[3] == 0xef;
200}
201
202
203static void Coroutine_PrimeStackChunks()
204{
205 unsigned char chunk_of_stack[COROUTINE_STARTUP_STACK_SIZE];
206 for (int i = 0; i < COROUTINE_STARTUP_STACK_SIZE-3; i += 4){
207 chunk_of_stack[i+0] = 0xde;
208 chunk_of_stack[i+1] = 0xad;
209 chunk_of_stack[i+2] = 0xbe;
210 chunk_of_stack[i+3] = 0xef;
211 }
212 assert(Check_Guard(chunk_of_stack));
213
214 // Stacks grow down in memory (almost always), so if the caller of this function changes
215 // the guard before entering the coroutine system, it has overrun the startup stack
216 g_c->guard = chunk_of_stack;
217
218 stack_chunk_base(NULL, NULL);
219}
220
221
222static void stack_chunk_chunk(
223 Coroutine *parent
224){
225 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
226 for (int i = 0; i < COROUTINE_STACK_SIZE-3; i += 4){
227 chunk_of_stack[i+0] = 0xde;
228 chunk_of_stack[i+1] = 0xad;
229 chunk_of_stack[i+2] = 0xbe;
230 chunk_of_stack[i+3] = 0xef;
231 }
232 stack_chunk_base(parent, chunk_of_stack);
233}
234
235
236static void stack_chunk_base(
237 Coroutine *parent,
238 unsigned char *guard
239){
240 Coroutine here;
241 here.state = Coroutine_Constructing;
242 switch (setjmp(here.buf)) {
243 case Chunk_Initial:
244 // got here for the first time
245 // parent now has a chunk_of_stack - add it to the free list
246 if (parent) {
247 assert(parent->state == Coroutine_Constructing);
248 assert(Check_Guard(guard));
249 parent->guard = guard;
250 parent->state = Coroutine_Free;
251 List_AddHead(&g_c->free, &parent->link);
252 g_c->report.coroutines_pool_size += 1;
253 }
254 // note that here is the tip of the chunk-claim stack
255 here.coroutines = g_c;
256 g_c->tip = &here;
257
258 // return to the coroutine allocator
259 longjmp(g_c->chunk_allocated, 1);
260 case Chunk_Create:
261 // request to create a new chunk on the stack
262 assert(here.state == Coroutine_Constructing);
263 stack_chunk_chunk(&here);
264 assert(false);
265 case Chunk_Enter:
266 // request to start a coroutine (ie use the chunk for a coroutine)
267 // arrive here with mutex locked
268 assert(here.state == Coroutine_Running);
269 g_c->active = &here;
270 int r = pthread_mutex_unlock(&g_c->mutex);
271 assert(r == 0);
272 here.value = here.start(here.entry_param);
273
274 // check the guard
275 if (!Check_Guard(here.guard)){
276 printf("Coroutine has overrun its stack - checked after returning from coroutine function\n");
277 exit(EXIT_FAILURE);
278 }
279
280 r = pthread_mutex_lock(&g_c->mutex);
281 assert(r == 0);
282 g_c->active = NULL;
283 assert(here.state == Coroutine_Running);
284 List_Remove(&here.link);
285 here.state = Coroutine_Complete;
286 List_AddTail(&g_c->inactive, &here.link);
287 // coroutine has completed
288 if (g_c->primary == &here) {
289 // if primary coroutine - return to Coroutine_Run
290 longjmp(g_c->controller, Coroutines_CoroutineComplete);
291 }
292 r = pthread_mutex_unlock(&g_c->mutex);
293 assert(r == 0);
294 Coroutine_RunNext();
295 assert(false);
296 }
297}
298
299
300static void Coroutine_RunNext()
301{
302 // arrive here with mutex unlocked
303 int r;
304 r = pthread_mutex_lock(&g_c->waiting_mutex);
305 assert(r == 0);
306 r = pthread_mutex_lock(&g_c->mutex);
307 assert(r == 0);
308 Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
309 assert(next->state == Coroutine_Running);
310 longjmp(next->buf, Chunk_Enter);
311 assert(false);
312}
313
314
315void Coroutine_StartSystem()
316{
317 assert(!g_c);
318 g_c = mustmalloc(sizeof(Coroutines));
319
320 g_c->state = Coroutines_Starting;
321
322 int r;
323 r = pthread_mutex_init(&g_c->mutex, NULL);
324 assert(r == 0);
325
326 g_c->tip = NULL;
327 g_c->active = NULL;
328
329 List_Init(&g_c->free);
330 List_Init(&g_c->inactive);
331 List_Init(&g_c->runable);
332 List_Init(&g_c->waiting);
333 r = pthread_mutex_init(&g_c->waiting_mutex, NULL);
334 assert(r == 0);
335 r = pthread_mutex_lock(&g_c->waiting_mutex);
336 assert(r == 0);
337
338 g_c->report.coroutines_created = 0;
339 g_c->report.coroutines_pool_size = 0;
340 g_c->report.lowest_headroom = COROUTINE_STACK_SIZE;
341
342 // prime the chunk system
343 if (!setjmp(g_c->chunk_allocated)){
344 Coroutine_PrimeStackChunks();
345 assert(false);
346 }
347
348 assert(g_c->state == Coroutines_Starting);
349 g_c->state = Coroutines_Started;
350}
351
352
353Coroutine_Report Coroutine_StopSystem()
354{
355 int r = pthread_mutex_lock(&g_c->mutex);
356 assert(r == 0);
357 assert(g_c->state == Coroutines_Started);
358 g_c->state = Coroutines_Stopping;
359
360 int stackminheadroom = COROUTINE_STACK_SIZE;
361 for (List_Link *link = g_c->free.fwd.link.next; link->next; link = link->next){
362 Coroutine *cor = List_Link_Container(Coroutine, link, link);
363 for (int i = 4; i < COROUTINE_STACK_SIZE-3; i += 4){
364 if (!Check_Guard(&cor->guard[i])){
365 stackminheadroom = i < stackminheadroom ? i : stackminheadroom;
366 break;
367 }
368 }
369 }
370 g_c->report.lowest_headroom = stackminheadroom;
371
372 assert(List_IsEmpty(&g_c->inactive));
373 r = pthread_mutex_unlock(&g_c->waiting_mutex);
374 assert(r == 0);
375 r = pthread_mutex_destroy(&g_c->waiting_mutex);
376 assert(r == 0);
377
378 assert(g_c->state == Coroutines_Stopping);
379 pthread_mutex_unlock(&g_c->mutex);
380 assert(r == 0);
381 g_c->state = Coroutines_Idle;
382 r = pthread_mutex_destroy(&g_c->mutex);
383 assert(r == 0);
384
385 Coroutine_Report res = g_c->report;
386
387 free(g_c);
388 g_c = NULL;
389
390 return res;
391}
392
393
394void Coroutine_Run_Coroutine(
395 Coroutine *cor,
396 void *value
397){
398 Coroutines *cors = cor->coroutines;
399 assert(g_c == cors);
400 int r = pthread_mutex_lock(&cors->mutex);
401 assert(r == 0);
402 assert(cors->state == Coroutines_Started);
403 cors->state = Coroutines_Active;
404 cors->primary = cor;
405
406 _Coroutine_Continue(cor, value, true);
407
408 if (!setjmp(cors->controller)){
409 pthread_mutex_unlock(&cors->mutex);
410 assert(r == 0);
411
412 // check the guard
413 if (!Check_Guard(g_c->guard)){
414 printf("Coroutine startup stack as has overrun - checked on entering the main coroutine\n");
415 exit(EXIT_FAILURE);
416 }
417
418 // start the first coroutine
419 Coroutine_RunNext();
420 }
421 // arrive here with mutex locked
422 assert(List_IsEmpty(&cors->runable));
423 assert(List_IsEmpty(&cors->waiting));
424 assert(cors->state == Coroutines_Active);
425 cors->state = Coroutines_Started;
426 pthread_mutex_unlock(&cors->mutex);
427 assert(r == 0);
428}
429
430
431void *Coroutine_Run(
432 Coroutine_Start start,
433 void *value
434){
435 Coroutine *cor = Coroutine_New(start);
436 Coroutine_Run_Coroutine(cor, value);
437 void *res = Coroutine_GetValue(cor);
438 Coroutine_Delete(cor);
439 return res;
440}
441
442
443Coroutine *Coroutine_New(
444 Coroutine_Start start
445){
446 assert((g_c->state == Coroutines_Started && List_IsEmpty(&g_c->inactive)) || g_c->state == Coroutines_Active);
447
448 // if none free - add one
449 if (List_IsEmpty(&g_c->free)){
450 if (!setjmp(g_c->chunk_allocated)){
451 longjmp(g_c->tip->buf, Chunk_Create);
452 }
453 }
454
455 Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
456 assert(cor->state == Coroutine_Free);
457 cor->state = Coroutine_Idle;
458 cor->start = start;
459 cor->value = NULL;
460 List_Remove(&cor->link);
461 List_AddHead(&g_c->inactive, &cor->link);
462
463 g_c->report.coroutines_created += 1;
464
465 return cor;
466}
467
468
469void Coroutine_Delete(
470 Coroutine *cor
471){
472 Coroutines *cors = cor->coroutines;
473 int r = pthread_mutex_lock(&cors->mutex);
474 assert(r == 0);
475 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
476 cor->state = Coroutine_Free;
477 List_Remove(&cor->link);
478 List_AddTail(&cors->free, &cor->link);
479 r = pthread_mutex_unlock(&cors->mutex);
480 assert(r == 0);
481}
482
483
484// Coroutine_Continue, assuming the mutex is claimed
485static void _Coroutine_Continue(
486 Coroutine *cor,
487 void *value,
488 bool early
489){
490 Coroutines *cors = cor->coroutines;
491 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
492 cor->entry_param = value;
493 cor->state = Coroutine_Running;
494 List_Remove(&cor->link);
495 if ( early ) {
496 List_AddHead(&cors->runable, &cor->link);
497 } else {
498 List_AddTail(&cors->runable, &cor->link);
499 }
500 int r;
501 r = pthread_mutex_unlock(&cors->waiting_mutex);
502 assert(r == 0);
503}
504
505
506void Coroutine_Continue(
507 Coroutine *cor,
508 void *value,
509 bool early
510){
511 Coroutines *cors = cor->coroutines;
512 int r = pthread_mutex_lock(&cors->mutex);
513 assert(r == 0);
514 _Coroutine_Continue(cor, value, early);
515 r = pthread_mutex_unlock(&cors->mutex);
516 assert(r == 0);
517}
518
519
520void *Coroutine_Yield(
521 void *value,
522 Coroutine_YieldCallback on_yield,
523 void *yield_me
524){
525 Coroutine *me = g_c->active;
526 if (!Check_Guard(me->guard)){
527 printf("Coroutine has overrun its stack - checked when yielding coroutine\n");
528 exit(EXIT_FAILURE);
529 }
530
531 int r = pthread_mutex_lock(&g_c->mutex);
532 assert(r == 0);
533 Coroutines *cors = me->coroutines;
534 assert(me && me->state == Coroutine_Running && cors == g_c);
535 me->value = value;
536 me->state = Coroutine_Waiting;
537
538 List_Remove(&me->link);
539 if (!List_IsEmpty(&cors->runable)){
540 r = pthread_mutex_unlock(&cors->waiting_mutex);
541 assert(r == 0);
542 }
543 List_AddTail(&cors->waiting, &me->link);
544
545 switch (setjmp(me->buf)){
546 case Chunk_Initial:
547 r = pthread_mutex_unlock(&cors->mutex);
548 assert(r == 0);
549 on_yield(yield_me);
550 Coroutine_RunNext();
551 case Chunk_Create:
552 assert(false);
553 case Chunk_Enter:
554 // arrive here with mutex locked
555 cors->active = me;
556 // when we return here - we are running again
557 assert(me->state == Coroutine_Running);
558 void *res = me->entry_param;
559 r = pthread_mutex_unlock(&cors->mutex);
560 assert(r == 0);
561 return res;
562 }
563 return NULL;
564}
565
566
567void *Coroutine_GetValue(
568 Coroutine *cor
569){
570 return cor->value;
571}
572
573
574Coroutine *Coroutine_GetActive()
575{
576 return g_c->active;
577}
578
579
580int Coroutine_GetStackHeadroom(){
581 unsigned char tbuf[4];
582 return tbuf - g_c->active->guard - 4;
583}
584
585
586bool Coroutine_HasCoroutinesInFreePool(){
587 return !List_IsEmpty(&g_c->free);
588}
589
590
591void *Coroutine_GetCStackTop(){
592 return g_c->tip;
593}
594
595
596struct Coroutine_ChainParam {
597 Coroutine_Start start;
598 void *value;
599 Coroutine *ret;
600};
601
602
603static void *Coroutine_ChainFn(
604 void *param
605){
606 struct Coroutine_ChainParam *params = (struct Coroutine_ChainParam *)param;
607 Coroutine_Continue(params->ret, params->start(params->value), true);
608 return NULL;
609}
610
611
612static void Coroutine_ChainYield(
613 void *unused
614){
615 (void)unused;
616}
617
618
619void *Coroutine_Chain(
620 Coroutine_Start start,
621 void *value
622){
623 if (!Check_Guard(Coroutine_GetActive()->guard)){
624 printf("Coroutine has overrun its stack - checked when chaining\n");
625 exit(EXIT_FAILURE);
626 }
627 Coroutine *cor = Coroutine_New(Coroutine_ChainFn);
628 struct Coroutine_ChainParam params = {
629 start,
630 value,
631 Coroutine_GetActive()
632 };
633 Coroutine_Continue(cor, &params, true);
634 void *res = Coroutine_Yield(NULL, Coroutine_ChainYield, NULL);
635 Coroutine_Delete(cor);
636 return res;
637}
638
639
640bool Coroutine_IsRunning(
641 Coroutine *cor
642)
643{
644 int state = cor->state;
645 return state == Coroutine_Running || state == Coroutine_Waiting;
646}
647
648
649bool Coroutine_IsStarted(){
650 return g_c != NULL;
651}
652