476 lines13.0 KB
1#include "coroutine.h"
2#include <assert.h>
3#include <setjmp.h>
4#include <stdbool.h>
5#include <stddef.h>
6#include <stdio.h>
7#include <pthread.h>
8#include <stdlib.h>
9
10#define COROUTINE_STACK_SIZE 16384
11#define COROUTINE_STARTUP_STACK_SIZE 1024
12
13
14static void *mustmalloc(size_t size){
15 void *p = malloc(size);
16 assert(p);
17 return p;
18}
19
20#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
21#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
22
23///////////////////////////////////////////////////////////////////////////////
24// Semaphore built from mutex & condition variables...
25//
26// Using pthread.h (more widely available than the C standard library thread.h)
27///////////////////////////////////////////////////////////////////////////////
28
29typedef struct Semaphore {
30 pthread_mutex_t mutex;
31 pthread_cond_t cond;
32 int count;
33} Semaphore;
34
35static void Semaphore_ctor(Semaphore *sem, int initial_count){
36 sem->count = initial_count;
37 int r = pthread_mutex_init(&sem->mutex, NULL);
38 assert(r == 0);
39 r = pthread_cond_init(&sem->cond, NULL);
40 assert(r == 0);
41}
42
43static void Semaphore_dtor(Semaphore *sem){
44 int r = pthread_mutex_destroy(&sem->mutex);
45 assert(r == 0);
46 r = pthread_cond_destroy(&sem->cond);
47 assert(r == 0);
48}
49
50static void Semaphore_Claim(Semaphore *sem){
51 int r = pthread_mutex_lock(&sem->mutex);
52 assert(r == 0);
53 while (sem->count <= 0) {
54 r = pthread_cond_wait(&sem->cond, &sem->mutex);
55 assert(r == 0);
56 }
57 sem->count--;
58 r = pthread_mutex_unlock(&sem->mutex);
59 assert(r == 0);
60}
61
62static void Semaphore_Release(Semaphore *sem){
63 int r = pthread_mutex_lock(&sem->mutex);
64 assert(r == 0);
65 sem->count++;
66 r = pthread_cond_broadcast(&sem->cond);
67 assert(r == 0);
68 r = pthread_mutex_unlock(&sem->mutex);
69 assert(r == 0);
70}
71
72///////////////////////////////////////////////////////////////////////////////
73// ...semaphore built from mutex
74///////////////////////////////////////////////////////////////////////////////
75
76///////////////////////////////////////////////////////////////////////////////
77// 2-way linked lists...
78//
79// Brought inline here to avoid namespace polution
80///////////////////////////////////////////////////////////////////////////////
81
82typedef struct List_Link List_Link;
83struct List_Link {
84 List_Link *next;
85 List_Link *prev;
86};
87
88typedef struct List_Head List_Head;
89struct List_Head {
90 union {
91 struct {
92 List_Link link;
93 List_Link *filler;
94 } fwd;
95 struct {
96 List_Link *filler;
97 List_Link link;
98 } back;
99 };
100};
101
102static inline bool List_IsEmpty(const List_Head *list) {
103 return list->fwd.link.next == &list->back.link;
104}
105
106static inline List_Link *List_GetHead(const List_Head *list) {
107 return List_IsEmpty(list) ? NULL : list->fwd.link.next;
108}
109static inline List_Link *List_GetTail(const List_Head *list) {
110 return List_IsEmpty(list) ? NULL : list->back.link.prev;
111}
112#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
113#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
114
115static inline void List_Init(List_Head *list)
116{
117 list->fwd.link.next = &list->back.link;
118 list->fwd.link.prev = NULL;
119 list->back.link.prev = &list->fwd.link;
120}
121
122static inline void List_AddHead(List_Head *list, List_Link *link)
123{
124 List_Link *first = list->fwd.link.next;
125 link->next = first;
126 link->prev = &list->fwd.link;
127 first->prev = link;
128 list->fwd.link.next = link;
129}
130
131static inline void List_AddTail(List_Head *list, List_Link *link)
132{
133 List_Link *last = list->back.link.prev;
134 link->prev = last;
135 link->next = &list->back.link;
136 last->next = link;
137 list->back.link.prev = link;
138}
139
140static inline void List_Remove(List_Link *link)
141{
142 link->prev->next = link->next;
143 link->next->prev = link->prev;
144}
145
146///////////////////////////////////////////////////////////////////////////////
147// ...2-way linked lists
148///////////////////////////////////////////////////////////////////////////////
149
150
151enum {
152 Coroutines_Idle,
153 Coroutines_Starting,
154 Coroutines_Started,
155 Coroutines_Active,
156 Coroutines_Stopping
157};
158
159enum {
160 Chunk_Initial,
161 Chunk_Create,
162 Chunk_Enter
163};
164
165enum {
166 Coroutine_Constructing,
167 Coroutine_Free,
168 Coroutine_Idle,
169 Coroutine_Running,
170 Coroutine_Waiting,
171 Coroutine_Complete
172};
173
174enum {
175 Coroutines_Init,
176 Coroutines_AllocatedChunk,
177 Coroutines_CoroutineComplete,
178};
179
180struct Coroutine {
181 List_Link link;
182 jmp_buf buf;
183 void *this;
184 Coroutine_YieldCallback on_yield;
185 Coroutine_Start start;
186 void *entry_param;
187 void *value;
188 char state;
189 char action;
190};
191
192typedef struct Coroutines Coroutines;
193
194struct Coroutines {
195 pthread_mutex_t mutex;
196 jmp_buf controller;
197 jmp_buf chunk_allocated;
198
199 // singletons
200 Coroutine *tip; // top of stack chunk
201 Coroutine *active; // currently running coroutine
202 Coroutine *primary; // Coroutine_Run coroutine
203
204 // lists
205 List_Head free;
206 List_Head inactive; // idle or complete
207 List_Head runable; // running or waiting to run
208 List_Head waiting; // yielded / waiting to run
209 Semaphore waiting_sem;
210
211 // state
212 char state;
213};
214
215Coroutines g_c;
216
217static void stack_chunk_chunk(Coroutine *parent);
218static void stack_chunk_base(Coroutine *parent);
219
220static void Coroutine_PrimeStackChunks()
221{
222 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
223 chunk_of_stack[0] = 0xde;
224 chunk_of_stack[1] = 0xad;
225 chunk_of_stack[2] = 0xbe;
226 chunk_of_stack[3] = 0xef;
227 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
228 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
229 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
230 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
231 stack_chunk_base(NULL);
232}
233
234static void stack_chunk_chunk(Coroutine *parent){
235 unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
236 chunk_of_stack[0] = 0xde;
237 chunk_of_stack[1] = 0xad;
238 chunk_of_stack[2] = 0xbe;
239 chunk_of_stack[3] = 0xef;
240 chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
241 chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
242 chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
243 chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
244 stack_chunk_base(parent);
245}
246
247static void Coroutine_RunNext()
248{
249 // arrvie here with mutex unlocked
250 Semaphore_Claim(&g_c.waiting_sem);
251 int r = pthread_mutex_lock(&g_c.mutex);
252 assert(r == 0);
253 assert(!List_IsEmpty(&g_c.runable));
254 Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c.runable));
255 assert(next->state == Coroutine_Running);
256 longjmp(next->buf, Chunk_Enter);
257 assert(false);
258}
259
260static void stack_chunk_base(Coroutine *parent){
261 Coroutine here;
262 here.state = Coroutine_Constructing;
263 switch (setjmp(here.buf)) {
264 case Chunk_Initial:
265 // got here for the first time
266 // parent now has a chunk_of_stack - add it to the free list
267 if (parent) {
268 assert(parent->state == Coroutine_Constructing);
269 parent->state = Coroutine_Free;
270 List_AddHead(&g_c.free, &parent->link);
271 }
272 // note that here is the tip of the chunk-claim stack
273 g_c.tip = &here;
274
275 // return to the coroutine allocator
276 longjmp(g_c.chunk_allocated, 1);
277 case Chunk_Create:
278 // request to create a new chunk on the stack
279 assert(here.state == Coroutine_Constructing);
280 stack_chunk_chunk(&here);
281 assert(false);
282 case Chunk_Enter:
283 // request to start a coroutine (ie use the chunk for a coroutine)
284 // arrive here with mutex locked
285 assert(here.state == Coroutine_Running);
286 g_c.active = &here;
287 int r = pthread_mutex_unlock(&g_c.mutex);
288 assert(r == 0);
289 here.value = here.start(here.entry_param);
290 r = pthread_mutex_lock(&g_c.mutex);
291 assert(r == 0);
292 g_c.active = NULL;
293 assert(here.state == Coroutine_Running);
294 List_Remove(&here.link);
295 here.state = Coroutine_Complete;
296 List_AddTail(&g_c.inactive, &here.link);
297 // coroutine has completed
298 if (g_c.primary == &here) {
299 // if primary coroutine - return to Coroutine_Run
300 longjmp(g_c.controller, Coroutines_CoroutineComplete);
301 }
302 r = pthread_mutex_unlock(&g_c.mutex);
303 assert(r == 0);
304 Coroutine_RunNext();
305 assert(false);
306 }
307}
308
309void Coroutine_StartSystem()
310{
311 assert(g_c.state == Coroutines_Idle);
312
313 pthread_mutexattr_t attr;
314 int r = pthread_mutexattr_init(&attr);
315 assert(r == 0);
316 r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
317 assert(r == 0);
318 r = pthread_mutex_init(&g_c.mutex, &attr);
319 assert(r == 0);
320 r = pthread_mutexattr_destroy(&attr);
321 assert(r == 0);
322
323 g_c.state = Coroutines_Starting;
324
325 g_c.tip = NULL;
326 g_c.active = NULL;
327
328 List_Init(&g_c.free);
329 List_Init(&g_c.inactive);
330 List_Init(&g_c.runable);
331 List_Init(&g_c.waiting);
332 Semaphore_ctor(&g_c.waiting_sem, 0);
333
334 // prime the chunk system
335 if (!setjmp(g_c.chunk_allocated)){
336 Coroutine_PrimeStackChunks();
337 assert(false);
338 }
339 assert(g_c.state == Coroutines_Starting);
340 g_c.state = Coroutines_Started;
341}
342
343void Coroutine_StopSystem()
344{
345 int r = pthread_mutex_lock(&g_c.mutex);
346 assert(r == 0);
347 assert(g_c.state == Coroutines_Started);
348 g_c.state = Coroutines_Stopping;
349
350 assert(List_IsEmpty(&g_c.inactive));
351 Semaphore_dtor(&g_c.waiting_sem);
352
353 assert(g_c.state == Coroutines_Stopping);
354 pthread_mutex_unlock(&g_c.mutex);
355 assert(r == 0);
356 g_c.state = Coroutines_Idle;
357 r = pthread_mutex_destroy(&g_c.mutex);
358 assert(r == 0);
359}
360
361void *Coroutine_Run(Coroutine *cor, void *value){
362 int r = pthread_mutex_lock(&g_c.mutex);
363 assert(r == 0);
364 assert(g_c.state == Coroutines_Started);
365 g_c.state = Coroutines_Active;
366 g_c.primary = cor;
367 Coroutine_Continue(g_c.primary, value, true);
368
369 if (!setjmp(g_c.controller)){
370 pthread_mutex_unlock(&g_c.mutex);
371 assert(r == 0);
372 // start the first coroutine
373 Coroutine_RunNext();
374 }
375 // arrive here with mutex locked
376 assert(List_IsEmpty(&g_c.runable));
377 assert(List_IsEmpty(&g_c.waiting));
378 assert(g_c.state == Coroutines_Active);
379 g_c.state = Coroutines_Started;
380 pthread_mutex_unlock(&g_c.mutex);
381 assert(r == 0);
382 return Coroutine_GetValue(cor);
383}
384
385Coroutine *Coroutine_New(void *this, Coroutine_YieldCallback on_yield, Coroutine_Start start){
386 assert(g_c.state == Coroutines_Started || g_c.state == Coroutines_Active);
387
388 // if none free - add one
389 if (List_IsEmpty(&g_c.free)){
390 if (!setjmp(g_c.chunk_allocated)){
391 longjmp(g_c.tip->buf, Chunk_Create);
392 }
393 }
394
395 Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c.free));
396 assert(cor->state == Coroutine_Free);
397 cor->state = Coroutine_Idle;
398 cor->this = this;
399 cor->start = start;
400 cor->on_yield = on_yield;
401 cor->value = NULL;
402 List_Remove(&cor->link);
403 List_AddHead(&g_c.inactive, &cor->link);
404
405 return cor;
406}
407
408void Coroutine_Delete(Coroutine *cor){
409 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
410 cor->state = Coroutine_Free;
411 List_Remove(&cor->link);
412 List_AddTail(&g_c.free, &cor->link);
413}
414
415void Coroutine_Continue(Coroutine *cor, void *value, bool early){
416 int r = pthread_mutex_lock(&g_c.mutex);
417 assert(r == 0);
418 assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
419 cor->entry_param = value;
420 cor->state = Coroutine_Running;
421 List_Remove(&cor->link);
422 if ( early ) {
423 List_AddHead(&g_c.runable, &cor->link);
424 } else {
425 List_AddTail(&g_c.runable, &cor->link);
426 }
427 r = pthread_mutex_unlock(&g_c.mutex);
428 assert(r == 0);
429 Semaphore_Release(&g_c.waiting_sem);
430}
431
432void *Coroutine_Yield(void *value){
433 int r = pthread_mutex_lock(&g_c.mutex);
434 assert(r == 0);
435 Coroutine *me = g_c.active;
436 assert(me && me->state == Coroutine_Running);
437 me->value = value;
438 me->state = Coroutine_Waiting;
439 List_Remove(&me->link);
440 List_AddTail(&g_c.waiting, &me->link);
441 switch (setjmp(me->buf)){
442 case Chunk_Initial:
443 r = pthread_mutex_unlock(&g_c.mutex);
444 assert(r == 0);
445 me->on_yield(me->this);
446 Coroutine_RunNext();
447 case Chunk_Create:
448 assert(false);
449 case Chunk_Enter:
450 // arrive here with mutex locked
451 g_c.active = me;
452 // when we return here - we are running again
453 assert(me->state == Coroutine_Running);
454 void *res = me->entry_param;
455 r = pthread_mutex_unlock(&g_c.mutex);
456 assert(r == 0);
457 return res;
458 }
459 return NULL;
460}
461
462void *Coroutine_GetValue(Coroutine *cor){
463 return cor->value;
464}
465
466Coroutine *Coroutine_GetActive()
467{
468 return g_c.active;
469}
470
471bool Coroutine_IsRunning(Coroutine *cor)
472{
473 int state = cor->state;
474 return state == Coroutine_Running || state == Coroutine_Waiting;
475}
476