*
*/
static int
-cccam_read_message(cccam_t *cccam, const char *state, uint8_t *buf, int len, int timeout)
+cccam_read_message0(cccam_t *cccam, const char *state, sbuf_t *rbuf, int timeout)
{
- int32_t ret;
uint16_t msglen;
+ uint8_t hdr[4];
+ struct cccam_crypt_block block;
- pthread_mutex_unlock(&cccam->cc_mutex);
- ret = tcp_read_timeout(cccam->cc_fd, buf, 4, timeout);
- pthread_mutex_lock(&cccam->cc_mutex);
- if (ret) {
- tvhdebug(cccam->cc_subsys, "%s: recv error %d or timeout",
- cccam->cc_name, ret);
- return -1;
- }
- cccam_decrypt(&cccam->recvblock, buf, 4);
- msglen = (buf[2] << 8) | buf[3];
- if (msglen > 0) {
- if (msglen > len - 2) {
- tvhdebug(cccam->cc_subsys, "%s: received message too large", cccam->cc_name);
- return -1;
- }
- pthread_mutex_unlock(&cccam->cc_mutex);
- ret = tcp_read_timeout(cccam->cc_fd, buf + 4, msglen, 5000);
- pthread_mutex_lock(&cccam->cc_mutex);
- if (ret) {
- tvhdebug(cccam->cc_subsys, "%s: timeout reading message", cccam->cc_name);
- return -1;
- }
- cccam_decrypt(&cccam->recvblock, buf + 4, msglen);
+ if (rbuf->sb_ptr < 4)
+ return 0;
+ block = cccam->recvblock;
+ memcpy(hdr, rbuf->sb_data, 4);
+ cccam_decrypt(&cccam->recvblock, hdr, 4);
+ msglen = (hdr[2] << 8) | hdr[3];
+ if (rbuf->sb_ptr >= msglen + 4) {
+ memcpy(rbuf->sb_data, hdr, 4);
+ cccam_decrypt(&cccam->recvblock, rbuf->sb_data + 4, msglen);
+ } else {
+ cccam->recvblock = block;
}
return msglen + 4;
}
*
*/
static int
-cccam_read(void *cc)
+cccam_read(void *cc, sbuf_t *rbuf)
{
cccam_t *cccam = cc;
- uint8_t buf[CCCAM_NETMSGSIZE];
const int ka_interval = cccam->cc_keepalive_interval * 2 * 1000;
- int r = cccam_read_message(cccam, "Decoderloop", buf, sizeof(buf), ka_interval);
+ int r = cccam_read_message0(cccam, "Decoderloop", rbuf, ka_interval);
if (r < 0)
return -1;
- return cccam_running_reply(cccam, buf, r);
+ if (r > 0) {
+ int ret = cccam_running_reply(cccam, rbuf->sb_data, r);
+ if (ret > 0)
+ sbuf_cut(rbuf, r);
+ if (ret < 0)
+ return -1;
+ }
+ return 0;
}
/**
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <fcntl.h>
#include <signal.h>
#include <pthread.h>
#include "tvheadend.h"
#include "tcp.h"
#include "cclient.h"
+#include "tvhpoll.h"
static void cc_service_pid_free(cc_service_t *ct);
tvhwarn(cc->cc_subsys, "%s: read error %d (%s)",
cc->cc_name, r, strerror(r));
- if(cc_must_break(cc))
+ if (cc_must_break(cc))
return ECONNABORTED;
return r;
tvhlog_hexdump(cc->cc_subsys, msg->cm_data, msg->cm_len);
if (enq) {
- pthread_mutex_lock(&cc->cc_writer_mutex);
- TAILQ_INSERT_TAIL(&cc->cc_writeq, msg, cm_link);
- tvh_cond_signal(&cc->cc_writer_cond, 0);
- pthread_mutex_unlock(&cc->cc_writer_mutex);
+ pthread_mutex_lock(&cc->cc_mutex);
+ if (cc->cc_write_running) {
+ TAILQ_INSERT_TAIL(&cc->cc_writeq, msg, cm_link);
+ tvh_nonblock_write(cc->cc_pipe.wr, "w", 1);
+ } else {
+ free(msg);
+ }
+ pthread_mutex_unlock(&cc->cc_mutex);
} else {
if (tvh_write(cc->cc_fd, msg->cm_data, msg->cm_len))
tvhinfo(cc->cc_subsys, "%s: write error %s",
/**
*
*/
-static void *
-cc_writer_thread(void *aux)
+static void
+cc_session(cclient_t *cc)
{
- cclient_t *cc = aux;
+ tvhpoll_t *poll;
+ tvhpoll_event_t ev;
+ char buf[16];
+ sbuf_t rbuf;
cc_message_t *cm;
+ ssize_t len;
int64_t mono;
int r;
- pthread_mutex_lock(&cc->cc_writer_mutex);
-
- while(cc->cc_writer_running) {
-
- if((cm = TAILQ_FIRST(&cc->cc_writeq)) != NULL) {
- TAILQ_REMOVE(&cc->cc_writeq, cm, cm_link);
- pthread_mutex_unlock(&cc->cc_writer_mutex);
- // int64_t ts = getfastmonoclock();
- if (tvh_write(cc->cc_fd, cm->cm_data, cm->cm_len))
- tvhinfo(cc->cc_subsys, "%s: write error %s",
- cc->cc_name, strerror(errno));
- // printf("Write took %lld usec\n", getfastmonoclock() - ts);
- free(cm);
- pthread_mutex_lock(&cc->cc_writer_mutex);
- continue;
- }
-
-
- /* If nothing is to be sent in keepalive interval seconds we
- need to send a keepalive */
- mono = mclk() + sec2mono(cc->cc_keepalive_interval);
- do {
- r = tvh_cond_timedwait(&cc->cc_writer_cond, &cc->cc_writer_mutex, mono);
- if(r == ETIMEDOUT) {
- if (cc->cc_keepalive)
- cc->cc_keepalive(cc);
- break;
- }
- } while (ERRNO_AGAIN(r));
- }
-
- pthread_mutex_unlock(&cc->cc_writer_mutex);
- return NULL;
-}
-
-
-
-/**
- *
- */
-static void
-cc_session(cclient_t *cc)
-{
- char buf[32];
- pthread_t writer_thread_id;
-
if (cc->cc_init_session(cc))
return;
/**
* We do all requests from now on in a separate thread
*/
- cc->cc_writer_running = 1;
- tvh_cond_init(&cc->cc_writer_cond);
- pthread_mutex_init(&cc->cc_writer_mutex, NULL);
TAILQ_INIT(&cc->cc_writeq);
- snprintf(buf, sizeof(buf), "%s-writer", cc->cc_id);
- tvhthread_create(&writer_thread_id, NULL, cc_writer_thread, cc, buf);
+ cc->cc_write_running = 1;
+ sbuf_init(&rbuf);
/**
* Mainloop
*/
- while(!cc_must_break(cc)) {
- if (cc->cc_read(cc))
- break;
+ poll = tvhpoll_create(1);
+ tvhpoll_add1(poll, cc->cc_pipe.rd, TVHPOLL_IN, &cc->cc_pipe);
+ tvhpoll_add1(poll, cc->cc_fd, TVHPOLL_IN, &cc->cc_fd);
+ mono = mclk() + sec2mono(cc->cc_keepalive_interval);
+ while (!cc_must_break(cc)) {
+ pthread_mutex_unlock(&cc->cc_mutex);
+ r = tvhpoll_wait(poll, &ev, 1, 1000);
+ pthread_mutex_lock(&cc->cc_mutex);
+ if (r < 0 && ERRNO_AGAIN(errno))
+ continue;
+ if (ev.data.ptr == &cc->cc_pipe)
+ read(cc->cc_pipe.rd, buf, sizeof(buf));
+ else if (ev.data.ptr == &cc->cc_fd) {
+ len = sbuf_read(&rbuf, cc->cc_fd);
+ if (len > 0 && cc->cc_read(cc, &rbuf))
+ break;
+ } else {
+ abort();
+ }
+ if ((cm = TAILQ_FIRST(&cc->cc_writeq)) != NULL) {
+ if (tvh_nonblock_write(cc->cc_fd, cm->cm_data, cm->cm_len))
+ break;
+ }
+ if (mono < mclk()) {
+ mono = mclk();
+ if (cc->cc_keepalive)
+ cc->cc_keepalive(cc);
+ }
}
tvhdebug(cc->cc_subsys, "%s: session exiting", cc->cc_name);
-
- /**
- * Collect the writer thread
- */
+ tvhpoll_destroy(poll);
+ sbuf_free(&rbuf);
shutdown(cc->cc_fd, SHUT_RDWR);
- cc->cc_writer_running = 0;
- tvh_cond_signal(&cc->cc_writer_cond, 0);
- pthread_join(writer_thread_id, NULL);
- tvhdebug(cc->cc_subsys, "%s: Write thread joined", cc->cc_name);
}
/**
cc = pcard->cs_client;
pthread_mutex_lock(&cc->cc_mutex);
mux = pcard->cs_mux;
- if (pcard->cs_running && cc->cc_forward_emm && cc->cc_writer_running) {
+ if (pcard->cs_running && cc->cc_forward_emm && cc->cc_write_running) {
if (cc->cc_emmex) {
if (cc->cc_emm_mux && cc->cc_emm_mux != mux) {
if (cc->cc_emm_update_time + sec2mono(25) > mclk())
}
if (!cc->cc_running) {
cc->cc_running = 1;
+ tvh_pipe(O_NONBLOCK, &cc->cc_pipe);
tvhthread_create(&cc->cc_tid, NULL, cc_thread, cc, "cc");
return;
}
if (cc->cc_fd >= 0)
shutdown(cc->cc_fd, SHUT_RDWR);
pthread_mutex_unlock(&cc->cc_mutex);
+ tvh_write(cc->cc_pipe.wr, "q", 1);
pthread_kill(tid, SIGHUP);
pthread_join(tid, NULL);
+ tvh_pipe_close(&cc->cc_pipe);
caclient_set_status(cac, CACLIENT_STATUS_NONE);
pthread_mutex_lock(&cc->cc_mutex);
free(cc->cc_name);
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "tvheadend.h"
#include "caclient.h"
#include "descrambler.h"
#include "emm_reass.h"
/* Callbacks */
void *(*cc_alloc_service)(void *cc);
int (*cc_init_session)(void *cc);
- int (*cc_read)(void *cc);
+ int (*cc_read)(void *cc, sbuf_t *sbuf);
void (*cc_keepalive)(void *cc);
uint32_t (*cc_send_ecm)(void *cc, cc_service_t *ct, cc_ecm_section_t *es,
cc_card_data_t *pcard, const uint8_t *data, int len);
pthread_t cc_tid;
tvh_cond_t cc_cond;
pthread_mutex_t cc_mutex;
+ th_pipe_t cc_pipe;
/* Database */
LIST_HEAD(, cc_service) cc_services;
/* Writer */
int cc_seq;
TAILQ_HEAD(, cc_message) cc_writeq;
- pthread_mutex_t cc_writer_mutex;
- tvh_cond_t cc_writer_cond;
- uint8_t cc_writer_running;
+ uint8_t cc_write_running;
/* Emm forwarding */
int cc_forward_emm;
return 0;
}
+/**
+ *
+ */
+static int
+cwc_read_message0
+ (cwc_t *cwc, const char *state, sbuf_t *rbuf, int timeout)
+{
+ int msglen;
+
+ if (rbuf->sb_ptr < 2)
+ return 0;
+
+ msglen = (rbuf->sb_data[0] << 8) | rbuf->sb_data[1];
+ if(rbuf->sb_ptr < 2 + msglen)
+ return 0;
+
+ if((msglen = des_decrypt(rbuf->sb_data, msglen + 2, cwc)) < 15) {
+ tvhinfo(cwc->cc_subsys, "%s: %s: Decrypt failed",
+ cwc->cc_name, state);
+ return -1;
+ }
+
+ return msglen;
+}
+
/**
*
*/
{
int msglen, r;
- if((r = cc_read((cclient_t *)cwc, buf, 2, timeout))) {
+ if ((r = cc_read((cclient_t *)cwc, buf, 2, timeout))) {
if (tvheadend_is_running())
tvhinfo(cwc->cc_subsys, "%s: %s: Read error (header): %s",
cwc->cc_name, state, strerror(r));
}
msglen = (buf[0] << 8) | buf[1];
- if(msglen > len) {
+ if (msglen > len) {
if (tvheadend_is_running())
tvhinfo(cwc->cc_subsys, "%s: %s: Invalid message size: %d",
cwc->cc_name, state, msglen);
/* We expect the rest of the message to arrive fairly quick,
so just wait 1 second here */
- if((r = cc_read((cclient_t *)cwc, buf + 2, msglen, 1000))) {
+ if ((r = cc_read((cclient_t *)cwc, buf + 2, msglen, 1000))) {
if (tvheadend_is_running())
tvhinfo(cwc->cc_subsys, "%s: %s: Read error: %s",
cwc->cc_name, state, strerror(r));
return -1;
}
- if((msglen = des_decrypt(buf, msglen + 2, cwc)) < 15) {
+ if ((msglen = des_decrypt(buf, msglen + 2, cwc)) < 15) {
tvhinfo(cwc->cc_subsys, "%s: %s: Decrypt failed",
cwc->cc_name, state);
return -1;
if (cwc_send_login(cwc))
return -1;
- if(cwc_read_message(cwc, "Wait login response", buf, sizeof(buf), 5000) < 0)
+ if (cwc_read_message(cwc, "Wait login response", buf, sizeof(buf), 5000) < 0)
return -1;
- if(buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) {
+ if (buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) {
tvhinfo(cwc->cc_subsys, "%s: Login failed", cwc->cc_name);
return -1;
}
* Request card data
*/
cwc_send_data_req(cwc);
- if((r = cwc_read_message(cwc, "Request card data", buf, sizeof(buf), 5000)) < 0)
+ if ((r = cwc_read_message(cwc, "Request card data", buf, sizeof(buf), 5000)) < 0)
return -1;
if (buf[12] != MSG_CARD_DATA) {
*
*/
static int
-cwc_read(void *cc)
+cwc_read(void *cc, sbuf_t *rbuf)
{
cwc_t *cwc = cc;
- uint8_t buf[CWS_NETMSGSIZE];
const int ka_interval = cwc->cc_keepalive_interval * 2 * 1000;
- int r = cwc_read_message(cwc, "Decoderloop", buf, sizeof(buf), ka_interval);
+ int r = cwc_read_message0(cwc, "DecoderLoop", rbuf, ka_interval);
if (r < 0)
return -1;
- if (r > 12)
- return cwc_running_reply(cwc, buf[12], buf, r);
- return -1;
+ if (r > 12) {
+ int ret = cwc_running_reply(cwc, rbuf->sb_data[12], rbuf->sb_data, r);
+ if (ret > 0)
+ sbuf_cut(rbuf, r);
+ if (ret < 0)
+ return -1;
+ return 0;
+ }
+ if (r > 0)
+ return -1;
+ return 0;
}
/**
int tvh_write(int fd, const void *buf, size_t len);
+int tvh_nonblock_write(int fd, const void *buf, size_t len);
+
FILE *tvh_fopen(const char *filename, const char *mode);
void hexdump(const char *pfx, const uint8_t *data, int len);
return len ? 1 : 0;
}
+int
+tvh_nonblock_write(int fd, const void *buf, size_t len)
+{
+ ssize_t c;
+
+ while (len) {
+ c = write(fd, buf, len);
+ if (c < 0) {
+ if (errno == EINTR)
+ continue;
+ break;
+ }
+ len -= c;
+ buf += c;
+ }
+
+ return len ? 1 : 0;
+}
+
FILE *
tvh_fopen(const char *filename, const char *mode)
{