234 lines4.4 KB
1#include "future.h"
2#include "coroutine.h"
3#include "task.h"
4#include <assert.h>
5#include <stdio.h>
6#include <stdlib.h>
7#include <time.h>
8#include <errno.h>
9#include <math.h>
10#include <string.h>
11#include "timespec_utils.h"
12
13
14Future_vfptrs_t Future_vfptrs = {
15 &Future_dtor,
16 &_Future_Await,
17 &_Future_SetResult
18};
19
20typedef struct Future_WatcherSpec {
21 Future_Watcher watcher;
22 void *me;
23} Future_WatcherSpec;
24
25void Future_ctor
26(
27 Future *fut
28){
29 fut->vfptrs = &Future_vfptrs;
30 _Cor_Mutex_ctor(&fut->mutex);
31 fut->state = Future_State_Waiting;
32 fut->value = NULL;
33 fut->canceled = false;
34 fut->watchers = NULL;
35 fut->nwatchers = 0;
36 fut->maxwatchers = 0;
37}
38
39
40Future *
41Future_New(
42 void
43){
44 Future *fut = malloc(sizeof(Future));
45 Future_ctor(fut);
46 return fut;
47}
48
49
50void
51Future_dtor(
52 Future *fut
53){
54 _Cor_Mutex_Lock(&fut->mutex);
55 assert(fut->nwatchers == 0);
56 free(fut->watchers);
57 _Cor_Mutex_Unlock(&fut->mutex);
58 _Cor_Mutex_dtor(&fut->mutex);
59}
60
61
62void
63Future_Delete(
64 Future *fut
65){
66 fut->vfptrs->dtor(fut);
67 free(fut);
68}
69
70
71void
72Future_AddWatcher(
73 Future *fut,
74 Future_Watcher watcher,
75 void *watcher_me
76){
77 _Cor_Mutex_Lock(&fut->mutex);
78 if (fut->state == Future_State_Done) {
79 _Cor_Mutex_Unlock(&fut->mutex);
80 watcher(watcher_me, fut);
81 } else {
82 if (fut->nwatchers >= fut->maxwatchers) {
83 fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
84 fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
85 assert(fut->watchers);
86 }
87 fut->watchers[fut->nwatchers].watcher = watcher;
88 fut->watchers[fut->nwatchers].me = watcher_me;
89 fut->nwatchers++;
90 _Cor_Mutex_Unlock(&fut->mutex);
91 }
92}
93
94
95void
96Future_RemoveWatcher(
97 Future *fut,
98 Future_Watcher watcher,
99 void *watcher_me
100){
101 _Cor_Mutex_Lock(&fut->mutex);
102 for (int i = 0; i < fut->nwatchers; i++) {
103 if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
104 fut->nwatchers--;
105 if (i < fut->nwatchers) {
106 fut->watchers[i] = fut->watchers[fut->nwatchers];
107 }
108 break;
109 }
110 }
111 _Cor_Mutex_Unlock(&fut->mutex);
112}
113
114
115static void
116_Future_Ready(
117 Future *fut
118){
119 fut->state = Future_State_Done;
120
121 // Take note of watchers list, and reset it
122 Future_WatcherSpec *watchers = fut->watchers;
123 int nwatchers = fut->nwatchers;
124 fut->watchers = NULL;
125 fut->nwatchers = 0;
126 _Cor_Mutex_Unlock(&fut->mutex);
127
128 // notify those watchers
129 for (int i = 0; i < nwatchers; i++) {
130 watchers[i].watcher(watchers[i].me, fut);
131 }
132 free(watchers);
133}
134
135
136void
137Future_SetResult(
138 Future *fut,
139 bool canceled,
140 void *value
141){
142 fut->vfptrs->set_result(fut, canceled, value);
143}
144
145
146Coroutine_Err
147Future_GetResult(
148 Future *fut,
149 void **res
150){
151 if(fut->state != Future_State_Done){
152 return Coroutine_Err_WrongState;
153 }
154 if (res){
155 *res = fut->value;
156 }
157 return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
158}
159
160
161typedef struct future_bits {
162 Future *fut;
163 Coroutine *cor;
164} future_bits;
165
166
167static void
168future_complete(
169 void *me,
170 Future *fut
171){
172 (void)fut;
173 Coroutine *cor = (Coroutine *)me;
174 Coroutine_NS(Continue)(cor, NULL, false);
175}
176
177
178static void
179on_yield_for_future(
180 void *me
181){
182 future_bits *bits = (future_bits *)me;
183 Future_AddWatcher(bits->fut, future_complete, bits->cor);
184}
185
186
187void
188_Future_Await(
189 Future *fut
190){
191 future_bits bits;
192 bits.fut = fut;
193 bits.cor = Coroutine_NS(GetActive)();
194 Task *my_task = current_task;
195 current_task = NULL;
196 my_task->awaiting_future = fut;
197 if (my_task->canceled){
198 Future_SetResult(fut, true, my_task->cancel_value);
199 }
200 Coroutine_NS(Yield)(NULL, on_yield_for_future, &bits);
201 my_task->awaiting_future = NULL;
202 current_task = my_task;
203}
204
205
206void
207_Future_SetResult(
208 Future *fut,
209 bool canceled,
210 void *res
211){
212 _Cor_Mutex_Lock(&fut->mutex);
213 if(fut->state == Future_State_Waiting){
214 fut->canceled = canceled;
215 fut->value = res;
216 _Future_Ready(fut);
217 } else {
218 _Cor_Mutex_Unlock(&fut->mutex);
219 }
220}
221
222
223Coroutine_Err
224Future_Await(
225 Future *fut,
226 void **res
227){
228 fut->vfptrs->await(fut);
229 if (res){
230 *res = fut->value;
231 }
232 return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
233}
234