]> git.ipfire.org Git - thirdparty/tor.git/commitdiff
Add code for end-to-end zlib compression. Still needs flow-control
authorNick Mathewson <nickm@torproject.org>
Mon, 17 Mar 2003 02:42:45 +0000 (02:42 +0000)
committerNick Mathewson <nickm@torproject.org>
Mon, 17 Mar 2003 02:42:45 +0000 (02:42 +0000)
svn:r187

TODO
src/or/buffers.c
src/or/connection.c
src/or/connection_ap.c
src/or/connection_exit.c
src/or/or.h

diff --git a/TODO b/TODO
index 79f1b3be58fe0e9a3e05dc1029d96b412c9e1312..5b11ee8398401037b8b790a89369b11b11bf5baf 100644 (file)
--- a/TODO
+++ b/TODO
@@ -19,8 +19,8 @@ ARMA    - arma claims
                 o Implement topics
                 - Rotate circuits after N minutes?
                 - Circuits should expire when circuit->expire triggers
-NICK            - Handle half-open connections
-NICK    - On the fly compression of each stream
+NICK            . Handle half-open connections
+NICK    . On the fly compression of each stream
         o Clean up the event loop (optimize and sanitize)
         - Exit policies
                 - Path selection algorithms
@@ -88,7 +88,7 @@ SPEC!!          - Handle socks commands other than connect, eg, bind?
                         - Keep track of load over links/nodes, to
                           know who's hosed
 NICK    - Daemonize and package
-                - Teach it to fork and background
+                o Teach it to fork and background
                 - Red Hat spec file
                 - Debian spec file equivalent
                 
index 424e2fe9e669ca05dc356de06e794813cc07ddc6..61706fcad7482e83d3643534e4018c6120bb4731 100644 (file)
@@ -136,6 +136,74 @@ int write_to_buf(char *string, int string_len,
 
 }
 
