]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyReader.cc
- Added basic support for asynchronous calls. The calls are
[thirdparty/squid.git] / src / BodyReader.cc
1
2
3 #include "squid.h"
4 #include "MemBuf.h"
5 #include "BodyReader.h"
6
7 BodyReader::BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d) :
8 _remaining(len), _available(0),
9 read_func(r), abort_func(a), kick_func(k), read_func_data(d),
10 read_callback(NULL), read_callback_data(NULL)
11 {
12 theBuf.init(4096, 65536);
13 debugs(32,3,HERE << this << " " << "created new BodyReader for content-length " << len);
14 bytes_read = 0;
15 }
16
17 BodyReader::~BodyReader()
18 {
19 if (_remaining && abort_func)
20 abort_func(read_func_data, _remaining);
21
22 if (callbackPending())
23 doCallback();
24
25 }
26
27 void
28 BodyReader::read(CBCB *callback, void *cbdata)
29 {
30 assert(_remaining || theBuf.contentSize());
31 debugs(32,3,HERE << this << " " << "remaining = " << _remaining);
32 debugs(32,3,HERE << this << " " << "available = " << _available);
33
34 if (read_callback == NULL) {
35 read_callback = callback;
36 read_callback_data = cbdataReference(cbdata);
37 } else {
38 assert(read_callback == callback);
39 assert(read_callback_data == cbdata);
40 }
41
42 if ((_available == 0) && (theBuf.contentSize() == 0)) {
43 debugs(32,3,HERE << this << " " << "read: no body data available, saving callback pointers");
44
45 if (kick_func)
46 kick_func(read_func_data);
47
48 return;
49 }
50
51 debugs(32,3,HERE << this << " " << "read_func=" << read_func);
52 debugs(32,3,HERE << this << " " << "data=" << read_func_data);
53 size_t size = theBuf.potentialSpaceSize();
54
55 debugs(32, 3, "BodyReader::read: available: " << _available << ", size " << size << ", remaining: " << _remaining);
56
57 if (size > _available)
58 size = _available;
59
60 if (size > _remaining)
61 size = _remaining;
62
63 if (size > 0) {
64 debugs(32,3,HERE << this << " " << "calling read_func for " << size << " bytes");
65
66 size_t nread = read_func(read_func_data, theBuf, size);
67
68 if (nread > 0) {
69 _available -= nread;
70 reduce_remaining(nread);
71 } else {
72 debugs(32,3,HERE << this << " " << "Help, read_func() ret " << nread);
73 }
74 }
75
76 if (theBuf.contentSize() > 0) {
77 debugs(32,3,HERE << this << " have " << theBuf.contentSize() << " bytes in theBuf, calling back");
78 doCallback();
79 }
80 }
81
82 void
83 BodyReader::notify(size_t now_available)
84 {
85 debugs(32,3,HERE << this << " " << "old available = " << _available);
86 debugs(32,3,HERE << this << " " << "now_available = " << now_available);
87 _available = now_available;
88
89 if (!callbackPending()) {
90 debugs(32,3,HERE << this << " " << "no callback pending, nothing to do");
91 return;
92 }
93
94 debugs(32,3,HERE << this << " " << "have data and pending callback, calling read()");
95
96 read(read_callback, read_callback_data);
97 }
98
99 bool
100 BodyReader::callbackPending()
101 {
102 return read_callback ? true : false;
103 }
104
105 /*
106 * doCallback
107 *
108 * Execute the read callback if there is a function registered
109 * and the read_callback_data is still valid.
110 */
111 bool
112 BodyReader::doCallback()
113 {
114 CBCB *t_callback = read_callback;
115 void *t_cbdata;
116
117 if (t_callback == NULL)
118 return false;
119
120 read_callback = NULL;
121
122 if (!cbdataReferenceValidDone(read_callback_data, &t_cbdata))
123 return false;
124
125 debugs(32,3,HERE << this << " doing callback, theBuf size = " << theBuf.contentSize());
126
127 t_callback(theBuf, t_cbdata);
128
129 return true;
130 }
131
132 bool
133 BodyReader::consume(size_t size)
134 {
135 debugs(32,3,HERE << this << " BodyReader::consume consuming " << size);
136
137 if (theBuf.contentSize() < (mb_size_t) size) {
138 debugs(0,0,HERE << this << "BodyReader::consume failed");
139 debugs(0,0,HERE << this << "BodyReader::consume size = " << size);
140 debugs(0,0,HERE << this << "BodyReader::consume contentSize() = " << theBuf.contentSize());
141 return false;
142 }
143
144 theBuf.consume(size);
145
146 if (callbackPending() && _available > 0) {
147 debugs(32,3,HERE << this << " " << "data avail and pending callback, calling read()");
148 read(read_callback, read_callback_data);
149 }
150
151 return true;
152 }
153
154 void
155 BodyReader::reduce_remaining(size_t size)
156 {
157 assert(size <= _remaining);
158 _remaining -= size;
159 }