1/*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) 1998 - 2016, Daniel Stenberg, <daniel@haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.haxx.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 ***************************************************************************/ 22/* <DESC> 23 * multi socket API usage together with with glib2 24 * </DESC> 25 */ 26/* Example application source code using the multi socket interface to 27 * download many files at once. 28 * 29 * Written by Jeff Pohlmeyer 30 31 Requires glib-2.x and a (POSIX?) system that has mkfifo(). 32 33 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c" 34 sample programs, adapted to use glib's g_io_channel in place of libevent. 35 36 When running, the program creates the named pipe "hiper.fifo" 37 38 Whenever there is input into the fifo, the program reads the input as a list 39 of URL's and creates some new easy handles to fetch each URL via the 40 curl_multi "hiper" API. 41 42 43 Thus, you can try a single URL: 44 % echo http://www.yahoo.com > hiper.fifo 45 46 Or a whole bunch of them: 47 % cat my-url-list > hiper.fifo 48 49 The fifo buffer is handled almost instantly, so you can even add more URL's 50 while the previous requests are still being downloaded. 51 52 This is purely a demo app, all retrieved data is simply discarded by the write 53 callback. 54 55*/ 56 57#include <glib.h> 58#include <sys/stat.h> 59#include <unistd.h> 60#include <fcntl.h> 61#include <stdlib.h> 62#include <stdio.h> 63#include <errno.h> 64#include <curl/curl.h> 65 66#define MSG_OUT g_print /* Change to "g_error" to write to stderr */ 67#define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */ 68#define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */ 69 70/* Global information, common to all connections */ 71typedef struct _GlobalInfo { 72 CURLM *multi; 73 guint timer_event; 74 int still_running; 75} GlobalInfo; 76 77/* Information associated with a specific easy handle */ 78typedef struct _ConnInfo { 79 CURL *easy; 80 char *url; 81 GlobalInfo *global; 82 char error[CURL_ERROR_SIZE]; 83} ConnInfo; 84 85/* Information associated with a specific socket */ 86typedef struct _SockInfo { 87 curl_socket_t sockfd; 88 CURL *easy; 89 int action; 90 long timeout; 91 GIOChannel *ch; 92 guint ev; 93 GlobalInfo *global; 94} SockInfo; 95 96/* Die if we get a bad CURLMcode somewhere */ 97static void mcode_or_die(const char *where, CURLMcode code) { 98 if(CURLM_OK != code) { 99 const char *s; 100 switch (code) { 101 case CURLM_BAD_HANDLE: s="CURLM_BAD_HANDLE"; break; 102 case CURLM_BAD_EASY_HANDLE: s="CURLM_BAD_EASY_HANDLE"; break; 103 case CURLM_OUT_OF_MEMORY: s="CURLM_OUT_OF_MEMORY"; break; 104 case CURLM_INTERNAL_ERROR: s="CURLM_INTERNAL_ERROR"; break; 105 case CURLM_BAD_SOCKET: s="CURLM_BAD_SOCKET"; break; 106 case CURLM_UNKNOWN_OPTION: s="CURLM_UNKNOWN_OPTION"; break; 107 case CURLM_LAST: s="CURLM_LAST"; break; 108 default: s="CURLM_unknown"; 109 } 110 MSG_OUT("ERROR: %s returns %s\n", where, s); 111 exit(code); 112 } 113} 114 115/* Check for completed transfers, and remove their easy handles */ 116static void check_multi_info(GlobalInfo *g) 117{ 118 char *eff_url; 119 CURLMsg *msg; 120 int msgs_left; 121 ConnInfo *conn; 122 CURL *easy; 123 CURLcode res; 124 125 MSG_OUT("REMAINING: %d\n", g->still_running); 126 while((msg = curl_multi_info_read(g->multi, &msgs_left))) { 127 if(msg->msg == CURLMSG_DONE) { 128 easy = msg->easy_handle; 129 res = msg->data.result; 130 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn); 131 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url); 132 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error); 133 curl_multi_remove_handle(g->multi, easy); 134 free(conn->url); 135 curl_easy_cleanup(easy); 136 free(conn); 137 } 138 } 139} 140 141/* Called by glib when our timeout expires */ 142static gboolean timer_cb(gpointer data) 143{ 144 GlobalInfo *g = (GlobalInfo *)data; 145 CURLMcode rc; 146 147 rc = curl_multi_socket_action(g->multi, 148 CURL_SOCKET_TIMEOUT, 0, &g->still_running); 149 mcode_or_die("timer_cb: curl_multi_socket_action", rc); 150 check_multi_info(g); 151 return FALSE; 152} 153 154/* Update the event timer after curl_multi library calls */ 155static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp) 156{ 157 struct timeval timeout; 158 GlobalInfo *g=(GlobalInfo *)userp; 159 timeout.tv_sec = timeout_ms/1000; 160 timeout.tv_usec = (timeout_ms%1000)*1000; 161 162 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n", 163 timeout_ms, timeout.tv_sec, timeout.tv_usec); 164 165 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g); 166 return 0; 167} 168 169/* Called by glib when we get action on a multi socket */ 170static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data) 171{ 172 GlobalInfo *g = (GlobalInfo*) data; 173 CURLMcode rc; 174 int fd=g_io_channel_unix_get_fd(ch); 175 176 int action = 177 (condition & G_IO_IN ? CURL_CSELECT_IN : 0) | 178 (condition & G_IO_OUT ? CURL_CSELECT_OUT : 0); 179 180 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running); 181 mcode_or_die("event_cb: curl_multi_socket_action", rc); 182 183 check_multi_info(g); 184 if(g->still_running) { 185 return TRUE; 186 } 187 else { 188 MSG_OUT("last transfer done, kill timeout\n"); 189 if(g->timer_event) { 190 g_source_remove(g->timer_event); 191 } 192 return FALSE; 193 } 194} 195 196/* Clean up the SockInfo structure */ 197static void remsock(SockInfo *f) 198{ 199 if(!f) { 200 return; 201 } 202 if(f->ev) { 203 g_source_remove(f->ev); 204 } 205 g_free(f); 206} 207 208/* Assign information to a SockInfo structure */ 209static void setsock(SockInfo*f, curl_socket_t s, CURL*e, int act, GlobalInfo*g) 210{ 211 GIOCondition kind = 212 (act&CURL_POLL_IN?G_IO_IN:0)|(act&CURL_POLL_OUT?G_IO_OUT:0); 213 214 f->sockfd = s; 215 f->action = act; 216 f->easy = e; 217 if(f->ev) { 218 g_source_remove(f->ev); 219 } 220 f->ev=g_io_add_watch(f->ch, kind, event_cb, g); 221} 222 223/* Initialize a new SockInfo structure */ 224static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g) 225{ 226 SockInfo *fdp = g_malloc0(sizeof(SockInfo)); 227 228 fdp->global = g; 229 fdp->ch=g_io_channel_unix_new(s); 230 setsock(fdp, s, easy, action, g); 231 curl_multi_assign(g->multi, s, fdp); 232} 233 234/* CURLMOPT_SOCKETFUNCTION */ 235static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) 236{ 237 GlobalInfo *g = (GlobalInfo*) cbp; 238 SockInfo *fdp = (SockInfo*) sockp; 239 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" }; 240 241 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]); 242 if(what == CURL_POLL_REMOVE) { 243 MSG_OUT("\n"); 244 remsock(fdp); 245 } 246 else { 247 if(!fdp) { 248 MSG_OUT("Adding data: %s%s\n", 249 what&CURL_POLL_IN?"READ":"", 250 what&CURL_POLL_OUT?"WRITE":""); 251 addsock(s, e, what, g); 252 } 253 else { 254 MSG_OUT( 255 "Changing action from %d to %d\n", fdp->action, what); 256 setsock(fdp, s, e, what, g); 257 } 258 } 259 return 0; 260} 261 262/* CURLOPT_WRITEFUNCTION */ 263static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data) 264{ 265 size_t realsize = size * nmemb; 266 ConnInfo *conn = (ConnInfo*) data; 267 (void)ptr; 268 (void)conn; 269 return realsize; 270} 271 272/* CURLOPT_PROGRESSFUNCTION */ 273static int prog_cb (void *p, double dltotal, double dlnow, double ult, 274 double uln) 275{ 276 ConnInfo *conn = (ConnInfo *)p; 277 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal); 278 return 0; 279} 280 281/* Create a new easy handle, and add it to the global curl_multi */ 282static void new_conn(char *url, GlobalInfo *g) 283{ 284 ConnInfo *conn; 285 CURLMcode rc; 286 287 conn = g_malloc0(sizeof(ConnInfo)); 288 conn->error[0]='\0'; 289 conn->easy = curl_easy_init(); 290 if(!conn->easy) { 291 MSG_OUT("curl_easy_init() failed, exiting!\n"); 292 exit(2); 293 } 294 conn->global = g; 295 conn->url = g_strdup(url); 296 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url); 297 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb); 298 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn); 299 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE); 300 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error); 301 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn); 302 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L); 303 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb); 304 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn); 305 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L); 306 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L); 307 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L); 308 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L); 309 310 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url); 311 rc =curl_multi_add_handle(g->multi, conn->easy); 312 mcode_or_die("new_conn: curl_multi_add_handle", rc); 313 314 /* note that the add_handle() will set a time-out to trigger very soon so 315 that the necessary socket_action() call will be called by this app */ 316} 317 318/* This gets called by glib whenever data is received from the fifo */ 319static gboolean fifo_cb (GIOChannel *ch, GIOCondition condition, gpointer data) 320{ 321#define BUF_SIZE 1024 322 gsize len, tp; 323 gchar *buf, *tmp, *all=NULL; 324 GIOStatus rv; 325 326 do { 327 GError *err=NULL; 328 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err); 329 if(buf) { 330 if(tp) { 331 buf[tp]='\0'; 332 } 333 new_conn(buf, (GlobalInfo*)data); 334 g_free(buf); 335 } 336 else { 337 buf = g_malloc(BUF_SIZE+1); 338 while(TRUE) { 339 buf[BUF_SIZE]='\0'; 340 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err); 341 if(len) { 342 buf[len]='\0'; 343 if(all) { 344 tmp=all; 345 all=g_strdup_printf("%s%s", tmp, buf); 346 g_free(tmp); 347 } 348 else { 349 all = g_strdup(buf); 350 } 351 } 352 else { 353 break; 354 } 355 } 356 if(all) { 357 new_conn(all, (GlobalInfo*)data); 358 g_free(all); 359 } 360 g_free(buf); 361 } 362 if(err) { 363 g_error("fifo_cb: %s", err->message); 364 g_free(err); 365 break; 366 } 367 } while((len) && (rv == G_IO_STATUS_NORMAL)); 368 return TRUE; 369} 370 371int init_fifo(void) 372{ 373 struct stat st; 374 const char *fifo = "hiper.fifo"; 375 int socket; 376 377 if(lstat (fifo, &st) == 0) { 378 if((st.st_mode & S_IFMT) == S_IFREG) { 379 errno = EEXIST; 380 perror("lstat"); 381 exit (1); 382 } 383 } 384 385 unlink (fifo); 386 if(mkfifo (fifo, 0600) == -1) { 387 perror("mkfifo"); 388 exit (1); 389 } 390 391 socket = open (fifo, O_RDWR | O_NONBLOCK, 0); 392 393 if(socket == -1) { 394 perror("open"); 395 exit (1); 396 } 397 MSG_OUT("Now, pipe some URL's into > %s\n", fifo); 398 399 return socket; 400} 401 402int main(int argc, char **argv) 403{ 404 GlobalInfo *g; 405 CURLMcode rc; 406 GMainLoop*gmain; 407 int fd; 408 GIOChannel* ch; 409 g=g_malloc0(sizeof(GlobalInfo)); 410 411 fd=init_fifo(); 412 ch=g_io_channel_unix_new(fd); 413 g_io_add_watch(ch, G_IO_IN, fifo_cb, g); 414 gmain=g_main_loop_new(NULL, FALSE); 415 g->multi = curl_multi_init(); 416 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb); 417 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g); 418 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb); 419 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g); 420 421 /* we don't call any curl_multi_socket*() function yet as we have no handles 422 added! */ 423 424 g_main_loop_run(gmain); 425 curl_multi_cleanup(g->multi); 426 return 0; 427} 428