Full scrapes usually are huge and one does not want to
allocate more memory. So lets get them in 512k units
*/
-#define OT_SCRAPE_CHUNK_SIZE (512*1024)
+#define OT_SCRAPE_CHUNK_SIZE (1024)
/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
-#define OT_FULLSCRAPE_MAXENTRYLEN 256
+#define OT_SCRAPE_MAXENTRYLEN 256
+
+#ifdef WANT_COMPRESSION_GZIP
+#define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK
+#define WANT_COMPRESSION_GZIP_PARAM( param1, param2 ) , param1, param2
+#else
+#define IF_COMPRESSION( TASK )
+#define WANT_COMPRESSION_GZIP_PARAM( param1, param2 )
+#endif
/* Forward declaration */
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
mutex_workqueue_pushtask( socket, tasktype );
}
+static int fullscrape_increase( int *iovec_entries, struct iovec **iovector,
+ char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode ) ) {
+ /* Allocate a fresh output buffer at the end of our buffers list */
+ if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) {
+
+ /* Deallocate gzip buffers */
+ IF_COMPRESSION( deflateEnd(strm); )
+
+ /* Release lock on current bucket and return */
+ return -1;
+ }
+
+ /* Adjust new end of output buffer */
+ *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
+
+ /* When compressing, we have all the bytes in output buffer */
+ IF_COMPRESSION( { \
+ *re -= OT_SCRAPE_MAXENTRYLEN; \
+ strm->next_out = (ot_byte*)*r; \
+ strm->avail_out = OT_SCRAPE_CHUNK_SIZE; \
+ deflate( strm, Z_NO_FLUSH ); \
+ *r = (char*)strm->next_out; \
+ } )
+
+ return 0;
+}
+
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
- int bucket;
- char *r, *re;
+ int bucket;
+ char *r, *re;
#ifdef WANT_COMPRESSION_GZIP
- char compress_buffer[OT_FULLSCRAPE_MAXENTRYLEN];
+ char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
z_stream strm;
#endif
if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
return;
- /* ... and pointer to end of current output buffer.
- This works as a low watermark */
- re = r + OT_SCRAPE_CHUNK_SIZE;
+ /* re points to low watermark */
+ re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
+ re += OT_SCRAPE_MAXENTRYLEN;
byte_zero( &strm, sizeof(strm) );
- strm.next_in = (ot_byte*)r;
+ strm.next_in = (ot_byte*)compress_buffer;
+ strm.next_out = (ot_byte*)r;
+ strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
fprintf( stderr, "not ok.\n" );
-
- strm.next_out = (unsigned char*)r;
- strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
r = compress_buffer;
}
#endif
- /* Reply dictionary only needed for bencoded fullscrape */
- if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
- memmove( r, "d5:filesd", 9 );
- r += 9;
- }
+ if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
+ r += sprintf( r, "d5:filesd" );
/* For each bucket... */
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
switch( mode & TASK_TASK_MASK ) {
case TASK_FULLSCRAPE:
default:
+
/* push hash as bencoded string */
*r++='2'; *r++='0'; *r++=':';
memmove( r, hash, 20 ); r+=20;
/* push rest of the scrape string */
r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
- break;
+
+ break;
case TASK_FULLSCRAPE_TPB_ASCII:
to_hex( r, *hash ); r+=40;
r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
}
#ifdef WANT_COMPRESSION_GZIP
- if( mode & TASK_FLAG_GZIP ) {
+ if( mode & TASK_FLAG_GZIP ) {
strm.next_in = (ot_byte*)compress_buffer;
strm.avail_in = r - compress_buffer;
- if( deflate( &strm, Z_NO_FLUSH ) != Z_OK )
- fprintf( stderr, "Not ok.\n" );
+ deflate( &strm, Z_NO_FLUSH );
r = (char*)strm.next_out;
}
#endif
- /* If we reached our low watermark in buffer... */
- if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) {
-
- /* crop current output buffer to the amount really used */
- iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
-
- /* And allocate a fresh output buffer at the end of our buffers list */
- if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) {
-
- /* If this fails: free buffers */
- iovec_free( iovec_entries, iovector );
-
-#ifdef WANT_COMPRESSION_GZIP
- deflateEnd(&strm);
-#endif
-
- /* Release lock on current bucket and return */
- mutex_bucket_unlock( bucket );
- return;
- }
-
- /* Adjust new end of output buffer */
- re = r + OT_SCRAPE_CHUNK_SIZE;
+ /* Check if there still is enough buffer left */
+ while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) )
+ return mutex_bucket_unlock( bucket );
-#ifdef WANT_COMPRESSION_GZIP
- if( mode & TASK_FLAG_GZIP ) {
- strm.next_out = (ot_byte*)r;
- strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
- }
-#endif
- }
-#ifdef WANT_COMPRESSION_GZIP
- if( mode & TASK_FLAG_GZIP ) {
- r = compress_buffer;
- }
-#endif
+ IF_COMPRESSION( r = compress_buffer; )
}
-
+
/* All torrents done: release lock on currenct bucket */
mutex_bucket_unlock( bucket );
}
- /* Close bencoded scrape dictionary if necessary */
- if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
- *r++='e'; *r++='e';
- }
+ if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
+ r += sprintf( r, "ee" );
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
- strm.next_in = (ot_byte*) compress_buffer;
+ strm.next_in = (ot_byte*)compress_buffer;
strm.avail_in = r - compress_buffer;
- if( deflate( &strm, Z_FINISH ) != Z_STREAM_END )
- fprintf( stderr, "Not ok.\n" );
+ deflate( &strm, Z_FINISH );
r = (char*)strm.next_out;
- deflateEnd(&strm);
+
+ while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) )
+ return mutex_bucket_unlock( bucket );
+ deflateEnd(&strm);
}
#endif
/* Release unused memory in current output buffer */
- iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
+ iovec_fixlast( iovec_entries, iovector, r );
}