1 contributor
634 lines15.8 KB
1#include "coroutine.h"
2#include <assert.h>
3#include <setjmp.h>
4#include <stdbool.h>
5#include <stddef.h>
6#include <stdio.h>
7#include <pthread.h>
8#include <stdlib.h>
9#include "cor_thread_local.h"
10
11
12static void *mustmalloc(size_t size){
13 void *p = malloc(size);
14 assert(p);
15 return p;
16}
17
18#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
19#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
20static void Coroutine_RunNext();
21
22///////////////////////////////////////////////////////////////////////////////
23// 2-way linked lists...
24//
25// Brought inline here to avoid namespace polution
26///////////////////////////////////////////////////////////////////////////////
27
28typedef struct List_Link List_Link;
29struct List_Link {
30 List_Link *next;
31 List_Link *prev;
32};
33
34typedef struct List_Head List_Head;
35struct List_Head {
36 union {
37 struct {
38 List_Link link;
39 List_Link *filler;
40 } fwd;
41 struct {
42 List_Link *filler;
43 List_Link link;
44 } back;
45 };
46};
47
48
49static inline bool List_IsEmpty(
50 const List_Head *list
51){
52 return list->fwd.link.next == &list->back.link;
53}
54
55
56static inline List_Link *List_GetHead(
57 const List_Head *list
58){
59 return List_IsEmpty(list) ? NULL : list->fwd.link.next;
60}
61
62
63// static inline List_Link *List_GetTail(
64// const List_Head *list
65// ){
66// return List_IsEmpty(list) ? NULL : list->back.link.prev;
67// }
68
69
70#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
71#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
72
73
74static inline void List_Init(
75 List_Head *list
76){
77 list->fwd.link.next = &list->back.link;
78 list->fwd.link.prev = NULL;
79 list->back.link.prev = &list->fwd.link;
80}
81
82
83static inline void List_AddHead(
84 List_Head *list,
85 List_Link *link
86){
87 List_Link *first = list->fwd.link.next;
88 link->next = first;
89 link->prev = &list->fwd.link;
90 first->prev = link;
91 list->fwd.link.next = link;
92}
93
94
95static inline void List_AddTail(
96 List_Head *list,
97 List_Link *link
98){
99 List_Link *last = list->back.link.prev;
100 link->prev = last;
101 link->next = &list->back.link;
102 last->next = link;
103 list->back.link.prev = link;
104}
105
106
107static inline void List_Remove(
108 List_Link *link
109){
110 link->prev->next = link->next;
111 link->next->prev = link->prev;
112}
113
114///////////////////////////////////////////////////////////////////////////////
115// ...2-way linked lists
116///////////////////////////////////////////////////////////////////////////////
117
118typedef struct Coroutines Coroutines;
119
120enum {
121 Coroutines_Idle,
122 Coroutines_Starting,
123 Coroutines_Started,
124 Coroutines_Active,
125 Coroutines_Stopping
126};
127
128enum {
129 Chunk_Initial,
130 Chunk_Create,
131 Chunk_Enter
132};
133
134typedef enum Coroutine_State {
135 Coroutine_Constructing,
136 Coroutine_Free,
137 Coroutine_Idle,
138 Coroutine_Running,
139 Coroutine_Waiting,
140 Coroutine_Complete
141} Coroutine_State;
142
143enum {
144 Coroutines_Init,
145 Coroutines_AllocatedChunk,
146 Coroutines_CoroutineComplete,
147};
148
149struct Coroutine {
150 Coroutines *coroutines; // so can work with it off-thread
151 List_Link link; // for whichever list it's on
152 jmp_buf buf; // how to get back to it
153 unsigned char *guard; // where the stack overrun guard is
154 Coroutine_Start start; // entry point
155 void *entry_param; // to pass to start
156 void *value; // yielded/returned
157 Coroutine_State state;
158};
159
160struct Coroutines {
161 pthread_mutex_t mutex;
162 jmp_buf controller; // to return from Coroutine_Run
163 jmp_buf chunk_allocated;// for chunk allocation
164 unsigned char *guard; // the stack guard for the startup sequence
165
166 // singletons
167 Coroutine *tip; // top of stack chunk
168 Coroutine *active; // currently running coroutine
169 Coroutine *primary; // Coroutine_Run coroutine
170
171 // lists
172 List_Head free;
173 List_Head inactive; // idle or complete
174 List_Head runable; // running or waiting to run
175 List_Head waiting; // yielded / waiting to run
176 pthread_cond_t waiting_cond;
177
178 // Summary of the system
179 Coroutine_Report report;
180
181 // state
182 char state;
183};
184
185_Cor_thread_local Coroutines *g_c;
186
187static void stack_chunk_chunk(Coroutine *parent);
188static void stack_chunk_base(Coroutine *parent, unsigned char *guard);
189
190
191// Check whether the guard is intact
192static inline bool Check_Guard(
193 unsigned char *guard
194){
195 return guard[0] == 0xde &&
196 guard[1] == 0xad &&
197 guard[2] == 0xbe &&
198 guard[3] == 0xef;
199}
200
201
202static void Coroutine_PrimeStackChunks()
203{
204 unsigned char chunk_of_stack[COROUTINE_STARTUP_STACK_SIZE];
205 for (int i = 0; i < COROUTINE_STARTUP_STACK_SIZE-3; i += 4){
206 chunk_of_stack[i+0] = 0xde;
207 chunk_of_stack[i+1] = 0xad;
208 chunk_of_stack[i+2] = 0xbe;
209 chunk_of_stack[i+3] = 0xef;
210 }
211 assert(Check_Guard(chunk_of_stack));
212
213 // Stacks grow down in memory (almost always), so if the caller of this function changes
214 // the guard before entering the coroutine system, it has overrun the startup stack
215 g_c->guard = chunk_of_stack;
216
217 stack_chunk_base(NULL, NULL);
218}
219
220
221static void stack_chunk_chunk(
222 Coroutine *parent
223){
224 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
225 for (int i = 0; i < COROUTINE_STACK_SIZE-3; i += 4){
226 chunk_of_stack[i+0] = 0xde;
227 chunk_of_stack[i+1] = 0xad;
228 chunk_of_stack[i+2] = 0xbe;
229 chunk_of_stack[i+3] = 0xef;
230 }
231 stack_chunk_base(parent, chunk_of_stack);
232}
233
234
235static void stack_chunk_base(
236 Coroutine *parent,
237 unsigned char *guard
238){
239 Coroutine here;
240 here.state = Coroutine_Constructing;
241 switch (setjmp(here.buf)) {
242 case Chunk_Initial:
243 // got here for the first time
244 // parent now has a chunk_of_stack - add it to the free list
245 if (parent) {
246 assert(parent->state == Coroutine_Constructing);
247 assert(Check_Guard(guard));
248 parent->guard = guard;
249 parent->state = Coroutine_Free;
250 List_AddHead(&g_c->free, &parent->link);
251 g_c->report.coroutines_pool_size += 1;
252 }
253 // note that here is the tip of the chunk-claim stack
254 here.coroutines = g_c;
255 g_c->tip = &here;
256
257 // return to the coroutine allocator
258 longjmp(g_c->chunk_allocated, 1);
259 case Chunk_Create:
260 // request to create a new chunk on the stack
261 assert(here.state == Coroutine_Constructing);
262 stack_chunk_chunk(&here);
263 assert(false);
264 case Chunk_Enter:
265 // request to start a coroutine (ie use the chunk for a coroutine)
266 // arrive here with mutex locked
267 assert(here.state == Coroutine_Running);
268 g_c->active = &here;
269 int r = pthread_mutex_unlock(&g_c->mutex);
270 assert(r == 0);
271 here.value = here.start(here.entry_param);
272
273 // check the guard
274 if (!Check_Guard(here.guard)){
275 printf("Coroutine has overrun its stack - checked after returning from coroutine function\n");
276 exit(EXIT_FAILURE);
277 }
278
279 r = pthread_mutex_lock(&g_c->mutex);
280 assert(r == 0);
281 g_c->active = NULL;
282 assert(here.state == Coroutine_Running);
283 List_Remove(&here.link);
284 here.state = Coroutine_Complete;
285 List_AddTail(&g_c->inactive, &here.link);
286 // coroutine has completed
287 if (g_c->primary == &here) {
288 // if primary coroutine - return to Coroutine_Run
289 longjmp(g_c->controller, Coroutines_CoroutineComplete);
290 }
291 r = pthread_mutex_unlock(&g_c->mutex);
292 assert(r == 0);
293 Coroutine_RunNext();
294 assert(false);
295 }
296}
297
298
299static void Coroutine_RunNext()
300{
301 // arrive here with mutex unlocked
302 int r;
303
304 r = pthread_mutex_lock(&g_c->mutex);
305 assert(r == 0);
306 while (List_IsEmpty(&g_c->runable)) {
307 r = pthread_cond_wait(&g_c->waiting_cond, &g_c->mutex);
308 assert(r == 0);
309 }
310 Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
311 assert(next->state == Coroutine_Running);
312 longjmp(next->buf, Chunk_Enter);
313 assert(false);
314}
315
316
317void Coroutine_StartSystem()
318{
319 assert(!g_c);
320 g_c = mustmalloc(sizeof(Coroutines));
321
322 g_c->state = Coroutines_Starting;
323
324 pthread_mutexattr_t attr;
325 int r = pthread_mutexattr_init(&attr);
326 assert(r == 0);
327 r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
328 assert(r == 0);
329 r = pthread_mutex_init(&g_c->mutex, &attr);
330 assert(r == 0);
331 r = pthread_mutexattr_destroy(&attr);
332 assert(r == 0);
333
334 g_c->tip = NULL;
335 g_c->active = NULL;
336
337 List_Init(&g_c->free);
338 List_Init(&g_c->inactive);
339 List_Init(&g_c->runable);
340 List_Init(&g_c->waiting);
341 r = pthread_cond_init(&g_c->waiting_cond, NULL);
342 assert(r == 0);
343
344 g_c->report.coroutines_created = 0;
345 g_c->report.coroutines_pool_size = 0;
346 g_c->report.lowest_headroom = COROUTINE_STACK_SIZE;
347
348 // prime the chunk system
349 if (!setjmp(g_c->chunk_allocated)){
350 Coroutine_PrimeStackChunks();
351 assert(false);
352 }
353
354 assert(g_c->state == Coroutines_Starting);
355 g_c->state = Coroutines_Started;
356}
357
358
359Coroutine_Report Coroutine_StopSystem()
360{
361 int r = pthread_mutex_lock(&g_c->mutex);
362 assert(r == 0);
363 assert(g_c->state == Coroutines_Started);
364 g_c->state = Coroutines_Stopping;
365
366 int stackminheadroom = COROUTINE_STACK_SIZE;
367 for (List_Link *link = g_c->free.fwd.link.next; link->next; link = link->next){
368 Coroutine *cor = List_Link_Container(Coroutine, link, link);
369 for (int i = 4; i < COROUTINE_STACK_SIZE-3; i += 4){
370 if (!Check_Guard(&cor->guard[i])){
371 stackminheadroom = i < stackminheadroom ? i : stackminheadroom;
372 break;
373 }
374 }
375 }
376 g_c->report.lowest_headroom = stackminheadroom;
377
378 assert(List_IsEmpty(&g_c->inactive));
379 r = pthread_cond_destroy(&g_c->waiting_cond);
380 assert(r == 0);
381
382 assert(g_c->state == Coroutines_Stopping);
383 pthread_mutex_unlock(&g_c->mutex);
384 assert(r == 0);
385 g_c->state = Coroutines_Idle;
386 r = pthread_mutex_destroy(&g_c->mutex);
387 assert(r == 0);
388
389 Coroutine_Report res = g_c->report;
390
391 free(g_c);
392 g_c = NULL;
393
394 return res;
395}
396
397
398void Coroutine_Run_Coroutine(
399 Coroutine *cor,
400 void *value
401){
402 Coroutines *cors = cor->coroutines;
403 assert(g_c == cors);
404 int r = pthread_mutex_lock(&cors->mutex);
405 assert(r == 0);
406 assert(cors->state == Coroutines_Started);
407 cors->state = Coroutines_Active;
408 cors->primary = cor;
409
410 Coroutine_Continue(cor, value, true);
411
412 if (!setjmp(cors->controller)){
413 pthread_mutex_unlock(&cors->mutex);
414 assert(r == 0);
415
416 // check the guard
417 if (!Check_Guard(g_c->guard)){
418 printf("Coroutine startup stack as has overrun - checked on entering the main coroutine\n");
419 exit(EXIT_FAILURE);
420 }
421
422 // start the first coroutine
423 Coroutine_RunNext();
424 }
425 // arrive here with mutex locked
426 assert(List_IsEmpty(&cors->runable));
427 assert(List_IsEmpty(&cors->waiting));
428 assert(cors->state == Coroutines_Active);
429 cors->state = Coroutines_Started;
430 pthread_mutex_unlock(&cors->mutex);
431 assert(r == 0);
432}
433
434
435void *Coroutine_Run(
436 Coroutine_Start start,
437 void *value
438){
439 Coroutine *cor = Coroutine_New(start);
440 Coroutine_Run_Coroutine(cor, value);
441 void *res = Coroutine_GetValue(cor);
442 Coroutine_Delete(cor);
443 return res;
444}
445
446
447Coroutine *Coroutine_New(
448 Coroutine_Start start
449){
450 assert((g_c->state == Coroutines_Started && List_IsEmpty(&g_c->inactive)) || g_c->state == Coroutines_Active);
451
452 // if none free - add one
453 if (List_IsEmpty(&g_c->free)){
454 if (!setjmp(g_c->chunk_allocated)){
455 longjmp(g_c->tip->buf, Chunk_Create);
456 }
457 }
458
459 Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
460 assert(cor->state == Coroutine_Free);
461 cor->state = Coroutine_Idle;
462 cor->start = start;
463 cor->value = NULL;
464 List_Remove(&cor->link);
465 List_AddHead(&g_c->inactive, &cor->link);
466
467 g_c->report.coroutines_created += 1;
468
469 return cor;
470}
471
472
473void Coroutine_Delete(
474 Coroutine *cor
475){
476 Coroutines *cors = cor->coroutines;
477 int r = pthread_mutex_lock(&cors->mutex);
478 assert(r == 0);
479 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
480 cor->state = Coroutine_Free;
481 List_Remove(&cor->link);
482 List_AddTail(&cors->free, &cor->link);
483 r = pthread_mutex_unlock(&cors->mutex);
484 assert(r == 0);
485}
486
487
488void Coroutine_Continue(
489 Coroutine *cor,
490 void *value,
491 bool early
492){
493 Coroutines *cors = cor->coroutines;
494 int r = pthread_mutex_lock(&cors->mutex);
495 assert(r == 0);
496 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
497 cor->entry_param = value;
498 cor->state = Coroutine_Running;
499 List_Remove(&cor->link);
500 if ( early ) {
501 List_AddHead(&cors->runable, &cor->link);
502 } else {
503 List_AddTail(&cors->runable, &cor->link);
504 }
505 r = pthread_cond_signal(&cors->waiting_cond);
506 r = pthread_mutex_unlock(&cors->mutex);
507 assert(r == 0);
508}
509
510
511void *Coroutine_Yield(
512 void *value,
513 Coroutine_YieldCallback on_yield,
514 void *yield_me
515){
516 Coroutine *me = g_c->active;
517 if (!Check_Guard(me->guard)){
518 printf("Coroutine has overrun its stack - checked when yielding coroutine\n");
519 exit(EXIT_FAILURE);
520 }
521
522 int r = pthread_mutex_lock(&g_c->mutex);
523 assert(r == 0);
524 Coroutines *cors = me->coroutines;
525 assert(me && me->state == Coroutine_Running && cors == g_c);
526 me->value = value;
527 me->state = Coroutine_Waiting;
528
529 List_Remove(&me->link);
530 List_AddTail(&cors->waiting, &me->link);
531
532 switch (setjmp(me->buf)){
533 case Chunk_Initial:
534 r = pthread_mutex_unlock(&cors->mutex);
535 assert(r == 0);
536 on_yield(yield_me);
537 Coroutine_RunNext();
538 case Chunk_Create:
539 assert(false);
540 case Chunk_Enter:
541 // arrive here with mutex locked
542 cors->active = me;
543 // when we return here - we are running again
544 assert(me->state == Coroutine_Running);
545 void *res = me->entry_param;
546 r = pthread_mutex_unlock(&cors->mutex);
547 assert(r == 0);
548 return res;
549 }
550 return NULL;
551}
552
553
554void *Coroutine_GetValue(
555 Coroutine *cor
556){
557 return cor->value;
558}
559
560
561Coroutine *Coroutine_GetActive()
562{
563 return g_c->active;
564}
565
566
567int Coroutine_GetStackHeadroom(){
568 unsigned char tbuf[4];
569 return tbuf - g_c->active->guard - 4;
570}
571
572
573bool Coroutine_HasCoroutinesInFreePool(){
574 return !List_IsEmpty(&g_c->free);
575}
576
577
578void *Coroutine_GetCStackTop(){
579 return g_c->tip;
580}
581
582
583struct Coroutine_ChainParam {
584 Coroutine_Start start;
585 void *value;
586 Coroutine *ret;
587};
588
589
590static void *Coroutine_ChainFn(
591 void *param
592){
593 struct Coroutine_ChainParam *params = (struct Coroutine_ChainParam *)param;
594 Coroutine_Continue(params->ret, params->start(params->value), true);
595 return NULL;
596}
597
598
599static void Coroutine_ChainYield(
600 void *unused
601){
602 (void)unused;
603}
604
605
606void *Coroutine_Chain(
607 Coroutine_Start start,
608 void *value
609){
610 if (!Check_Guard(Coroutine_GetActive()->guard)){
611 printf("Coroutine has overrun its stack - checked when chaining\n");
612 exit(EXIT_FAILURE);
613 }
614 Coroutine *cor = Coroutine_New(Coroutine_ChainFn);
615 struct Coroutine_ChainParam params = {
616 start,
617 value,
618 Coroutine_GetActive()
619 };
620 Coroutine_Continue(cor, &params, true);
621 void *res = Coroutine_Yield(NULL, Coroutine_ChainYield, NULL);
622 Coroutine_Delete(cor);
623 return res;
624}
625
626
627bool Coroutine_IsRunning(
628 Coroutine *cor
629)
630{
631 int state = cor->state;
632 return state == Coroutine_Running || state == Coroutine_Waiting;
633}
634