$Id$ */
/* System */
+#include <arpa/inet.h>
+#include <ctype.h>
+#include <errno.h>
+#include <pthread.h>
+#include <pwd.h>
+#include <signal.h>
#include <stdint.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
-#include <errno.h>
-#include <signal.h>
-#include <stdio.h>
-#include <pwd.h>
-#include <ctype.h>
-#include <pthread.h>
/* Libowfat */
-#include "socket.h"
+#include "byte.h"
#include "io.h"
#include "iob.h"
-#include "byte.h"
-#include "scan.h"
#include "ip6.h"
#include "ndelay.h"
+#include "scan.h"
+#include "socket.h"
/* Opentracker */
-#include "trackerlogic.h"
-#include "ot_vector.h"
#include "ot_mutex.h"
#include "ot_stats.h"
+#include "ot_vector.h"
+#include "trackerlogic.h"
#ifndef WANT_SYNC_LIVE
#define WANT_SYNC_LIVE
ot_ip6 g_serverip;
uint16_t g_serverport = 9009;
uint32_t g_tracker_id;
-char groupip_1[4] = { 224,0,23,5 };
+char groupip_1[4] = {224, 0, 23, 5};
int g_self_pipe[2];
/* If you have more than 10 peers, don't use this proxy
Use 20 slots for 10 peers to have room for 10 incoming connection slots
*/
-#define MAX_PEERS 20
+#define MAX_PEERS 20
-#define LIVESYNC_INCOMING_BUFFSIZE (256*256)
-#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256)
+#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256)
+#define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256)
-#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
-#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
-#define LIVESYNC_MAXDELAY 15 /* seconds */
+#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
+#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
+#define LIVESYNC_MAXDELAY 15 /* seconds */
/* The amount of time a complete sync cycle should take */
-#define OT_SYNC_INTERVAL_MINUTES 2
+#define OT_SYNC_INTERVAL_MINUTES 2
/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
-#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
+#define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT))
enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
enum { FLAG_SERVERSOCKET = 1 };
static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
static ot_time g_next_packet_time;
-static void * livesync_worker( void * args );
-static void * streamsync_worker( void * args );
-static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer );
+static void *livesync_worker(void *args);
+static void *streamsync_worker(void *args);
+static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer);
-void exerr( char * message ) {
- fprintf( stderr, "%s\n", message );
- exit( 111 );
+void exerr(char *message) {
+ fprintf(stderr, "%s\n", message);
+ exit(111);
}
-void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) {
- (void) event;
- (void) proto;
- (void) event_data;
+void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) {
+ (void)event;
+ (void)proto;
+ (void)event_data;
}
-void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
- char tmpip[4] = {0,0,0,0};
+void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
+ char tmpip[4] = {0, 0, 0, 0};
char *v4ip;
- if( !ip6_isv4mapped(ip))
+ if (!ip6_isv4mapped(ip))
exerr("v6 mcast support not yet available.");
- v4ip = ip+12;
+ v4ip = ip + 12;
- if( g_socket_in != -1 )
+ if (g_socket_in != -1)
exerr("Error: Livesync listen ip specified twice.");
- if( ( g_socket_in = socket_udp4( )) < 0)
- exerr("Error: Cant create live sync incoming socket." );
+ if ((g_socket_in = socket_udp4()) < 0)
+ exerr("Error: Cant create live sync incoming socket.");
ndelay_off(g_socket_in);
- if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
- exerr("Error: Cant bind live sync incoming socket." );
+ if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
+ exerr("Error: Cant bind live sync incoming socket.");
- if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) )
+ if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
exerr("Error: Cant make live sync incoming socket join mcast group.");
- if( ( g_socket_out = socket_udp4()) < 0)
- exerr("Error: Cant create live sync outgoing socket." );
- if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 )
- exerr("Error: Cant bind live sync outgoing socket." );
+ if ((g_socket_out = socket_udp4()) < 0)
+ exerr("Error: Cant create live sync outgoing socket.");
+ if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
+ exerr("Error: Cant bind live sync outgoing socket.");
socket_mcttl4(g_socket_out, 1);
socket_mcloop4(g_socket_out, 1);
}
-size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) {
+size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
int exactmatch;
ot_torrent *torrent;
ot_peerlist *peer_list;
ot_peer *peer_dest;
- ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
- size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);
+ ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
+ size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);
- torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), compare_size, &exactmatch );
- if( !torrent )
+ torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch);
+ if (!torrent)
return -1;
- if( !exactmatch ) {
+ if (!exactmatch) {
/* Create a new torrent entry, then */
- memcpy( torrent->hash, hash, sizeof(ot_hash) );
+ memcpy(torrent->hash, hash, sizeof(ot_hash));
- if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) ||
- !( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) {
- vector_remove_torrent( torrents_list, torrent );
- mutex_bucket_unlock_by_hash( hash, 0 );
+ if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) {
+ vector_remove_torrent(torrents_list, torrent);
+ mutex_bucket_unlock_by_hash(hash, 0);
return -1;
}
- byte_zero( torrent->peer_list6, sizeof( ot_peerlist ) );
- byte_zero( torrent->peer_list4, sizeof( ot_peerlist ) );
+ byte_zero(torrent->peer_list6, sizeof(ot_peerlist));
+ byte_zero(torrent->peer_list4, sizeof(ot_peerlist));
}
peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
/* Check for peer in torrent */
- peer_dest = vector_find_or_insert_peer( &(peer_list->peers), peer, peer_size, &exactmatch );
- if( !peer_dest ) {
- mutex_bucket_unlock_by_hash( hash, 0 );
+ peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch);
+ if (!peer_dest) {
+ mutex_bucket_unlock_by_hash(hash, 0);
return -1;
}
/* Tell peer that it's fresh */
- OT_PEERTIME( peer, peer_size ) = 0;
+ OT_PEERTIME(peer, peer_size) = 0;
/* If we hadn't had a match create peer there */
- if( !exactmatch ) {
+ if (!exactmatch) {
peer_list->peer_count++;
- if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING )
+ if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING)
peer_list->seed_count++;
}
- memcpy( peer_dest, peer, peer_size );
- mutex_bucket_unlock_by_hash( hash, 0 );
+ memcpy(peer_dest, peer, peer_size);
+ mutex_bucket_unlock_by_hash(hash, 0);
return 0;
}
-size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) {
- int exactmatch;
- ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
- ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
+size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
+ int exactmatch;
+ ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
+ ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch);
- if( exactmatch ) {
+ if (exactmatch) {
ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
- switch( vector_remove_peer( &peer_list->peers, peer, peer_size ) ) {
- case 2: peer_list->seed_count--; /* Intentional fallthrough */
- case 1: peer_list->peer_count--; /* Intentional fallthrough */
- default: break;
+ switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) {
+ case 2:
+ peer_list->seed_count--; /* Intentional fallthrough */
+ case 1:
+ peer_list->peer_count--; /* Intentional fallthrough */
+ default:
+ break;
}
}
- mutex_bucket_unlock_by_hash( hash, 0 );
+ mutex_bucket_unlock_by_hash(hash, 0);
return 0;
}
-void free_peerlist( ot_peerlist *peer_list ) {
- if( peer_list->peers.data ) {
- if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
- ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data);
+void free_peerlist(ot_peerlist *peer_list) {
+ if (peer_list->peers.data) {
+ if (OT_PEERLIST_HASBUCKETS(peer_list)) {
+ ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data);
- while( peer_list->peers.size-- )
- free( bucket_list++->data );
+ while (peer_list->peers.size--)
+ free(bucket_list++->data);
}
- free( peer_list->peers.data );
+ free(peer_list->peers.data);
}
- free( peer_list );
+ free(peer_list);
}
-static void livesync_handle_peersync( ssize_t datalen, size_t peer_size ) {
- int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
+static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) {
+ int off = sizeof(g_tracker_id) + sizeof(uint32_t);
- fprintf( stderr, "." );
+ fprintf(stderr, ".");
- while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= datalen ) {
- ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash));
- ot_hash *hash = (ot_hash*)(g_inbuffer + off);
+ while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) {
+ ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash));
+ ot_hash *hash = (ot_hash *)(g_inbuffer + off);
- if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED )
- remove_peer_from_torrent_proxy( *hash, peer, peer_size );
+ if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED)
+ remove_peer_from_torrent_proxy(*hash, peer, peer_size);
else
- add_peer_to_torrent_proxy( *hash, peer, peer_size );
+ add_peer_to_torrent_proxy(*hash, peer, peer_size);
- off += sizeof( ot_hash ) + peer_size;
+ off += sizeof(ot_hash) + peer_size;
}
}
-int usage( char *self ) {
- fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self );
+int usage(char *self) {
+ fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self);
return 0;
}
FLAG_MASK = 0x07
};
-#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING)
-#define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED)
-#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
-#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
-#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
-#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
+#define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING)
+#define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED)
+#define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED)
+#define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING)
+#define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID)
+#define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED)
typedef struct {
- int state; /* Whether we want to connect, how far our handshake is, etc. */
- ot_ip6 ip; /* The peer to connect to */
- uint16_t port; /* The peers port */
- uint8_t indata[8192*16]; /* Any data not processed yet */
- size_t indata_length; /* Length of unprocessed data */
- uint32_t tracker_id; /* How the other end greeted */
- int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
- io_batch outdata; /* The iobatch containing our sync data */
-
- size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
- uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
- uint8_t packet_type; /* Type of current packet */
- uint32_t packet_tid; /* Tracker id for current packet */
+ int state; /* Whether we want to connect, how far our handshake is, etc. */
+ ot_ip6 ip; /* The peer to connect to */
+ uint16_t port; /* The peers port */
+ uint8_t indata[8192 * 16]; /* Any data not processed yet */
+ size_t indata_length; /* Length of unprocessed data */
+ uint32_t tracker_id; /* How the other end greeted */
+ int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
+ io_batch outdata; /* The iobatch containing our sync data */
+
+ size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
+ uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
+ uint8_t packet_type; /* Type of current packet */
+ uint32_t packet_tid; /* Tracker id for current packet */
} proxy_peer;
-static void process_indata( proxy_peer * peer );
+static void process_indata(proxy_peer *peer);
-void reset_info_block( proxy_peer * peer ) {
+void reset_info_block(proxy_peer *peer) {
peer->indata_length = 0;
peer->tracker_id = 0;
peer->fd = -1;
peer->packet_tcount = 0;
- iob_reset( &peer->outdata );
- PROXYPEER_SETDISCONNECTED( peer->state );
+ iob_reset(&peer->outdata);
+ PROXYPEER_SETDISCONNECTED(peer->state);
}
/* Number of connections to peers
- * If a peer's IP is set, we try to reconnect, when the connection drops
- * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
- * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
- * Reconnect attempts occur only twice a minute
-*/
+ * If a peer's IP is set, we try to reconnect, when the connection drops
+ * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
+ * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
+ * Reconnect attempts occur only twice a minute
+ */
static int g_connection_count;
static ot_time g_connection_reconn;
static proxy_peer g_connections[MAX_PEERS];
-static void handle_reconnects( void ) {
+static void handle_reconnects(void) {
int i;
- for( i=0; i<g_connection_count; ++i )
- if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
- int64 newfd = socket_tcp6( );
- fprintf( stderr, "(Re)connecting to peer..." );
- if( newfd < 0 ) continue; /* No socket for you */
+ for (i = 0; i < g_connection_count; ++i)
+ if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) {
+ int64 newfd = socket_tcp6();
+ fprintf(stderr, "(Re)connecting to peer...");
+ if (newfd < 0)
+ continue; /* No socket for you */
io_fd(newfd);
- if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
- io_close( newfd );
+ if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) {
+ io_close(newfd);
continue;
}
- if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
- errno != EINPROGRESS && errno != EWOULDBLOCK ) {
+ if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) {
close(newfd);
continue;
}
io_wantwrite(newfd); /* So we will be informed when it is connected */
- io_setcookie(newfd,g_connections+i);
+ io_setcookie(newfd, g_connections + i);
/* Prepare connection info block */
- reset_info_block( g_connections+i );
- g_connections[i].fd = newfd;
- PROXYPEER_SETCONNECTING( g_connections[i].state );
+ reset_info_block(g_connections + i);
+ g_connections[i].fd = newfd;
+ PROXYPEER_SETCONNECTING(g_connections[i].state);
}
g_connection_reconn = time(NULL) + 30;
}
/* Handle incoming connection requests, check against whitelist */
-static void handle_accept( int64 serversocket ) {
- int64 newfd;
+static void handle_accept(int64 serversocket) {
+ int64 newfd;
ot_ip6 ip;
uint16 port;
- while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) {
+ while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) {
/* XXX some access control */
/* Put fd into a non-blocking mode */
- io_nonblock( newfd );
+ io_nonblock(newfd);
- if( !io_fd( newfd ) )
- io_close( newfd );
+ if (!io_fd(newfd))
+ io_close(newfd);
else {
/* Find a new home for our incoming connection */
int i;
- for( i=0; i<MAX_PEERS; ++i )
- if( g_connections[i].state == FLAG_DISCONNECTED )
+ for (i = 0; i < MAX_PEERS; ++i)
+ if (g_connections[i].state == FLAG_DISCONNECTED)
break;
- if( i == MAX_PEERS ) {
- fprintf( stderr, "No room for incoming connection." );
- close( newfd );
+ if (i == MAX_PEERS) {
+ fprintf(stderr, "No room for incoming connection.");
+ close(newfd);
continue;
}
/* Prepare connection info block */
- reset_info_block( g_connections+i );
- PROXYPEER_SETCONNECTING( g_connections[i].state );
+ reset_info_block(g_connections + i);
+ PROXYPEER_SETCONNECTING(g_connections[i].state);
g_connections[i].port = port;
g_connections[i].fd = newfd;
- io_setcookie( newfd, g_connections + i );
+ io_setcookie(newfd, g_connections + i);
/* We expect the connecting side to begin with its tracker_id */
- io_wantread( newfd );
+ io_wantread(newfd);
}
}
}
/* New sync data on the stream */
-static void handle_read( int64 peersocket ) {
- int i;
- int64 datalen;
- uint32_t tracker_id;
- proxy_peer *peer = io_getcookie( peersocket );
+static void handle_read(int64 peersocket) {
+ int i;
+ int64 datalen;
+ uint32_t tracker_id;
+ proxy_peer *peer = io_getcookie(peersocket);
- if( !peer ) {
+ if (!peer) {
/* Can't happen ;) */
- io_close( peersocket );
+ io_close(peersocket);
return;
}
- switch( peer->state & FLAG_MASK ) {
+ switch (peer->state & FLAG_MASK) {
case FLAG_DISCONNECTED:
- io_close( peersocket );
+ io_close(peersocket);
break; /* Shouldnt happen */
case FLAG_CONNECTING:
case FLAG_WAITTRACKERID:
/* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
- if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
+ if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id))
goto close_socket;
/* See, if we already have a connection to that peer */
- for( i=0; i<MAX_PEERS; ++i )
- if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED &&
- g_connections[i].tracker_id == tracker_id ) {
- fprintf( stderr, "Peer already connected. Closing connection.\n" );
+ for (i = 0; i < MAX_PEERS; ++i)
+ if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) {
+ fprintf(stderr, "Peer already connected. Closing connection.\n");
goto close_socket;
}
/* Also no need for soliloquy */
- if( tracker_id == g_tracker_id )
+ if (tracker_id == g_tracker_id)
goto close_socket;
/* The new connection is good, send our tracker_id on incoming connections */
- if( peer->state == FLAG_CONNECTING )
- if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) )
+ if (peer->state == FLAG_CONNECTING)
+ if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id))
goto close_socket;
peer->tracker_id = tracker_id;
- PROXYPEER_SETCONNECTED( peer->state );
+ PROXYPEER_SETCONNECTED(peer->state);
- if( peer->state & FLAG_OUTGOING )
- fprintf( stderr, "succeeded.\n" );
+ if (peer->state & FLAG_OUTGOING)
+ fprintf(stderr, "succeeded.\n");
else
- fprintf( stderr, "Incoming connection successful.\n" );
+ fprintf(stderr, "Incoming connection successful.\n");
break;
-close_socket:
- fprintf( stderr, "Handshake incomplete, closing socket\n" );
- io_close( peersocket );
- reset_info_block( peer );
+ close_socket:
+ fprintf(stderr, "Handshake incomplete, closing socket\n");
+ io_close(peersocket);
+ reset_info_block(peer);
break;
case FLAG_CONNECTED:
/* Here we acutally expect data from peer
indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
- datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
- if( !datalen || datalen < -1 ) {
- fprintf( stderr, "Connection closed by remote peer.\n" );
- io_close( peersocket );
- reset_info_block( peer );
- } else if( datalen > 0 ) {
+ datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length);
+ if (!datalen || datalen < -1) {
+ fprintf(stderr, "Connection closed by remote peer.\n");
+ io_close(peersocket);
+ reset_info_block(peer);
+ } else if (datalen > 0) {
peer->indata_length += datalen;
- process_indata( peer );
+ process_indata(peer);
}
break;
}
}
/* Can write new sync data to the stream */
-static void handle_write( int64 peersocket ) {
- proxy_peer *peer = io_getcookie( peersocket );
+static void handle_write(int64 peersocket) {
+ proxy_peer *peer = io_getcookie(peersocket);
- if( !peer ) {
+ if (!peer) {
/* Can't happen ;) */
- io_close( peersocket );
+ io_close(peersocket);
return;
}
- switch( peer->state & FLAG_MASK ) {
+ switch (peer->state & FLAG_MASK) {
case FLAG_DISCONNECTED:
default: /* Should not happen */
- io_close( peersocket );
+ io_close(peersocket);
break;
case FLAG_CONNECTING:
/* Ensure that the connection is established and handle connection error */
- if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
- fprintf( stderr, "failed\n" );
- reset_info_block( peer );
- io_close( peersocket );
- break;
+ if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) {
+ fprintf(stderr, "failed\n");
+ reset_info_block(peer);
+ io_close(peersocket);
+ break;
}
- if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) {
- PROXYPEER_SETWAITTRACKERID( peer->state );
- io_dontwantwrite( peersocket );
- io_wantread( peersocket );
+ if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) {
+ PROXYPEER_SETWAITTRACKERID(peer->state);
+ io_dontwantwrite(peersocket);
+ io_wantread(peersocket);
} else {
- fprintf( stderr, "Handshake incomplete, closing socket\n" );
- io_close( peersocket );
- reset_info_block( peer );
+ fprintf(stderr, "Handshake incomplete, closing socket\n");
+ io_close(peersocket);
+ reset_info_block(peer);
}
break;
case FLAG_CONNECTED:
- switch( iob_send( peersocket, &peer->outdata ) ) {
+ switch (iob_send(peersocket, &peer->outdata)) {
case 0: /* all data sent */
- io_dontwantwrite( peersocket );
+ io_dontwantwrite(peersocket);
break;
case -3: /* an error occured */
- io_close( peersocket );
- reset_info_block( peer );
+ io_close(peersocket);
+ reset_info_block(peer);
break;
default: /* Normal operation or eagain */
break;
int64 sock;
/* inlined livesync_init() */
- memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
+ memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start));
g_peerbuffer_pos = g_peerbuffer_start;
- memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
- uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
- g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
- g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
+ memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id));
+ uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER);
+ g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t);
+ g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
- while(1) {
+ while (1) {
/* See if we need to connect to anyone */
- if( time(NULL) > g_connection_reconn )
- handle_reconnects( );
+ if (time(NULL) > g_connection_reconn)
+ handle_reconnects();
/* Wait for io events until next approx reconn check time */
- io_waituntil2( 30*1000 );
+ io_waituntil2(30 * 1000);
/* Loop over readable sockets */
- while( ( sock = io_canread( ) ) != -1 ) {
- const void *cookie = io_getcookie( sock );
- if( (uintptr_t)cookie == FLAG_SERVERSOCKET )
- handle_accept( sock );
+ while ((sock = io_canread()) != -1) {
+ const void *cookie = io_getcookie(sock);
+ if ((uintptr_t)cookie == FLAG_SERVERSOCKET)
+ handle_accept(sock);
else
- handle_read( sock );
+ handle_read(sock);
}
/* Loop over writable sockets */
- while( ( sock = io_canwrite( ) ) != -1 )
- handle_write( sock );
+ while ((sock = io_canwrite()) != -1)
+ handle_write(sock);
- livesync_ticker( );
+ livesync_ticker();
}
}
-static void panic( const char *routine ) {
- fprintf( stderr, "%s: %s\n", routine, strerror(errno) );
- exit( 111 );
+static void panic(const char *routine) {
+ fprintf(stderr, "%s: %s\n", routine, strerror(errno));
+ exit(111);
}
-static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
- int64 sock = socket_tcp6( );
+static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) {
+ int64 sock = socket_tcp6();
- if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 )
- panic( "socket_bind6_reuse" );
+ if (socket_bind6_reuse(sock, ip, port, 0) == -1)
+ panic("socket_bind6_reuse");
- if( socket_listen( sock, SOMAXCONN) == -1 )
- panic( "socket_listen" );
+ if (socket_listen(sock, SOMAXCONN) == -1)
+ panic("socket_listen");
- if( !io_fd( sock ) )
- panic( "io_fd" );
+ if (!io_fd(sock))
+ panic("io_fd");
- io_setcookie( sock, (void*)FLAG_SERVERSOCKET );
- io_wantread( sock );
+ io_setcookie(sock, (void *)FLAG_SERVERSOCKET);
+ io_wantread(sock);
return sock;
}
-
-static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
+static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) {
const char *s = src;
- int off, bracket = 0;
- while( isspace(*s) ) ++s;
- if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
- if( !(off = scan_ip6( s, ip ) ) )
+ int off, bracket = 0;
+ while (isspace(*s))
+ ++s;
+ if (*s == '[')
+ ++s, ++bracket; /* for v6 style notation */
+ if (!(off = scan_ip6(s, ip)))
return 0;
s += off;
- if( *s == 0 || isspace(*s)) return s-src;
- if( *s == ']' && bracket ) ++s;
- if( !ip6_isv4mapped(ip)){
- if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
+ if (*s == 0 || isspace(*s))
+ return s - src;
+ if (*s == ']' && bracket)
+ ++s;
+ if (!ip6_isv4mapped(ip)) {
+ if ((bracket && *(s) != ':') || (*(s) != '.'))
+ return 0;
s++;
} else {
- if( *(s++) != ':' ) return 0;
+ if (*(s++) != ':')
+ return 0;
}
- if( !(off = scan_ushort (s, port ) ) )
- return 0;
- return off+s-src;
+ if (!(off = scan_ushort(s, port)))
+ return 0;
+ return off + s - src;
}
-int main( int argc, char **argv ) {
+int main(int argc, char **argv) {
static pthread_t sync_in_thread_id;
static pthread_t sync_out_thread_id;
- ot_ip6 serverip;
- uint16_t tmpport;
- int scanon = 1, lbound = 0, sbound = 0;
+ ot_ip6 serverip;
+ uint16_t tmpport;
+ int scanon = 1, lbound = 0, sbound = 0;
- srandom( time(NULL) );
+ srandom(time(NULL));
#ifdef WANT_ARC4RANDOM
g_tracker_id = arc4random();
#else
g_tracker_id = random();
#endif
- while( scanon ) {
- switch( getopt( argc, argv, ":l:c:L:h" ) ) {
- case -1: scanon = 0; break;
+ while (scanon) {
+ switch (getopt(argc, argv, ":l:c:L:h")) {
+ case -1:
+ scanon = 0;
+ break;
case 'l':
tmpport = 0;
- if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
- ot_try_bind( serverip, tmpport );
+ if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
+ usage(argv[0]);
+ exit(1);
+ }
+ ot_try_bind(serverip, tmpport);
++sbound;
break;
case 'c':
- if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
+ if (g_connection_count > MAX_PEERS / 2)
+ exerr("Connection limit exceeded.\n");
tmpport = 0;
- if( !scan_ip6_port( optarg,
- g_connections[g_connection_count].ip,
- &g_connections[g_connection_count].port ) ||
- !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
+ if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) {
+ usage(argv[0]);
+ exit(1);
+ }
g_connections[g_connection_count++].state = FLAG_OUTGOING;
break;
case 'L':
tmpport = 9696;
- if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
- livesync_bind_mcast( serverip, tmpport); ++lbound; break;
+ if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
+ usage(argv[0]);
+ exit(1);
+ }
+ livesync_bind_mcast(serverip, tmpport);
+ ++lbound;
+ break;
default:
- case '?': usage( argv[0] ); exit( 1 );
+ case '?':
+ usage(argv[0]);
+ exit(1);
}
}
- if( !lbound ) exerr( "No livesync port bound." );
- if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
- pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
- pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
+ if (!lbound)
+ exerr("No livesync port bound.");
+ if (!g_connection_count && !sbound)
+ exerr("No streamsync port bound.");
+ pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL);
+ pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL);
server_mainloop();
return 0;
}
-static void * streamsync_worker( void * args ) {
+static void *streamsync_worker(void *args) {
(void)args;
- while( 1 ) {
+ while (1) {
int bucket;
/* For each bucket... */
- for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
+ for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
/* Get exclusive access to that bucket */
- ot_vector *torrents_list = mutex_bucket_lock( bucket );
- size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
- size_t mem, mem_a = 0, mem_b = 0;
- uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
+ ot_vector *torrents_list = mutex_bucket_lock(bucket);
+ size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
+ size_t mem, mem_a = 0, mem_b = 0;
+ uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
- if( !torrents_list->size ) goto unlock_continue;
+ if (!torrents_list->size)
+ goto unlock_continue;
/* For each torrent in this bucket.. */
- for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
+ for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
/* Address torrents members */
- ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
- switch( peer_list->peer_count ) {
- case 2: count_two++; break;
- case 1: count_one++; break;
- case 0: break;
- default: count_def++;
- count_peers += peer_list->peer_count;
+ ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list;
+ switch (peer_list->peer_count) {
+ case 2:
+ count_two++;
+ break;
+ case 1:
+ count_one++;
+ break;
+ case 0:
+ break;
+ default:
+ count_def++;
+ count_peers += peer_list->peer_count;
}
}
/* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
- mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) +
- ( count_one + 2 * count_two + count_peers ) * 7;
-
- fprintf( stderr, "Mem: %zd\n", mem );
-
- ptr = ptr_a = ptr_b = ptr_c = malloc( mem );
- if( !ptr ) goto unlock_continue;
-
- if( count_one > 4 || !count_def ) {
- mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 );
- ptr_b += mem_a; ptr_c += mem_a;
- ptr_a[0] = 1; /* Offset 0: packet type 1 */
- ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
- ptr_a[2] = count_one >> 8;
- ptr_a[3] = count_one & 255;
- ptr_a += 4;
+ mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7;
+
+ fprintf(stderr, "Mem: %zd\n", mem);
+
+ ptr = ptr_a = ptr_b = ptr_c = malloc(mem);
+ if (!ptr)
+ goto unlock_continue;
+
+ if (count_one > 4 || !count_def) {
+ mem_a = 1 + 1 + 2 + count_one * (19 + 7);
+ ptr_b += mem_a;
+ ptr_c += mem_a;
+ ptr_a[0] = 1; /* Offset 0: packet type 1 */
+ ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
+ ptr_a[2] = count_one >> 8;
+ ptr_a[3] = count_one & 255;
+ ptr_a += 4;
} else
count_def += count_one;
- if( count_two > 4 || !count_def ) {
- mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 );
- ptr_c += mem_b;
- ptr_b[0] = 2; /* Offset 0: packet type 2 */
- ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
- ptr_b[2] = count_two >> 8;
- ptr_b[3] = count_two & 255;
- ptr_b += 4;
+ if (count_two > 4 || !count_def) {
+ mem_b = 1 + 1 + 2 + count_two * (19 + 14);
+ ptr_c += mem_b;
+ ptr_b[0] = 2; /* Offset 0: packet type 2 */
+ ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
+ ptr_b[2] = count_two >> 8;
+ ptr_b[3] = count_two & 255;
+ ptr_b += 4;
} else
count_def += count_two;
- if( count_def ) {
- ptr_c[0] = 0; /* Offset 0: packet type 0 */
- ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
- ptr_c[2] = count_def >> 8;
- ptr_c[3] = count_def & 255;
- ptr_c += 4;
+ if (count_def) {
+ ptr_c[0] = 0; /* Offset 0: packet type 0 */
+ ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
+ ptr_c[2] = count_def >> 8;
+ ptr_c[3] = count_def & 255;
+ ptr_c += 4;
}
/* For each torrent in this bucket.. */
- for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
+ for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
/* Address torrents members */
- ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset;
+ ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset;
ot_peerlist *peer_list = torrent->peer_list;
- ot_peer *peers = (ot_peer*)(peer_list->peers.data);
- uint8_t **dst;
+ ot_peer *peers = (ot_peer *)(peer_list->peers.data);
+ uint8_t **dst;
/* Determine destination slot */
count_peers = peer_list->peer_count;
- switch( count_peers ) {
- case 0: continue;
- case 1: dst = mem_a ? &ptr_a : &ptr_c; break;
- case 2: dst = mem_b ? &ptr_b : &ptr_c; break;
- default: dst = &ptr_c; break;
+ switch (count_peers) {
+ case 0:
+ continue;
+ case 1:
+ dst = mem_a ? &ptr_a : &ptr_c;
+ break;
+ case 2:
+ dst = mem_b ? &ptr_b : &ptr_c;
+ break;
+ default:
+ dst = &ptr_c;
+ break;
}
/* Copy tail of info_hash, advance pointer */
- memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1);
- *dst += sizeof( ot_hash ) - 1;
+ memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1);
+ *dst += sizeof(ot_hash) - 1;
/* Encode peer count */
- if( dst == &ptr_c )
- while( count_peers ) {
- if( count_peers <= 0x7f )
+ if (dst == &ptr_c)
+ while (count_peers) {
+ if (count_peers <= 0x7f)
*(*dst)++ = count_peers;
else
- *(*dst)++ = 0x80 | ( count_peers & 0x7f );
+ *(*dst)++ = 0x80 | (count_peers & 0x7f);
count_peers >>= 7;
}
/* Copy peers */
count_peers = peer_list->peer_count;
- while( count_peers-- ) {
- memcpy( *dst, peers++, OT_IP_SIZE + 3 );
+ while (count_peers--) {
+ memcpy(*dst, peers++, OT_IP_SIZE + 3);
*dst += OT_IP_SIZE + 3;
}
free_peerlist(peer_list);
}
- free( torrents_list->data );
- memset( torrents_list, 0, sizeof(*torrents_list ) );
-unlock_continue:
- mutex_bucket_unlock( bucket, 0 );
+ free(torrents_list->data);
+ memset(torrents_list, 0, sizeof(*torrents_list));
+ unlock_continue:
+ mutex_bucket_unlock(bucket, 0);
- if( ptr ) {
+ if (ptr) {
int i;
- if( ptr_b > ptr_c ) ptr_c = ptr_b;
- if( ptr_a > ptr_c ) ptr_c = ptr_a;
+ if (ptr_b > ptr_c)
+ ptr_c = ptr_b;
+ if (ptr_a > ptr_c)
+ ptr_c = ptr_a;
mem = ptr_c - ptr;
- for( i=0; i < MAX_PEERS; ++i ) {
- if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) {
- void *tmp = malloc( mem );
- if( tmp ) {
- memcpy( tmp, ptr, mem );
- iob_addbuf_free( &g_connections[i].outdata, tmp, mem );
- io_wantwrite( g_connections[i].fd );
+ for (i = 0; i < MAX_PEERS; ++i) {
+ if (PROXYPEER_ISCONNECTED(g_connections[i].state)) {
+ void *tmp = malloc(mem);
+ if (tmp) {
+ memcpy(tmp, ptr, mem);
+ iob_addbuf_free(&g_connections[i].outdata, tmp, mem);
+ io_wantwrite(g_connections[i].fd);
}
}
}
- free( ptr );
+ free(ptr);
}
- usleep( OT_SYNC_SLEEP );
+ usleep(OT_SYNC_SLEEP);
}
}
return 0;
}
-static void livesync_issue_peersync( ) {
- socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
- groupip_1, LIVESYNC_PORT);
- g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
+static void livesync_issue_peersync() {
+ socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT);
+ g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t);
g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
}
-void livesync_ticker( ) {
+void livesync_ticker() {
/* livesync_issue_peersync sets g_next_packet_time */
- if( time(NULL) > g_next_packet_time &&
- g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
+ if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id))
livesync_issue_peersync();
}
-static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) {
-// unsigned int i;
+static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) {
+ // unsigned int i;
*g_peerbuffer_pos = prefix;
- memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 );
- memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 );
+ memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1);
+ memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1);
#if 0
/* Dump info_hash */
#endif
g_peerbuffer_pos += sizeof(ot_peer);
- if( g_peerbuffer_pos >= g_peerbuffer_highwater )
+ if (g_peerbuffer_pos >= g_peerbuffer_highwater)
livesync_issue_peersync();
}
-static void process_indata( proxy_peer * peer ) {
- size_t consumed, peers;
+static void process_indata(proxy_peer *peer) {
+ size_t consumed, peers;
uint8_t *data = peer->indata, *hash;
uint8_t *dataend = data + peer->indata_length;
- while( 1 ) {
+ while (1) {
/* If we're not inside of a packet, make a new one */
- if( !peer->packet_tcount ) {
+ if (!peer->packet_tcount) {
/* Ensure the header is complete or postpone processing */
- if( data + 4 > dataend ) break;
- peer->packet_type = data[0];
- peer->packet_tprefix = data[1];
- peer->packet_tcount = data[2] * 256 + data[3];
- data += 4;
-printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount );
+ if (data + 4 > dataend)
+ break;
+ peer->packet_type = data[0];
+ peer->packet_tprefix = data[1];
+ peer->packet_tcount = data[2] * 256 + data[3];
+ data += 4;
+ printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount);
}
/* Ensure size for a minimal torrent block */
- if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break;
+ if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend)
+ break;
/* Advance pointer to peer count or peers */
- hash = data;
- data += sizeof(ot_hash) - 1;
+ hash = data;
+ data += sizeof(ot_hash) - 1;
/* Type 0 has peer count encoded before each peers */
- peers = peer->packet_type;
- if( !peers ) {
+ peers = peer->packet_type;
+ if (!peers) {
int shift = 0;
- do peers |= ( 0x7f & *data ) << ( 7 * shift );
- while ( *(data++) & 0x80 && shift++ < 6 );
+ do
+ peers |= (0x7f & *data) << (7 * shift);
+ while (*(data++) & 0x80 && shift++ < 6);
}
#if 0
printf( "peers: %zd\n", peers );
#endif
/* Ensure enough data being read to hold all peers */
- if( data + (OT_IP_SIZE + 3) * peers > dataend ) {
+ if (data + (OT_IP_SIZE + 3) * peers > dataend) {
data = hash;
break;
}
- while( peers-- ) {
- livesync_proxytell( peer->packet_tprefix, hash, data );
+ while (peers--) {
+ livesync_proxytell(peer->packet_tprefix, hash, data);
data += OT_IP_SIZE + 3;
}
--peer->packet_tcount;
}
consumed = data - peer->indata;
- memmove( peer->indata, data, peer->indata_length - consumed );
+ memmove(peer->indata, data, peer->indata_length - consumed);
peer->indata_length -= consumed;
}
-static void * livesync_worker( void * args ) {
+static void *livesync_worker(void *args) {
(void)args;
- while( 1 ) {
- ot_ip6 in_ip; uint16_t in_port;
- size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
+ while (1) {
+ ot_ip6 in_ip;
+ uint16_t in_port;
+ size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
/* Expect at least tracker id and packet type */
- if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
+ if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
continue;
- if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
+ if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) {
/* drop packet coming from ourselves */
continue;
}
- switch( uint32_read_big( (char*)g_inbuffer + sizeof( g_tracker_id ) ) ) {
+ switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) {
case OT_SYNC_PEER4:
- livesync_handle_peersync( datalen, OT_PEER_SIZE4 );
+ livesync_handle_peersync(datalen, OT_PEER_SIZE4);
break;
case OT_SYNC_PEER6:
- livesync_handle_peersync( datalen, OT_PEER_SIZE6 );
+ livesync_handle_peersync(datalen, OT_PEER_SIZE6);
break;
default:
// fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );