229 lines5.3 KB
1#include "asleep.h"
2#include "task.h"
3#include <assert.h>
4#include <stdio.h>
5#include <stdlib.h>
6#include <errno.h>
7#include <math.h>
8#include <string.h>
9#include "cor_platform.h"
10
11
12typedef struct Loop_Timer {
13 int64_t when;
14 Future *who;
15} Loop_Timer;
16
17typedef struct _ASleep {
18 _Cor_Mutex mutex;
19 unsigned users;
20 Loop_Timer *sleepers;
21 int lo;
22 int hi;
23 int max;
24 _Cor_Semaphore sem;
25 bool sleeploop_cancel;
26 pthread_t sleeploop;
27 bool started;
28} _ASleep;
29
30static _ASleep asleep;
31
32static void *ASleep_Sleeper(void *param);
33
34static void ASleep_ctor(
35 _ASleep *me
36){
37 int r;
38
39 me->sleepers = NULL;
40 me->lo = 0;
41 me->hi = 0;
42 me->max = 0;
43
44 _Cor_Mutex_ctor(&me->mutex);
45 _Cor_Semaphore_ctor(&me->sem);
46 me->sleeploop_cancel = false;
47 r = pthread_create(&me->sleeploop, NULL, ASleep_Sleeper, me);
48 assert(r == 0);
49 me->started = true;
50}
51
52
53
54static void ASleep_dtor(
55 _ASleep *me
56){
57 int r;
58 me->started = false;
59 me->sleeploop_cancel = true;
60 _Cor_Semaphore_Signal(&me->sem);
61 r = pthread_join(me->sleeploop, NULL);
62 assert(r == 0);
63 _Cor_Sempahore_dtor(&me->sem);
64 _Cor_Mutex_dtor(&me->mutex);
65 free(me->sleepers);
66}
67
68
69void ASleep_StartSystem(){
70 ASleep_ctor(&asleep);
71}
72
73
74void ASleep_StopSystem(){
75 ASleep_dtor(&asleep);
76}
77
78
79static void ASleep_AddSleeper(
80 _ASleep *me,
81 int64_t when,
82 Future *who
83){
84 _Cor_Mutex_Lock(&me->mutex);
85
86 int nsleepers;
87 if (me->lo <= me->hi){
88 int nsleepers = me->hi - me->lo;
89 // ensure there's always an empty slot so that lo == hi always means no sleepers
90 if (nsleepers+1 >= me->max) {
91 me->max = (me->max == 0) ? 4 : me->max * 2;
92 me->sleepers = realloc(me->sleepers, me->max * sizeof(Loop_Timer));
93 assert(me->sleepers);
94 }
95 } else {
96 nsleepers = me->hi + me->max - me->lo;
97 // ensure there's always an empty slot so that lo == hi always means no sleepers
98 if (nsleepers+1 >= me->max) {
99 int oldmax = me->max;
100 me->max = (me->max == 0) ? 4 : me->max * 2;
101 me->sleepers = realloc(me->sleepers, me->max * sizeof(Loop_Timer));
102 assert(me->sleepers);
103 memmove(&me->sleepers[me->lo], &me->lo + me->max - oldmax, sizeof(*me->sleepers) * oldmax - me->lo);
104 me->lo += me->max - oldmax;
105 }
106 }
107
108 // insertion sort - we're assuming there's not going to be many sleepers active at once
109 int i;
110 int nexti;
111 for (i = me->hi; i != me->lo; i = nexti){
112 nexti = i-1;
113 if (i < 0){
114 nexti += me->max;
115 }
116 if (me->sleepers[nexti].when <= when){
117 break;
118 }
119 me->sleepers[i] = me->sleepers[nexti];
120 }
121 me->sleepers[i].when = when;
122 me->sleepers[i].who = who;
123 me->hi += 1;
124 if (me->hi >= me->max){
125 me->hi -= me->max;
126 }
127
128 _Cor_Mutex_Unlock(&me->mutex);
129 _Cor_Semaphore_Signal(&me->sem);
130}
131
132
133static void ASleep_RemoveSleeper(
134 _ASleep *me,
135 int64_t when,
136 Future *who
137){
138 _Cor_Mutex_Lock(&me->mutex);
139
140 int i;
141 for (i = me->lo; i != me->hi; ){
142 if (me->sleepers[i].when == when && me->sleepers[i].who == who){
143 int previ = i;
144 i += 1;
145 if (i >= me->max){
146 i -= me->max;
147 }
148 for (; i != me->hi; ){
149 me->sleepers[previ] = me->sleepers[i];
150 previ = i;
151 i += 1;
152 if (i >= me->max){
153 i -= me->max;
154 }
155 }
156 me->hi = previ;
157 break;
158 }
159 i += 1;
160 if (i >= me->max){
161 i -= me->max;
162 }
163 }
164
165 _Cor_Mutex_Unlock(&me->mutex);
166 _Cor_Semaphore_Signal(&me->sem);
167}
168
169
170static void *ASleep_Sleeper(
171 void *param
172){
173 _ASleep *me = (_ASleep *)param;
174
175 while (!me->sleeploop_cancel){
176 _Cor_Mutex_Lock(&me->mutex);
177 bool got_one;
178 if (me->lo != me->hi){
179 int64_t when = me->sleepers[0].when;
180 _Cor_Mutex_Unlock(&me->mutex);
181 got_one = _Cor_Semaphore_Wait(&me->sem, when);
182 } else {
183 _Cor_Mutex_Unlock(&me->mutex);
184 got_one = _Cor_Semaphore_Wait(&me->sem, -1);
185 }
186 if (!got_one){
187 // timed out, so...
188 // issue any due timers
189 int64_t now = _Cor_Realtime_Now();
190 while(me->lo != me->hi && me->sleepers[me->lo].when <= now) {
191 Future *fut = me->sleepers[me->lo].who;
192 me->lo += 1;
193 if (me->lo >= me->max){
194 me->lo -= me->max;
195 }
196 _Cor_Mutex_Unlock(&me->mutex);
197 Future_SetResult(fut, false, NULL);
198 _Cor_Mutex_Lock(&me->mutex);
199 now = _Cor_Realtime_Now();
200 }
201 _Cor_Mutex_Unlock(&me->mutex);
202 }
203 }
204
205 return NULL;
206}
207
208
209// value is a pointer to a float delay in seconds
210bool ASleep(
211 float delay,
212 void **value
213){
214 assert(current_task && asleep.started);
215 if (delay < 0){
216 delay = 0;
217 }
218 int64_t endtime = _Cor_Realtime_Now() + (int64_t)(delay * 1000000000);
219
220 Future fut;
221 Future_ctor(&fut);
222 ASleep_AddSleeper(&asleep, endtime, &fut);
223 bool res = Future_Await(&fut, value);
224 ASleep_RemoveSleeper(&asleep, endtime, &fut);
225 Future_dtor(&fut);
226
227 return res;
228}
229