#include #include #include #include #include #include #include #include #include #include #include "common.h" #include "playlist.h" #include "streamer.h" #include "transmuxer.h" /* ===== Definitions ===== */ #define PRIORITY -10 struct Streamer { pthread_t worker_thread; pthread_mutex_t worker_ready_mutex; pthread_cond_t worker_ready_cond; int worker_ready_flag; enum State state; pthread_mutex_t state_mutex; HLSRemuxer *remuxer; Playlist *playlist; char *current_song; }; /* ===== Thread-local storage ===== */ static pthread_key_t streamer_worker_key; static pthread_once_t streamer_worker_once = PTHREAD_ONCE_INIT; static void streamer_worker_key_create(void) { pthread_key_create(&streamer_worker_key, NULL); } /* ====== Streamer initialisation ===== */ static void *streamer_worker (void *t); Streamer *streamer_init (const struct StreamerOpt *opts) { Streamer *s = NULL; Playlist *p = NULL; HLSRemuxer *r = NULL; pthread_once(&streamer_worker_once, streamer_worker_key_create); if ((s = malloc(sizeof(*s))) == NULL) goto error; if ((p = playlist_init()) == NULL) goto error; if ((r = hls_remuxer_init(opts)) == NULL) goto error; if (pthread_create(&s->worker_thread, NULL, streamer_worker, s)) goto error; s->playlist = p; s->remuxer = r; s->state = STATE_STANDBY; s->state_mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; return s; error: hls_remuxer_free(r); playlist_free(p); free(s); return NULL; } int streamer_push(Streamer *s, const char *url, unsigned int index) { return (index == PLAYLIST_END) ? playlist_enqueue(s->playlist, url) : playlist_insert(s->playlist, index, url); } int streamer_pop(Streamer *s, unsigned int id) { int err; struct Entry entry; if ((err = playlist_remove(s->playlist, id, &entry))) return err; free(entry.file); return 0; } void streamer_skip(Streamer *s) { /* 'skip' only halt the current song, which is then replaced by the streamer * worker, either immediately, or as soon as a song gets pushed. */ hls_remuxer_halt(s->remuxer); } void streamer_pause(Streamer *s) { pthread_mutex_lock(&s->state_mutex); if (s->state == STATE_PLAYING) { hls_remuxer_pause(s->remuxer); s->state = STATE_PAUSED; } pthread_mutex_unlock(&s->state_mutex); } void streamer_resume(Streamer *s) { pthread_mutex_lock(&s->state_mutex); if (s->state == STATE_PAUSED) { hls_remuxer_resume(s->remuxer); s->state = STATE_PLAYING; } pthread_mutex_unlock(&s->state_mutex); } int streamer_info(Streamer *streamer, struct StateInfo *info) { pthread_mutex_lock(&streamer->state_mutex); info->state = streamer->state; if ((info->current_song = strdup(streamer->current_song)) == NULL) { pthread_mutex_unlock(&streamer->state_mutex); return -1; } if (playlist_list(streamer->playlist, info)) { pthread_mutex_unlock(&streamer->state_mutex); free(info->current_song); return -1; } pthread_mutex_unlock(&streamer->state_mutex); return 0; } void streamer_free (Streamer *s) { pthread_kill(s->worker_thread, SIGTERM); hls_remuxer_halt(s->remuxer); pthread_join(s->worker_thread, NULL); playlist_free(s->playlist); free(s); } static void streamer_worker_sighandler(int sig) { Streamer *streamer; streamer = pthread_getspecific(streamer_worker_key); pthread_mutex_lock(&streamer->state_mutex); streamer->state = STATE_TERM; pthread_mutex_unlock(&streamer->state_mutex); } static inline int streamer_worker_conf_priority(void) { errno = 0; setpriority(PRIO_PROCESS, 0, PRIORITY); return errno ? -1 : 0; } static inline int streamer_worker_conf_sighandling(void) { sigset_t set; struct sigaction sigact; /* Configure SIGTERM interuptor */ sigact.sa_handler = streamer_worker_sighandler; sigact.sa_flags = 0; sigemptyset(&sigact.sa_mask); sigaction(SIGTERM, &sigact, NULL); // term sigaction(SIGUSR1, &sigact, NULL); // play/pause sigaction(SIGUSR2, &sigact, NULL); // skip /* Unblock SIGTERM */ sigemptyset(&set); sigaddset(&set, SIGTERM); sigaddset(&set, SIGUSR1); sigaddset(&set, SIGUSR2); pthread_sigmask(SIG_UNBLOCK, &set, NULL); } static void *streamer_worker(void *arg) { Streamer *s; struct Entry next; pthread_setspecific(streamer_worker_key, (s = arg)); /* Configure worker */ pthread_mutex_lock(&s->worker_ready_mutex); streamer_worker_conf_priority(); streamer_worker_conf_sighandling(); s->worker_ready_flag = 1; pthread_cond_broadcast(&s->worker_ready_cond); pthread_mutex_unlock(&s->worker_ready_mutex); /* Main loop */ while (s->state != ) { /* Attempt to fetch the next song, or go into standby mode */ if (playlist_try_dequeue(s->playlist, &next)) { puts("The playlist is empty, going into standby mode."); pthread_mutex_lock(&s->state_mutex); s->state = STATE_STANDBY; pthread_mutex_unlock(&s->state_mutex); playlist_dequeue(s->playlist, &next); } /* Update the current state */ pthread_mutex_lock(&s->state_mutex); printf("Next song: [%i] %s\n", next.id, next.file); s->current_song = next.file; s->state = STATE_PLAYING; pthread_mutex_unlock(&s->state_mutex); /* Play the song entirely */ hls_remuxer_play(s->remuxer, next.file); /* Discard current_song, but keep the current 'playing' state. */ pthread_mutex_lock(&s->state_mutex); free(s->current_song); s->current_song = NULL; pthread_mutex_unlock(&s->state_mutex); } return NULL; }