]> git.ipfire.org Git - thirdparty/opentracker.git/commitdiff
introducing multithreaded full scrape creation.
authorerdgeist <>
Fri, 16 Nov 2007 00:23:42 +0000 (00:23 +0000)
committererdgeist <>
Fri, 16 Nov 2007 00:23:42 +0000 (00:23 +0000)
opentracker.c
ot_fullscrape.c
ot_fullscrape.h

index 501213de4587c88b986f5e5f37f396fe92586900..06be4fa6e05bd5c2958dddbc92b667bbbfb09de7 100644 (file)
@@ -193,15 +193,29 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
   size_t header_size, size = iovec_length( &iovec_entries, &iovector );
   tai6464 t;
 
+  /* No cookie? Bad socket. Leave. */
   if( !h ) {
     iovec_free( &iovec_entries, &iovector );
-    return;
+    HTTPERROR_500;
   }
+  
+  /* If this socket collected request in a buffer,
+     free it now */
   if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) {
     h->flag &= ~STRUCT_HTTP_FLAG_ARRAY_USED;
     array_reset( &h->request );
   }
 
+  /* If we came here, wait for the answer is over */
+  h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
+
+  /* Our answers never are 0 bytes. Return an error. */
+  if( !iovec_entries || !iovector[0].iov_len ) {
+    iovec_free( &iovec_entries, &iovector );
+    HTTPERROR_500;
+  }
+
+  /* Prepare space for http header */
   header = malloc( SUCCESS_HTTP_HEADER_LENGTH );
   if( !header ) {
     iovec_free( &iovec_entries, &iovector );
@@ -212,7 +226,7 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
 
   iob_reset( &h->batch );
   iob_addbuf_free( &h->batch, header, header_size );
-  
+
   /* Will move to ot_iovec.c */
   for( i=0; i<iovec_entries; ++i )
     iob_addbuf_munmap( &h->batch, iovector[i].iov_base, iovector[i].iov_len );
@@ -390,19 +404,15 @@ LOG_TO_STDERR( "sync: %d.%d.%d.%d\n", h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
 
     /* Full scrape... you might want to limit that */
     if( !byte_diff( data, 12, "scrape HTTP/" ) ) {
-      int iovec_entries = 0;
-      struct iovec * iovector = NULL;
-      reply_size = return_fullscrape_for_tracker( &iovec_entries, &iovector );
-
 LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
 #ifdef _DEBUG_HTTPERROR
 write( 2, debug_request, l );
 #endif
-      if( !reply_size ) HTTPERROR_500;
-
-      /* Stat keeping */
-      stats_issue_event( EVENT_FULLSCRAPE, 1, reply_size);
-      return sendiovecdata( s, iovec_entries, iovector );
+      /* Pass this task to the worker thread */
+      h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
+      fullscrape_deliver( s );
+      io_dontwantread( s );
+      return;
     }
 
 SCRAPE_WORKAROUND:
@@ -714,9 +724,8 @@ static void handle_timeouted( void ) {
 
 static void server_mainloop( ) {
   time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
-/* Later we will poll for finished tasks
   struct iovec *iovector;
-  int iovec_entries;*/
+  int iovec_entries;
 
   for( ; ; ) {
     int64 i;
@@ -733,9 +742,8 @@ static void server_mainloop( ) {
         handle_read( i );
     }
 
-/*  Later we will poll for finished tasks
     while( ( i = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 )
-      sendiovecdata( i, iovec_entries, iovector ); */
+      sendiovecdata( i, iovec_entries, iovector );
 
     while( ( i = io_canwrite( ) ) != -1 )
       handle_write( i );
@@ -835,6 +843,8 @@ int main( int argc, char **argv ) {
   if( trackerlogic_init( serverdir ) == -1 )
     panic( "Logic not started" );
 
+  fullscrape_init( );
+
   g_now = ot_start_time = time( NULL );
   alarm(5);
 
index 3c9540d35271fe685c47aa416955588c415a27b8..58e525f2a0f0c19b3da006cf53089da77712cc4a 100644 (file)
@@ -5,6 +5,7 @@
 #include <sys/uio.h>
 #include <stdio.h>
 #include <string.h>
+#include <pthread.h>
 
 /* Libowfat */
 
 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
 #define OT_FULLSCRAPE_MAXENTRYLEN 100
 
-size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ) {
+/* Forward declaration */
+static void fullscrape_make( int *iovec_entries, struct iovec **iovector );
+
+/* This is the entry point into this worker thread
+   It grabs tasks from mutex_tasklist and delivers results back
+*/
+static void * fullscrape_worker( void * args) {
+  int iovec_entries;
+  struct iovec *iovector;
+
+  args = args;
+
+  while( 1 ) {
+    ot_taskid taskid = mutex_workqueue_poptask( OT_TASKTYPE_FULLSCRAPE );
+    fullscrape_make( &iovec_entries, &iovector );
+    if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
+      iovec_free( &iovec_entries, &iovector );
+  }
+  return NULL;
+}
+
+void fullscrape_init( ) {
+  pthread_t thread_id;
+  pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
+}
+
+void fullscrape_deliver( int64 socket ) {
+  mutex_workqueue_pushtask( socket, OT_TASKTYPE_FULLSCRAPE );
+}
+
+static void fullscrape_make( int *iovec_entries, struct iovec **iovector ) {
   int    bucket;
   char  *r, *re;
 
   /* Setup return vector... */
   *iovec_entries = 0;
+  *iovector = NULL;
   if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
-    return 0;
+    return;
 
   /* ... and pointer to end of current output buffer.
      This works as a low watermark */
@@ -76,7 +108,7 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto
 
           /* Release lock on current bucket and return */
           mutex_bucket_unlock( bucket );
-          return 0;
+          return;
         }
         
         /* Adjust new end of output buffer */
@@ -93,7 +125,4 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto
 
   /* Release unused memory in current output buffer */
   iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
-
-  /* Return answer size */
-  return iovec_length( iovec_entries, iovector );
 }
index a33d0669e16f856acfba57b04f0b26de28201349..9ed4376c37182c7a4a393458326a9e8a19cada3b 100644 (file)
@@ -4,8 +4,9 @@
 #ifndef __OT_FULLSCRAPE_H__
 #define __OT_FULLSCRAPE_H__
 
-#include <sys/uio.h>
+#include <io.h>
 
-size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector );
+void fullscrape_init( );
+void fullscrape_deliver( int64 socket );
 
 #endif