8 months ago |
37 |
1 |
#include "future.h" | ||
2 |
#include "coroutine.h" | ||||
3 |
#include "task.h" | ||||
4 |
#include <assert.h> | ||||
5 |
#include <stdio.h> | ||||
6 |
#include <stdlib.h> | ||||
7 |
#include <time.h> | ||||
8 |
#include <errno.h> | ||||
9 |
#include <math.h> | ||||
10 |
#include <string.h> | ||||
11 |
#include "timespec_utils.h" | ||||
12 |
|||||
13 |
|||||
14 |
Future_vfptrs_t Future_vfptrs = { | ||||
15 |
&Future_dtor, | ||||
16 |
&_Future_Await, | ||||
17 |
&_Future_SetResult | ||||
18 |
}; | ||||
19 |
|||||
20 |
typedef struct Future_WatcherSpec { | ||||
21 |
Future_Watcher watcher; | ||||
22 |
void *me; | ||||
23 |
} Future_WatcherSpec; | ||||
24 |
|||||
25 |
void Future_ctor(Future *fut){ | ||||
26 |
fut->vfptrs = &Future_vfptrs; | ||||
7 months ago |
27 |
_Cor_Mutex_ctor(&fut->mutex); | |||
8 months ago |
37 |
28 |
fut->state = Future_State_Waiting; | ||
29 |
fut->value = NULL; | ||||
30 |
fut->canceled = false; | ||||
31 |
fut->watchers = NULL; | ||||
32 |
fut->nwatchers = 0; | ||||
33 |
fut->maxwatchers = 0; | ||||
34 |
} | ||||
35 |
|||||
36 |
|||||
37 |
Future *Future_New(){ | ||||
38 |
Future *fut = malloc(sizeof(Future)); | ||||
39 |
Future_ctor(fut); | ||||
40 |
return fut; | ||||
41 |
} | ||||
42 |
|||||
43 |
|||||
44 |
void Future_dtor( | ||||
45 |
Future *fut | ||||
46 |
){ | ||||
7 months ago |
47 |
_Cor_Mutex_Lock(&fut->mutex); | |||
8 months ago |
37 |
48 |
assert(fut->nwatchers == 0); | ||
49 |
free(fut->watchers); | ||||
7 months ago |
50 |
_Cor_Mutex_Unlock(&fut->mutex); | |||
51 |
_Cor_Mutex_dtor(&fut->mutex); | ||||
8 months ago |
37 |
52 |
} | ||
53 |
|||||
54 |
|||||
55 |
void Future_Delete( | ||||
56 |
Future *fut | ||||
57 |
){ | ||||
58 |
fut->vfptrs->dtor(fut); | ||||
59 |
free(fut); | ||||
60 |
} | ||||
61 |
|||||
62 |
|||||
63 |
void Future_AddWatcher( | ||||
64 |
Future *fut, | ||||
65 |
Future_Watcher watcher, | ||||
66 |
void *watcher_me | ||||
67 |
){ | ||||
7 months ago |
68 |
_Cor_Mutex_Lock(&fut->mutex); | |||
8 months ago |
37 |
69 |
if (fut->state == Future_State_Done) { | ||
7 months ago |
70 |
_Cor_Mutex_Unlock(&fut->mutex); | |||
8 months ago |
37 |
71 |
watcher(watcher_me, fut); | ||
72 |
} else { | ||||
73 |
if (fut->nwatchers >= fut->maxwatchers) { | ||||
74 |
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2; | ||||
75 |
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec)); | ||||
76 |
assert(fut->watchers); | ||||
77 |
} | ||||
78 |
fut->watchers[fut->nwatchers].watcher = watcher; | ||||
79 |
fut->watchers[fut->nwatchers].me = watcher_me; | ||||
80 |
fut->nwatchers++; | ||||
7 months ago |
81 |
_Cor_Mutex_Unlock(&fut->mutex); | |||
8 months ago |
37 |
82 |
} | ||
83 |
} | ||||
84 |
|||||
85 |
|||||
86 |
void Future_RemoveWatcher( | ||||
87 |
Future *fut, | ||||
88 |
Future_Watcher watcher, | ||||
89 |
void *watcher_me | ||||
90 |
){ | ||||
7 months ago |
91 |
_Cor_Mutex_Lock(&fut->mutex); | |||
8 months ago |
37 |
92 |
for (int i = 0; i < fut->nwatchers; i++) { | ||
93 |
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) { | ||||
94 |
fut->nwatchers--; | ||||
95 |
if (i < fut->nwatchers) { | ||||
96 |
fut->watchers[i] = fut->watchers[fut->nwatchers]; | ||||
97 |
} | ||||
98 |
break; | ||||
99 |
} | ||||
100 |
} | ||||
7 months ago |
101 |
_Cor_Mutex_Unlock(&fut->mutex); | |||
8 months ago |
37 |
102 |
} | ||
103 |
|||||
104 |
|||||
105 |
static void _Future_Ready( | ||||
106 |
Future *fut | ||||
107 |
){ | ||||
108 |
fut->state = Future_State_Done; | ||||
109 |
|||||
110 |
// Take note of watchers list, and reset it | ||||
111 |
Future_WatcherSpec *watchers = fut->watchers; | ||||
112 |
int nwatchers = fut->nwatchers; | ||||
113 |
fut->watchers = NULL; | ||||
114 |
fut->nwatchers = 0; | ||||
7 months ago |
115 |
_Cor_Mutex_Unlock(&fut->mutex); | |||
8 months ago |
37 |
116 |
|||
117 |
// notify those watchers | ||||
118 |
for (int i = 0; i < nwatchers; i++) { | ||||
119 |
watchers[i].watcher(watchers[i].me, fut); | ||||
120 |
} | ||||
121 |
free(watchers); | ||||
122 |
} | ||||
123 |
|||||
124 |
|||||
125 |
void Future_SetResult( | ||||
126 |
Future *fut, | ||||
127 |
bool canceled, | ||||
128 |
void *value | ||||
129 |
){ | ||||
130 |
fut->vfptrs->set_result(fut, canceled, value); | ||||
131 |
} | ||||
132 |
|||||
133 |
|||||
134 |
bool Future_GetResult( | ||||
135 |
Future *fut, | ||||
136 |
void **res | ||||
137 |
){ | ||||
138 |
assert(fut->state == Future_State_Done); | ||||
139 |
if (res){ | ||||
140 |
*res = fut->value; | ||||
141 |
} | ||||
142 |
return fut->canceled; | ||||
143 |
} | ||||
144 |
|||||
145 |
|||||
146 |
typedef struct future_bits { | ||||
147 |
Future *fut; | ||||
148 |
Coroutine *cor; | ||||
149 |
} future_bits; | ||||
150 |
|||||
151 |
|||||
152 |
static void future_complete( | ||||
153 |
void *me, | ||||
154 |
Future *fut | ||||
155 |
){ | ||||
156 |
(void)fut; | ||||
157 |
Coroutine *cor = (Coroutine *)me; | ||||
158 |
Coroutine_Continue(cor, NULL, false); | ||||
159 |
} | ||||
160 |
|||||
161 |
|||||
162 |
static void on_yield_for_future( | ||||
163 |
void *me | ||||
164 |
){ | ||||
165 |
future_bits *bits = (future_bits *)me; | ||||
166 |
Future_AddWatcher(bits->fut, future_complete, bits->cor); | ||||
167 |
} | ||||
168 |
|||||
169 |
|||||
170 |
void _Future_Await( | ||||
171 |
Future *fut | ||||
172 |
){ | ||||
173 |
future_bits bits; | ||||
174 |
bits.fut = fut; | ||||
175 |
bits.cor = Coroutine_GetActive(); | ||||
176 |
Task *my_task = current_task; | ||||
177 |
current_task = NULL; | ||||
178 |
my_task->awaiting_future = fut; | ||||
179 |
if (my_task->canceled){ | ||||
180 |
Future_SetResult(fut, true, my_task->cancel_value); | ||||
181 |
} | ||||
182 |
Coroutine_Yield(NULL, on_yield_for_future, &bits); | ||||
183 |
my_task->awaiting_future = NULL; | ||||
184 |
current_task = my_task; | ||||
185 |
} | ||||
186 |
|||||
187 |
|||||
188 |
void _Future_SetResult( | ||||
189 |
Future *fut, | ||||
190 |
bool canceled, | ||||
191 |
void *res | ||||
192 |
){ | ||||
7 months ago |
193 |
_Cor_Mutex_Lock(&fut->mutex); | |||
8 months ago |
37 |
194 |
if(fut->state == Future_State_Waiting){ | ||
195 |
fut->canceled = canceled; | ||||
196 |
fut->value = res; | ||||
197 |
_Future_Ready(fut); | ||||
198 |
} else { | ||||
7 months ago |
199 |
_Cor_Mutex_Unlock(&fut->mutex); | |||
8 months ago |
37 |
200 |
} | ||
201 |
} | ||||
202 |
|||||
203 |
|||||
204 |
bool Future_Await( | ||||
205 |
Future *fut, | ||||
206 |
void **res | ||||
207 |
){ | ||||
208 |
fut->vfptrs->await(fut); | ||||
209 |
if (res){ | ||||
210 |
*res = fut->value; | ||||
211 |
} | ||||
212 |
return fut->canceled; | ||||
213 |
} | ||||
214 |