Line data Source code
1 : /* SPDX-License-Identifier: MIT OR GPL-3.0-only */
2 : /* event.c
3 : ** strophe XMPP client library -- event loop and management
4 : **
5 : ** Copyright (C) 2005-2009 Collecta, Inc.
6 : **
7 : ** This software is provided AS-IS with no warranty, either express
8 : ** or implied.
9 : **
10 : ** This program is dual licensed under the MIT or GPLv3 licenses.
11 : */
12 :
13 : /** @file
14 : * Event loop and management.
15 : */
16 :
17 : /** @defgroup EventLoop Event loop
18 : * These functions manage the Strophe event loop.
19 : *
20 : * Simple tools can use xmpp_run() and xmpp_stop() to manage the life
21 : * cycle of the program. A common idiom is to set up a few initial
22 : * event handers, call xmpp_run(), and then respond and react to
23 : * events as they come in. At some point, one of the handlers will
24 : * call xmpp_stop() to quit the event loop which leads to the program
25 : * terminating.
26 : *
27 : * More complex programs will have their own event loops, and should
28 : * ensure that xmpp_run_once() is called regularly from there. For
29 : * example, a GUI program will already include an event loop to
30 : * process UI events from users, and xmpp_run_once() would be called
31 : * from an idle function.
32 : */
33 :
34 : #include <stdio.h>
35 : #include <stdlib.h>
36 : #include <string.h>
37 :
38 : #ifndef _WIN32
39 : #include <sys/select.h>
40 : #include <errno.h>
41 : #include <unistd.h>
42 : #define _sleep(x) usleep((x)*1000)
43 : #else
44 : #include <winsock2.h>
45 : #ifndef ETIMEDOUT
46 : #define ETIMEDOUT WSAETIMEDOUT
47 : #endif
48 : #ifndef ECONNRESET
49 : #define ECONNRESET WSAECONNRESET
50 : #endif
51 : #ifndef ECONNABORTED
52 : #define ECONNABORTED WSAECONNABORTED
53 : #endif
54 : #define _sleep(x) Sleep(x)
55 : #endif
56 :
57 : #include "strophe.h"
58 : #include "common.h"
59 : #include "parser.h"
60 :
61 : #ifndef STROPHE_MESSAGE_BUFFER_SIZE
62 : /** Max buffer size for receiving messages. */
63 : #define STROPHE_MESSAGE_BUFFER_SIZE 4096
64 : #endif
65 :
66 0 : static int _connect_next(xmpp_conn_t *conn)
67 : {
68 0 : sock_close(conn->sock);
69 0 : conn->sock = sock_connect(conn->xsock);
70 0 : if (conn->sock == INVALID_SOCKET)
71 : return -1;
72 :
73 0 : conn->timeout_stamp = time_stamp();
74 :
75 0 : return 0;
76 : }
77 :
78 : /** Run the event loop once.
79 : * This function will run send any data that has been queued by
80 : * xmpp_send and related functions and run through the Strophe even
81 : * loop a single time, and will not wait more than timeout
82 : * milliseconds for events. This is provided to support integration
83 : * with event loops outside the library, and if used, should be
84 : * called regularly to achieve low latency event handling.
85 : *
86 : * @param ctx a Strophe context object
87 : * @param timeout time to wait for events in milliseconds
88 : *
89 : * @ingroup EventLoop
90 : */
91 0 : void xmpp_run_once(xmpp_ctx_t *ctx, unsigned long timeout)
92 : {
93 0 : xmpp_connlist_t *connitem;
94 0 : xmpp_conn_t *conn;
95 0 : struct conn_interface *intf;
96 0 : fd_set rfds, wfds;
97 0 : sock_t max = 0;
98 0 : int ret;
99 0 : struct timeval tv;
100 0 : xmpp_send_queue_t *sq, *tsq;
101 0 : int towrite;
102 0 : char buf[STROPHE_MESSAGE_BUFFER_SIZE];
103 0 : uint64_t next;
104 0 : uint64_t usec;
105 0 : int tls_read_bytes = 0;
106 :
107 0 : if (ctx->loop_status == XMPP_LOOP_QUIT)
108 0 : return;
109 :
110 : /* send queued data */
111 0 : connitem = ctx->connlist;
112 0 : while (connitem) {
113 0 : conn = connitem->conn;
114 0 : if (conn->state != XMPP_STATE_CONNECTED) {
115 0 : connitem = connitem->next;
116 0 : continue;
117 : }
118 0 : intf = &conn->intf;
119 :
120 : /* if we're running tls, there may be some remaining data waiting to
121 : * be sent, so push that out */
122 0 : if (conn->tls) {
123 0 : ret = tls_clear_pending_write(intf);
124 :
125 0 : if (ret < 0 && !tls_is_recoverable(intf, tls_error(intf))) {
126 : /* an error occurred */
127 0 : strophe_debug(
128 : ctx, "xmpp",
129 : "Send error of pending data occurred, disconnecting.");
130 0 : conn->error = ECONNABORTED;
131 0 : conn_disconnect(conn);
132 0 : goto next_item;
133 : }
134 : }
135 :
136 : /* write all data from the send queue to the socket */
137 0 : sq = conn->send_queue_head;
138 0 : while (sq) {
139 0 : towrite = sq->len - sq->written;
140 :
141 0 : ret = conn_interface_write(intf, &sq->data[sq->written], towrite);
142 0 : if (ret > 0 && ret < towrite)
143 0 : sq->written += ret; /* not all data could be sent now */
144 0 : sq->wip = 1;
145 0 : if (ret != towrite)
146 : break; /* partial write or an error */
147 :
148 : /* all data for this queue item written, delete and move on */
149 0 : strophe_debug(conn->ctx, "conn", "SENT: %s", sq->data);
150 0 : strophe_debug_verbose(1, ctx, "xmpp", "Q_SENT: %p", sq);
151 0 : tsq = sq;
152 0 : sq = sq->next;
153 0 : conn->send_queue_len--;
154 0 : if (tsq->owner & XMPP_QUEUE_USER)
155 0 : conn->send_queue_user_len--;
156 0 : if (!(tsq->owner & XMPP_QUEUE_SM) && conn->sm_state->sm_enabled) {
157 0 : tsq->sm_h = conn->sm_state->sm_sent_nr;
158 0 : conn->sm_state->sm_sent_nr++;
159 0 : strophe_debug_verbose(1, ctx, "xmpp", "SM_Q_MOVE: %p, h=%lu",
160 : tsq, tsq->sm_h);
161 0 : add_queue_back(&conn->sm_state->sm_queue, tsq);
162 : tsq = NULL;
163 : }
164 : if (tsq) {
165 0 : strophe_debug_verbose(2, ctx, "xmpp", "Q_FREE: %p", tsq);
166 0 : strophe_debug_verbose(3, ctx, "conn", "Q_CONTENT: %s",
167 : tsq->data);
168 0 : strophe_free(ctx, tsq->data);
169 0 : strophe_free(ctx, tsq);
170 : }
171 :
172 : /* pop the top item */
173 0 : conn->send_queue_head = sq;
174 : /* if we've sent everything update the tail */
175 0 : if (!sq)
176 0 : conn->send_queue_tail = NULL;
177 : }
178 0 : intf->flush(intf);
179 :
180 : /* tear down connection on error */
181 0 : if (conn->error) {
182 : /* FIXME: need to tear down send queues and random other things
183 : * maybe this should be abstracted */
184 0 : strophe_debug(ctx, "xmpp", "Send error occurred, disconnecting.");
185 0 : conn->error = ECONNABORTED;
186 0 : conn_disconnect(conn);
187 : }
188 0 : next_item:
189 0 : connitem = connitem->next;
190 : }
191 :
192 : /* reset parsers if needed */
193 0 : for (connitem = ctx->connlist; connitem; connitem = connitem->next) {
194 0 : if (connitem->conn->reset_parser)
195 0 : conn_parser_reset(connitem->conn);
196 : }
197 :
198 : /* fire any ready timed handlers, then make sure we don't wait past
199 : the time when timed handlers need to be called */
200 0 : next = handler_fire_timed(ctx);
201 :
202 0 : usec = ((next < timeout) ? next : timeout) * 1000;
203 0 : tv.tv_sec = (long)(usec / 1000000);
204 0 : tv.tv_usec = (long)(usec % 1000000);
205 :
206 0 : FD_ZERO(&rfds);
207 0 : FD_ZERO(&wfds);
208 :
209 : /* find events to watch */
210 0 : connitem = ctx->connlist;
211 0 : while (connitem) {
212 0 : conn = connitem->conn;
213 0 : intf = &conn->intf;
214 :
215 0 : switch (conn->state) {
216 0 : case XMPP_STATE_CONNECTING:
217 : /* connect has been called and we're waiting for it to complete */
218 : /* connection will give us write or error events */
219 :
220 : /* make sure the timeout hasn't expired */
221 0 : if (time_elapsed(conn->timeout_stamp, time_stamp()) <=
222 0 : conn->connect_timeout)
223 0 : FD_SET(conn->sock, &wfds);
224 : else {
225 0 : strophe_info(ctx, "xmpp", "Connection attempt timed out.");
226 0 : ret = _connect_next(conn);
227 0 : if (ret != 0) {
228 0 : conn->error = ETIMEDOUT;
229 0 : conn_disconnect(conn);
230 : } else {
231 0 : FD_SET(conn->sock, &wfds);
232 : }
233 : }
234 : break;
235 0 : case XMPP_STATE_CONNECTED:
236 0 : FD_SET(conn->sock, &rfds);
237 0 : if (conn->send_queue_len > 0)
238 0 : FD_SET(conn->sock, &wfds);
239 : break;
240 : case XMPP_STATE_DISCONNECTED:
241 : /* do nothing */
242 : default:
243 : break;
244 : }
245 :
246 : /* Check if there is something in the SSL buffer. */
247 0 : if (conn->tls)
248 0 : tls_read_bytes += tls_pending(intf);
249 :
250 0 : if (conn->state != XMPP_STATE_DISCONNECTED && conn->sock > max)
251 0 : max = conn->sock;
252 :
253 0 : connitem = connitem->next;
254 : }
255 :
256 : /* check for events */
257 0 : if (max > 0)
258 0 : ret = select(max + 1, &rfds, &wfds, NULL, &tv);
259 : else {
260 0 : if (timeout > 0)
261 0 : _sleep(timeout);
262 0 : return;
263 : }
264 :
265 : /* select errored */
266 0 : if (ret < 0) {
267 0 : if (!sock_is_recoverable(NULL, sock_error(NULL)))
268 0 : strophe_error(ctx, "xmpp", "event watcher internal error %d",
269 : sock_error(NULL));
270 0 : return;
271 : }
272 :
273 : /* no events happened */
274 0 : if (ret == 0 && tls_read_bytes == 0)
275 : return;
276 :
277 : /* process events */
278 0 : connitem = ctx->connlist;
279 0 : while (connitem) {
280 0 : conn = connitem->conn;
281 0 : intf = &conn->intf;
282 :
283 0 : switch (conn->state) {
284 0 : case XMPP_STATE_CONNECTING:
285 0 : if (FD_ISSET(conn->sock, &wfds)) {
286 : /* connection complete */
287 :
288 : /* check for error */
289 0 : ret = sock_connect_error(conn->sock);
290 0 : if (ret != 0) {
291 : /* connection failed */
292 0 : strophe_debug(ctx, "xmpp", "connection failed, error %d",
293 : ret);
294 0 : ret = _connect_next(conn);
295 0 : if (ret != 0) {
296 0 : conn->error = ret;
297 0 : conn_disconnect(conn);
298 : }
299 : break;
300 : }
301 :
302 0 : conn->state = XMPP_STATE_CONNECTED;
303 0 : strophe_debug(ctx, "xmpp", "connection successful");
304 0 : conn_established(conn);
305 : }
306 :
307 : break;
308 0 : case XMPP_STATE_CONNECTED:
309 0 : if (FD_ISSET(conn->sock, &rfds) || intf->pending(intf)) {
310 :
311 0 : ret = intf->read(intf, buf, STROPHE_MESSAGE_BUFFER_SIZE);
312 :
313 0 : if (ret > 0) {
314 0 : ret = parser_feed(conn->parser, buf, ret);
315 0 : if (!ret) {
316 0 : strophe_debug(ctx, "xmpp", "parse error [%s]", buf);
317 0 : xmpp_send_error(conn, XMPP_SE_INVALID_XML,
318 : "parse error");
319 : }
320 : } else {
321 0 : int err = intf->get_error(intf);
322 0 : if (!intf->error_is_recoverable(intf, err)) {
323 0 : strophe_debug(ctx, "xmpp", "Unrecoverable error: %d.",
324 : err);
325 0 : conn->error = err;
326 0 : conn_disconnect(conn);
327 0 : } else if (!conn->tls) {
328 : /* return of 0 means socket closed by server */
329 0 : strophe_debug(ctx, "xmpp",
330 : "Socket closed by remote host.");
331 0 : conn->error = ECONNRESET;
332 0 : conn_disconnect(conn);
333 : }
334 : }
335 : }
336 :
337 : break;
338 : case XMPP_STATE_DISCONNECTED:
339 : /* do nothing */
340 : default:
341 : break;
342 : }
343 :
344 0 : connitem = connitem->next;
345 : }
346 :
347 : /* fire any ready handlers */
348 0 : handler_fire_timed(ctx);
349 : }
350 :
351 : /** Start the event loop.
352 : * This function continuously calls xmpp_run_once and does not return
353 : * until xmpp_stop has been called.
354 : *
355 : * @param ctx a Strophe context object
356 : *
357 : * @ingroup EventLoop
358 : */
359 0 : void xmpp_run(xmpp_ctx_t *ctx)
360 : {
361 0 : if (ctx->loop_status != XMPP_LOOP_NOTSTARTED)
362 : return;
363 :
364 0 : ctx->loop_status = XMPP_LOOP_RUNNING;
365 0 : while (ctx->loop_status == XMPP_LOOP_RUNNING) {
366 0 : xmpp_run_once(ctx, ctx->timeout);
367 : }
368 :
369 : /* make it possible to start event loop again */
370 0 : ctx->loop_status = XMPP_LOOP_NOTSTARTED;
371 :
372 0 : strophe_debug(ctx, "event", "Event loop completed.");
373 : }
374 :
375 : /** Stop the event loop.
376 : * This will stop the event loop after the current iteration and cause
377 : * xmpp_run to exit.
378 : *
379 : * @param ctx a Strophe context object
380 : *
381 : * @ingroup EventLoop
382 : */
383 0 : void xmpp_stop(xmpp_ctx_t *ctx)
384 : {
385 0 : strophe_debug(ctx, "event", "Stopping event loop.");
386 :
387 0 : if (ctx->loop_status == XMPP_LOOP_RUNNING)
388 0 : ctx->loop_status = XMPP_LOOP_QUIT;
389 0 : }
390 :
391 : /** Set the timeout to use when calling xmpp_run().
392 : *
393 : * @param ctx a Strophe context object
394 : * @param timeout the time to wait for events in milliseconds
395 : *
396 : * @ingroup EventLoop
397 : */
398 0 : void xmpp_ctx_set_timeout(xmpp_ctx_t *ctx, unsigned long timeout)
399 : {
400 0 : ctx->timeout = timeout;
401 0 : }
|