+#ifdef USE_ZLIB
+int compress_from_buf(char *string, int string_len, 
+                     char **buf_in, int *buflen_in, int *buf_datalen_in,
+                     z_stream *zstream, int flush) {
+  int err;
+
+  if (!*buf_datalen_in)
+    return 0;
+
+  zstream->next_in = *buf_in;
+  zstream->avail_in = *buf_datalen_in;
+  zstream->next_out = string;
+  zstream->avail_out = string_len;
+  
+  err = deflate(zstream, flush);
+
+  switch (err) 
+    {
+    case Z_OK:
+    case Z_STREAM_END:
+      memmove(*buf_in, zstream->next_in, zstream->avail_in);
+      *buf_datalen_in = zstream->avail_in;
+      return string_len - zstream->avail_out;
+    case Z_STREAM_ERROR:
+    case Z_BUF_ERROR:
+      log(LOG_ERR, "Error processing compression: %s", zstream->msg);
+      return -1;
+    default:
+      log(LOG_ERR, "Unknown return value from deflate: %d", err);
+      return -1;
+  }
+}
+
+int decompress_buf_to_buf(char **buf_in, int *buflen_in, int *buf_datalen_in,
+                         char **buf_out, int *buflen_out, int *buf_datalen_out,
+                         z_stream *zstream, int flush) 
+{
+  int err;
+
+  zstream->next_in = *buf_in;
+  zstream->avail_in = *buf_datalen_in;
+  zstream->next_out = *buf_out + *buf_datalen_out;
+  zstream->avail_out = *buflen_out - *buf_datalen_out;
+  
+  if (!zstream->avail_in && !zstream->avail_out)
+    return 0;
+  
+  err = inflate(zstream, flush);
+
+  switch (err) 
+    {
+    case Z_OK:
+    case Z_STREAM_END:
+      memmove(*buf_in, zstream->next_in, zstream->avail_in);
+      *buf_datalen_in = zstream->avail_in;
+      *buf_datalen_out = *buflen_out - zstream->avail_out;
+      return 1;
+    case Z_STREAM_ERROR:
+    case Z_BUF_ERROR:
+      log(LOG_ERR, "Error processing compression: %s", zstream->msg);
+      return 1;
+    default:
+      log(LOG_ERR, "Unknown return value from deflate: %d", err);
+      return -1;
+    }
+}
+#endif
+
 int fetch_from_buf(char *string, int string_len,
                    char **buf, int *buflen, int *buf_datalen) {
 
index 4108aa2a2313aacdfc829e40c3826a1ca3d48584..23743cecdf7aed5fd5a73cfbf4391d9cf34b1329 100644 (file)
@@ -126,6 +126,29 @@ connection_t *connection_new(int type) {
   if(type == CONN_TYPE_OR) {
     directory_set_dirty();
   }
+#ifdef USE_ZLIB
+  if (type == CONN_TYPE_AP || type == CONN_TYPE_EXIT)  {
+    if (buf_new(&conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen) < 0)
+      return NULL;
+    if (! (conn->compression = malloc(sizeof(z_stream))))
+      return NULL;
+    if (! (conn->decompression = malloc(sizeof(z_stream))))
+      return NULL;
+    memset(conn->compression, 0, sizeof(z_stream));
+    memset(conn->decompression, 0, sizeof(z_stream));
+    if (deflateInit(conn->compression, Z_DEFAULT_COMPRESSION) != Z_OK) {
+      log(LOG_ERR, "Error initializing zlib: %s", conn->compression->msg);
+      return NULL;
+    }
+    if (inflateInit(conn->decompression) != Z_OK) {
+      log(LOG_ERR, "Error initializing zlib: %s", conn->decompression->msg);
+      return NULL;
+    }
+  } else {
+    conn->compression = conn->decompression = NULL;
+  }
+  conn->done_sending = conn->done_receiving = 0
+#endif
   return conn;
 }
 
@@ -156,6 +179,19 @@ void connection_free(connection_t *conn) {
   if(conn->type == CONN_TYPE_OR) {
     directory_set_dirty();
   }
+#ifdef USE_ZLIB
+  if (conn->compression) {
+    if (inflateEnd(conn->decompression) != Z_OK)
+      log(LOG_ERR,"connection_free(): while closing zlib: %s",
+         conn->decompression->msg);
+    if (deflateEnd(conn->compression) != Z_OK)
+      log(LOG_ERR,"connection_free(): while closing zlib: %s",
+         conn->compression->msg);
+    free(conn->compression);
+    free(conn->decompression);
+    buf_free(conn->z_outbuf);
+  }
+#endif
   free(conn);
 }
 
@@ -337,6 +373,49 @@ int connection_fetch_from_buf(char *string, int len, connection_t *conn) {
   return fetch_from_buf(string, len, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen);
 }
 
+#ifdef USE_ZLIB
+int connection_compress_from_buf(char *string, int len, connection_t *conn,
+                                int flush) {
+  return compress_from_buf(string, len,
+                          &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen,
+                          conn->compression, flush);
+}
+
+int connection_decompress_to_buf(char *string, int len, connection_t *conn,
+                                int flush) {
+  /* This is not sane with respect to flow control; we want to spool out to 
+   * z_outbuf, but only decompress and write as needed.
+   */
+  int n;
+  struct timeval now;
+
+  if (write_to_buf(string, len, 
+          &conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen) < 0)
+    return -1;
+  
+  n = decompress_buf_to_buf(
+          &conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen,
+          &conn->outbuf, &conn->outbuflen, &conn->outbuf_datalen,
+          conn->decompression, flush);
+
+  if (n < 0)
+    return -1;
+
+  if(gettimeofday(&now,NULL) < 0)
+    return -1;
+  
+  if(!n)
+    return 0;
+
+  if(conn->marked_for_close)
+    return 0;
+
+  conn->timestamp_lastwritten = now.tv_sec;
+
+  return n;
+}
+#endif
+
 int connection_find_on_inbuf(char *string, int len, connection_t *conn) {
   return find_on_inbuf(string, len, conn->inbuf, conn->inbuf_datalen);
 }
@@ -607,7 +686,7 @@ int connection_process_inbuf(connection_t *conn) {
 }
 
 int connection_package_raw_inbuf(connection_t *conn) {
-  int amount_to_process;
+  int amount_to_process, len;
   cell_t cell;
   circuit_t *circ;
 
@@ -618,13 +697,27 @@ int connection_package_raw_inbuf(connection_t *conn) {
 repeat_connection_package_raw_inbuf:
 
   amount_to_process = conn->inbuf_datalen;
-
+  
   if(!amount_to_process)
     return 0;
 
   /* Initialize the cell with 0's */
   memset(&cell, 0, sizeof(cell_t));
 
+#ifdef USE_ZLIB
+  /* This compression logic is not necessarily optimal:
+   *    1) Maybe we should try to read as much as we can onto the inbuf before
+   *       compressing.
+   *    2) 
+   */
+  len = connection_compress_from_buf(cell.payload + TOPIC_HEADER_SIZE,
+                                    CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE,
+                                    conn, Z_SYNC_FLUSH);
+  if (len < 0)
+    return -1;
+
+  cell.length = len;    
+#else 
   if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) {
     cell.length = CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE;
   } else {
@@ -633,6 +726,7 @@ repeat_connection_package_raw_inbuf:
 
   if(connection_fetch_from_buf(cell.payload+TOPIC_HEADER_SIZE, cell.length, conn) < 0)
     return -1;
+#endif
 
   circ = circuit_get_by_conn(conn);
   if(!circ) {
@@ -677,7 +771,7 @@ repeat_connection_package_raw_inbuf:
     }
     log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at AP is %d",conn->p_receive_topicwindow);
   }
-  if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) {
+  if (conn->inbuf_datalen) {
     log(LOG_DEBUG,"connection_package_raw_inbuf(): recursing.");
     goto repeat_connection_package_raw_inbuf;
   }
index 9d28d5a4a90730014948e4497de1646d72b6727e..339c141378982a66894297885c7c2559aaa90d7c 100644 (file)
@@ -418,11 +418,21 @@ int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) {
       }
       log(LOG_DEBUG,"connection_ap_process_data_cell(): willing to receive %d more cells from circ",conn->n_receive_topicwindow);
 
+#ifdef USE_ZLIB
+      if(connection_decompress_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+                                     cell->length - TOPIC_HEADER_SIZE, 
+                                     conn, Z_SYNC_FLUSH) < 0) {
+        log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
+        conn->marked_for_close = 1;
+        return 0;
+      }
+#else
       if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
                                  cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
         conn->marked_for_close = 1;
         return 0;
       }
