216 lines4.4 KB
1#include "future.h"
2#include "coroutine.h"
3#include "task.h"
4#include <assert.h>
5#include <stdio.h>
6#include <stdlib.h>
7#include <time.h>
8#include <errno.h>
9#include <math.h>
10#include <string.h>
11#include "timespec_utils.h"
12
13
14Future_vfptrs_t Future_vfptrs = {
15 &Future_dtor,
16 &_Future_Await,
17 &_Future_SetResult
18};
19
20typedef struct Future_WatcherSpec {
21 Future_Watcher watcher;
22 void *me;
23} Future_WatcherSpec;
24
25void Future_ctor(Future *fut){
26 fut->vfptrs = &Future_vfptrs;
27 _Cor_Mutex_ctor(&fut->mutex);
28 fut->state = Future_State_Waiting;
29 fut->value = NULL;
30 fut->canceled = false;
31 fut->watchers = NULL;
32 fut->nwatchers = 0;
33 fut->maxwatchers = 0;
34}
35
36
37Future *Future_New(){
38 Future *fut = malloc(sizeof(Future));
39 Future_ctor(fut);
40 return fut;
41}
42
43
44void Future_dtor(
45 Future *fut
46){
47 _Cor_Mutex_Lock(&fut->mutex);
48 assert(fut->nwatchers == 0);
49 free(fut->watchers);
50 _Cor_Mutex_Unlock(&fut->mutex);
51 _Cor_Mutex_dtor(&fut->mutex);
52}
53
54
55void Future_Delete(
56 Future *fut
57){
58 fut->vfptrs->dtor(fut);
59 free(fut);
60}
61
62
63void Future_AddWatcher(
64 Future *fut,
65 Future_Watcher watcher,
66 void *watcher_me
67){
68 _Cor_Mutex_Lock(&fut->mutex);
69 if (fut->state == Future_State_Done) {
70 _Cor_Mutex_Unlock(&fut->mutex);
71 watcher(watcher_me, fut);
72 } else {
73 if (fut->nwatchers >= fut->maxwatchers) {
74 fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
75 fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
76 assert(fut->watchers);
77 }
78 fut->watchers[fut->nwatchers].watcher = watcher;
79 fut->watchers[fut->nwatchers].me = watcher_me;
80 fut->nwatchers++;
81 _Cor_Mutex_Unlock(&fut->mutex);
82 }
83}
84
85
86void Future_RemoveWatcher(
87 Future *fut,
88 Future_Watcher watcher,
89 void *watcher_me
90){
91 _Cor_Mutex_Lock(&fut->mutex);
92 for (int i = 0; i < fut->nwatchers; i++) {
93 if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
94 fut->nwatchers--;
95 if (i < fut->nwatchers) {
96 fut->watchers[i] = fut->watchers[fut->nwatchers];
97 }
98 break;
99 }
100 }
101 _Cor_Mutex_Unlock(&fut->mutex);
102}
103
104
105static void _Future_Ready(
106 Future *fut
107){
108 fut->state = Future_State_Done;
109
110 // Take note of watchers list, and reset it
111 Future_WatcherSpec *watchers = fut->watchers;
112 int nwatchers = fut->nwatchers;
113 fut->watchers = NULL;
114 fut->nwatchers = 0;
115 _Cor_Mutex_Unlock(&fut->mutex);
116
117 // notify those watchers
118 for (int i = 0; i < nwatchers; i++) {
119 watchers[i].watcher(watchers[i].me, fut);
120 }
121 free(watchers);
122}
123
124
125void Future_SetResult(
126 Future *fut,
127 bool canceled,
128 void *value
129){
130 fut->vfptrs->set_result(fut, canceled, value);
131}
132
133
134Coroutine_Err Future_GetResult(
135 Future *fut,
136 void **res
137){
138 if(fut->state != Future_State_Done){
139 return Coroutine_Err_WrongState;
140 }
141 if (res){
142 *res = fut->value;
143 }
144 return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
145}
146
147
148typedef struct future_bits {
149 Future *fut;
150 Coroutine *cor;
151} future_bits;
152
153
154static void future_complete(
155 void *me,
156 Future *fut
157){
158 (void)fut;
159 Coroutine *cor = (Coroutine *)me;
160 Coroutine_Continue(cor, NULL, false);
161}
162
163
164static void on_yield_for_future(
165 void *me
166){
167 future_bits *bits = (future_bits *)me;
168 Future_AddWatcher(bits->fut, future_complete, bits->cor);
169}
170
171
172void _Future_Await(
173 Future *fut
174){
175 future_bits bits;
176 bits.fut = fut;
177 bits.cor = Coroutine_GetActive();
178 Task *my_task = current_task;
179 current_task = NULL;
180 my_task->awaiting_future = fut;
181 if (my_task->canceled){
182 Future_SetResult(fut, true, my_task->cancel_value);
183 }
184 Coroutine_Yield(NULL, on_yield_for_future, &bits);
185 my_task->awaiting_future = NULL;
186 current_task = my_task;
187}
188
189
190void _Future_SetResult(
191 Future *fut,
192 bool canceled,
193 void *res
194){
195 _Cor_Mutex_Lock(&fut->mutex);
196 if(fut->state == Future_State_Waiting){
197 fut->canceled = canceled;
198 fut->value = res;
199 _Future_Ready(fut);
200 } else {
201 _Cor_Mutex_Unlock(&fut->mutex);
202 }
203}
204
205
206Coroutine_Err Future_Await(
207 Future *fut,
208 void **res
209){
210 fut->vfptrs->await(fut);
211 if (res){
212 *res = fut->value;
213 }
214 return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
215}
216