From 25ba41991d1e775b43c4555cb0bdbb62d30b6461 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Fri, 29 Dec 2017 20:06:25 +0100 Subject: [PATCH] cclient: remove writer thread, use poll --- src/descrambler/cccam.c | 53 ++++++++-------- src/descrambler/cclient.c | 125 ++++++++++++++++---------------------- src/descrambler/cclient.h | 8 +-- src/descrambler/cwc.c | 58 ++++++++++++++---- src/tvheadend.h | 2 + src/wrappers.c | 19 ++++++ 6 files changed, 148 insertions(+), 117 deletions(-) diff --git a/src/descrambler/cccam.c b/src/descrambler/cccam.c index 1a87bdd9f..e17c287f3 100644 --- a/src/descrambler/cccam.c +++ b/src/descrambler/cccam.c @@ -363,34 +363,23 @@ cccam_running_reply(cccam_t *cccam, uint8_t *buf, int len) * */ static int -cccam_read_message(cccam_t *cccam, const char *state, uint8_t *buf, int len, int timeout) +cccam_read_message0(cccam_t *cccam, const char *state, sbuf_t *rbuf, int timeout) { - int32_t ret; uint16_t msglen; + uint8_t hdr[4]; + struct cccam_crypt_block block; - pthread_mutex_unlock(&cccam->cc_mutex); - ret = tcp_read_timeout(cccam->cc_fd, buf, 4, timeout); - pthread_mutex_lock(&cccam->cc_mutex); - if (ret) { - tvhdebug(cccam->cc_subsys, "%s: recv error %d or timeout", - cccam->cc_name, ret); - return -1; - } - cccam_decrypt(&cccam->recvblock, buf, 4); - msglen = (buf[2] << 8) | buf[3]; - if (msglen > 0) { - if (msglen > len - 2) { - tvhdebug(cccam->cc_subsys, "%s: received message too large", cccam->cc_name); - return -1; - } - pthread_mutex_unlock(&cccam->cc_mutex); - ret = tcp_read_timeout(cccam->cc_fd, buf + 4, msglen, 5000); - pthread_mutex_lock(&cccam->cc_mutex); - if (ret) { - tvhdebug(cccam->cc_subsys, "%s: timeout reading message", cccam->cc_name); - return -1; - } - cccam_decrypt(&cccam->recvblock, buf + 4, msglen); + if (rbuf->sb_ptr < 4) + return 0; + block = cccam->recvblock; + memcpy(hdr, rbuf->sb_data, 4); + cccam_decrypt(&cccam->recvblock, hdr, 4); + msglen = (hdr[2] << 8) | hdr[3]; + if (rbuf->sb_ptr >= msglen + 4) { + memcpy(rbuf->sb_data, hdr, 4); + cccam_decrypt(&cccam->recvblock, rbuf->sb_data + 4, msglen); + } else { + cccam->recvblock = block; } return msglen + 4; } @@ -674,15 +663,21 @@ cccam_send_emm(void *cc, cc_service_t *ct, cc_card_data_t *pcard, * */ static int -cccam_read(void *cc) +cccam_read(void *cc, sbuf_t *rbuf) { cccam_t *cccam = cc; - uint8_t buf[CCCAM_NETMSGSIZE]; const int ka_interval = cccam->cc_keepalive_interval * 2 * 1000; - int r = cccam_read_message(cccam, "Decoderloop", buf, sizeof(buf), ka_interval); + int r = cccam_read_message0(cccam, "Decoderloop", rbuf, ka_interval); if (r < 0) return -1; - return cccam_running_reply(cccam, buf, r); + if (r > 0) { + int ret = cccam_running_reply(cccam, rbuf->sb_data, r); + if (ret > 0) + sbuf_cut(rbuf, r); + if (ret < 0) + return -1; + } + return 0; } /** diff --git a/src/descrambler/cclient.c b/src/descrambler/cclient.c index 21822aeac..3ce2c2bdd 100644 --- a/src/descrambler/cclient.c +++ b/src/descrambler/cclient.c @@ -17,11 +17,13 @@ * along with this program. If not, see . */ +#include #include #include #include "tvheadend.h" #include "tcp.h" #include "cclient.h" +#include "tvhpoll.h" static void cc_service_pid_free(cc_service_t *ct); @@ -331,7 +333,7 @@ cc_read(cclient_t *cc, void *buf, size_t len, int timeout) tvhwarn(cc->cc_subsys, "%s: read error %d (%s)", cc->cc_name, r, strerror(r)); - if(cc_must_break(cc)) + if (cc_must_break(cc)) return ECONNABORTED; return r; @@ -348,10 +350,14 @@ cc_write_message(cclient_t *cc, cc_message_t *msg, int enq) tvhlog_hexdump(cc->cc_subsys, msg->cm_data, msg->cm_len); if (enq) { - pthread_mutex_lock(&cc->cc_writer_mutex); - TAILQ_INSERT_TAIL(&cc->cc_writeq, msg, cm_link); - tvh_cond_signal(&cc->cc_writer_cond, 0); - pthread_mutex_unlock(&cc->cc_writer_mutex); + pthread_mutex_lock(&cc->cc_mutex); + if (cc->cc_write_running) { + TAILQ_INSERT_TAIL(&cc->cc_writeq, msg, cm_link); + tvh_nonblock_write(cc->cc_pipe.wr, "w", 1); + } else { + free(msg); + } + pthread_mutex_unlock(&cc->cc_mutex); } else { if (tvh_write(cc->cc_fd, msg->cm_data, msg->cm_len)) tvhinfo(cc->cc_subsys, "%s: write error %s", @@ -363,60 +369,18 @@ cc_write_message(cclient_t *cc, cc_message_t *msg, int enq) /** * */ -static void * -cc_writer_thread(void *aux) +static void +cc_session(cclient_t *cc) { - cclient_t *cc = aux; + tvhpoll_t *poll; + tvhpoll_event_t ev; + char buf[16]; + sbuf_t rbuf; cc_message_t *cm; + ssize_t len; int64_t mono; int r; - pthread_mutex_lock(&cc->cc_writer_mutex); - - while(cc->cc_writer_running) { - - if((cm = TAILQ_FIRST(&cc->cc_writeq)) != NULL) { - TAILQ_REMOVE(&cc->cc_writeq, cm, cm_link); - pthread_mutex_unlock(&cc->cc_writer_mutex); - // int64_t ts = getfastmonoclock(); - if (tvh_write(cc->cc_fd, cm->cm_data, cm->cm_len)) - tvhinfo(cc->cc_subsys, "%s: write error %s", - cc->cc_name, strerror(errno)); - // printf("Write took %lld usec\n", getfastmonoclock() - ts); - free(cm); - pthread_mutex_lock(&cc->cc_writer_mutex); - continue; - } - - - /* If nothing is to be sent in keepalive interval seconds we - need to send a keepalive */ - mono = mclk() + sec2mono(cc->cc_keepalive_interval); - do { - r = tvh_cond_timedwait(&cc->cc_writer_cond, &cc->cc_writer_mutex, mono); - if(r == ETIMEDOUT) { - if (cc->cc_keepalive) - cc->cc_keepalive(cc); - break; - } - } while (ERRNO_AGAIN(r)); - } - - pthread_mutex_unlock(&cc->cc_writer_mutex); - return NULL; -} - - - -/** - * - */ -static void -cc_session(cclient_t *cc) -{ - char buf[32]; - pthread_t writer_thread_id; - if (cc->cc_init_session(cc)) return; @@ -429,30 +393,46 @@ cc_session(cclient_t *cc) /** * We do all requests from now on in a separate thread */ - cc->cc_writer_running = 1; - tvh_cond_init(&cc->cc_writer_cond); - pthread_mutex_init(&cc->cc_writer_mutex, NULL); TAILQ_INIT(&cc->cc_writeq); - snprintf(buf, sizeof(buf), "%s-writer", cc->cc_id); - tvhthread_create(&writer_thread_id, NULL, cc_writer_thread, cc, buf); + cc->cc_write_running = 1; + sbuf_init(&rbuf); /** * Mainloop */ - while(!cc_must_break(cc)) { - if (cc->cc_read(cc)) - break; + poll = tvhpoll_create(1); + tvhpoll_add1(poll, cc->cc_pipe.rd, TVHPOLL_IN, &cc->cc_pipe); + tvhpoll_add1(poll, cc->cc_fd, TVHPOLL_IN, &cc->cc_fd); + mono = mclk() + sec2mono(cc->cc_keepalive_interval); + while (!cc_must_break(cc)) { + pthread_mutex_unlock(&cc->cc_mutex); + r = tvhpoll_wait(poll, &ev, 1, 1000); + pthread_mutex_lock(&cc->cc_mutex); + if (r < 0 && ERRNO_AGAIN(errno)) + continue; + if (ev.data.ptr == &cc->cc_pipe) + read(cc->cc_pipe.rd, buf, sizeof(buf)); + else if (ev.data.ptr == &cc->cc_fd) { + len = sbuf_read(&rbuf, cc->cc_fd); + if (len > 0 && cc->cc_read(cc, &rbuf)) + break; + } else { + abort(); + } + if ((cm = TAILQ_FIRST(&cc->cc_writeq)) != NULL) { + if (tvh_nonblock_write(cc->cc_fd, cm->cm_data, cm->cm_len)) + break; + } + if (mono < mclk()) { + mono = mclk(); + if (cc->cc_keepalive) + cc->cc_keepalive(cc); + } } tvhdebug(cc->cc_subsys, "%s: session exiting", cc->cc_name); - - /** - * Collect the writer thread - */ + tvhpoll_destroy(poll); + sbuf_free(&rbuf); shutdown(cc->cc_fd, SHUT_RDWR); - cc->cc_writer_running = 0; - tvh_cond_signal(&cc->cc_writer_cond, 0); - pthread_join(writer_thread_id, NULL); - tvhdebug(cc->cc_subsys, "%s: Write thread joined", cc->cc_name); } /** @@ -622,7 +602,7 @@ cc_emm(void *opaque, int pid, const uint8_t *data, int len, int emm) cc = pcard->cs_client; pthread_mutex_lock(&cc->cc_mutex); mux = pcard->cs_mux; - if (pcard->cs_running && cc->cc_forward_emm && cc->cc_writer_running) { + if (pcard->cs_running && cc->cc_forward_emm && cc->cc_write_running) { if (cc->cc_emmex) { if (cc->cc_emm_mux && cc->cc_emm_mux != mux) { if (cc->cc_emm_update_time + sec2mono(25) > mclk()) @@ -1041,6 +1021,7 @@ cc_conf_changed(caclient_t *cac) } if (!cc->cc_running) { cc->cc_running = 1; + tvh_pipe(O_NONBLOCK, &cc->cc_pipe); tvhthread_create(&cc->cc_tid, NULL, cc_thread, cc, "cc"); return; } @@ -1060,8 +1041,10 @@ cc_conf_changed(caclient_t *cac) if (cc->cc_fd >= 0) shutdown(cc->cc_fd, SHUT_RDWR); pthread_mutex_unlock(&cc->cc_mutex); + tvh_write(cc->cc_pipe.wr, "q", 1); pthread_kill(tid, SIGHUP); pthread_join(tid, NULL); + tvh_pipe_close(&cc->cc_pipe); caclient_set_status(cac, CACLIENT_STATUS_NONE); pthread_mutex_lock(&cc->cc_mutex); free(cc->cc_name); diff --git a/src/descrambler/cclient.h b/src/descrambler/cclient.h index 89f237248..b7144c86a 100644 --- a/src/descrambler/cclient.h +++ b/src/descrambler/cclient.h @@ -17,6 +17,7 @@ * along with this program. If not, see . */ +#include "tvheadend.h" #include "caclient.h" #include "descrambler.h" #include "emm_reass.h" @@ -142,7 +143,7 @@ typedef struct cclient { /* Callbacks */ void *(*cc_alloc_service)(void *cc); int (*cc_init_session)(void *cc); - int (*cc_read)(void *cc); + int (*cc_read)(void *cc, sbuf_t *sbuf); void (*cc_keepalive)(void *cc); uint32_t (*cc_send_ecm)(void *cc, cc_service_t *ct, cc_ecm_section_t *es, cc_card_data_t *pcard, const uint8_t *data, int len); @@ -158,6 +159,7 @@ typedef struct cclient { pthread_t cc_tid; tvh_cond_t cc_cond; pthread_mutex_t cc_mutex; + th_pipe_t cc_pipe; /* Database */ LIST_HEAD(, cc_service) cc_services; @@ -166,9 +168,7 @@ typedef struct cclient { /* Writer */ int cc_seq; TAILQ_HEAD(, cc_message) cc_writeq; - pthread_mutex_t cc_writer_mutex; - tvh_cond_t cc_writer_cond; - uint8_t cc_writer_running; + uint8_t cc_write_running; /* Emm forwarding */ int cc_forward_emm; diff --git a/src/descrambler/cwc.c b/src/descrambler/cwc.c index b1d3dbbd4..7ffecba3c 100644 --- a/src/descrambler/cwc.c +++ b/src/descrambler/cwc.c @@ -493,6 +493,31 @@ cwc_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) return 0; } +/** + * + */ +static int +cwc_read_message0 + (cwc_t *cwc, const char *state, sbuf_t *rbuf, int timeout) +{ + int msglen; + + if (rbuf->sb_ptr < 2) + return 0; + + msglen = (rbuf->sb_data[0] << 8) | rbuf->sb_data[1]; + if(rbuf->sb_ptr < 2 + msglen) + return 0; + + if((msglen = des_decrypt(rbuf->sb_data, msglen + 2, cwc)) < 15) { + tvhinfo(cwc->cc_subsys, "%s: %s: Decrypt failed", + cwc->cc_name, state); + return -1; + } + + return msglen; +} + /** * */ @@ -502,7 +527,7 @@ cwc_read_message { int msglen, r; - if((r = cc_read((cclient_t *)cwc, buf, 2, timeout))) { + if ((r = cc_read((cclient_t *)cwc, buf, 2, timeout))) { if (tvheadend_is_running()) tvhinfo(cwc->cc_subsys, "%s: %s: Read error (header): %s", cwc->cc_name, state, strerror(r)); @@ -510,7 +535,7 @@ cwc_read_message } msglen = (buf[0] << 8) | buf[1]; - if(msglen > len) { + if (msglen > len) { if (tvheadend_is_running()) tvhinfo(cwc->cc_subsys, "%s: %s: Invalid message size: %d", cwc->cc_name, state, msglen); @@ -520,14 +545,14 @@ cwc_read_message /* We expect the rest of the message to arrive fairly quick, so just wait 1 second here */ - if((r = cc_read((cclient_t *)cwc, buf + 2, msglen, 1000))) { + if ((r = cc_read((cclient_t *)cwc, buf + 2, msglen, 1000))) { if (tvheadend_is_running()) tvhinfo(cwc->cc_subsys, "%s: %s: Read error: %s", cwc->cc_name, state, strerror(r)); return -1; } - if((msglen = des_decrypt(buf, msglen + 2, cwc)) < 15) { + if ((msglen = des_decrypt(buf, msglen + 2, cwc)) < 15) { tvhinfo(cwc->cc_subsys, "%s: %s: Decrypt failed", cwc->cc_name, state); return -1; @@ -563,10 +588,10 @@ cwc_init_session(void *cc) if (cwc_send_login(cwc)) return -1; - if(cwc_read_message(cwc, "Wait login response", buf, sizeof(buf), 5000) < 0) + if (cwc_read_message(cwc, "Wait login response", buf, sizeof(buf), 5000) < 0) return -1; - if(buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) { + if (buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) { tvhinfo(cwc->cc_subsys, "%s: Login failed", cwc->cc_name); return -1; } @@ -577,7 +602,7 @@ cwc_init_session(void *cc) * Request card data */ cwc_send_data_req(cwc); - if((r = cwc_read_message(cwc, "Request card data", buf, sizeof(buf), 5000)) < 0) + if ((r = cwc_read_message(cwc, "Request card data", buf, sizeof(buf), 5000)) < 0) return -1; if (buf[12] != MSG_CARD_DATA) { @@ -594,17 +619,24 @@ cwc_init_session(void *cc) * */ static int -cwc_read(void *cc) +cwc_read(void *cc, sbuf_t *rbuf) { cwc_t *cwc = cc; - uint8_t buf[CWS_NETMSGSIZE]; const int ka_interval = cwc->cc_keepalive_interval * 2 * 1000; - int r = cwc_read_message(cwc, "Decoderloop", buf, sizeof(buf), ka_interval); + int r = cwc_read_message0(cwc, "DecoderLoop", rbuf, ka_interval); if (r < 0) return -1; - if (r > 12) - return cwc_running_reply(cwc, buf[12], buf, r); - return -1; + if (r > 12) { + int ret = cwc_running_reply(cwc, rbuf->sb_data[12], rbuf->sb_data, r); + if (ret > 0) + sbuf_cut(rbuf, r); + if (ret < 0) + return -1; + return 0; + } + if (r > 0) + return -1; + return 0; } /** diff --git a/src/tvheadend.h b/src/tvheadend.h index 3f1f69097..8bd94ef38 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -733,6 +733,8 @@ void tvh_pipe_close(th_pipe_t *pipe); int tvh_write(int fd, const void *buf, size_t len); +int tvh_nonblock_write(int fd, const void *buf, size_t len); + FILE *tvh_fopen(const char *filename, const char *mode); void hexdump(const char *pfx, const uint8_t *data, int len); diff --git a/src/wrappers.c b/src/wrappers.c index b20bac523..83a69e135 100644 --- a/src/wrappers.c +++ b/src/wrappers.c @@ -102,6 +102,25 @@ tvh_write(int fd, const void *buf, size_t len) return len ? 1 : 0; } +int +tvh_nonblock_write(int fd, const void *buf, size_t len) +{ + ssize_t c; + + while (len) { + c = write(fd, buf, len); + if (c < 0) { + if (errno == EINTR) + continue; + break; + } + len -= c; + buf += c; + } + + return len ? 1 : 0; +} + FILE * tvh_fopen(const char *filename, const char *mode) { -- 2.47.3