+#endif
       if(connection_consider_sending_sendme(conn, EDGE_AP) < 0)
         conn->marked_for_close = 1;
       return 0;
@@ -440,6 +450,12 @@ int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) {
       }
       for(prevconn = circ->p_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
       prevconn->next_topic = conn->next_topic;
+#endif
+#if 0
+      conn->done_sending = 1;
+      shutdown(conn->s, 1); /* XXX check return; refactor NM */
+      if (conn->done_receiving)
+       conn->marked_for_close = 1;
 #endif
       conn->marked_for_close = 1;
       break;
index fb0daadc4905651028843082867799de5148a547..857dfe8845e308d857d45ee28995dead1f6dd6de 100644 (file)
@@ -217,12 +217,22 @@ int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) {
         log(LOG_DEBUG,"connection_exit_process_data_cell(): data received while resolving/connecting. Queueing.");
       }
       log(LOG_DEBUG,"connection_exit_process_data_cell(): put %d bytes on outbuf.",cell->length - TOPIC_HEADER_SIZE);
+#ifdef USE_ZLIB
+      if(connection_decompress_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+                                     cell->length - TOPIC_HEADER_SIZE, 
+                                     conn, Z_SYNC_FLUSH) < 0) {
+        log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
+        conn->marked_for_close = 1;
+        return 0;
+      }
+#else
       if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
                                  cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
         log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
         conn->marked_for_close = 1;
         return 0;
       }
+#endif
       if(connection_consider_sending_sendme(conn, EDGE_EXIT) < 0)
         conn->marked_for_close = 1;
       return 0;
