234 lines4.4 KB
Newer
Older
-
+
commited
{line.log.rev}
on
9 months ago
37
1
#include "future.h"
2
#include "coroutine.h"
3
#include "task.h"
4
#include <assert.h>
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <time.h>
8
#include <errno.h>
9
#include <math.h>
10
#include <string.h>
11
#include "timespec_utils.h"
12
13
14
Future_vfptrs_t Future_vfptrs = {
15
&Future_dtor,
16
&_Future_Await,
17
&_Future_SetResult
18
};
19
20
typedef struct Future_WatcherSpec {
21
Future_Watcher watcher;
22
void *me;
23
} Future_WatcherSpec;
24
Last week
25
void Future_ctor
26
(
27
Future *fut
28
){
9 months ago
37
29
fut->vfptrs = &Future_vfptrs;
8 months ago
30
_Cor_Mutex_ctor(&fut->mutex);
9 months ago
37
31
fut->state = Future_State_Waiting;
32
fut->value = NULL;
33
fut->canceled = false;
34
fut->watchers = NULL;
35
fut->nwatchers = 0;
36
fut->maxwatchers = 0;
37
}
38
39
Last week
40
Future *
41
Future_New(
42
void
43
){
9 months ago
37
44
Future *fut = malloc(sizeof(Future));
45
Future_ctor(fut);
46
return fut;
47
}
48
49
Last week
50
void
51
Future_dtor(
9 months ago
37
52
Future *fut
53
){
8 months ago
54
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
55
assert(fut->nwatchers == 0);
56
free(fut->watchers);
8 months ago
57
_Cor_Mutex_Unlock(&fut->mutex);
58
_Cor_Mutex_dtor(&fut->mutex);
9 months ago
37
59
}
60
61
Last week
62
void
63
Future_Delete(
9 months ago
37
64
Future *fut
65
){
66
fut->vfptrs->dtor(fut);
67
free(fut);
68
}
69
70
Last week
71
void
72
Future_AddWatcher(
9 months ago
37
73
Future *fut,
74
Future_Watcher watcher,
75
void *watcher_me
76
){
8 months ago
77
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
78
if (fut->state == Future_State_Done) {
8 months ago
79
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
80
watcher(watcher_me, fut);
81
} else {
82
if (fut->nwatchers >= fut->maxwatchers) {
83
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
84
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
85
assert(fut->watchers);
86
}
87
fut->watchers[fut->nwatchers].watcher = watcher;
88
fut->watchers[fut->nwatchers].me = watcher_me;
89
fut->nwatchers++;
8 months ago
90
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
91
}
92
}
93
94
Last week
95
void
96
Future_RemoveWatcher(
9 months ago
37
97
Future *fut,
98
Future_Watcher watcher,
99
void *watcher_me
100
){
8 months ago
101
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
102
for (int i = 0; i < fut->nwatchers; i++) {
103
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
104
fut->nwatchers--;
105
if (i < fut->nwatchers) {
106
fut->watchers[i] = fut->watchers[fut->nwatchers];
107
}
108
break;
109
}
110
}
8 months ago
111
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
112
}
113
114
Last week
115
static void
116
_Future_Ready(
9 months ago
37
117
Future *fut
118
){
119
fut->state = Future_State_Done;
120
121
// Take note of watchers list, and reset it
122
Future_WatcherSpec *watchers = fut->watchers;
123
int nwatchers = fut->nwatchers;
124
fut->watchers = NULL;
125
fut->nwatchers = 0;
8 months ago
126
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
127
128
// notify those watchers
129
for (int i = 0; i < nwatchers; i++) {
130
watchers[i].watcher(watchers[i].me, fut);
131
}
132
free(watchers);
133
}
134
135
Last week
136
void
137
Future_SetResult(
9 months ago
37
138
Future *fut,
139
bool canceled,
140
void *value
141
){
142
fut->vfptrs->set_result(fut, canceled, value);
143
}
144
145
Last week
146
Coroutine_Err
147
Future_GetResult(
9 months ago
37
148
Future *fut,
149
void **res
150
){
4 months ago
151
if(fut->state != Future_State_Done){
152
return Coroutine_Err_WrongState;
153
}
9 months ago
37
154
if (res){
155
*res = fut->value;
156
}
4 months ago
157
return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
9 months ago
37
158
}
159
160
161
typedef struct future_bits {
162
Future *fut;
163
Coroutine *cor;
164
} future_bits;
165
166
Last week
167
static void
168
future_complete(
9 months ago
37
169
void *me,
170
Future *fut
171
){
172
(void)fut;
173
Coroutine *cor = (Coroutine *)me;
Last month
174
Coroutine_NS(Continue)(cor, NULL, false);
9 months ago
37
175
}
176
177
Last week
178
static void
179
on_yield_for_future(
9 months ago
37
180
void *me
181
){
182
future_bits *bits = (future_bits *)me;
183
Future_AddWatcher(bits->fut, future_complete, bits->cor);
184
}
185
186
Last week
187
void
188
_Future_Await(
9 months ago
37
189
Future *fut
190
){
191
future_bits bits;
192
bits.fut = fut;
Last month
193
bits.cor = Coroutine_NS(GetActive)();
9 months ago
37
194
Task *my_task = current_task;
195
current_task = NULL;
196
my_task->awaiting_future = fut;
197
if (my_task->canceled){
198
Future_SetResult(fut, true, my_task->cancel_value);
199
}
Last month
200
Coroutine_NS(Yield)(NULL, on_yield_for_future, &bits);
9 months ago
37
201
my_task->awaiting_future = NULL;
202
current_task = my_task;
203
}
204
205
Last week
206
void
207
_Future_SetResult(
9 months ago
37
208
Future *fut,
209
bool canceled,
210
void *res
211
){
8 months ago
212
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
213
if(fut->state == Future_State_Waiting){
214
fut->canceled = canceled;
215
fut->value = res;
216
_Future_Ready(fut);
217
} else {
8 months ago
218
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
219
}
220
}
221
222
Last week
223
Coroutine_Err
224
Future_Await(
9 months ago
37
225
Future *fut,
226
void **res
227
){
228
fut->vfptrs->await(fut);
229
if (res){
230
*res = fut->value;
231
}
4 months ago
232
return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
9 months ago
37
233
}
234