1 contributor
526 lines13.6 KB
1#include "coroutine.h"
2#include <assert.h>
3#include <setjmp.h>
4#include <stdbool.h>
5#include <stddef.h>
6#include <stdio.h>
7#include <pthread.h>
8#include <stdlib.h>
9#include "cor_thread_local.h"
10
11
12
13static void *mustmalloc(size_t size){
14 void *p = malloc(size);
15 assert(p);
16 return p;
17}
18
19#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
20#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
21
22///////////////////////////////////////////////////////////////////////////////
23// Semaphore built from mutex & condition variables...
24//
25// Using pthread.h (more widely available than the C standard library thread.h)
26///////////////////////////////////////////////////////////////////////////////
27
28typedef struct Semaphore {
29 pthread_mutex_t mutex;
30 pthread_cond_t cond;
31 int count;
32} Semaphore;
33
34static void Semaphore_ctor(Semaphore *sem, int initial_count){
35 sem->count = initial_count;
36 int r = pthread_mutex_init(&sem->mutex, NULL);
37 assert(r == 0);
38 r = pthread_cond_init(&sem->cond, NULL);
39 assert(r == 0);
40}
41
42static void Semaphore_dtor(Semaphore *sem){
43 int r = pthread_mutex_destroy(&sem->mutex);
44 assert(r == 0);
45 r = pthread_cond_destroy(&sem->cond);
46 assert(r == 0);
47}
48
49static void Semaphore_Claim(Semaphore *sem){
50 int r = pthread_mutex_lock(&sem->mutex);
51 assert(r == 0);
52 while (sem->count <= 0) {
53 r = pthread_cond_wait(&sem->cond, &sem->mutex);
54 assert(r == 0);
55 }
56 sem->count--;
57 r = pthread_mutex_unlock(&sem->mutex);
58 assert(r == 0);
59}
60
61static void Semaphore_Release(Semaphore *sem){
62 int r = pthread_mutex_lock(&sem->mutex);
63 assert(r == 0);
64 sem->count++;
65 r = pthread_mutex_unlock(&sem->mutex);
66 assert(r == 0);
67 r = pthread_cond_broadcast(&sem->cond);
68 assert(r == 0);
69}
70
71///////////////////////////////////////////////////////////////////////////////
72// ...semaphore built from mutex
73///////////////////////////////////////////////////////////////////////////////
74
75///////////////////////////////////////////////////////////////////////////////
76// 2-way linked lists...
77//
78// Brought inline here to avoid namespace polution
79///////////////////////////////////////////////////////////////////////////////
80
81typedef struct List_Link List_Link;
82struct List_Link {
83 List_Link *next;
84 List_Link *prev;
85};
86
87typedef struct List_Head List_Head;
88struct List_Head {
89 union {
90 struct {
91 List_Link link;
92 List_Link *filler;
93 } fwd;
94 struct {
95 List_Link *filler;
96 List_Link link;
97 } back;
98 };
99};
100
101static inline bool List_IsEmpty(const List_Head *list) {
102 return list->fwd.link.next == &list->back.link;
103}
104
105static inline List_Link *List_GetHead(const List_Head *list) {
106 return List_IsEmpty(list) ? NULL : list->fwd.link.next;
107}
108// static inline List_Link *List_GetTail(const List_Head *list) {
109// return List_IsEmpty(list) ? NULL : list->back.link.prev;
110// }
111#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
112#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
113
114static inline void List_Init(List_Head *list)
115{
116 list->fwd.link.next = &list->back.link;
117 list->fwd.link.prev = NULL;
118 list->back.link.prev = &list->fwd.link;
119}
120
121static inline void List_AddHead(List_Head *list, List_Link *link)
122{
123 List_Link *first = list->fwd.link.next;
124 link->next = first;
125 link->prev = &list->fwd.link;
126 first->prev = link;
127 list->fwd.link.next = link;
128}
129
130static inline void List_AddTail(List_Head *list, List_Link *link)
131{
132 List_Link *last = list->back.link.prev;
133 link->prev = last;
134 link->next = &list->back.link;
135 last->next = link;
136 list->back.link.prev = link;
137}
138
139static inline void List_Remove(List_Link *link)
140{
141 link->prev->next = link->next;
142 link->next->prev = link->prev;
143}
144
145///////////////////////////////////////////////////////////////////////////////
146// ...2-way linked lists
147///////////////////////////////////////////////////////////////////////////////
148
149typedef struct Coroutines Coroutines;
150
151enum {
152 Coroutines_Idle,
153 Coroutines_Starting,
154 Coroutines_Started,
155 Coroutines_Active,
156 Coroutines_Stopping
157};
158
159enum {
160 Chunk_Initial,
161 Chunk_Create,
162 Chunk_Enter
163};
164
165enum {
166 Coroutine_Constructing,
167 Coroutine_Free,
168 Coroutine_Idle,
169 Coroutine_Running,
170 Coroutine_Waiting,
171 Coroutine_Complete
172};
173
174enum {
175 Coroutines_Init,
176 Coroutines_AllocatedChunk,
177 Coroutines_CoroutineComplete,
178};
179
180struct Coroutine {
181 Coroutines *coroutines;
182 List_Link link;
183 jmp_buf buf;
184 void *this;
185 Coroutine_Start start;
186 void *entry_param;
187 void *value;
188 char state;
189 char action;
190};
191
192struct Coroutines {
193 pthread_mutex_t mutex;
194 jmp_buf controller;
195 jmp_buf chunk_allocated;
196
197 // singletons
198 Coroutine *tip; // top of stack chunk
199 Coroutine *active; // currently running coroutine
200 Coroutine *primary; // Coroutine_Run coroutine
201
202 // lists
203 List_Head free;
204 List_Head inactive; // idle or complete
205 List_Head runable; // running or waiting to run
206 List_Head waiting; // yielded / waiting to run
207 Semaphore waiting_sem;
208
209 // state
210 char state;
211};
212
213_Cor_thread_local Coroutines *g_c;
214
215static void stack_chunk_chunk(Coroutine *parent);
216static void stack_chunk_base(Coroutine *parent);
217
218
219static void Coroutine_PrimeStackChunks()
220{
221 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
222 chunk_of_stack[0] = 0xde;
223 chunk_of_stack[1] = 0xad;
224 chunk_of_stack[2] = 0xbe;
225 chunk_of_stack[3] = 0xef;
226 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
227 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
228 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
229 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
230 stack_chunk_base(NULL);
231}
232
233
234static void stack_chunk_chunk(
235 Coroutine *parent
236){
237 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
238 chunk_of_stack[0] = 0xde;
239 chunk_of_stack[1] = 0xad;
240 chunk_of_stack[2] = 0xbe;
241 chunk_of_stack[3] = 0xef;
242 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
243 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
244 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
245 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
246 stack_chunk_base(parent);
247}
248
249
250static void Coroutine_RunNext()
251{
252 // arrvie here with mutex unlocked
253 Semaphore_Claim(&g_c->waiting_sem);
254 int r = pthread_mutex_lock(&g_c->mutex);
255 assert(r == 0);
256 assert(!List_IsEmpty(&g_c->runable));
257 Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
258 assert(next->state == Coroutine_Running);
259 longjmp(next->buf, Chunk_Enter);
260 assert(false);
261}
262
263
264static void stack_chunk_base(
265 Coroutine *parent
266){
267 Coroutine here;
268 here.state = Coroutine_Constructing;
269 switch (setjmp(here.buf)) {
270 case Chunk_Initial:
271 // got here for the first time
272 // parent now has a chunk_of_stack - add it to the free list
273 if (parent) {
274 assert(parent->state == Coroutine_Constructing);
275 parent->state = Coroutine_Free;
276 List_AddHead(&g_c->free, &parent->link);
277 }
278 // note that here is the tip of the chunk-claim stack
279 here.coroutines = g_c;
280 g_c->tip = &here;
281
282 // return to the coroutine allocator
283 longjmp(g_c->chunk_allocated, 1);
284 case Chunk_Create:
285 // request to create a new chunk on the stack
286 assert(here.state == Coroutine_Constructing);
287 stack_chunk_chunk(&here);
288 assert(false);
289 case Chunk_Enter:
290 // request to start a coroutine (ie use the chunk for a coroutine)
291 // arrive here with mutex locked
292 assert(here.state == Coroutine_Running);
293 g_c->active = &here;
294 int r = pthread_mutex_unlock(&g_c->mutex);
295 assert(r == 0);
296 here.value = here.start(here.entry_param);
297 r = pthread_mutex_lock(&g_c->mutex);
298 assert(r == 0);
299 g_c->active = NULL;
300 assert(here.state == Coroutine_Running);
301 List_Remove(&here.link);
302 here.state = Coroutine_Complete;
303 List_AddTail(&g_c->inactive, &here.link);
304 // coroutine has completed
305 if (g_c->primary == &here) {
306 // if primary coroutine - return to Coroutine_Run
307 longjmp(g_c->controller, Coroutines_CoroutineComplete);
308 }
309 r = pthread_mutex_unlock(&g_c->mutex);
310 assert(r == 0);
311 Coroutine_RunNext();
312 assert(false);
313 }
314}
315
316
317void Coroutine_StartSystem()
318{
319 assert(!g_c);
320 g_c = mustmalloc(sizeof(Coroutines));
321
322 g_c->state = Coroutines_Starting;
323
324 pthread_mutexattr_t attr;
325 int r = pthread_mutexattr_init(&attr);
326 assert(r == 0);
327 r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
328 assert(r == 0);
329 r = pthread_mutex_init(&g_c->mutex, &attr);
330 assert(r == 0);
331 r = pthread_mutexattr_destroy(&attr);
332 assert(r == 0);
333
334 g_c->tip = NULL;
335 g_c->active = NULL;
336
337 List_Init(&g_c->free);
338 List_Init(&g_c->inactive);
339 List_Init(&g_c->runable);
340 List_Init(&g_c->waiting);
341 Semaphore_ctor(&g_c->waiting_sem, 0);
342
343 // prime the chunk system
344 if (!setjmp(g_c->chunk_allocated)){
345 Coroutine_PrimeStackChunks();
346 assert(false);
347 }
348
349 assert(g_c->state == Coroutines_Starting);
350 g_c->state = Coroutines_Started;
351}
352
353
354void Coroutine_StopSystem()
355{
356 int r = pthread_mutex_lock(&g_c->mutex);
357 assert(r == 0);
358 assert(g_c->state == Coroutines_Started);
359 g_c->state = Coroutines_Stopping;
360
361 assert(List_IsEmpty(&g_c->inactive));
362 Semaphore_dtor(&g_c->waiting_sem);
363
364 assert(g_c->state == Coroutines_Stopping);
365 pthread_mutex_unlock(&g_c->mutex);
366 assert(r == 0);
367 g_c->state = Coroutines_Idle;
368 r = pthread_mutex_destroy(&g_c->mutex);
369 assert(r == 0);
370 free(g_c);
371 g_c = NULL;
372}
373
374
375void *Coroutine_Run(
376 Coroutine_Start start,
377 void *value
378){
379 Coroutine *cor = Coroutine_New(start);
380 Coroutines *cors = cor->coroutines;
381 assert(g_c == cors);
382 int r = pthread_mutex_lock(&cors->mutex);
383 assert(r == 0);
384 assert(cors->state == Coroutines_Started);
385 cors->state = Coroutines_Active;
386 cors->primary = cor;
387 Coroutine_Continue(cors->primary, value, true);
388
389 if (!setjmp(cors->controller)){
390 pthread_mutex_unlock(&cors->mutex);
391 assert(r == 0);
392 // start the first coroutine
393 Coroutine_RunNext();
394 }
395 // arrive here with mutex locked
396 assert(List_IsEmpty(&cors->runable));
397 assert(List_IsEmpty(&cors->waiting));
398 assert(cors->state == Coroutines_Active);
399 cors->state = Coroutines_Started;
400 pthread_mutex_unlock(&cors->mutex);
401 assert(r == 0);
402 void *res = Coroutine_GetValue(cor);
403 Coroutine_Delete(cor);
404 return res;
405}
406
407
408Coroutine *Coroutine_New(
409 Coroutine_Start start
410){
411 assert(g_c->state == Coroutines_Started || g_c->state == Coroutines_Active);
412
413 // if none free - add one
414 if (List_IsEmpty(&g_c->free)){
415 if (!setjmp(g_c->chunk_allocated)){
416 longjmp(g_c->tip->buf, Chunk_Create);
417 }
418 }
419
420 Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
421 assert(cor->state == Coroutine_Free);
422 cor->state = Coroutine_Idle;
423 cor->start = start;
424 cor->value = NULL;
425 List_Remove(&cor->link);
426 List_AddHead(&g_c->inactive, &cor->link);
427
428 return cor;
429}
430
431
432void Coroutine_Delete(
433 Coroutine *cor
434){
435 Coroutines *cors = cor->coroutines;
436 int r = pthread_mutex_lock(&cors->mutex);
437 assert(r == 0);
438 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
439 cor->state = Coroutine_Free;
440 List_Remove(&cor->link);
441 List_AddTail(&cors->free, &cor->link);
442 r = pthread_mutex_unlock(&cors->mutex);
443 assert(r == 0);
444}
445
446
447void Coroutine_Continue(
448 Coroutine *cor,
449 void *value,
450 bool early
451){
452 Coroutines *cors = cor->coroutines;
453 int r = pthread_mutex_lock(&cors->mutex);
454 assert(r == 0);
455 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
456 cor->entry_param = value;
457 cor->state = Coroutine_Running;
458 List_Remove(&cor->link);
459 if ( early ) {
460 List_AddHead(&cors->runable, &cor->link);
461 } else {
462 List_AddTail(&cors->runable, &cor->link);
463 }
464 r = pthread_mutex_unlock(&cors->mutex);
465 assert(r == 0);
466 Semaphore_Release(&cors->waiting_sem);
467}
468
469
470void *Coroutine_Yield(
471 void *value,
472 Coroutine_YieldCallback on_yield,
473 void *this
474){
475 int r = pthread_mutex_lock(&g_c->mutex);
476 assert(r == 0);
477 Coroutine *me = g_c->active;
478 Coroutines *cors = me->coroutines;
479 assert(me && me->state == Coroutine_Running && cors == g_c);
480 me->value = value;
481 me->state = Coroutine_Waiting;
482 List_Remove(&me->link);
483 List_AddTail(&cors->waiting, &me->link);
484 switch (setjmp(me->buf)){
485 case Chunk_Initial:
486 r = pthread_mutex_unlock(&cors->mutex);
487 assert(r == 0);
488 on_yield(this);
489 Coroutine_RunNext();
490 case Chunk_Create:
491 assert(false);
492 case Chunk_Enter:
493 // arrive here with mutex locked
494 cors->active = me;
495 // when we return here - we are running again
496 assert(me->state == Coroutine_Running);
497 void *res = me->entry_param;
498 r = pthread_mutex_unlock(&cors->mutex);
499 assert(r == 0);
500 return res;
501 }
502 return NULL;
503}
504
505
506void *Coroutine_GetValue(
507 Coroutine *cor
508){
509 return cor->value;
510}
511
512
513Coroutine *Coroutine_GetActive()
514{
515 return g_c->active;
516}
517
518
519bool Coroutine_IsRunning(
520 Coroutine *cor
521)
522{
523 int state = cor->state;
524 return state == Coroutine_Running || state == Coroutine_Waiting;
525}
526