struct imap_arg_list *flags_list;
struct mailbox_custom_flags old_flags;
struct mail_full_flags flags;
+ struct istream *input;
time_t internal_date;
const char *mailbox, *internal_date_str;
uoff_t msg_size;
}
/* save the mail */
- i_stream_set_read_limit(client->input,
- client->input->v_offset + msg_size);
+ input = i_stream_create_limit(default_pool, client->input,
+ client->input->v_offset,
+ msg_size);
if (!box->save_next(ctx, &flags, internal_date,
- timezone_offset, client->input)) {
+ timezone_offset, input)) {
+ i_stream_unref(input);
client_send_storage_error(client, storage);
break;
}
- i_stream_set_read_limit(client->input, 0);
+ i_stream_unref(input);
if (client->input->closed)
break;
struct istream *input;
buffer_t *headers;
- uoff_t body_offset, body_size;
- struct message_size header_size;
+ uoff_t v_header_size, body_offset, body_size;
};
static void _close(struct _iostream *stream __attr_unused__)
struct mbox_istream *mstream = (struct mbox_istream *) stream;
ssize_t ret;
size_t pos;
- uoff_t limit, old_limit;
- off_t vsize_diff;
+ uoff_t offset;
- if (stream->istream.v_offset < mstream->header_size.virtual_size) {
+ if (stream->istream.v_offset < mstream->v_header_size) {
/* we don't support mixing headers and body.
it shouldn't be needed. */
return -2;
}
- /* may be positive or negative, depending on how much there was CRs
- and how much headers were hidden */
- vsize_diff = mstream->header_size.virtual_size -
- mstream->header_size.physical_size;
-
- limit = stream->istream.v_limit - vsize_diff;
- old_limit = mstream->input->v_limit;
- if (limit != old_limit)
- i_stream_set_read_limit(mstream->input, limit);
-
- if (mstream->input->v_offset != stream->istream.v_offset - vsize_diff) {
- i_stream_seek(mstream->input,
- stream->istream.v_offset - vsize_diff);
- }
+ offset = stream->istream.v_offset - mstream->v_header_size;
+ if (mstream->input->v_offset != offset)
+ i_stream_seek(mstream->input, offset);
ret = i_stream_read(mstream->input);
- mstream->istream.pos -= mstream->istream.skip;
- mstream->istream.skip = 0;
- mstream->istream.buffer = i_stream_get_data(mstream->input, &pos);
+ stream->pos -= stream->skip;
+ stream->skip = 0;
+ stream->buffer = i_stream_get_data(mstream->input, &pos);
- ret = pos <= mstream->istream.pos ? -1 :
- (ssize_t) (pos - mstream->istream.pos);
+ ret = pos <= stream->pos ? -1 :
+ (ssize_t) (pos - stream->pos);
mstream->istream.pos = pos;
-
- if (limit != old_limit)
- i_stream_set_read_limit(mstream->input, old_limit);
return ret;
}
struct mbox_istream *mstream = (struct mbox_istream *) stream;
stream->istream.v_offset = v_offset;
- if (v_offset < mstream->header_size.virtual_size) {
+ if (v_offset < mstream->v_header_size) {
/* still in headers */
stream->skip = v_offset;
- stream->pos = stream->high_pos =
- mstream->header_size.virtual_size;
+ stream->pos = mstream->v_header_size;
stream->buffer = buffer_get_data(mstream->headers, NULL);
} else {
/* body - use our real input stream */
- stream->skip = stream->pos = stream->high_pos = 0;
+ stream->skip = stream->pos = 0;
stream->buffer = NULL;
-
- v_offset += (off_t)mstream->header_size.physical_size -
- (off_t)mstream->header_size.virtual_size;
- i_stream_seek(mstream->input, v_offset);
}
}
-static void _skip(struct _istream *stream, uoff_t count)
-{
- i_stream_seek(&stream->istream, stream->istream.v_offset + count);
-}
-
struct istream *i_stream_create_mbox(pool_t pool, struct istream *input,
- uoff_t body_size)
+ uoff_t offset, uoff_t body_size)
{
struct mbox_istream *mstream;
+ struct istream *hdr_input;
mstream = p_new(pool, struct mbox_istream, 1);
- mstream->input = input;
mstream->body_size = body_size;
if (body_size == 0) {
/* possibly broken message, find the next From-line
and make sure header parser won't pass it. */
mbox_skip_header(input);
- i_stream_set_read_limit(input, input->v_offset);
- i_stream_seek(input, 0);
+ hdr_input = i_stream_create_limit(pool, input,
+ 0, input->v_offset);
+ } else {
+ hdr_input = input;
+ i_stream_ref(input);
}
mstream->headers = buffer_create_dynamic(default_pool,
8192, (size_t)-1);
- mbox_hide_headers(input, mstream->headers,
- &mstream->header_size);
- mstream->body_offset = input->v_offset;
- i_stream_set_read_limit(input, mstream->body_offset + body_size);
+ i_stream_seek(hdr_input, offset);
+ mbox_read_headers(hdr_input, mstream->headers);
+ mstream->v_header_size = buffer_get_used_size(mstream->headers);
+ mstream->body_offset = hdr_input->v_offset;
+ i_stream_unref(hdr_input);
+
+ mstream->input = i_stream_create_limit(pool, input,
+ mstream->body_offset, body_size);
mstream->istream.buffer = buffer_get_data(mstream->headers, NULL);
- mstream->istream.pos = mstream->header_size.virtual_size;
+ mstream->istream.pos = mstream->v_header_size;
mstream->istream.iostream.close = _close;
mstream->istream.iostream.destroy = _destroy;
mstream->istream.iostream.set_blocking = _set_blocking;
mstream->istream.read = _read;
- mstream->istream.skip_count = _skip;
mstream->istream.seek = _seek;
- return _i_stream_create(&mstream->istream, pool, -1, 0,
- mstream->header_size.virtual_size + body_size);
+ return _i_stream_create(&mstream->istream, pool, -1,
+ mstream->v_header_size + body_size);
}
struct istream *input)
{
struct mail_index_record *rec;
- struct mbox_header_context ctx;
+ struct mbox_header_context ctx;
+ struct istream *hdr_stream;
enum mail_index_record_flag index_flags;
time_t received_date;
- uoff_t abs_start_offset, eoh_offset;
+ uoff_t hdr_offset, body_offset, end_offset;
const unsigned char *data;
unsigned char md5_digest[16];
size_t size, pos;
if (size == 0)
return -1;
- if (pos == size || size <= 5 ||
- strncmp((const char *) data, "From ", 5) != 0) {
+ if (pos == size || size <= 5 || memcmp(data, "From ", 5) != 0) {
/* a) no \n found, or line too long
b) not a From-line */
index_set_error(index, "Error indexing mbox file %s: "
received_date = ioloop_time;
i_stream_skip(input, pos+1);
- abs_start_offset = input->start_offset + input->v_offset;
+ hdr_offset = input->v_offset;
/* now, find the end of header. also stops at "\nFrom " if it's
found (broken messages) */
mbox_skip_header(input);
- eoh_offset = input->v_offset;
+ body_offset = input->v_offset;
index_flags = 0;
reading the headers. it uses Content-Length if available or finds
the next From-line. */
mbox_header_init_context(&ctx, index, input);
- ctx.set_read_limit = TRUE;
- i_stream_seek(input, abs_start_offset - input->start_offset);
- i_stream_set_read_limit(input, eoh_offset);
-
- message_parse_header(NULL, input, NULL, mbox_header_cb, &ctx);
+ hdr_stream = i_stream_create_limit(default_pool, input,
+ hdr_offset,
+ body_offset - hdr_offset);
+ i_stream_seek(hdr_stream, 0);
+ message_parse_header(NULL, hdr_stream, NULL, mbox_header_cb, &ctx);
+ i_stream_unref(hdr_stream);
+
+ dirty = FALSE;
+
+ /* try Content-Length */
+ end_offset = body_offset + ctx.content_length;
+ if (ctx.content_length == (uoff_t)-1 ||
+ !mbox_verify_end_of_body(input, end_offset)) {
+ /* failed, search for From-line */
+ if (ctx.content_length != (uoff_t)-1) {
+ /* broken, rewrite it */
+ dirty = TRUE;
+ }
- i_stream_seek(input, input->v_limit);
- i_stream_set_read_limit(input, 0);
+ i_stream_seek(input, body_offset);
+ mbox_skip_message(input);
+ ctx.content_length = input->v_offset - body_offset;
+ }
- dirty = ctx.content_length_broken;
if (index->header->messages_count == 0 &&
ctx.uid_validity != index->header->uid_validity) {
/* UID validity is different */
/* location offset = beginning of headers in message */
if (!mail_cache_add(trans_ctx, rec, MAIL_CACHE_LOCATION_OFFSET,
- &abs_start_offset, sizeof(abs_start_offset)))
+ &hdr_offset, sizeof(hdr_offset)))
return -1;
if (!mail_cache_add(trans_ctx, rec, MAIL_CACHE_RECEIVED_DATE,
uoff_t offset;
int ret;
- if (input->eof) {
- /* no new data */
- return TRUE;
- }
-
if (!index->set_lock(index, MAIL_LOCK_EXCLUSIVE))
return FALSE;
do {
offset = input->v_offset;
- if (input->start_offset + input->v_offset != 0) {
+ if (input->v_offset != 0) {
/* we're at the [\r]\n before the From-line,
skip it */
if (!mbox_skip_crlf(input)) {
}
}
- t_push();
- ret = mbox_index_append_next(index, trans_ctx, input);
- t_pop();
-
if (input->eof) {
ret = 1;
break;
}
+ t_push();
+ ret = mbox_index_append_next(index, trans_ctx, input);
+ t_pop();
+
if (ret == 0) {
/* we want to rescan this message with exclusive
locking */
return TRUE;
}
-struct istream *mbox_get_stream(struct mail_index *index, uoff_t offset,
+struct istream *mbox_get_stream(struct mail_index *index,
enum mail_lock_type lock_type)
{
- i_assert(offset < OFF_T_MAX);
-
switch (lock_type) {
case MAIL_LOCK_SHARED:
case MAIL_LOCK_EXCLUSIVE:
}
}
- i_stream_set_read_limit(index->mbox_stream, 0);
- i_stream_set_start_offset(index->mbox_stream, (uoff_t)offset);
i_stream_seek(index->mbox_stream, 0);
-
i_stream_ref(index->mbox_stream);
return index->mbox_stream;
}
struct message_header_line *hdr, void *context)
{
struct mbox_header_context *ctx = context;
- uoff_t start_offset, end_offset;
size_t i;
int fixed = FALSE;
- if (hdr == NULL) {
- /* End of headers */
- if (!ctx->set_read_limit)
- return;
-
- /* a) use Content-Length, b) search for "From "-line */
- start_offset = ctx->input->v_offset;
- i_stream_set_read_limit(ctx->input, 0);
-
- end_offset = start_offset + ctx->content_length;
- if (ctx->content_length == (uoff_t)-1 ||
- !mbox_verify_end_of_body(ctx->input, end_offset)) {
- if (ctx->content_length != (uoff_t)-1) {
- i_stream_seek(ctx->input, start_offset);
- ctx->content_length_broken = TRUE;
- }
- mbox_skip_message(ctx->input);
- end_offset = ctx->input->v_offset;
- ctx->content_length = end_offset - start_offset;
- }
-
- i_stream_seek(ctx->input, start_offset);
- i_stream_set_read_limit(ctx->input, end_offset);
- return;
- }
-
- if (hdr->eoh)
+ if (hdr == NULL || hdr->eoh)
return;
/* Pretty much copy&pasted from popa3d by Solar Designer */
case 'C':
case 'c':
- if (ctx->set_read_limit &&
- strcasecmp(hdr->name, "Content-Length") == 0) {
+ if (strcasecmp(hdr->name, "Content-Length") == 0) {
/* manual parsing, so we can deal with uoff_t */
ctx->content_length = 0;
for (i = 0; i < hdr->value_len; i++) {
if (i_stream_read_data(input, &data, &size, 6) < 0)
return FALSE;
- if (input->eof) {
- /* end of file. a bit unexpected though,
- since \n is missing. */
- return TRUE;
- }
-
/* either there should be the next From-line,
or [\r]\n at end of file */
if (size > 0 && data[0] == '\r') {
return TRUE;
}
-void mbox_hide_headers(struct istream *input, buffer_t *dest,
- struct message_size *hdr_size)
+void mbox_read_headers(struct istream *input, buffer_t *dest)
{
struct message_header_parser_ctx *hdr_ctx;
struct message_header_line *hdr;
- uoff_t virtual_size = 0;
- hdr_ctx = message_parse_header_init(input, hdr_size);
+ hdr_ctx = message_parse_header_init(input, NULL);
while ((hdr = message_parse_header_next(hdr_ctx)) != NULL) {
if (hdr->eoh) {
- if (dest != NULL)
- buffer_append(dest, "\r\n", 2);
- else
- virtual_size += 2;
+ buffer_append(dest, "\r\n", 2);
break;
}
strcasecmp(hdr->name, "Content-Length") == 0 ||
strcasecmp(hdr->name, "Status") == 0) {
/* ignore */
- } else if (dest != NULL) {
+ } else {
if (!hdr->continued) {
buffer_append(dest, hdr->name, hdr->name_len);
buffer_append(dest, ": ", 2);
}
buffer_append(dest, hdr->value, hdr->value_len);
buffer_append(dest, "\r\n", 2);
- } else {
- if (!hdr->continued)
- virtual_size += hdr->name_len + 2;
- virtual_size += hdr->value_len + 2;
}
}
message_parse_header_deinit(hdr_ctx);
-
- if (dest != NULL)
- virtual_size = buffer_get_used_size(dest);
-
- hdr_size->virtual_size = virtual_size;
- hdr_size->lines = 0;
}
struct mail_index *
struct istream *input;
uoff_t content_length;
- int set_read_limit, content_length_broken;
};
int mbox_set_syscall_error(struct mail_index *index, const char *function);
which is useful when you want to be sure you're not accessing a deleted
mbox file. */
int mbox_file_open(struct mail_index *index);
-struct istream *mbox_get_stream(struct mail_index *index, uoff_t offset,
+struct istream *mbox_get_stream(struct mail_index *index,
enum mail_lock_type lock_type);
void mbox_file_close_stream(struct mail_index *index);
void mbox_file_close_fd(struct mail_index *index);
int mbox_mail_get_location(struct mail_index *index,
struct mail_index_record *rec,
uoff_t *offset, uoff_t *body_size);
-void mbox_hide_headers(struct istream *input, buffer_t *dest,
- struct message_size *hdr_size);
+void mbox_read_headers(struct istream *input, buffer_t *dest);
struct mail_index *
mbox_index_alloc(const char *mbox_path, const char *index_dir,
int mbox_index_rewrite(struct mail_index *index);
struct istream *i_stream_create_mbox(pool_t pool, struct istream *input,
- uoff_t body_size);
+ uoff_t offset, uoff_t body_size);
#endif
if (!mbox_mail_get_location(index, rec, &offset, &body_size))
return NULL;
- input = mbox_get_stream(index, offset, MAIL_LOCK_SHARED);
+ input = mbox_get_stream(index, MAIL_LOCK_SHARED);
if (input == NULL)
return NULL;
*received_date = index->get_received_date(index, rec);
i_assert(index->mbox_sync_counter == index->mbox_lock_counter);
-
- return i_stream_create_mbox(default_pool, input, body_size);
+ return i_stream_create_mbox(default_pool, input, offset, body_size);
}
static int mbox_write(struct mail_index *index, struct istream *input,
struct ostream *output, uoff_t end_offset)
{
- uoff_t old_limit;
int failed;
i_assert(input->v_offset <= end_offset);
- old_limit = input->v_limit;
- i_stream_set_read_limit(input, end_offset);
+ input = i_stream_create_limit(default_pool, input, 0, end_offset);
if (o_stream_send_istream(output, input) < 0) {
index_set_error(index, "Error rewriting mbox file %s: %s",
index->mailbox_path,
failed = FALSE;
}
- i_stream_set_read_limit(input, old_limit);
+ i_stream_unref(input);
return !failed;
}
struct message_header_parser_ctx *hdr_ctx;
struct message_header_line *hdr;
struct message_size hdr_size;
+ struct istream *hdr_input;
uoff_t offset;
int force_filler;
ctx.uid_last = index->header->next_uid-1;
ctx.custom_flags = mail_custom_flags_list_get(index->custom_flags);
- if (body_size == 0) {
+ if (body_size != 0) {
+ hdr_input = input;
+ i_stream_ref(hdr_input);
+ } else {
/* possibly broken message, find the next From-line
and make sure header parser won't pass it. */
offset = input->v_offset;
mbox_skip_header(input);
- i_stream_set_read_limit(input, input->v_offset);
- i_stream_seek(input, offset);
- }
+ hdr_input = i_stream_create_limit(default_pool, input, offset,
+ input->v_offset);
+ }
- hdr_ctx = message_parse_header_init(input, &hdr_size);
+ hdr_ctx = message_parse_header_init(hdr_input, &hdr_size);
while ((hdr = message_parse_header_next(hdr_ctx)) != NULL) {
t_push();
write_header(&ctx, hdr);
message_parse_header_deinit(hdr_ctx);
*hdr_input_size = hdr_size.physical_size;
- i_stream_set_read_limit(input, 0);
+ i_stream_unref(hdr_input);
/* append the flag fields */
if (seq == 1 && !ctx.ximapbase_found) {
static int fd_copy(struct mail_index *index, int in_fd, int out_fd,
uoff_t out_offset, uoff_t size)
{
- struct istream *input;
+ struct istream *input, *input2;
struct ostream *output;
struct stat st;
int ret;
t_push();
- input = i_stream_create_mmap(in_fd, pool_datastack_create(),
- 1024*256, 0, 0, FALSE);
- i_stream_set_read_limit(input, size);
+ input = i_stream_create_file(in_fd, pool_datastack_create(),
+ 1024*256, FALSE);
+ input2 = i_stream_create_limit(pool_datastack_create(), input, 0, size);
output = o_stream_create_file(out_fd, pool_datastack_create(),
1024, FALSE);
o_stream_set_blocking(output, 60000, NULL, NULL);
- ret = o_stream_send_istream(output, input);
+ ret = o_stream_send_istream(output, input2);
if (ret < 0) {
errno = output->stream_errno;
mbox_set_syscall_error(index, "o_stream_send_istream()");
}
o_stream_unref(output);
+ i_stream_unref(input2);
i_stream_unref(input);
t_pop();
break;
}
- input = mbox_get_stream(index, 0, MAIL_LOCK_EXCLUSIVE);
+ input = mbox_get_stream(index, MAIL_LOCK_EXCLUSIVE);
if (input == NULL)
break;
break;
}
offset += hdr_size;
+ i_assert(input->v_offset == offset);
if (dirty_found &&
offset - dirty_offset == output->offset) {
unsigned int *seq, struct istream *input,
struct mail_index_record **next_rec, int *dirty)
{
- struct message_size hdr_parsed_size;
struct mbox_header_context ctx;
struct mail_index_record *first_rec, *last_rec;
+ struct istream *hdr_input;
enum mail_index_record_flag index_flags;
- uoff_t header_offset, body_offset, offset;
- uoff_t hdr_size, body_size;
+ uoff_t header_offset, body_offset, offset, body_size, eoh_offset;
unsigned char current_digest[16];
unsigned int first_seq, last_seq;
- int ret, hdr_size_fixed;
+ int ret, hdr_parsed;
*next_rec = NULL;
/* skip the From-line */
skip_line(input);
- if (input->eof)
- return -1;
header_offset = input->v_offset;
first_rec = last_rec = NULL;
first_seq = last_seq = 0;
- ret = 0; hdr_size = 0; body_offset = 0; hdr_size_fixed = FALSE;
+ ret = 0; body_offset = 0; eoh_offset = (uoff_t)-1; hdr_parsed = FALSE;
do {
if (!mbox_mail_get_location(index, rec, &offset, &body_size))
return -1;
- i_stream_seek(input, header_offset);
-
- if (body_size == 0 && !hdr_size_fixed) {
+ if (body_size == 0 && eoh_offset == (uoff_t)-1) {
/* possibly broken message, find the next From-line
and make sure header parser won't pass it. */
- mbox_skip_header(input);
- i_stream_set_read_limit(input, input->v_offset);
i_stream_seek(input, header_offset);
- hdr_size_fixed = TRUE;
- hdr_size = 0;
+ mbox_skip_header(input);
+ eoh_offset = input->v_offset;
+ hdr_parsed = FALSE;
}
- if (hdr_size == 0) {
+ if (!hdr_parsed) {
/* get the MD5 sum of fixed headers and the current
message flags in Status and X-Status fields */
- mbox_header_init_context(&ctx, index, input);
- message_parse_header(NULL, input, &hdr_parsed_size,
+ if (eoh_offset == (uoff_t)-1)
+ hdr_input = input;
+ else {
+ hdr_input = i_stream_create_limit(default_pool,
+ input, 0, eoh_offset);
+ }
+ i_stream_seek(hdr_input, header_offset);
+
+ mbox_header_init_context(&ctx, index, hdr_input);
+ message_parse_header(NULL, hdr_input, NULL,
mbox_header_cb, &ctx);
+
+ hdr_parsed = TRUE;
+ body_offset = hdr_input->v_offset;
+
+ if (eoh_offset != (uoff_t)-1)
+ i_stream_unref(hdr_input);
+ hdr_input = NULL;
md5_final(&ctx.md5, current_digest);
if (*seq == 1) {
ctx.uid_last+1;
}
}
-
- i_stream_set_read_limit(input, 0);
-
- body_offset = input->v_offset;
}
if (verify_header(index, rec, ctx.uid, current_digest) &&
index->header->flags &= ~MAIL_INDEX_HDR_FLAG_DIRTY_MESSAGES;
}
- if (input->eof || (index->set_flags & MAIL_INDEX_HDR_FLAG_REBUILD))
+ if ((index->set_flags & MAIL_INDEX_HDR_FLAG_REBUILD))
return TRUE;
else
return mbox_index_append_stream(index, input);
i_assert(index->lock_type == MAIL_LOCK_EXCLUSIVE);
- input = mbox_get_stream(index, 0, MAIL_LOCK_SHARED);
+ input = mbox_get_stream(index, MAIL_LOCK_SHARED);
if (input == NULL)
return FALSE;
if (!mbox_unlock(index))
return FALSE;
- input = mbox_get_stream(index, 0, MAIL_LOCK_EXCLUSIVE);
+ input = mbox_get_stream(index, MAIL_LOCK_EXCLUSIVE);
if (input == NULL)
return FALSE;
buffer_t *decodebuf;
pool_t pool;
size_t data_size, pos;
- uoff_t old_limit;
ssize_t ret;
int found;
i_stream_skip(input, part->physical_pos +
part->header_size.physical_size - input->v_offset);
- old_limit = input->v_limit;
- i_stream_set_read_limit(input, input->v_offset +
- part->body_size.physical_size);
+ input = i_stream_create_limit(default_pool, input, 0,
+ part->body_size.physical_size);
found = FALSE; pos = 0;
while (i_stream_read_data(input, &data, &data_size, pos) > 0) {
pos -= data_size;
}
- i_stream_set_read_limit(input, old_limit);
+ i_stream_unref(input);
if (ctx->translation != NULL)
charset_to_utf8_end(ctx->translation);
/* mbox must be already opened, synced and locked at this point.
we just want the istream. */
- input = mbox_get_stream(ibox->index, 0, MAIL_LOCK_EXCLUSIVE);
+ input = mbox_get_stream(ibox->index, MAIL_LOCK_EXCLUSIVE);
if (input == NULL)
return NULL;
static int mbox_move_data(struct mbox_expunge_context *ctx)
{
+ struct istream *input;
const unsigned char *data;
size_t size;
- uoff_t old_limit;
int failed;
i_stream_seek(ctx->input, ctx->move_offset);
i_stream_skip(ctx->input, 2);
}
- old_limit = ctx->input->v_limit;
- i_stream_set_read_limit(ctx->input, ctx->from_offset);
- failed = o_stream_send_istream(ctx->output, ctx->input) < 0;
- i_stream_set_read_limit(ctx->input, old_limit);
+ if (ctx->from_offset == 0)
+ failed = o_stream_send_istream(ctx->output, ctx->input) < 0;
+ else {
+ input = i_stream_create_limit(default_pool, ctx->input,
+ 0, ctx->from_offset);
+ failed = o_stream_send_istream(ctx->output, ctx->input) < 0;
+ i_stream_unref(input);
+ }
- if (failed || (ctx->input->v_offset != ctx->from_offset &&
- ctx->from_offset != 0))
- return FALSE;
- return TRUE;
+ return !failed;
}
int mbox_storage_expunge_deinit(struct mail_expunge_context *_ctx)
if (ctx->expunges) {
if (!failed && ctx->move_offset != (uoff_t)-1) {
- ctx->from_offset = ctx->input->v_limit;
+ ctx->from_offset = 0;
if (!mbox_move_data(ctx))
failed = TRUE;
} else if (failed && ctx->output->offset > 0) {
errno = input->stream_errno;
if (errno == 0) {
/* EOF */
- if (input->v_offset != input->v_limit &&
- input->v_limit != 0) {
+ if (input->eof) {
/* too early */
mail_storage_set_error(storage,
"Unexpected EOF");
istream.c \
istream-data.c \
istream-file.c \
+ istream-limit.c \
istream-mmap.c \
ioloop.c \
ioloop-notify-none.c \
void (*timeout_cb)(void *), void *context);
#define GET_TIMEOUT_TIME(fstream) \
- ((fstream)->timeout_msecs == 0 ? 0 : \
+ ((fstream)->timeout_msecs <= 0 ? 0 : \
time(NULL) + ((fstream)->timeout_msecs / 1000))
#define STREAM_IS_BLOCKING(fstream) \
((fstream)->timeout_msecs != 0)
{
}
-static ssize_t _read(struct _istream *stream __attr_unused__)
+static ssize_t _read(struct _istream *stream)
{
+ stream->istream.eof = TRUE;
return -1;
}
stream->istream.v_offset = v_offset;
}
-static void _skip(struct _istream *stream, uoff_t count)
+static uoff_t _get_size(struct _istream *stream)
{
- stream->skip += count;
- stream->istream.v_offset += count;
+ return stream->pos;
}
struct istream *i_stream_create_from_data(pool_t pool, const void *data,
stream->iostream.set_blocking = _set_blocking;
stream->read = _read;
- stream->skip_count = _skip;
stream->seek = _seek;
+ stream->get_size = _get_size;
- return _i_stream_create(stream, pool, -1, 0, size);
+ return _i_stream_create(stream, pool, -1, 0);
}
{
struct file_istream *fstream = (struct file_istream *) stream;
time_t timeout_time;
- uoff_t read_limit;
size_t size;
ssize_t ret;
}
size = stream->buffer_size - stream->pos;
- if (stream->istream.v_limit > 0) {
- i_assert(stream->istream.v_limit >= stream->istream.v_offset);
-
- read_limit = stream->istream.v_limit -
- stream->istream.v_offset + fstream->skip_left;
- if (read_limit <= stream->pos - stream->skip) {
- /* virtual limit reached == EOF */
- stream->istream.eof = TRUE;
- return -1;
- }
-
- read_limit -= stream->pos - stream->skip;
- if (size > read_limit)
- size = read_limit;
- }
-
timeout_time = GET_TIMEOUT_TIME(fstream);
ret = -1;
if (fstream->file) {
ret = pread(stream->fd,
stream->w_buffer + stream->pos, size,
- stream->istream.start_offset +
+ stream->abs_start_offset +
stream->istream.v_offset +
(stream->pos - stream->skip));
} else {
}
if (ret == 0) {
/* EOF */
- stream->istream.stream_errno = 0;
stream->istream.eof = TRUE;
return -1;
}
if (ret < 0) {
if (errno == ECONNRESET || errno == ETIMEDOUT) {
/* treat as disconnection */
- stream->istream.stream_errno = 0;
stream->istream.eof = TRUE;
return -1;
}
if (ret > 0 && fstream->skip_left > 0) {
i_assert(!fstream->file);
+ i_assert(stream->skip == stream->pos);
+
if (fstream->skip_left >= (size_t)ret) {
fstream->skip_left -= ret;
ret = 0;
return ret;
}
-static void _skip(struct _istream *stream, uoff_t count)
-{
- struct file_istream *fstream = (struct file_istream *) stream;
-
- i_assert(stream->skip == stream->pos);
-
- if (!fstream->file)
- fstream->skip_left += count;
- stream->istream.v_offset += count;
- stream->skip = stream->pos = 0;
-}
-
static void _seek(struct _istream *stream, uoff_t v_offset)
{
struct file_istream *fstream = (struct file_istream *) stream;
if (!fstream->file) {
- stream->istream.stream_errno = ESPIPE;
- return;
+ if (v_offset < stream->istream.v_offset) {
+ stream->istream.stream_errno = ESPIPE;
+ return;
+ }
+ fstream->skip_left += v_offset - stream->istream.v_offset;
}
stream->istream.stream_errno = 0;
stream->skip = stream->pos = 0;
}
+static uoff_t _get_size(struct _istream *stream)
+{
+ struct file_istream *fstream = (struct file_istream *) stream;
+ struct stat st;
+
+ if (fstream->file && fstat(stream->fd, &st) == 0 && S_ISREG(st.st_mode))
+ return (uoff_t)st.st_size;
+ else
+ return (uoff_t)-1;
+}
+
struct istream *i_stream_create_file(int fd, pool_t pool,
size_t max_buffer_size, int autoclose_fd)
{
fstream->istream.iostream.set_blocking = _set_blocking;
fstream->istream.read = _read;
- fstream->istream.skip_count = _skip;
fstream->istream.seek = _seek;
+ fstream->istream.get_size = _get_size;
/* get size of fd if it's a file */
if (fstat(fd, &st) == 0 && S_ISREG(st.st_mode))
fstream->file = TRUE;
- return _i_stream_create(&fstream->istream, pool, fd, 0, 0);
+ return _i_stream_create(&fstream->istream, pool, fd, 0);
}
/* methods: */
ssize_t (*read)(struct _istream *stream);
- void (*skip_count)(struct _istream *stream, uoff_t count);
void (*seek)(struct _istream *stream, uoff_t v_offset);
+ uoff_t (*get_size)(struct _istream *stream);
/* data: */
struct istream istream;
const unsigned char *buffer;
unsigned char *w_buffer; /* may be NULL */
size_t buffer_size;
+ uoff_t abs_start_offset;
- size_t skip, pos, high_pos;
+ size_t skip, pos;
};
struct istream *_i_stream_create(struct _istream *_buf, pool_t pool, int fd,
- uoff_t start_offset, uoff_t v_size);
+ uoff_t abs_start_offset);
#endif
--- /dev/null
+/* Copyright (C) 2003 Timo Sirainen */
+
+#include "lib.h"
+#include "istream-internal.h"
+
+struct limit_istream {
+ struct _istream istream;
+
+ struct istream *input;
+ uoff_t v_start_offset, v_size;
+};
+
+static void _close(struct _iostream *stream __attr_unused__)
+{
+}
+
+static void _destroy(struct _iostream *stream)
+{
+ struct limit_istream *lstream = (struct limit_istream *) stream;
+
+ /* get to same position in parent stream */
+ i_stream_seek(lstream->input, lstream->v_start_offset +
+ lstream->istream.istream.v_offset);
+ i_stream_unref(lstream->input);
+}
+
+static void _set_max_buffer_size(struct _iostream *stream, size_t max_size)
+{
+ struct limit_istream *lstream = (struct limit_istream *) stream;
+
+ i_stream_set_max_buffer_size(lstream->input, max_size);
+}
+
+static void _set_blocking(struct _iostream *stream, int timeout_msecs,
+ void (*timeout_cb)(void *), void *context)
+{
+ struct limit_istream *lstream = (struct limit_istream *) stream;
+
+ i_stream_set_blocking(lstream->input, timeout_msecs,
+ timeout_cb, context);
+}
+
+static ssize_t _read(struct _istream *stream)
+{
+ struct limit_istream *lstream = (struct limit_istream *) stream;
+ uoff_t left;
+ ssize_t ret;
+ size_t pos;
+
+ if (stream->istream.v_offset >= lstream->v_size)
+ return -1;
+
+ if (lstream->input->v_offset !=
+ lstream->v_start_offset + stream->istream.v_offset) {
+ i_stream_seek(lstream->input,
+ lstream->v_start_offset + stream->istream.v_offset);
+ }
+
+ if (i_stream_read(lstream->input) == -2 && stream->buffer != NULL) {
+ if (lstream->istream.skip == 0)
+ return -2;
+ stream->istream.eof = lstream->input->eof;
+ }
+
+ stream->pos -= stream->skip;
+ stream->skip = 0;
+ stream->buffer = i_stream_get_data(lstream->input, &pos);
+
+ left = lstream->v_size - stream->istream.v_offset;
+ if (pos > left)
+ pos = left;
+
+ ret = pos <= lstream->istream.pos ? -1 :
+ (ssize_t) (pos - stream->pos);
+ lstream->istream.pos = pos;
+ return ret;
+}
+
+static void _seek(struct _istream *stream, uoff_t v_offset)
+{
+ stream->istream.stream_errno = 0;
+ stream->istream.v_offset = v_offset;
+ stream->skip = stream->pos = 0;
+}
+
+static uoff_t _get_size(struct _istream *stream)
+{
+ struct limit_istream *lstream = (struct limit_istream *) stream;
+
+ return lstream->v_size != (uoff_t)-1 ? lstream->v_size :
+ i_stream_get_size(lstream->input);
+}
+
+struct istream *i_stream_create_limit(pool_t pool, struct istream *input,
+ uoff_t v_start_offset, uoff_t v_size)
+{
+ struct limit_istream *lstream;
+
+ i_stream_ref(input);
+
+ lstream = p_new(pool, struct limit_istream, 1);
+ lstream->input = input;
+ lstream->v_start_offset = v_start_offset;
+ lstream->v_size = v_size;
+
+ lstream->istream.istream.v_offset =
+ input->v_offset < v_start_offset ? 0 :
+ input->v_offset - v_start_offset > v_size ? v_size :
+ input->v_offset - v_start_offset;
+
+ lstream->istream.iostream.close = _close;
+ lstream->istream.iostream.destroy = _destroy;
+ lstream->istream.iostream.set_max_buffer_size = _set_max_buffer_size;
+ lstream->istream.iostream.set_blocking = _set_blocking;
+
+ lstream->istream.read = _read;
+ lstream->istream.seek = _seek;
+ lstream->istream.get_size = _get_size;
+
+ return _i_stream_create(&lstream->istream, pool, i_stream_get_fd(input),
+ input->real_stream->abs_start_offset +
+ v_start_offset);
+}
void *mmap_base;
off_t mmap_offset;
size_t mmap_block_size;
+ uoff_t v_size;
unsigned int autoclose_fd:1;
};
/* we never block */
}
-static ssize_t io_stream_set_mmaped_pos(struct _istream *stream)
-{
- struct mmap_istream *mstream = (struct mmap_istream *) stream;
- uoff_t top;
-
- i_assert((uoff_t)mstream->mmap_offset <=
- stream->istream.start_offset + stream->istream.v_limit);
-
- top = stream->istream.start_offset + stream->istream.v_limit -
- mstream->mmap_offset;
- stream->pos = I_MIN(top, stream->buffer_size);
-
- return stream->pos - stream->skip;
-}
-
static ssize_t _read(struct _istream *stream)
{
struct mmap_istream *mstream = (struct mmap_istream *) stream;
- size_t aligned_skip, limit_size;
+ size_t aligned_skip;
uoff_t top;
- if (stream->istream.start_offset + stream->istream.v_limit <=
- (uoff_t)mstream->mmap_offset + stream->pos) {
- /* end of file */
- stream->istream.stream_errno = 0;
- stream->istream.eof = TRUE;
- return -1;
- }
+ stream->istream.stream_errno = 0;
if (stream->pos < stream->buffer_size) {
/* more bytes available without needing to mmap() */
- return io_stream_set_mmaped_pos(stream);
+ stream->pos = stream->buffer_size;
+ return stream->pos - stream->skip;
+ }
+
+ if (stream->istream.v_offset >= mstream->v_size) {
+ stream->istream.eof = TRUE;
+ return -1;
}
aligned_skip = stream->skip & ~mmap_pagemask;
i_error("io_stream_read_mmaped(): munmap() failed: %m");
}
- top = stream->istream.start_offset + stream->istream.v_size -
- mstream->mmap_offset;
+ top = stream->abs_start_offset + mstream->v_size - mstream->mmap_offset;
stream->buffer_size = I_MIN(top, mstream->mmap_block_size);
i_assert((uoff_t)mstream->mmap_offset + stream->buffer_size <=
- stream->istream.start_offset + stream->istream.v_size);
+ stream->abs_start_offset + mstream->v_size);
mstream->mmap_base = mmap(NULL, stream->buffer_size,
PROT_READ, MAP_PRIVATE,
}
stream->buffer = mstream->mmap_base;
- /* madvise() only if non-limited mmap()ed buffer area larger than
- page size */
- limit_size = stream->istream.start_offset + stream->istream.v_limit -
- mstream->mmap_offset;
- if (limit_size > mmap_pagesize) {
- if (limit_size > stream->buffer_size)
- limit_size = stream->buffer_size;
-
- if (madvise(mstream->mmap_base, limit_size,
+ if (stream->buffer_size > mmap_pagesize) {
+ if (madvise(mstream->mmap_base, stream->buffer_size,
MADV_SEQUENTIAL) < 0)
i_error("mmap_istream.madvise(): %m");
}
- return io_stream_set_mmaped_pos(stream);
+ stream->pos = stream->buffer_size;
+ return stream->pos - stream->skip;
}
static void _seek(struct _istream *stream, uoff_t v_offset)
struct mmap_istream *mstream = (struct mmap_istream *) stream;
uoff_t abs_offset;
- abs_offset = stream->istream.start_offset + v_offset;
+ abs_offset = stream->abs_start_offset + v_offset;
if (stream->buffer_size != 0 &&
(uoff_t)mstream->mmap_offset <= abs_offset &&
(uoff_t)mstream->mmap_offset + stream->buffer_size > abs_offset) {
stream->istream.v_offset = v_offset;
}
-static void _skip(struct _istream *stream, uoff_t count)
+static uoff_t _get_size(struct _istream *stream)
{
- _seek(stream, stream->istream.v_offset + count);
+ struct mmap_istream *mstream = (struct mmap_istream *) stream;
+
+ return mstream->v_size;
}
struct istream *i_stream_create_mmap(int fd, pool_t pool, size_t block_size,
}
if (v_size == 0) {
- if (fstat(fd, &st) < 0) {
+ if (fstat(fd, &st) < 0)
i_error("i_stream_create_mmap(): fstat() failed: %m");
- v_size = 0;
- } else {
+ else {
v_size = st.st_size;
if (start_offset > v_size)
start_offset = v_size;
mstream->fd = fd;
_set_max_buffer_size(&mstream->istream.iostream, block_size);
mstream->autoclose_fd = autoclose_fd;
+ mstream->v_size = v_size;
mstream->istream.iostream.close = _close;
mstream->istream.iostream.destroy = _destroy;
mstream->istream.iostream.set_blocking = _set_blocking;
mstream->istream.read = _read;
- mstream->istream.skip_count = _skip;
mstream->istream.seek = _seek;
+ mstream->istream.get_size = _get_size;
- istream = _i_stream_create(&mstream->istream, pool, fd,
- start_offset, v_size);
+ istream = _i_stream_create(&mstream->istream, pool, fd, start_offset);
istream->mmaped = TRUE;
return istream;
}
timeout_cb, context);
}
-void i_stream_set_start_offset(struct istream *stream, uoff_t offset)
-{
- struct _istream *_stream = stream->real_stream;
- off_t diff;
-
- i_assert(stream->v_size == 0 ||
- offset <= stream->start_offset + stream->v_size);
-
- if (offset == stream->start_offset)
- return;
-
- diff = (off_t)stream->start_offset - (off_t)offset;
- stream->start_offset = offset;
- stream->v_offset += diff;
- if (stream->v_size != 0)
- stream->v_size += diff;
- if (stream->v_limit != 0)
- stream->v_limit += diff;
-
- /* reset buffer data */
- _stream->skip = _stream->pos = _stream->high_pos = 0;
-}
-
-void i_stream_set_read_limit(struct istream *stream, uoff_t v_offset)
-{
- struct _istream *_stream = stream->real_stream;
- uoff_t max_pos;
-
- i_assert(stream->v_size == 0 || v_offset <= stream->v_size);
-
- stream->eof = FALSE;
- if (_stream->high_pos != 0) {
- _stream->pos = _stream->high_pos;
- _stream->high_pos = 0;
- }
-
- if (v_offset == 0)
- stream->v_limit = stream->v_size;
- else {
- i_assert(v_offset >= stream->v_offset);
-
- stream->v_limit = v_offset;
- max_pos = v_offset - stream->v_offset + _stream->skip;
- if (_stream->pos > max_pos) {
- _stream->high_pos = _stream->pos;
- _stream->pos = max_pos;
- }
- }
-}
-
ssize_t i_stream_read(struct istream *stream)
{
struct _istream *_stream = stream->real_stream;
if (stream->closed)
return -1;
- if (_stream->pos < _stream->high_pos) {
- /* virtual limit reached */
- return -1;
- }
-
stream->eof = FALSE;
return _stream->read(_stream);
}
struct _istream *_stream = stream->real_stream;
size_t data_size;
- i_assert(stream->v_size == 0 ||
- stream->v_offset + count <= stream->v_size);
-
data_size = _stream->pos - _stream->skip;
if (count <= data_size) {
+ /* within buffer */
stream->v_offset += count;
_stream->skip += count;
return;
}
- if (stream->closed)
- return;
-
+ /* have to seek forward */
count -= data_size;
_stream->skip = _stream->pos;
stream->v_offset += data_size;
- if (_stream->pos < _stream->high_pos) {
- /* virtual limit reached */
- } else {
- _stream->skip_count(_stream, count);
- }
+ if (stream->closed)
+ return;
+
+ _stream->seek(_stream, stream->v_offset + count);
}
void i_stream_seek(struct istream *stream, uoff_t v_offset)
{
struct _istream *_stream = stream->real_stream;
- i_assert(stream->v_size == 0 || v_offset <= stream->v_size);
-
if (stream->closed)
return;
- stream->eof = FALSE;
- _stream->high_pos = 0;
+ if (v_offset < stream->v_offset)
+ stream->eof = FALSE;
_stream->seek(_stream, v_offset);
}
+uoff_t i_stream_get_size(struct istream *stream)
+{
+ struct _istream *_stream = stream->real_stream;
+
+ return _stream->get_size(_stream);
+}
+
char *i_stream_next_line(struct istream *stream)
{
struct _istream *_stream = stream->real_stream;
}
struct istream *_i_stream_create(struct _istream *_stream, pool_t pool, int fd,
- uoff_t start_offset, uoff_t v_size)
+ uoff_t abs_start_offset)
{
_stream->fd = fd;
- _stream->istream.start_offset = start_offset;
- _stream->istream.v_size = v_size;
- _stream->istream.v_limit = v_size;
+ _stream->abs_start_offset = abs_start_offset;
_stream->istream.real_stream = _stream;
_io_stream_init(pool, &_stream->iostream);
return &_stream->istream;
}
+
+#ifdef STREAM_TEST
+/* gcc istream.c -o teststream liblib.a -Wall -DHAVE_CONFIG_H -DSTREAM_TEST -g */
+
+#include <fcntl.h>
+#include <unistd.h>
+#include "ostream.h"
+
+#define BUF_VALUE(offset) \
+ (((offset) % 256) ^ ((offset) / 256))
+
+static void check_buffer(const unsigned char *data, size_t size, size_t offset)
+{
+ size_t i;
+
+ for (i = 0; i < size; i++)
+ i_assert(data[i] == BUF_VALUE(i+offset));
+}
+
+int main(void)
+{
+ struct istream *input, *l_input;
+ struct ostream *output1, *output2;
+ int i, fd1, fd2;
+ unsigned char buf[1024];
+ const unsigned char *data;
+ size_t size;
+
+ lib_init();
+
+ fd1 = open("teststream.1", O_RDWR | O_CREAT | O_TRUNC, 0600);
+ if (fd1 < 0)
+ i_fatal("open() failed: %m");
+ fd2 = open("teststream.2", O_RDWR | O_CREAT | O_TRUNC, 0600);
+ if (fd2 < 0)
+ i_fatal("open() failed: %m");
+
+ /* write initial data */
+ for (i = 0; i < sizeof(buf); i++)
+ buf[i] = BUF_VALUE(i);
+ write(fd1, buf, sizeof(buf));
+
+ /* test reading */
+ input = i_stream_create_file(fd1, default_pool, 512, FALSE);
+ i_assert(i_stream_get_size(input) == sizeof(buf));
+
+ i_assert(i_stream_read_data(input, &data, &size, 0) > 0);
+ i_assert(size == 512);
+ check_buffer(data, size, 0);
+
+ i_stream_seek(input, 256);
+ i_assert(i_stream_read_data(input, &data, &size, 0) > 0);
+ i_assert(size == 512);
+ check_buffer(data, size, 256);
+
+ i_stream_seek(input, 0);
+ i_assert(i_stream_read_data(input, &data, &size, 512) == -2);
+ i_assert(size == 512);
+ check_buffer(data, size, 0);
+
+ i_stream_skip(input, 900);
+ i_assert(i_stream_read_data(input, &data, &size, 0) > 0);
+ i_assert(size == sizeof(buf) - 900);
+ check_buffer(data, size, 900);
+
+ /* test moving data */
+ output1 = o_stream_create_file(fd1, default_pool, 512, FALSE);
+ output2 = o_stream_create_file(fd2, default_pool, 512, FALSE);
+
+ i_stream_seek(input, 1); size = sizeof(buf)-1;
+ i_assert(o_stream_send_istream(output2, input) == size);
+ o_stream_flush(output2);
+
+ lseek(fd2, 0, SEEK_SET);
+ i_assert(read(fd2, buf, sizeof(buf)) == size);
+ check_buffer(buf, size, 1);
+
+ i_stream_seek(input, 0);
+ o_stream_seek(output1, sizeof(buf));
+ i_assert(o_stream_send_istream(output1, input) == sizeof(buf));
+
+ /* test moving with limits */
+ l_input = i_stream_create_limit(default_pool, input,
+ sizeof(buf)/2, 512);
+ i_stream_seek(l_input, 0);
+ o_stream_seek(output1, 10);
+ i_assert(o_stream_send_istream(output1, l_input) == 512);
+
+ i_stream_set_max_buffer_size(input, sizeof(buf));
+
+ i_stream_seek(input, 0);
+ i_assert(i_stream_read_data(input, &data, &size, sizeof(buf)-1) > 0);
+ i_assert(size == sizeof(buf));
+ check_buffer(data, 10, 0);
+ check_buffer(data + 10, 512, sizeof(buf)/2);
+ check_buffer(data + 10 + 512,
+ size - (10 + 512), 10 + 512);
+
+ /* reading within limits */
+ i_stream_seek(l_input, 0);
+ i_assert(i_stream_read_data(l_input, &data, &size, 511) > 0);
+ i_assert(size == 512);
+ i_assert(i_stream_read_data(l_input, &data, &size, 512) == -2);
+ i_assert(size == 512);
+ i_stream_skip(l_input, 511);
+ i_assert(i_stream_read_data(l_input, &data, &size, 0) > 0);
+ i_assert(size == 1);
+ i_stream_skip(l_input, 1);
+ i_assert(i_stream_read_data(l_input, &data, &size, 0) == -1);
+ i_assert(size == 0);
+
+ unlink("teststream.1");
+ unlink("teststream.2");
+ return 0;
+}
+#endif
#define __ISTREAM_H
struct istream {
- uoff_t start_offset;
- uoff_t v_offset, v_size, v_limit; /* relative to start_offset */
+ uoff_t v_offset;
int stream_errno;
unsigned int mmaped:1; /* be careful when copying data */
int autoclose_fd);
struct istream *i_stream_create_from_data(pool_t pool, const void *data,
size_t size);
+struct istream *i_stream_create_limit(pool_t pool, struct istream *input,
+ uoff_t v_start_offset, uoff_t v_size);
/* Reference counting. References start from 1, so calling i_stream_unref()
destroys the stream if i_stream_ref() is never used. */
/* Change the maximum size for stream's input buffer to grow. Useful only
for buffered streams (currently only file). */
void i_stream_set_max_buffer_size(struct istream *stream, size_t max_size);
-/* Change the start_offset and drop all data in buffers. Doesn't do anything
- if offset is the same as existing start_offset. */
-void i_stream_set_start_offset(struct istream *stream, uoff_t offset);
-/* Stream won't be read past specified offset. Giving 0 as offset
- removes the limit. */
-void i_stream_set_read_limit(struct istream *stream, uoff_t v_offset);
/* Makes reads blocking until at least one byte is read. timeout_cb is
called if nothing is read in specified time. Setting timeout_msecs to 0
makes it non-blocking. This call changes non-blocking state of file
/* Seek to specified position from beginning of file. Never fails, the next
read tells if it was successful. This works only for files. */
void i_stream_seek(struct istream *stream, uoff_t v_offset);
+/* Returns size of the stream, or (uoff_t)-1 if unknown */
+uoff_t i_stream_get_size(struct istream *stream);
/* Gets the next line from stream and returns it, or NULL if more data is
needed to make a full line. NOTE: modifies the data in buffer for the \0,
so it works only with buffered streams (currently only file). */
}
static off_t io_stream_sendfile(struct _ostream *outstream,
- struct istream *instream, int in_fd)
+ struct istream *instream,
+ int in_fd, uoff_t in_size)
{
struct file_ostream *foutstream = (struct file_ostream *) outstream;
time_t timeout_time;
break;
}
- offset = instream->start_offset + v_offset;
- send_size = instream->v_limit - v_offset;
+ offset = instream->real_stream->abs_start_offset + v_offset;
+ send_size = in_size - v_offset;
ret = safe_sendfile(foutstream->fd, in_fd, &offset,
MAX_SSIZE_T(send_size));
- if (ret < 0) {
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* EOF */
+ break;
+ }
+
if (errno != EINTR && errno != EAGAIN) {
outstream->ostream.stream_errno = errno;
if (errno != EINVAL) {
}
static off_t io_stream_copy(struct _ostream *outstream,
- struct istream *instream, int overlapping)
+ struct istream *instream, uoff_t in_size)
{
struct file_ostream *foutstream = (struct file_ostream *) outstream;
time_t timeout_time;
struct iovec iov[3];
int iov_len;
const unsigned char *data;
- size_t size, skip_size;
+ size_t size, skip_size, block_size;
ssize_t ret;
int pos;
for (pos = 0; pos < iov_len; pos++)
skip_size += iov[pos].iov_len;
- i_assert(!overlapping || iov_len == 0);
-
start_offset = instream->v_offset;
- for (;;) {
- if (overlapping)
- i_stream_seek(instream, instream->v_offset);
- (void)i_stream_read_data(instream, &data, &size,
- foutstream->optimal_block_size-1);
+ while (in_size > 0) {
+ block_size = I_MIN(foutstream->optimal_block_size, in_size);
+ (void)i_stream_read_data(instream, &data, &size, block_size-1);
+ in_size -= size;
if (size == 0) {
/* all sent */
iov[pos].iov_base = (void *) data;
iov[pos].iov_len = size;
- if (overlapping) {
- if (o_stream_seek(&outstream->ostream,
- outstream->ostream.offset) < 0)
- return -1;
- }
-
ret = o_stream_writev(foutstream, iov, iov_len);
if (ret < 0) {
/* error */
}
static off_t io_stream_copy_backwards(struct _ostream *outstream,
- struct istream *instream)
+ struct istream *instream, uoff_t in_size)
{
struct file_ostream *foutstream = (struct file_ostream *) outstream;
time_t timeout_time;
- uoff_t in_start_offset, in_offset, out_offset;
+ uoff_t in_start_offset, in_offset, in_limit, out_offset;
const unsigned char *data;
size_t buffer_size, size, read_size;
ssize_t ret;
}
in_start_offset = instream->v_offset;
- in_offset = instream->v_limit;
- out_offset = outstream->ostream.offset +
- (instream->v_limit - instream->v_offset);
-
- i_assert(instream->v_size == 0 ||
- out_offset <= instream->start_offset + instream->v_size);
+ in_offset = in_limit = in_size;
+ out_offset = outstream->ostream.offset + (in_offset - in_start_offset);
while (in_offset > in_start_offset) {
if (in_offset - in_start_offset <= buffer_size)
out_offset -= read_size;
for (;;) {
- i_assert(in_offset <= instream->v_limit);
+ i_assert(in_offset <= in_limit);
i_stream_seek(instream, in_offset);
- read_size = instream->v_limit - in_offset;
+ read_size = in_limit - in_offset;
(void)i_stream_read_data(instream, &data, &size,
read_size-1);
buffer_size -= read_size;
}
}
+ in_limit -= size;
if (o_stream_seek(&outstream->ostream, out_offset) < 0)
return -1;
outstream->ostream.stream_errno = EAGAIN;
return -1;
}
-
- i_stream_set_read_limit(instream, in_offset);
}
- return (off_t) (instream->v_limit - in_start_offset);
+ return (off_t) (in_size - in_start_offset);
}
-static off_t send_istream_fd(struct _ostream *outstream,
- struct istream *instream, int in_fd)
+static off_t _send_istream(struct _ostream *outstream, struct istream *instream)
{
struct file_ostream *foutstream = (struct file_ostream *) outstream;
- uoff_t old_limit;
+ uoff_t in_size;
off_t ret;
- int overlapping;
+ int in_fd, overlapping;
- i_assert(instream->v_limit <= OFF_T_MAX);
- i_assert(instream->v_offset <= instream->v_limit);
+ in_fd = i_stream_get_fd(instream);
+ in_size = i_stream_get_size(instream);
+ i_assert(instream->v_offset <= in_size);
outstream->ostream.stream_errno = 0;
-
- if (instream->v_offset == instream->v_limit)
- return 0;
-
if (in_fd != foutstream->fd)
overlapping = 0;
else {
/* copying data within same fd. we'll have to be careful with
seeks and overlapping writes. */
+ if (in_size == (uoff_t)-1) {
+ outstream->ostream.stream_errno = EINVAL;
+ return -1;
+ }
+
ret = (off_t)outstream->ostream.offset -
- (off_t)(instream->start_offset + instream->v_offset);
+ (off_t)(instream->real_stream->abs_start_offset +
+ instream->v_offset);
if (ret == 0) {
/* copying data over itself. we don't really
need to do that, just fake it. */
- return instream->v_limit - instream->v_offset;
+ return in_size - instream->v_offset;
}
overlapping = ret < 0 ? -1 : 1;
-
- if (o_stream_seek(&outstream->ostream,
- outstream->ostream.offset) < 0)
- return -1;
}
if (!foutstream->no_sendfile && in_fd != -1 && overlapping <= 0) {
- ret = io_stream_sendfile(outstream, instream, in_fd);
+ ret = io_stream_sendfile(outstream, instream, in_fd, in_size);
if (ret >= 0 || outstream->ostream.stream_errno != EINVAL)
return ret;
}
if (overlapping <= 0)
- return io_stream_copy(outstream, instream, overlapping);
- else {
- old_limit = instream->v_limit;
- ret = io_stream_copy_backwards(outstream, instream);
- i_stream_set_read_limit(instream, old_limit);
- return ret;
- }
-}
-
-static off_t _send_istream(struct _ostream *outstream, struct istream *instream)
-{
- struct stat st;
- int in_fd, ret;
-
- in_fd = i_stream_get_fd(instream);
- if (fstat(in_fd, &st) < 0) {
- outstream->ostream.stream_errno = errno;
- return -1;
- }
-
- if (instream->v_limit != 0)
- return send_istream_fd(outstream, instream, in_fd);
- else {
- /* easier this way so we know exactly how much data we're
- moving */
- i_stream_set_read_limit(instream, st.st_size);
- ret = send_istream_fd(outstream, instream, in_fd);
- i_stream_set_read_limit(instream, 0);
- return ret;
- }
+ return io_stream_copy(outstream, instream, in_size);
+ else
+ return io_stream_copy_backwards(outstream, instream, in_size);
}
struct ostream *
fstream->no_socket_cork = TRUE;
fstream->file = TRUE;
- o_stream_set_blocking(ostream, 60000, 0, NULL);
+ o_stream_set_blocking(ostream, -1, 0, NULL);
}
}
#ifndef HAVE_LINUX_SENDFILE