diff src/c/http.c @ 1979:81bc76aa4acd

Merge in upstream changes.
author Patrick Hurst <phurst@mit.edu>
date Sat, 18 Jan 2014 18:26:24 -0500
parents 6b80900ddc66
children c93fbd139732
line wrap: on
line diff
--- a/src/c/http.c	Mon Dec 09 20:41:24 2013 -0500
+++ b/src/c/http.c	Sat Jan 18 18:26:24 2014 -0500
@@ -21,7 +21,7 @@
 extern uw_app uw_application;
 
 int uw_backlog = SOMAXCONN;
-static int keepalive = 0;
+static int keepalive = 0, quiet = 0;
 
 static char *get_header(void *data, const char *h) {
   char *s = data;
@@ -62,16 +62,18 @@
 }
 
 static void log_debug(void *data, const char *fmt, ...) {
-  va_list ap;
-  va_start(ap, fmt);
+  if (!quiet) {
+    va_list ap;
+    va_start(ap, fmt);
 
-  vprintf(fmt, ap);
+    vprintf(fmt, ap);
+  }
 }
 
 static void *worker(void *data) {
   int me = *(int *)data;
   uw_context ctx = uw_request_new_context(me, &uw_application, NULL, log_error, log_debug);
-  size_t buf_size = 2;
+  size_t buf_size = 1024;
   char *buf = malloc(buf_size), *back = buf;
   uw_request_context rc = uw_new_request_context();
   int sock = 0;
@@ -82,7 +84,8 @@
       sock = uw_dequeue();
     }
 
-    printf("Handling connection with thread #%d.\n", me);
+    if (!quiet)
+      printf("Handling connection with thread #%d.\n", me);
 
     while (1) {
       int r;
@@ -96,26 +99,32 @@
         buf = new_buf;
       }
 
-      r = recv(sock, back, buf_size - 1 - (back - buf), 0);
+      *back = 0;
+      body = strstr(buf, "\r\n\r\n");
+      if (body == NULL) {
+        r = recv(sock, back, buf_size - 1 - (back - buf), 0);
 
-      if (r < 0) {
-        fprintf(stderr, "Recv failed\n");
-        close(sock);
-        sock = 0;
-        break;
+        if (r < 0) {
+          if (!quiet)
+            fprintf(stderr, "Recv failed\n");
+          close(sock);
+          sock = 0;
+          break;
+        }
+
+        if (r == 0) {
+          if (!quiet)
+            printf("Connection closed.\n");
+          close(sock);
+          sock = 0;
+          break;
+        }
+
+        back += r;
+        *back = 0;
       }
 
-      if (r == 0) {
-        printf("Connection closed.\n");
-        close(sock);
-        sock = 0;
-        break;
-      }
-
-      back += r;
-      *back = 0;
-
-      if ((body = strstr(buf, "\r\n\r\n"))) {
+      if (body != NULL || (body = strstr(buf, "\r\n\r\n"))) {
         request_result rr;
         int should_keepalive = 0;
 
@@ -148,14 +157,16 @@
             r = recv(sock, back, buf_size - 1 - (back - buf), 0);
 
             if (r < 0) {
-              fprintf(stderr, "Recv failed\n");
+              if (!quiet)
+                fprintf(stderr, "Recv failed\n");
               close(sock);
               sock = 0;
               goto done;
             }
 
             if (r == 0) {
-              fprintf(stderr, "Connection closed.\n");
+              if (!quiet)
+                fprintf(stderr, "Connection closed.\n");
               close(sock);
               sock = 0;
               goto done;
@@ -206,6 +217,11 @@
 
         s = headers;
         while ((s2 = strchr(s, '\r'))) {
+          if (s2 == s) {
+            *s = 0;
+            break;
+          }
+
           s = s2;
 
           if (s[1] == 0)
@@ -218,15 +234,14 @@
         uw_set_headers(ctx, get_header, headers);
         uw_set_env(ctx, get_env, NULL);
 
-        printf("Serving URI %s....\n", path);
+        if (!quiet)
+          printf("Serving URI %s....\n", path);
         rr = uw_request(rc, ctx, method, path, query_string, body, back - body,
                         on_success, on_failure,
                         NULL, log_error, log_debug,
                         sock, uw_really_send, close);
 
         if (rr != KEEP_OPEN) {
-          char clen[100];
-
           if (keepalive) {
             char *connection = uw_Basis_requestHeader(ctx, "Connection");
 
@@ -236,8 +251,13 @@
           if (!should_keepalive)
             uw_write_header(ctx, "Connection: close\r\n");
 
-          sprintf(clen, "Content-length: %d\r\n", uw_pagelen(ctx));
-          uw_write_header(ctx, clen);
+          if (!uw_has_contentLength(ctx)) {
+            char clen[100];
+
+            sprintf(clen, "Content-length: %d\r\n", uw_pagelen(ctx));
+            uw_write_header(ctx, clen);
+          }
+
           uw_send(ctx, sock);
         }
 
@@ -246,13 +266,25 @@
             // In case any other requests are queued up, shift
             // unprocessed part of buffer to front.
             int kept = back - after;
-            memmove(buf, after, kept);
-            back = buf + kept;
+
+            if (kept == 0) {
+              // No pipelining going on here.
+              // We'd might as well try to switch to a different connection,
+              // while we wait for more input on this one.
+              uw_enqueue(sock);
+              sock = 0;
+            } else {
+              // More input!  Move it to the front and continue in this loop.
+              memmove(buf, after, kept);
+              back = buf + kept;
+            }
           } else {
             close(sock);
             sock = 0;
           }
-        } else if (rr != KEEP_OPEN)
+        } else if (rr == KEEP_OPEN)
+          sock = 0;
+        else
           fprintf(stderr, "Illegal uw_request return code: %d\n", rr);
 
         break;
@@ -267,7 +299,7 @@
 }
 
 static void help(char *cmd) {
-  printf("Usage: %s [-p <port>] [-a <IP address>] [-t <thread count>] [-k]\nThe '-k' option turns on HTTP keepalive.\n", cmd);
+  printf("Usage: %s [-p <port>] [-a <IP address>] [-t <thread count>] [-k] [-q]\nThe '-k' option turns on HTTP keepalive.\nThe '-q' option turns off some chatter on stdout.\n", cmd);
 }
 
 static void sigint(int signum) {
@@ -291,10 +323,10 @@
   my_addr.sin_addr.s_addr = INADDR_ANY; // auto-fill with my IP
   memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero);
 
-  while ((opt = getopt(argc, argv, "hp:a:t:k")) != -1) {
+  while ((opt = getopt(argc, argv, "hp:a:t:kq")) != -1) {
     switch (opt) {
     case '?':
-      fprintf(stderr, "Unknown command-line option");
+      fprintf(stderr, "Unknown command-line option\n");
       help(argv[0]);
       return 1;
 
@@ -332,6 +364,10 @@
       keepalive = 1;
       break;
 
+    case 'q':
+      quiet = 1;
+      break;
+
     default:
       fprintf(stderr, "Unexpected getopt() behavior\n");
       return 1;
@@ -369,7 +405,8 @@
 
   sin_size = sizeof their_addr;
 
-  printf("Listening on port %d....\n", uw_port);
+  if (!quiet)
+    printf("Listening on port %d....\n", uw_port);
 
   {
     pthread_t thread;
@@ -393,18 +430,19 @@
     int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
 
     if (new_fd < 0) {
-      fprintf(stderr, "Socket accept failed\n");
-      return 1;
+      if (!quiet)
+        fprintf(stderr, "Socket accept failed\n");
+    } else {
+      if (!quiet)
+        printf("Accepted connection.\n");
+
+      if (keepalive) {
+        int flag = 1; 
+        setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
+      }
+
+      uw_enqueue(new_fd);
     }
-
-    printf("Accepted connection.\n");
-
-    if (keepalive) {
-      int flag = 1; 
-      setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
-    }
-
-    uw_enqueue(new_fd);
   }
 }
 
@@ -419,10 +457,11 @@
 }
 
 void uw_do_expunge(uw_context ctx, uw_Basis_client cli, void *data) {
-  if (uw_get_app(ctx)->db_begin(ctx))
-    uw_error(ctx, FATAL, "Error running SQL BEGIN");
+  uw_ensure_transaction(ctx);
   uw_get_app(ctx)->expunger(ctx, cli);
-  uw_commit(ctx);
+
+  if (uw_commit(ctx))
+    uw_error(ctx, UNLIMITED_RETRY, "Rerunning expunge transaction");
 }
 
 void uw_post_expunge(uw_context ctx, void *data) {