@@ -241,6 +251,13 @@ int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) {
       for(prevconn = circ->n_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
       prevconn->next_topic = conn->next_topic;
 #endif
+#if 0
+      conn->done_sending = 1;
+      shutdown(conn->s, 1); /* XXX check return; refactor NM */
+      if (conn->done_receiving)
+       conn->marked_for_close = 1;
+#endif
+       
       conn->marked_for_close = 1;
       break;
     case TOPIC_COMMAND_CONNECTED:
index 13a24203aa4d49c57513705f886873e83f8f9a06..a35fb26b272c7ca82759c7d819417664c8d28c19 100644 (file)
@@ -36,6 +36,9 @@
 #include <errno.h>
 #include <assert.h>
 #include <time.h>
+#ifdef USE_ZLIB
+#include <zlib.h>
+#endif
 
 #include "../common/crypto.h"
 #include "../common/log.h"
 #define CONFIG_TYPE_INT     2
 #define CONFIG_TYPE_LONG    3
 #define CONFIG_TYPE_DOUBLE  4
+#define CONFIG_TYPE_BOOL    5
 
 #define CONFIG_LINE_MAXLEN 1024
 
@@ -254,24 +258,39 @@ struct connection_t {
   uint16_t port;
 
 /* used by exit and ap: */
-
   uint16_t topic_id;
   struct connection_t *next_topic;
   int n_receive_topicwindow;
   int p_receive_topicwindow;
+  int done_sending;
+  int done_receiving;
+#ifdef USE_ZLIB
+  char *z_outbuf;
+  int z_outbuflen;
+  int z_outbuf_datalen;
+
+  z_stream *compression;
+  z_stream *decompression;
+#endif
 
+/* Used by ap: */
   char socks_version; 
   char read_username;
 
+/* Used by exit and ap: */
   char *dest_addr;
   uint16_t dest_port; /* host order */
 
+/* Used by ap: */
   char dest_tmp[512];
   int dest_tmplen;
   
+/* Used by everyone */
   char *address; /* strdup into this, because free_connection frees it */
+/* Used for cell connections */
   crypto_pk_env_t *pkey; /* public RSA key for the other side */
 
+/* Used while negotiating OR/OR connections */
   char nonce[8];
  
 };
@@ -383,6 +402,7 @@ typedef struct {
    char *RouterFile;
    char *PrivateKeyFile;
    double CoinWeight;
+   int Daemon;
    int ORPort;
    int OPPort;
    int APPort;
@@ -421,12 +441,29 @@ int write_to_buf(char *string, int string_len,
    * return total number of bytes on the buf
    */
 
+
 int fetch_from_buf(char *string, int string_len,
                    char **buf, int *buflen, int *buf_datalen);
   /* if there is string_len bytes in buf, write them onto string,
    * then memmove buf back (that is, remove them from buf)
    */
 
+#ifdef USE_ZLIB
+int compress_from_buf(char *string, int string_len, 
+                     char **buf_in, int *buflen_in, int *buf_datalen_in,
+                     z_stream *zstream, int flush);
+  /* read and compress as many characters as possible from buf, writing up to
+   * string_len of them onto string, then memmove buf back.  Return number of
+   * characters written.
+   */
+
+int decompress_buf_to_buf(char **buf_in, int *buflen_in, int *buf_datalen_in,
+                         char **buf_out, int *buflen_out, int *buf_datalen_out,
+                         z_stream *zstream, int flush);
+  /* XXX document this NM
+   */
+#endif
+
 int find_on_inbuf(char *string, int string_len,
                   char *buf, int buf_datalen);
   /* find first instance of needle 'string' on haystack 'buf'. return how
@@ -529,6 +566,13 @@ int connection_read_to_buf(connection_t *conn);
 
 int connection_fetch_from_buf(char *string, int len, connection_t *conn);
 
+#ifdef USE_ZLIB
+int connection_compress_from_buf(char *string, int len, connection_t *conn,
+                                int flush);
+int connection_decompress_to_buf(char *string, int len, connection_t *conn,
+                                int flush);
+#endif
+
 int connection_outbuf_too_full(connection_t *conn);
 int connection_find_on_inbuf(char *string, int len, connection_t *conn);
 int connection_wants_to_flush(connection_t *conn);