1 contributor
526 lines13.6 KB
Newer
Older
-
+
commited
{line.log.rev}
on
8 months ago
4
1
#include "coroutine.h"
2
#include <assert.h>
3
#include <setjmp.h>
4
#include <stdbool.h>
5
#include <stddef.h>
6
#include <stdio.h>
8 months ago
7
#include <pthread.h>
8
#include <stdlib.h>
8 months ago
9
#include "cor_thread_local.h"
8 months ago
4
10
11
12
8 months ago
13
static void *mustmalloc(size_t size){
14
void *p = malloc(size);
15
assert(p);
16
return p;
17
}
18
19
#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
20
#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
21
8 months ago
4
22
///////////////////////////////////////////////////////////////////////////////
8 months ago
23
// Semaphore built from mutex & condition variables...
24
//
25
// Using pthread.h (more widely available than the C standard library thread.h)
26
///////////////////////////////////////////////////////////////////////////////
27
28
typedef struct Semaphore {
29
pthread_mutex_t mutex;
30
pthread_cond_t cond;
31
int count;
32
} Semaphore;
33
34
static void Semaphore_ctor(Semaphore *sem, int initial_count){
35
sem->count = initial_count;
36
int r = pthread_mutex_init(&sem->mutex, NULL);
37
assert(r == 0);
38
r = pthread_cond_init(&sem->cond, NULL);
39
assert(r == 0);
40
}
41
42
static void Semaphore_dtor(Semaphore *sem){
43
int r = pthread_mutex_destroy(&sem->mutex);
44
assert(r == 0);
45
r = pthread_cond_destroy(&sem->cond);
46
assert(r == 0);
47
}
48
49
static void Semaphore_Claim(Semaphore *sem){
50
int r = pthread_mutex_lock(&sem->mutex);
51
assert(r == 0);
52
while (sem->count <= 0) {
53
r = pthread_cond_wait(&sem->cond, &sem->mutex);
54
assert(r == 0);
55
}
56
sem->count--;
57
r = pthread_mutex_unlock(&sem->mutex);
58
assert(r == 0);
59
}
60
61
static void Semaphore_Release(Semaphore *sem){
62
int r = pthread_mutex_lock(&sem->mutex);
63
assert(r == 0);
64
sem->count++;
8 months ago
65
r = pthread_mutex_unlock(&sem->mutex);
66
assert(r == 0);
8 months ago
67
r = pthread_cond_broadcast(&sem->cond);
68
assert(r == 0);
69
}
70
71
///////////////////////////////////////////////////////////////////////////////
72
// ...semaphore built from mutex
73
///////////////////////////////////////////////////////////////////////////////
74
75
///////////////////////////////////////////////////////////////////////////////
8 months ago
4
76
// 2-way linked lists...
77
//
8 months ago
78
// Brought inline here to avoid namespace polution
8 months ago
4
79
///////////////////////////////////////////////////////////////////////////////
80
81
typedef struct List_Link List_Link;
82
struct List_Link {
83
List_Link *next;
84
List_Link *prev;
85
};
86
87
typedef struct List_Head List_Head;
88
struct List_Head {
89
union {
90
struct {
91
List_Link link;
92
List_Link *filler;
93
} fwd;
94
struct {
95
List_Link *filler;
96
List_Link link;
97
} back;
98
};
99
};
100
101
static inline bool List_IsEmpty(const List_Head *list) {
102
return list->fwd.link.next == &list->back.link;
103
}
104
105
static inline List_Link *List_GetHead(const List_Head *list) {
106
return List_IsEmpty(list) ? NULL : list->fwd.link.next;
107
}
8 months ago
108
// static inline List_Link *List_GetTail(const List_Head *list) {
109
// return List_IsEmpty(list) ? NULL : list->back.link.prev;
110
// }
8 months ago
4
111
#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
112
#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
113
114
static inline void List_Init(List_Head *list)
115
{
116
list->fwd.link.next = &list->back.link;
117
list->fwd.link.prev = NULL;
118
list->back.link.prev = &list->fwd.link;
119
}
120
121
static inline void List_AddHead(List_Head *list, List_Link *link)
122
{
123
List_Link *first = list->fwd.link.next;
124
link->next = first;
125
link->prev = &list->fwd.link;
126
first->prev = link;
127
list->fwd.link.next = link;
128
}
129
130
static inline void List_AddTail(List_Head *list, List_Link *link)
131
{
132
List_Link *last = list->back.link.prev;
133
link->prev = last;
134
link->next = &list->back.link;
135
last->next = link;
136
list->back.link.prev = link;
137
}
138
139
static inline void List_Remove(List_Link *link)
140
{
141
link->prev->next = link->next;
142
link->next->prev = link->prev;
143
}
144
145
///////////////////////////////////////////////////////////////////////////////
146
// ...2-way linked lists
147
///////////////////////////////////////////////////////////////////////////////
148
8 months ago
149
typedef struct Coroutines Coroutines;
8 months ago
4
150
151
enum {
152
Coroutines_Idle,
153
Coroutines_Starting,
154
Coroutines_Started,
155
Coroutines_Active,
156
Coroutines_Stopping
157
};
158
159
enum {
160
Chunk_Initial,
161
Chunk_Create,
162
Chunk_Enter
163
};
164
165
enum {
166
Coroutine_Constructing,
167
Coroutine_Free,
168
Coroutine_Idle,
169
Coroutine_Running,
170
Coroutine_Waiting,
171
Coroutine_Complete
172
};
173
174
enum {
175
Coroutines_Init,
176
Coroutines_AllocatedChunk,
177
Coroutines_CoroutineComplete,
178
};
179
180
struct Coroutine {
8 months ago
181
Coroutines *coroutines;
8 months ago
4
182
List_Link link;
183
jmp_buf buf;
184
void *this;
185
Coroutine_Start start;
186
void *entry_param;
187
void *value;
188
char state;
189
char action;
190
};
191
192
struct Coroutines {
8 months ago
193
pthread_mutex_t mutex;
8 months ago
4
194
jmp_buf controller;
195
jmp_buf chunk_allocated;
196
197
// singletons
198
Coroutine *tip; // top of stack chunk
199
Coroutine *active; // currently running coroutine
200
Coroutine *primary; // Coroutine_Run coroutine
201
202
// lists
203
List_Head free;
204
List_Head inactive; // idle or complete
205
List_Head runable; // running or waiting to run
206
List_Head waiting; // yielded / waiting to run
8 months ago
207
Semaphore waiting_sem;
8 months ago
4
208
209
// state
210
char state;
211
};
212
8 months ago
213
_Cor_thread_local Coroutines *g_c;
8 months ago
4
214
215
static void stack_chunk_chunk(Coroutine *parent);
216
static void stack_chunk_base(Coroutine *parent);
217
8 months ago
218
8 months ago
4
219
static void Coroutine_PrimeStackChunks()
220
{
221
unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
222
chunk_of_stack[0] = 0xde;
223
chunk_of_stack[1] = 0xad;
224
chunk_of_stack[2] = 0xbe;
225
chunk_of_stack[3] = 0xef;
226
chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
227
chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
228
chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
229
chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
230
stack_chunk_base(NULL);
231
}
232
8 months ago
233
234
static void stack_chunk_chunk(
235
Coroutine *parent
236
){
8 months ago
4
237
unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
238
chunk_of_stack[0] = 0xde;
239
chunk_of_stack[1] = 0xad;
240
chunk_of_stack[2] = 0xbe;
241
chunk_of_stack[3] = 0xef;
242
chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
243
chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
244
chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
245
chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
246
stack_chunk_base(parent);
247
}
248
8 months ago
249
8 months ago
4
250
static void Coroutine_RunNext()
251
{
8 months ago
252
// arrvie here with mutex unlocked
8 months ago
253
Semaphore_Claim(&g_c->waiting_sem);
254
int r = pthread_mutex_lock(&g_c->mutex);
8 months ago
255
assert(r == 0);
8 months ago
256
assert(!List_IsEmpty(&g_c->runable));
257
Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
8 months ago
4
258
assert(next->state == Coroutine_Running);
259
longjmp(next->buf, Chunk_Enter);
260
assert(false);
261
}
262
8 months ago
263
264
static void stack_chunk_base(
265
Coroutine *parent
266
){
8 months ago
4
267
Coroutine here;
268
here.state = Coroutine_Constructing;
269
switch (setjmp(here.buf)) {
270
case Chunk_Initial:
271
// got here for the first time
272
// parent now has a chunk_of_stack - add it to the free list
273
if (parent) {
274
assert(parent->state == Coroutine_Constructing);
275
parent->state = Coroutine_Free;
8 months ago
276
List_AddHead(&g_c->free, &parent->link);
8 months ago
4
277
}
278
// note that here is the tip of the chunk-claim stack
8 months ago
279
here.coroutines = g_c;
280
g_c->tip = &here;
8 months ago
4
281
282
// return to the coroutine allocator
8 months ago
283
longjmp(g_c->chunk_allocated, 1);
8 months ago
4
284
case Chunk_Create:
285
// request to create a new chunk on the stack
286
assert(here.state == Coroutine_Constructing);
287
stack_chunk_chunk(&here);
288
assert(false);
289
case Chunk_Enter:
290
// request to start a coroutine (ie use the chunk for a coroutine)
8 months ago
291
// arrive here with mutex locked
8 months ago
4
292
assert(here.state == Coroutine_Running);
8 months ago
293
g_c->active = &here;
294
int r = pthread_mutex_unlock(&g_c->mutex);
8 months ago
295
assert(r == 0);
8 months ago
4
296
here.value = here.start(here.entry_param);
8 months ago
297
r = pthread_mutex_lock(&g_c->mutex);
8 months ago
298
assert(r == 0);
8 months ago
299
g_c->active = NULL;
8 months ago
4
300
assert(here.state == Coroutine_Running);
301
List_Remove(&here.link);
302
here.state = Coroutine_Complete;
8 months ago
303
List_AddTail(&g_c->inactive, &here.link);
8 months ago
4
304
// coroutine has completed
8 months ago
305
if (g_c->primary == &here) {
8 months ago
4
306
// if primary coroutine - return to Coroutine_Run
8 months ago
307
longjmp(g_c->controller, Coroutines_CoroutineComplete);
8 months ago
4
308
}
8 months ago
309
r = pthread_mutex_unlock(&g_c->mutex);
8 months ago
310
assert(r == 0);
8 months ago
4
311
Coroutine_RunNext();
312
assert(false);
313
}
314
}
315
8 months ago
316
8 months ago
4
317
void Coroutine_StartSystem()
318
{
8 months ago
319
assert(!g_c);
320
g_c = mustmalloc(sizeof(Coroutines));
8 months ago
321
8 months ago
322
g_c->state = Coroutines_Starting;
323
8 months ago
324
pthread_mutexattr_t attr;
325
int r = pthread_mutexattr_init(&attr);
326
assert(r == 0);
327
r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
328
assert(r == 0);
8 months ago
329
r = pthread_mutex_init(&g_c->mutex, &attr);
8 months ago
330
assert(r == 0);
331
r = pthread_mutexattr_destroy(&attr);
332
assert(r == 0);
333
8 months ago
334
g_c->tip = NULL;
335
g_c->active = NULL;
8 months ago
4
336
8 months ago
337
List_Init(&g_c->free);
338
List_Init(&g_c->inactive);
339
List_Init(&g_c->runable);
340
List_Init(&g_c->waiting);
341
Semaphore_ctor(&g_c->waiting_sem, 0);
8 months ago
4
342
343
// prime the chunk system
8 months ago
344
if (!setjmp(g_c->chunk_allocated)){
8 months ago
4
345
Coroutine_PrimeStackChunks();
346
assert(false);
347
}
8 months ago
348
349
assert(g_c->state == Coroutines_Starting);
350
g_c->state = Coroutines_Started;
8 months ago
4
351
}
352
8 months ago
353
8 months ago
4
354
void Coroutine_StopSystem()
355
{
8 months ago
356
int r = pthread_mutex_lock(&g_c->mutex);
8 months ago
357
assert(r == 0);
8 months ago
358
assert(g_c->state == Coroutines_Started);
359
g_c->state = Coroutines_Stopping;
8 months ago
4
360
8 months ago
361
assert(List_IsEmpty(&g_c->inactive));
362
Semaphore_dtor(&g_c->waiting_sem);
8 months ago
4
363
8 months ago
364
assert(g_c->state == Coroutines_Stopping);
365
pthread_mutex_unlock(&g_c->mutex);
8 months ago
366
assert(r == 0);
8 months ago
367
g_c->state = Coroutines_Idle;
368
r = pthread_mutex_destroy(&g_c->mutex);
8 months ago
369
assert(r == 0);
8 months ago
370
free(g_c);
371
g_c = NULL;
8 months ago
4
372
}
373
8 months ago
374
375
void *Coroutine_Run(
8 months ago
376
Coroutine_Start start,
8 months ago
377
void *value
378
){
8 months ago
379
Coroutine *cor = Coroutine_New(start);
8 months ago
380
Coroutines *cors = cor->coroutines;
381
assert(g_c == cors);
382
int r = pthread_mutex_lock(&cors->mutex);
8 months ago
383
assert(r == 0);
8 months ago
384
assert(cors->state == Coroutines_Started);
385
cors->state = Coroutines_Active;
386
cors->primary = cor;
387
Coroutine_Continue(cors->primary, value, true);
8 months ago
4
388
8 months ago
389
if (!setjmp(cors->controller)){
390
pthread_mutex_unlock(&cors->mutex);
8 months ago
391
assert(r == 0);
8 months ago
4
392
// start the first coroutine
393
Coroutine_RunNext();
394
}
8 months ago
395
// arrive here with mutex locked
8 months ago
396
assert(List_IsEmpty(&cors->runable));
397
assert(List_IsEmpty(&cors->waiting));
398
assert(cors->state == Coroutines_Active);
399
cors->state = Coroutines_Started;
400
pthread_mutex_unlock(&cors->mutex);
8 months ago
401
assert(r == 0);
8 months ago
402
void *res = Coroutine_GetValue(cor);
403
Coroutine_Delete(cor);
404
return res;
8 months ago
4
405
}
406
407
8 months ago
408
Coroutine *Coroutine_New(
409
Coroutine_Start start
410
){
411
assert(g_c->state == Coroutines_Started || g_c->state == Coroutines_Active);
412
8 months ago
4
413
// if none free - add one
8 months ago
414
if (List_IsEmpty(&g_c->free)){
415
if (!setjmp(g_c->chunk_allocated)){
416
longjmp(g_c->tip->buf, Chunk_Create);
8 months ago
4
417
}
418
}
419
8 months ago
420
Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
8 months ago
4
421
assert(cor->state == Coroutine_Free);
422
cor->state = Coroutine_Idle;
423
cor->start = start;
424
cor->value = NULL;
425
List_Remove(&cor->link);
8 months ago
426
List_AddHead(&g_c->inactive, &cor->link);
8 months ago
4
427
428
return cor;
429
}
430
8 months ago
431
432
void Coroutine_Delete(
433
Coroutine *cor
434
){
435
Coroutines *cors = cor->coroutines;
436
int r = pthread_mutex_lock(&cors->mutex);
437
assert(r == 0);
8 months ago
4
438
assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
439
cor->state = Coroutine_Free;
440
List_Remove(&cor->link);
8 months ago
441
List_AddTail(&cors->free, &cor->link);
442
r = pthread_mutex_unlock(&cors->mutex);
443
assert(r == 0);
8 months ago
4
444
}
445
8 months ago
446
447
void Coroutine_Continue(
448
Coroutine *cor,
449
void *value,
450
bool early
451
){
452
Coroutines *cors = cor->coroutines;
453
int r = pthread_mutex_lock(&cors->mutex);
8 months ago
454
assert(r == 0);
8 months ago
4
455
assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
456
cor->entry_param = value;
457
cor->state = Coroutine_Running;
458
List_Remove(&cor->link);
459
if ( early ) {
8 months ago
460
List_AddHead(&cors->runable, &cor->link);
8 months ago
4
461
} else {
8 months ago
462
List_AddTail(&cors->runable, &cor->link);
8 months ago
4
463
}
8 months ago
464
r = pthread_mutex_unlock(&cors->mutex);
8 months ago
465
assert(r == 0);
8 months ago
466
Semaphore_Release(&cors->waiting_sem);
8 months ago
4
467
}
468
8 months ago
469
470
void *Coroutine_Yield(
471
void *value,
472
Coroutine_YieldCallback on_yield,
473
void *this
474
){
475
int r = pthread_mutex_lock(&g_c->mutex);
8 months ago
476
assert(r == 0);
8 months ago
477
Coroutine *me = g_c->active;
478
Coroutines *cors = me->coroutines;
479
assert(me && me->state == Coroutine_Running && cors == g_c);
8 months ago
4
480
me->value = value;
481
me->state = Coroutine_Waiting;
482
List_Remove(&me->link);
8 months ago
483
List_AddTail(&cors->waiting, &me->link);
8 months ago
484
switch (setjmp(me->buf)){
485
case Chunk_Initial:
8 months ago
486
r = pthread_mutex_unlock(&cors->mutex);
8 months ago
487
assert(r == 0);
8 months ago
488
on_yield(this);
8 months ago
4
489
Coroutine_RunNext();
8 months ago
490
case Chunk_Create:
491
assert(false);
492
case Chunk_Enter:
493
// arrive here with mutex locked
8 months ago
494
cors->active = me;
8 months ago
495
// when we return here - we are running again
496
assert(me->state == Coroutine_Running);
497
void *res = me->entry_param;
8 months ago
498
r = pthread_mutex_unlock(&cors->mutex);
8 months ago
499
assert(r == 0);
500
return res;
8 months ago
4
501
}
8 months ago
502
return NULL;
8 months ago
4
503
}
504
8 months ago
505
506
void *Coroutine_GetValue(
507
Coroutine *cor
508
){
8 months ago
4
509
return cor->value;
510
}
511
8 months ago
512
8 months ago
4
513
Coroutine *Coroutine_GetActive()
514
{
8 months ago
515
return g_c->active;
8 months ago
4
516
}
517
8 months ago
518
519
bool Coroutine_IsRunning(
520
Coroutine *cor
521
)
8 months ago
4
522
{
8 months ago
523
int state = cor->state;
524
return state == Coroutine_Running || state == Coroutine_Waiting;
8 months ago
4
525
}
526