1 contributor
523 lines13.5 KB
1#include "coroutine.h"
2#include <assert.h>
3#include <setjmp.h>
4#include <stdbool.h>
5#include <stddef.h>
6#include <stdio.h>
7#include <pthread.h>
8#include <stdlib.h>
9#include "cor_thread_local.h"
10
11
12
13static void *mustmalloc(size_t size){
14 void *p = malloc(size);
15 assert(p);
16 return p;
17}
18
19#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
20#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
21
22///////////////////////////////////////////////////////////////////////////////
23// Semaphore built from mutex & condition variables...
24//
25// Using pthread.h (more widely available than the C standard library thread.h)
26///////////////////////////////////////////////////////////////////////////////
27
28typedef struct Semaphore {
29 pthread_mutex_t mutex;
30 pthread_cond_t cond;
31 int count;
32} Semaphore;
33
34static void Semaphore_ctor(Semaphore *sem, int initial_count){
35 sem->count = initial_count;
36 int r = pthread_mutex_init(&sem->mutex, NULL);
37 assert(r == 0);
38 r = pthread_cond_init(&sem->cond, NULL);
39 assert(r == 0);
40}
41
42static void Semaphore_dtor(Semaphore *sem){
43 int r = pthread_mutex_destroy(&sem->mutex);
44 assert(r == 0);
45 r = pthread_cond_destroy(&sem->cond);
46 assert(r == 0);
47}
48
49static void Semaphore_Claim(Semaphore *sem){
50 int r = pthread_mutex_lock(&sem->mutex);
51 assert(r == 0);
52 while (sem->count <= 0) {
53 r = pthread_cond_wait(&sem->cond, &sem->mutex);
54 assert(r == 0);
55 }
56 sem->count--;
57 r = pthread_mutex_unlock(&sem->mutex);
58 assert(r == 0);
59}
60
61static void Semaphore_Release(Semaphore *sem){
62 int r = pthread_mutex_lock(&sem->mutex);
63 assert(r == 0);
64 sem->count++;
65 r = pthread_cond_broadcast(&sem->cond);
66 assert(r == 0);
67 r = pthread_mutex_unlock(&sem->mutex);
68 assert(r == 0);
69}
70
71///////////////////////////////////////////////////////////////////////////////
72// ...semaphore built from mutex
73///////////////////////////////////////////////////////////////////////////////
74
75///////////////////////////////////////////////////////////////////////////////
76// 2-way linked lists...
77//
78// Brought inline here to avoid namespace polution
79///////////////////////////////////////////////////////////////////////////////
80
81typedef struct List_Link List_Link;
82struct List_Link {
83 List_Link *next;
84 List_Link *prev;
85};
86
87typedef struct List_Head List_Head;
88struct List_Head {
89 union {
90 struct {
91 List_Link link;
92 List_Link *filler;
93 } fwd;
94 struct {
95 List_Link *filler;
96 List_Link link;
97 } back;
98 };
99};
100
101static inline bool List_IsEmpty(const List_Head *list) {
102 return list->fwd.link.next == &list->back.link;
103}
104
105static inline List_Link *List_GetHead(const List_Head *list) {
106 return List_IsEmpty(list) ? NULL : list->fwd.link.next;
107}
108static inline List_Link *List_GetTail(const List_Head *list) {
109 return List_IsEmpty(list) ? NULL : list->back.link.prev;
110}
111#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
112#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
113
114static inline void List_Init(List_Head *list)
115{
116 list->fwd.link.next = &list->back.link;
117 list->fwd.link.prev = NULL;
118 list->back.link.prev = &list->fwd.link;
119}
120
121static inline void List_AddHead(List_Head *list, List_Link *link)
122{
123 List_Link *first = list->fwd.link.next;
124 link->next = first;
125 link->prev = &list->fwd.link;
126 first->prev = link;
127 list->fwd.link.next = link;
128}
129
130static inline void List_AddTail(List_Head *list, List_Link *link)
131{
132 List_Link *last = list->back.link.prev;
133 link->prev = last;
134 link->next = &list->back.link;
135 last->next = link;
136 list->back.link.prev = link;
137}
138
139static inline void List_Remove(List_Link *link)
140{
141 link->prev->next = link->next;
142 link->next->prev = link->prev;
143}
144
145///////////////////////////////////////////////////////////////////////////////
146// ...2-way linked lists
147///////////////////////////////////////////////////////////////////////////////
148
149typedef struct Coroutines Coroutines;
150
151enum {
152 Coroutines_Idle,
153 Coroutines_Starting,
154 Coroutines_Started,
155 Coroutines_Active,
156 Coroutines_Stopping
157};
158
159enum {
160 Chunk_Initial,
161 Chunk_Create,
162 Chunk_Enter
163};
164
165enum {
166 Coroutine_Constructing,
167 Coroutine_Free,
168 Coroutine_Idle,
169 Coroutine_Running,
170 Coroutine_Waiting,
171 Coroutine_Complete
172};
173
174enum {
175 Coroutines_Init,
176 Coroutines_AllocatedChunk,
177 Coroutines_CoroutineComplete,
178};
179
180struct Coroutine {
181 Coroutines *coroutines;
182 List_Link link;
183 jmp_buf buf;
184 void *this;
185 Coroutine_Start start;
186 void *entry_param;
187 void *value;
188 char state;
189 char action;
190};
191
192struct Coroutines {
193 pthread_mutex_t mutex;
194 jmp_buf controller;
195 jmp_buf chunk_allocated;
196
197 // singletons
198 Coroutine *tip; // top of stack chunk
199 Coroutine *active; // currently running coroutine
200 Coroutine *primary; // Coroutine_Run coroutine
201
202 // lists
203 List_Head free;
204 List_Head inactive; // idle or complete
205 List_Head runable; // running or waiting to run
206 List_Head waiting; // yielded / waiting to run
207 Semaphore waiting_sem;
208
209 // state
210 char state;
211};
212
213_Cor_thread_local Coroutines *g_c;
214
215static void stack_chunk_chunk(Coroutine *parent);
216static void stack_chunk_base(Coroutine *parent);
217
218
219static void Coroutine_PrimeStackChunks()
220{
221 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
222 chunk_of_stack[0] = 0xde;
223 chunk_of_stack[1] = 0xad;
224 chunk_of_stack[2] = 0xbe;
225 chunk_of_stack[3] = 0xef;
226 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
227 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
228 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
229 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
230 stack_chunk_base(NULL);
231}
232
233
234static void stack_chunk_chunk(
235 Coroutine *parent
236){
237 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
238 chunk_of_stack[0] = 0xde;
239 chunk_of_stack[1] = 0xad;
240 chunk_of_stack[2] = 0xbe;
241 chunk_of_stack[3] = 0xef;
242 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
243 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
244 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
245 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
246 stack_chunk_base(parent);
247}
248
249
250static void Coroutine_RunNext()
251{
252 // arrvie here with mutex unlocked
253 Semaphore_Claim(&g_c->waiting_sem);
254 int r = pthread_mutex_lock(&g_c->mutex);
255 assert(r == 0);
256 assert(!List_IsEmpty(&g_c->runable));
257 Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
258 assert(next->state == Coroutine_Running);
259 longjmp(next->buf, Chunk_Enter);
260 assert(false);
261}
262
263
264static void stack_chunk_base(
265 Coroutine *parent
266){
267 Coroutine here;
268 here.state = Coroutine_Constructing;
269 switch (setjmp(here.buf)) {
270 case Chunk_Initial:
271 // got here for the first time
272 // parent now has a chunk_of_stack - add it to the free list
273 if (parent) {
274 assert(parent->state == Coroutine_Constructing);
275 parent->state = Coroutine_Free;
276 List_AddHead(&g_c->free, &parent->link);
277 }
278 // note that here is the tip of the chunk-claim stack
279 here.coroutines = g_c;
280 g_c->tip = &here;
281
282 // return to the coroutine allocator
283 longjmp(g_c->chunk_allocated, 1);
284 case Chunk_Create:
285 // request to create a new chunk on the stack
286 assert(here.state == Coroutine_Constructing);
287 stack_chunk_chunk(&here);
288 assert(false);
289 case Chunk_Enter:
290 // request to start a coroutine (ie use the chunk for a coroutine)
291 // arrive here with mutex locked
292 assert(here.state == Coroutine_Running);
293 g_c->active = &here;
294 int r = pthread_mutex_unlock(&g_c->mutex);
295 assert(r == 0);
296 here.value = here.start(here.entry_param);
297 r = pthread_mutex_lock(&g_c->mutex);
298 assert(r == 0);
299 g_c->active = NULL;
300 assert(here.state == Coroutine_Running);
301 List_Remove(&here.link);
302 here.state = Coroutine_Complete;
303 List_AddTail(&g_c->inactive, &here.link);
304 // coroutine has completed
305 if (g_c->primary == &here) {
306 // if primary coroutine - return to Coroutine_Run
307 longjmp(g_c->controller, Coroutines_CoroutineComplete);
308 }
309 r = pthread_mutex_unlock(&g_c->mutex);
310 assert(r == 0);
311 Coroutine_RunNext();
312 assert(false);
313 }
314}
315
316
317void Coroutine_StartSystem()
318{
319 assert(!g_c);
320 g_c = mustmalloc(sizeof(Coroutines));
321
322 g_c->state = Coroutines_Starting;
323
324 pthread_mutexattr_t attr;
325 int r = pthread_mutexattr_init(&attr);
326 assert(r == 0);
327 r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
328 assert(r == 0);
329 r = pthread_mutex_init(&g_c->mutex, &attr);
330 assert(r == 0);
331 r = pthread_mutexattr_destroy(&attr);
332 assert(r == 0);
333
334 g_c->tip = NULL;
335 g_c->active = NULL;
336
337 List_Init(&g_c->free);
338 List_Init(&g_c->inactive);
339 List_Init(&g_c->runable);
340 List_Init(&g_c->waiting);
341 Semaphore_ctor(&g_c->waiting_sem, 0);
342
343 // prime the chunk system
344 if (!setjmp(g_c->chunk_allocated)){
345 Coroutine_PrimeStackChunks();
346 assert(false);
347 }
348
349 assert(g_c->state == Coroutines_Starting);
350 g_c->state = Coroutines_Started;
351}
352
353
354void Coroutine_StopSystem()
355{
356 int r = pthread_mutex_lock(&g_c->mutex);
357 assert(r == 0);
358 assert(g_c->state == Coroutines_Started);
359 g_c->state = Coroutines_Stopping;
360
361 assert(List_IsEmpty(&g_c->inactive));
362 Semaphore_dtor(&g_c->waiting_sem);
363
364 assert(g_c->state == Coroutines_Stopping);
365 pthread_mutex_unlock(&g_c->mutex);
366 assert(r == 0);
367 g_c->state = Coroutines_Idle;
368 r = pthread_mutex_destroy(&g_c->mutex);
369 assert(r == 0);
370 free(g_c);
371 g_c = NULL;
372}
373
374
375void *Coroutine_Run(
376 Coroutine *cor,
377 void *value
378){
379 Coroutines *cors = cor->coroutines;
380 assert(g_c == cors);
381 int r = pthread_mutex_lock(&cors->mutex);
382 assert(r == 0);
383 assert(cors->state == Coroutines_Started);
384 cors->state = Coroutines_Active;
385 cors->primary = cor;
386 Coroutine_Continue(cors->primary, value, true);
387
388 if (!setjmp(cors->controller)){
389 pthread_mutex_unlock(&cors->mutex);
390 assert(r == 0);
391 // start the first coroutine
392 Coroutine_RunNext();
393 }
394 // arrive here with mutex locked
395 assert(List_IsEmpty(&cors->runable));
396 assert(List_IsEmpty(&cors->waiting));
397 assert(cors->state == Coroutines_Active);
398 cors->state = Coroutines_Started;
399 pthread_mutex_unlock(&cors->mutex);
400 assert(r == 0);
401 return Coroutine_GetValue(cor);
402}
403
404
405Coroutine *Coroutine_New(
406 Coroutine_Start start
407){
408 assert(g_c->state == Coroutines_Started || g_c->state == Coroutines_Active);
409
410 // if none free - add one
411 if (List_IsEmpty(&g_c->free)){
412 if (!setjmp(g_c->chunk_allocated)){
413 longjmp(g_c->tip->buf, Chunk_Create);
414 }
415 }
416
417 Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
418 assert(cor->state == Coroutine_Free);
419 cor->state = Coroutine_Idle;
420 cor->start = start;
421 cor->value = NULL;
422 List_Remove(&cor->link);
423 List_AddHead(&g_c->inactive, &cor->link);
424
425 return cor;
426}
427
428
429void Coroutine_Delete(
430 Coroutine *cor
431){
432 Coroutines *cors = cor->coroutines;
433 int r = pthread_mutex_lock(&cors->mutex);
434 assert(r == 0);
435 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
436 cor->state = Coroutine_Free;
437 List_Remove(&cor->link);
438 List_AddTail(&cors->free, &cor->link);
439 r = pthread_mutex_unlock(&cors->mutex);
440 assert(r == 0);
441}
442
443
444void Coroutine_Continue(
445 Coroutine *cor,
446 void *value,
447 bool early
448){
449 Coroutines *cors = cor->coroutines;
450 int r = pthread_mutex_lock(&cors->mutex);
451 assert(r == 0);
452 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
453 cor->entry_param = value;
454 cor->state = Coroutine_Running;
455 List_Remove(&cor->link);
456 if ( early ) {
457 List_AddHead(&cors->runable, &cor->link);
458 } else {
459 List_AddTail(&cors->runable, &cor->link);
460 }
461 r = pthread_mutex_unlock(&cors->mutex);
462 assert(r == 0);
463 Semaphore_Release(&cors->waiting_sem);
464}
465
466
467void *Coroutine_Yield(
468 void *value,
469 Coroutine_YieldCallback on_yield,
470 void *this
471){
472 int r = pthread_mutex_lock(&g_c->mutex);
473 assert(r == 0);
474 Coroutine *me = g_c->active;
475 Coroutines *cors = me->coroutines;
476 assert(me && me->state == Coroutine_Running && cors == g_c);
477 me->value = value;
478 me->state = Coroutine_Waiting;
479 List_Remove(&me->link);
480 List_AddTail(&cors->waiting, &me->link);
481 switch (setjmp(me->buf)){
482 case Chunk_Initial:
483 r = pthread_mutex_unlock(&cors->mutex);
484 assert(r == 0);
485 on_yield(this);
486 Coroutine_RunNext();
487 case Chunk_Create:
488 assert(false);
489 case Chunk_Enter:
490 // arrive here with mutex locked
491 cors->active = me;
492 // when we return here - we are running again
493 assert(me->state == Coroutine_Running);
494 void *res = me->entry_param;
495 r = pthread_mutex_unlock(&cors->mutex);
496 assert(r == 0);
497 return res;
498 }
499 return NULL;
500}
501
502
503void *Coroutine_GetValue(
504 Coroutine *cor
505){
506 return cor->value;
507}
508
509
510Coroutine *Coroutine_GetActive()
511{
512 return g_c->active;
513}
514
515
516bool Coroutine_IsRunning(
517 Coroutine *cor
518)
519{
520 int state = cor->state;
521 return state == Coroutine_Running || state == Coroutine_Waiting;
522}
523