summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.editorconfig8
-rw-r--r--.gitignore3
-rw-r--r--HACKING33
-rw-r--r--Makefile42
-rw-r--r--README40
-rw-r--r--audio/decoder.c58
-rw-r--r--audio/decoder.h16
-rw-r--r--audio/encoder.c69
-rw-r--r--audio/encoder.h15
-rw-r--r--audio/muxer.c235
-rw-r--r--audio/muxer.h54
-rw-r--r--audio/resampler.c121
-rw-r--r--audio/resampler.h31
-rw-r--r--audio/stream.c47
-rw-r--r--audio/stream.h14
-rw-r--r--common.c13
-rw-r--r--common.h33
-rw-r--r--cybd.139
-rw-r--r--daemon.c273
-rw-r--r--daemon.h29
-rw-r--r--main.c353
-rw-r--r--playlist.c269
-rw-r--r--playlist.h33
-rw-r--r--streamer.c235
-rw-r--r--streamer.h22
-rw-r--r--testmuxer.c21
-rw-r--r--transmuxer.c283
-rw-r--r--transmuxer.h17
28 files changed, 2406 insertions, 0 deletions
diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..316e750
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,8 @@
+root = true
+
+[*]
+end_of_line = lf
+insert_final_newline = true
+indent_style=tab
+charset = utf-8
+trim_trailing_whitespace = true
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..eed3965
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+LICENSE
+cybd
+*.o
diff --git a/HACKING b/HACKING
new file mode 100644
index 0000000..ea45cdc
--- /dev/null
+++ b/HACKING
@@ -0,0 +1,33 @@
+Hacking manual
+==============
+cybd is intended to be used and embedded into a greater software stack as part
+of a more complete HLS distribution system.
+
+The easiest way to use cybd is through the provided daemon/client command-line.
+Altough cybd is not a library per-say, it's nonetheless quite easy to import the
+client, the daemon, or even the internal streamer object into another program
+thanks to it's clean-cut architecture.
+
+The cybd layer (main.c, common.h)
+---------------------------------
+cybd's main entry file, responsible of parsing commands arguments and dispatching
+calls to the daemon layer.
+
+The daemon layer (daemon.c, daemon.h)
+-------------------------------------
+Code related to client/daemon communication via a unix domain socket, and code for
+the daemon itself which handles IPC calls by forwarding them to the streamer
+layer.
+
+The streamer layer (streamer.c/h, playlist.c/h, transmuxer.c/h)
+---------------------------------------------------------------
+streamer.c/h - The 'brain' of the software, manage the song/media queue and stock
+the FFMPEG muxer with new songs.
+
+playlist.c/h - Used by streamer.c. Implements a thread-safe, indexed circular
+buffer.
+
+transmuxer.c/h - Used by streamer.c. Interface FFMPEG to implement muxing and
+demuxing.
+
+Happy hacking!
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..b48bf99
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,42 @@
+.POSIX:
+
+CFLAGS = -std=c99 -O0 -D_POSIX_C_SOURCE=200809 -g
+LDFLAGS = -lswresample -lavutil -lavcodec -lavformat -lpthread
+
+AUDIO = \
+ audio/decoder.c\
+ audio/encoder.c\
+ audio/muxer.c\
+ audio/resampler.c\
+ audio/stream.c\
+
+SRC = $(AUDIO) main.c streamer.c playlist.c transmuxer.c daemon.c common.c audio_muxer.c
+OBJ = $(SRC:.c=.o)
+
+all: cybd
+
+.c.o:
+ $(CC) $(CFLAGS) -c $< -o $@
+
+audio/decoder.o: audio/decoder.h
+audio/encoder.o: audio/encoder.h
+audio/resampler.o: audio/resampler.h audio/encoder.h audio/decoder.h
+audio/muxer.o: audio/resampler.h audio/encoder.h audio/decoder.h audio/muxer.h
+
+common.o: common.h
+main.o: common.h daemon.h
+daemon.o: common.h daemon.h streamer.h
+streamer.o: common.h streamer.h playlist.h transmuxer.h
+transmuxer.o: common.h transmuxer.h playlist.h audio/muxer.h
+playlist.o: common.h playlist.h
+
+cybd: $(OBJ)
+ $(CC) -o $@ $^ $(LDFLAGS)
+
+clean:
+ rm -f cybd $(OBJ) testmuxer.o test/*
+
+testhls: testmuxer.o transmuxer.o playlist.o $(AUDIO:.c=.o)
+ $(CC) -o $@ $^ $(LDFLAGS)
+
+.PHONY: all clean
diff --git a/README b/README
new file mode 100644
index 0000000..1ce0ebd
--- /dev/null
+++ b/README
@@ -0,0 +1,40 @@
+cybd - The cyber radio daemon
+=============================
+cybd is a lightweight, embeddable POSIX-compliant queued HSL muxer.
+
+HSL (http live stream) is a popular streaming protocol capable of transporting
+audio-video content over HTTP. cybd can play the role of the HSL server to
+generate an .m3u8 tracklist file and .ts segments files which can be served by
+any standard web server.
+
+Requirements
+------------
+In order to build cybd you need the ffmpeg API header files.
+
+
+Installation
+------------
+Edit config.mk to match your local setup (cybd is installed into the /usr/local
+namespace by default).
+
+Afterwards enter the following command to build and install cybd (if necessary as
+root):
+
+ make clean install
+
+
+Running cybd
+------------
+A cybd deamon must first be launched by invoking the 'daemon' command:
+
+ cybd [-s socket] daemon
+
+Then:
+
+ cybd [-s socket] COMMAND <args>
+
+
+Hacking cybd
+------------
+cybd was built with versatility in mind, and can be embedded in your server stack
+in several different ways. Read HACKING for more on that.
diff --git a/audio/decoder.c b/audio/decoder.c
new file mode 100644
index 0000000..70eacd5
--- /dev/null
+++ b/audio/decoder.c
@@ -0,0 +1,58 @@
+#include "decoder.h"
+#include <libavutil/dict.h>
+
+int decoder_init (struct Decoder *decoder, const AVCodecParameters *codecpar, AVDictionary **opts)
+{
+ int err;
+ AVCodecContext *avctx;
+ const AVCodec *codec;
+
+ if ((codec = avcodec_find_decoder(codecpar->codec_id)) == NULL)
+ return AVERROR_DECODER_NOT_FOUND;
+
+ /* NOTE: parameter 'codec' might be redundant here due to the next, more
+ * featuref-full call to avcodec_parameters_to_context. */
+ if ((avctx = avcodec_alloc_context3(codec)) == NULL)
+ return AVERROR(ENOMEM); /* NOTE: After reading libavcodec's code, this
+ * error code *could* misrepresent the actual
+ * error, but ffmpeg won't tell us what it is
+ * anyway so oh well. */
+
+ if ((err = avcodec_parameters_to_context(avctx, codecpar)) < 0)
+ goto error;
+
+ if ((err = avcodec_open2(avctx, codec, opts)) < 0)
+ goto error;
+
+ /* Free previous context (no-op if avctx == NULL) and swap it with the new
+ * one. This allows the previous one to be left untouched in case of error,
+ * which is generally a nice thing to do in your code. */
+ avcodec_free_context(&decoder->avctx);
+ decoder->avctx = avctx;
+ return 0;
+
+error:
+ avcodec_free_context(&avctx);
+ return err;
+}
+
+int decoder_init_for_stream (struct Decoder *decoder, const AVStream *stream, AVDictionary **opts)
+{
+ return decoder_init(decoder, stream->codecpar, opts);
+}
+
+int decoder_send (struct Decoder *decoder, const AVPacket *pkt)
+{
+ return avcodec_send_packet(decoder->avctx, pkt);
+}
+
+int decoder_convert (struct Decoder *decoder, AVFrame *out)
+{
+ return avcodec_receive_frame(decoder->avctx, out);
+}
+
+void decoder_free (struct Decoder *decoder)
+{
+ if (decoder == NULL) return;
+ avcodec_free_context(&decoder->avctx);
+}
diff --git a/audio/decoder.h b/audio/decoder.h
new file mode 100644
index 0000000..bb7f7b1
--- /dev/null
+++ b/audio/decoder.h
@@ -0,0 +1,16 @@
+#ifndef AUDIO_DECODER_H_
+#define AUDIO_DECODER_H_
+
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+#include <libavutil/dict.h>
+
+struct Decoder { AVCodecContext *avctx; };
+
+int decoder_init (struct Decoder *decoder, const AVCodecParameters *codecpar, AVDictionary **opts);
+int decoder_send (struct Decoder *decoder, const AVPacket *pkt);
+int decoder_convert (struct Decoder *decoder, AVFrame *out);
+void decoder_free (struct Decoder *decoder);
+int decoder_init_for_stream (struct Decoder *decoder, const AVStream *stream, AVDictionary **opts);
+
+#endif // AUDIO_DECODER_H_
diff --git a/audio/encoder.c b/audio/encoder.c
new file mode 100644
index 0000000..17cf2d2
--- /dev/null
+++ b/audio/encoder.c
@@ -0,0 +1,69 @@
+#include "encoder.h"
+#include <assert.h>
+
+int encoder_init (struct Encoder *encoder, const AVCodecParameters *codecpar, AVDictionary **opts)
+{
+ /* NOTE: While encoder->time_base "MUST be set by user" when encoding
+ * (quoted from the official FFMPEG documentation), avcodec_open2 actually
+ * discard it's value and set it to '1/sample_rate'.
+ *
+ * NOTE: The assert at the end of this section serves to protect cybd in
+ * case the FFMPEG devs update their code to implement some funny behaviors.
+ */
+ int err;
+ AVCodecContext *avctx;
+ const AVCodec *codec;
+
+ if ((codec = avcodec_find_encoder(codecpar->codec_id)) == NULL) {
+ err = AVERROR_ENCODER_NOT_FOUND;
+ goto error;
+ }
+
+ if ((avctx = avcodec_alloc_context3(codec)) == NULL)
+ return AVERROR(ENOMEM);
+
+ if ((err = avcodec_parameters_to_context(avctx, codecpar)) < 0)
+ goto error;
+
+ if ((err = avcodec_open2(avctx, codec, opts)))
+ goto error;
+
+ /* Because some code rely on avctx's time base to be 1/sample_rate, and
+ * because this behavious currently appears undocumented, I placed some
+ * assert here to catch up if anything changes up with ffmpeg. */
+ assert(avctx->time_base.num == 1);
+ assert(avctx->time_base.den == avctx->sample_rate);
+ encoder->avctx = avctx;
+ encoder->pts = 0;
+ return 0;
+
+error:
+ avcodec_free_context(&avctx);
+ return err;
+}
+
+int encoder_init_for_stream (struct Encoder *encoder, const AVStream *stream, AVDictionary **opts)
+{
+ return encoder_init(encoder, stream->codecpar, opts);
+}
+
+int encoder_send (struct Encoder *encoder, const AVFrame *frame)
+{
+ return avcodec_send_frame(encoder->avctx, frame);
+}
+
+int encoder_convert (struct Encoder *encoder, AVPacket *pkt_out)
+{
+ int err;
+ err = avcodec_receive_packet(encoder->avctx, pkt_out);
+ if (err < 0) return err;
+ // Manually setting pts because FFMPEG doesn't do it consistantly
+ //pkt_out->pts = encoder->pts;
+ //encoder->pts += pkt_out->duration;
+ return 0;
+}
+
+void encoder_free (struct Encoder *encoder)
+{
+ avcodec_free_context(&encoder->avctx);
+}
diff --git a/audio/encoder.h b/audio/encoder.h
new file mode 100644
index 0000000..b42f862
--- /dev/null
+++ b/audio/encoder.h
@@ -0,0 +1,15 @@
+#ifndef AUDIO_ENCODER_H_
+#define AUDIO_ENCODER_H_
+
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+
+struct Encoder { AVCodecContext *avctx; int pts; };
+
+int encoder_init (struct Encoder *encoder, const AVCodecParameters *codecpar, AVDictionary **opts);
+int encoder_send (struct Encoder *encoder, const AVFrame *pkt);
+int encoder_convert (struct Encoder *encoder, AVPacket *out);
+void encoder_free (struct Encoder *encoder);
+int encoder_init_for_stream (struct Encoder *encoder, const AVStream *stream, AVDictionary **opts);
+
+#endif // AUDIO_ENCODER_H_
diff --git a/audio/muxer.c b/audio/muxer.c
new file mode 100644
index 0000000..3860476
--- /dev/null
+++ b/audio/muxer.c
@@ -0,0 +1,235 @@
+#include <asm-generic/errno-base.h>
+#include <assert.h>
+#include <libavcodec/avcodec.h>
+#include <libavutil/error.h>
+#include <libavutil/frame.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdio.h>
+#include "muxer.h"
+#include "encoder.h"
+#include "decoder.h"
+#include "resampler.h"
+#include "stream.h"
+
+#define NANOSEC (uint64_t)1000000000
+
+struct AudioMuxer {
+ struct Stream stream;
+ struct Encoder encoder;
+ struct Decoder decoder;
+ struct Resampler resampler;
+ AVFrame *frame_in;
+ AVFrame *frame_out;
+ AVPacket *packet;
+
+ /* The amount of output samples that went out of this muxer. Starts at 0 and
+ * keep on increasing.
+ */
+ unsigned int nb_samples_out;
+};
+
+int64_t audiomuxer_flush(AudioMuxer *muxer);
+
+uint64_t audiomuxer_get_runtime(const AudioMuxer *muxer)
+{
+ return (muxer->nb_samples_out * NANOSEC) / muxer->resampler.out_sample_rate;
+}
+
+int audiomuxer_init(AudioMuxer **pmuxer, AVFormatContext *s, const struct AudioMuxerOpt *opts)
+{
+ int err;
+ AudioMuxer *muxer;
+ AVFrame *frame_out;
+ AVFrame *frame_in;
+ AVPacket *pkt;
+
+ *pmuxer = NULL;
+ if ((muxer = malloc(sizeof(AudioMuxer))) == NULL)
+ return AVERROR(ENOMEM);
+
+ muxer->decoder.avctx = NULL;
+ muxer->encoder.avctx = NULL;
+ muxer->resampler.swr = NULL;
+
+ err = stream_init(
+ &muxer->stream,
+ s,
+ opts->codec_id,
+ opts->sample_rate,
+ opts->sample_format,
+ &opts->ch_layout,
+ opts->opts_muxer);
+ if (err) goto error;
+
+ err = encoder_init_for_stream(&muxer->encoder, muxer->stream.av_stream, opts->opts_encoder);
+ if (err) goto error;
+
+ err = resampler_init_for_encoder(&muxer->resampler, &muxer->encoder);
+ if (err) goto error;
+
+ if ((frame_out = av_frame_alloc()) == NULL)
+ goto enomem;
+ if ((frame_in = av_frame_alloc()) == NULL)
+ goto enomem;
+ if ((pkt = av_packet_alloc()) == NULL)
+ goto enomem;
+
+
+ frame_out->format = muxer->encoder.avctx->sample_fmt;
+ frame_out->nb_samples = muxer->encoder.avctx->frame_size;
+ frame_out->ch_layout = muxer->encoder.avctx->ch_layout;
+ frame_out->sample_rate = muxer->encoder.avctx->sample_rate;
+ if ((err = av_frame_get_buffer(frame_out, 0)))
+ goto error;
+
+ muxer->packet = pkt;
+ muxer->frame_out = frame_out;
+ muxer->frame_in = frame_in;
+ *pmuxer = muxer;
+ return 0;
+
+enomem:
+ err = AVERROR(ENOMEM);
+error:
+ audiomuxer_free(muxer);
+ return err;
+}
+
+int audiomuxer_conf (AudioMuxer *muxer, const AVStream *input)
+{
+ int err, src_rate, dst_rate;
+ const AVCodec *codec;
+ SwrContext *resampler;
+
+ src_rate = input->codecpar->sample_rate;
+
+ err = decoder_init_for_stream(&muxer->decoder, input, NULL); // TODO: Pass options // TODO: Pass options
+ if (err) {
+ fprintf(stderr, "Failed to configure decoder: %s\n", av_err2str(err));
+ return err;
+ }
+
+ err = resampler_conf_for_decoder(&muxer->resampler, &muxer->decoder);
+ if (err) {
+ fprintf(stderr, "Failed to configure resampler: %s\n", av_err2str(err));
+ return err;
+ }
+
+
+ muxer->frame_in->format = muxer->decoder.avctx->sample_fmt;
+ muxer->frame_in->nb_samples = muxer->decoder.avctx->frame_size;
+ muxer->frame_in->ch_layout = muxer->decoder.avctx->ch_layout;
+ muxer->frame_in->sample_rate = muxer->decoder.avctx->sample_rate;
+ if ((err = av_frame_get_buffer(muxer->frame_in, 0)))
+ return err;
+
+ return 0;
+}
+
+int audiomuxer_send_packet (AudioMuxer *muxer, const AVPacket *pkt_in)
+{
+ int err, duration;
+ AVFrame *frame_in;
+
+ frame_in = muxer->frame_in;
+
+ if ((err = decoder_send(&muxer->decoder, pkt_in)) < 0)
+ return err;
+
+ /* Since a packet might contains several frames, we call decoder_convert
+ * repeatedly until we're out of data.
+ */
+ while ((err = decoder_convert(&muxer->decoder, frame_in)) != AVERROR(EAGAIN)) {
+ /* Errors other than EAGAIN should be repported */
+ if (err) {
+ fprintf(stderr, "Failed to decode frame: %s\n", av_err2str(err));
+ return err;
+ }
+
+ /* Immediately send frame to resampler and postpone silent injection */
+ if ((err = resampler_send_frame(&muxer->resampler, frame_in)) < 0) {
+ fprintf(stderr, "Failed to send data to resampler: %s\n", av_err2str(err));
+ return err;
+ }
+
+ /* Flush frame if possible */
+ if ((err = audiomuxer_flush(muxer)) < 0)
+ return err;
+ }
+
+ return 0;
+}
+
+int audiomuxer_send_silence (AudioMuxer *muxer, unsigned int microsec)
+{
+ int err;
+ if ((err = resampler_send_empty(&muxer->resampler, microsec)))
+ return err;
+
+ return audiomuxer_flush(muxer);
+}
+
+
+int64_t audiomuxer_flush(AudioMuxer *muxer)
+{
+ int err;
+ AVPacket *pkt_out;
+ AVFrame *frame_out;
+
+ frame_out = muxer->frame_out;
+ pkt_out = muxer->packet;
+
+ if ((err = resampler_convert(&muxer->resampler, frame_out))) {
+ if (err == AVERROR(EAGAIN)) return 0;
+ fprintf(stderr, "Failed to resample: %s\n", av_err2str(err));
+ return err;
+ }
+
+ if ((err = encoder_send(&muxer->encoder, frame_out))) {
+ fprintf(stderr, "Failed to send frame to encoder: %s\n", av_err2str(err));
+ return err;
+ }
+
+ if ((err = encoder_convert(&muxer->encoder, pkt_out))) {
+ if (err == AVERROR(EAGAIN)) return 0;
+ fprintf(stderr, "Failed to encode packet: %s\n", av_err2str(err));
+ return err;
+ }
+
+ /* pkt_out->duration is the "Duration of this packet in AVStream->time_base
+ * units, 0 if unknown." In practice, the timebase appears to be 1 /
+ * sample_rate, so, in other words, pkt_out->duration is the amount of
+ * samples stored in that packet.
+ *
+ * TODO: This behaviour is however undocumented, and asserts should be put
+ * in place to catch potential regressions.
+ *
+ * NOTE: duration is usually 1152 with a default MP3 audio stream.
+ */
+ assert(pkt_out->duration > 0);
+ muxer->nb_samples_out += pkt_out->duration;
+
+ /* Finally send packet to stream. */
+ if ((err = stream_send(&muxer->stream, pkt_out)) < 0) {
+ fprintf(stderr, "Failed stream packet: %s\n", av_err2str(err));
+ return err;
+ }
+
+ return 0;
+}
+
+
+void audiomuxer_free (AudioMuxer *muxer)
+{
+ if (muxer == NULL) return;
+
+ /* Flush decoder */
+ avcodec_send_packet(muxer->decoder.avctx, NULL);
+
+ encoder_free(&muxer->encoder);
+ decoder_free(&muxer->decoder);
+ resampler_free(&muxer->resampler);
+
+ free(muxer);
+}
diff --git a/audio/muxer.h b/audio/muxer.h
new file mode 100644
index 0000000..b671484
--- /dev/null
+++ b/audio/muxer.h
@@ -0,0 +1,54 @@
+#ifndef AUDIO_MUXER_H_
+#define AUDIO_MUXER_H_
+
+#include <libavformat/avformat.h>
+#include <libavutil/frame.h>
+
+/**
+ * An AudioMuxer is responsible for attaching and managing an audio stream to an
+ * FFMPEG AVFormatContext instance. Ultimately, AudioMuxer allows users to pass
+ * amorphous FFMPEG audio packets to an AvFormatContext without worrying about
+ * encoding or sample count, and to enable silent streaming.
+ */
+typedef struct AudioMuxer AudioMuxer;
+
+struct AudioMuxerOpt {
+ int sample_rate;
+ enum AVSampleFormat sample_format;
+ enum AVCodecID codec_id;
+ AVChannelLayout ch_layout;
+ AVDictionary **opts_muxer;
+ AVDictionary **opts_encoder;
+};
+
+#define AUDIO_MUXER_OPT_DEFAULTS\
+ .sample_rate = 44100,\
+ .sample_format = 0,\
+ .codec_id = 0,\
+ .ch_layout = 0,\
+ .opts_muxer = NULL,\
+ .opts_encoder = NULL
+
+/**
+ * Initialize **pmuxer using the parameters passed by *opts and attach an audio
+ * stream to *avctx. This function mearly configures the output front of the
+ * remuxer (i.e. a stream, an encoder, and part of a resmapler), and a second
+ * call to audiomuxer_conf must be dispatched before sending in packets. Return
+ * 0 on success, or a negative AVERROR on failure.
+ */
+int audiomuxer_init(AudioMuxer **pmuxer, AVFormatContext *avctx, const struct AudioMuxerOpt *opts);
+
+/**
+ * Configure *muxer to process input from a given *input stream. Internally,
+ * this function allocate a decoder and configure the remuxer's resampler to
+ * suite *input's codec and sample rate. Return 0 on success, or a negative
+ * AVERROR on failure.
+ */
+int audiomuxer_conf(AudioMuxer *muxer, const AVStream *input);
+
+int audiomuxer_send_packet (AudioMuxer *muxer, const AVPacket *pkt_in);
+int audiomuxer_send_silence (AudioMuxer *muxer, unsigned int microsec);
+uint64_t audiomuxer_get_runtime (const AudioMuxer *muxer);
+void audiomuxer_free (AudioMuxer *muxer);
+
+#endif // AUDIO_MUXER_H_
diff --git a/audio/resampler.c b/audio/resampler.c
new file mode 100644
index 0000000..6906d05
--- /dev/null
+++ b/audio/resampler.c
@@ -0,0 +1,121 @@
+#include <asm-generic/errno-base.h>
+#include <assert.h>
+#include <libavcodec/avcodec.h>
+#include <libavutil/error.h>
+#include <libavutil/opt.h>
+#include <libswresample/swresample.h>
+#include "resampler.h"
+
+#define MICROSECOND 1000000
+
+static inline uint64_t div_uint64_ceil(uint64_t num, uint64_t den)
+{
+ return (num / den) + (num % den != 0);
+}
+
+static int resampler_init (struct Resampler *resampler, const AVChannelLayout *out_chlayout, enum AVSampleFormat out_sample_fmt, int64_t out_sample_rate, uint64_t out_frame_size)
+{
+ /* Let me tell you something; you --don't want-- CAN'T configure a
+ * SwrContext manually, GOOD ****ING LUCK DOING THAT. The documentation is
+ * so bad on that matter, it only referes to "tHe PaRaMeTeRs" or show off a
+ * list of deprecated attribute which looks nothing like what's actually
+ * used in their API. I tried to copy what's being done in their a API but
+ * whatever I do would either not work or be liable to change since it's not
+ * documented for shit (even the documented stuff doesn't appears stable, so
+ * wtf am I saying anyway). A ****ing joke, I tell you that.
+ *
+ * Pardon my french.
+ *
+ * Anywho, instead of configuring the input parameter directly, we shove
+ * them in resampler so we can recall them before calling ffmpeg's SWR
+ * allocator.
+ */
+ resampler->out_chlayout = out_chlayout;
+ resampler->out_sample_fmt = out_sample_fmt;
+ resampler->out_sample_rate = out_sample_rate;
+ resampler->out_frame_size = out_frame_size;
+ resampler->swr = NULL;
+ return 0;
+}
+
+static int resampler_conf (struct Resampler *resampler, const AVChannelLayout *in_chlayout, enum AVSampleFormat in_sample_fmt, int64_t in_sample_rate)
+{
+ int err;
+
+ err = swr_alloc_set_opts2(
+ &resampler->swr,
+ resampler->out_chlayout,
+ resampler->out_sample_fmt,
+ resampler->out_sample_rate,
+ in_chlayout,
+ in_sample_fmt,
+ in_sample_rate,
+ 0, 0);
+ if (err) return err;
+
+ if (!swr_is_initialized(resampler->swr))
+ if ((err = swr_init(resampler->swr)))
+ return err;
+
+ resampler->in_sample_rate = in_sample_rate;
+ resampler->needed = div_uint64_ceil(resampler->out_frame_size * in_sample_rate, resampler->out_sample_rate);
+
+ return 0;
+}
+
+int resampler_send_frame (struct Resampler *resampler, AVFrame *in)
+{
+ int err;
+
+ if ((err = swr_convert(resampler->swr, NULL, 0, (const uint8_t **)in->extended_data, in->nb_samples)) < 0)
+ return err;
+ resampler->stored += in->nb_samples;
+ return 0;
+}
+
+int resampler_send_empty (struct Resampler *resampler, unsigned int microsecond)
+{
+ // TODO: Check for downcasting
+ int nb_samples;
+ nb_samples = div_uint64_ceil(microsecond * resampler->in_sample_rate, 1000);
+ return swr_inject_silence(resampler->swr, nb_samples);
+}
+
+int resampler_convert (struct Resampler *resampler, AVFrame *out)
+{
+ int err;
+ assert(out != NULL);
+ assert(resampler->swr != NULL);
+
+ if (resampler->stored < resampler->needed) {
+ return AVERROR(EAGAIN);
+ /*
+ err = swr_inject_silence(resampler->swr, resampler->needed - resampler->stored);
+ if (err < 0) return err;
+ */
+ }
+
+ err = swr_convert(resampler->swr, out->extended_data, out->nb_samples, NULL, 0);
+ if (err < 0) return err;
+ resampler->stored = 0;
+
+ return 0;
+}
+
+void resampler_free (struct Resampler *resampler)
+{
+ if (resampler == NULL) return;
+ swr_free(&resampler->swr);
+}
+
+int resampler_init_for_encoder (struct Resampler *resampler, const struct Encoder *encoder)
+{
+ AVCodecContext *avctx = encoder->avctx;
+ return resampler_init(resampler, &avctx->ch_layout, avctx->sample_fmt, avctx->sample_rate, avctx->frame_size);
+}
+
+int resampler_conf_for_decoder (struct Resampler *resampler, const struct Decoder *decoder)
+{
+ AVCodecContext *avctx = decoder->avctx;
+ return resampler_conf(resampler, &avctx->ch_layout, avctx->sample_fmt, avctx->sample_rate);
+}
diff --git a/audio/resampler.h b/audio/resampler.h
new file mode 100644
index 0000000..0bb471c
--- /dev/null
+++ b/audio/resampler.h
@@ -0,0 +1,31 @@
+#ifndef AUDIO_RESAMPLER_H_
+#define AUDIO_RESAMPLER_H_
+
+#include <libavutil/channel_layout.h>
+#include <libavutil/samplefmt.h>
+#include <libswresample/swresample.h>
+#include "decoder.h"
+#include "encoder.h"
+
+struct Resampler {
+ SwrContext *swr;
+ uint64_t stored;
+ uint64_t needed;
+ int in_sample_rate;
+
+ /* Passed by *_init, used by *_conf */
+ const AVChannelLayout *out_chlayout;
+ enum AVSampleFormat out_sample_fmt;
+ uint64_t out_sample_rate;
+ uint64_t out_frame_size;
+};
+
+int resampler_init_for_encoder (struct Resampler *resampler, const struct Encoder *encoder);
+int resampler_conf_for_decoder (struct Resampler *resampler, const struct Decoder *decoder);
+int resampler_send_frame (struct Resampler *resampler, AVFrame *in);
+int resampler_send_empty (struct Resampler *resampler, unsigned int microsecond);
+int resampler_convert (struct Resampler *resmapler, AVFrame *out);
+int resampler_has_enough (struct Resampler *resampler);
+void resampler_free (struct Resampler *resampler);
+
+#endif // AUDIO_RESAMPLER_H_
diff --git a/audio/stream.c b/audio/stream.c
new file mode 100644
index 0000000..7bab674
--- /dev/null
+++ b/audio/stream.c
@@ -0,0 +1,47 @@
+#include "stream.h"
+
+int stream_init(struct Stream *stream, AVFormatContext *s, int codec_id, int sample_rate, int format, const AVChannelLayout *layout, AVDictionary **opts)
+{
+ int err;
+ AVStream *av_stream;
+
+ if ((av_stream = avformat_new_stream(s, NULL)) == NULL)
+ return AVERROR(ENOMEM);
+ av_stream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
+ av_stream->codecpar->codec_id = codec_id;
+ av_stream->codecpar->sample_rate = sample_rate;
+ av_stream->codecpar->format = format;
+ av_stream->codecpar->ch_layout = *layout;
+
+ if ((err = avformat_init_output(s, opts)) < 0)
+ return err;
+
+ stream->av_stream = av_stream;
+ stream->s = s;
+ return 0;
+}
+
+int stream_send (struct Stream *stream, AVPacket *pkt)
+{
+ int err;
+ int64_t duration;
+
+ // NOTE:
+ // Before encoding;
+ // - frame_out->pts DOES matter
+ //
+ // After encoding;
+ // - pkt->pts is set to something
+ // - pkt->timebase is unset, but would be '1 / output_sample_rate'
+ // - pkt->duration is set to the amount of sample in the packet
+ // - pkt->dts seems to be pkt->pts
+ pkt->stream_index = stream->av_stream->index;
+ //av_packet_rescale_ts(pkt_out, encoder->time_base, stream->time_base);
+ //print_pts(pkt_out, &stream->time_base);
+
+ duration = pkt->duration;
+ if ((err = av_interleaved_write_frame(stream->s, pkt)) < 0)
+ return err;
+
+ return duration;
+}
diff --git a/audio/stream.h b/audio/stream.h
new file mode 100644
index 0000000..94c5162
--- /dev/null
+++ b/audio/stream.h
@@ -0,0 +1,14 @@
+#ifndef AUDIO_STREAM_H
+#define AUDIO_STREAM_H
+
+#include <libavformat/avformat.h>
+
+struct Stream {
+ AVFormatContext *s;
+ AVStream *av_stream;
+};
+
+int stream_init (struct Stream *stream, AVFormatContext *s, int codec_id, int sample_rate, int format, const AVChannelLayout *layout, AVDictionary **opts);
+int stream_send (struct Stream *stream, AVPacket *pkt);
+
+#endif // AUDIO_STREAM_H
diff --git a/common.c b/common.c
new file mode 100644
index 0000000..683428d
--- /dev/null
+++ b/common.c
@@ -0,0 +1,13 @@
+#include <stdlib.h>
+#include "common.h"
+
+void stateinfo_free(struct StateInfo *state_info)
+{
+ int i;
+
+ for (i = 0; i < state_info->song_count; i ++)
+ free(state_info->next_songs[i].file);
+
+ free(state_info->current_song);
+ free(state_info->next_songs);
+}
diff --git a/common.h b/common.h
new file mode 100644
index 0000000..1ad624b
--- /dev/null
+++ b/common.h
@@ -0,0 +1,33 @@
+#ifndef COMMON_H_
+#define COMMON_H_
+
+#include <limits.h>
+#define PLAYLIST_END UINT_MAX
+#define SEGMENT_COUNT_UNLIMITED UINT_MAX
+
+enum State {
+ STATE_PAUSED,
+ STATE_PLAYING,
+ STATE_STANDBY,
+ STATE_TERM
+};
+
+struct Entry {
+ unsigned int id;
+ char *file;
+};
+
+struct StateInfo {
+ enum State state;
+ char *current_song;
+ int song_count;
+ struct Entry *next_songs;
+};
+
+struct StreamerOpt {
+ char *filename;
+};
+
+void stateinfo_free(struct StateInfo *state_info);
+
+#endif // COMMON_H_
diff --git a/cybd.1 b/cybd.1
new file mode 100644
index 0000000..6dffa17
--- /dev/null
+++ b/cybd.1
@@ -0,0 +1,39 @@
+.TH CYBD 1 cybd\-VERSION
+.SH NAME
+cybd \- The cyber radio daemon
+.SH SYNOPSIS
+.B cybd
+.RB [ \-t
+.IR socket ]
+.I command
+.RI [ arguments ...]
+.SH DESCRIPTION
+cybd is a simple HLS muxer daemon designed to be integrated into a bigger HTTP
+radio stack. It's role is to progress through a list of local media file in
+order to generate a continuous HTTP live stream, stored as an .m3u8 playlist
+file and .ts segment files, which can then be statically distributed by a web
+server.
+.P
+By default, the daemon and the clients communicate via a UNIX socket named
+$XDG_RUNTIME_DIR/cybd.sock. This can be changed by setting the
+.B \-t
+flag to the desired socket name.
+.SH COMMANDS
+.TP
+.B daemon
+Start the daemon and begin streaming silent frames. The diffusion will start as soon as an entry is pushed into the daemon's playlist.
+.TP
+.B skip
+Halt the playback of the currently playing media an continue with the next one in queue.
+.TP
+.B pause
+Pause the daemon and starts streaming silent frames.
+.TP
+.B resume
+Resume the daemon after a pause command.
+.SH AUTHORS
+See the LICENSE file for the authors.
+.SH LICENSE
+See the LICENSE file for the terms of redistribution.
+.SH SEE ALSO
+.BR ffmpeg (1),
diff --git a/daemon.c b/daemon.c
new file mode 100644
index 0000000..bcb5049
--- /dev/null
+++ b/daemon.c
@@ -0,0 +1,273 @@
+#define _DEFAULT_SOURCE
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include "common.h"
+#include "daemon.h"
+#include "streamer.h"
+
+int daemon_run = 0;
+
+// TODO: Too pedantic and verbose, replace with read_val, write_val, read_str, write_str
+#define strict_read(fd, ptr, count) (read(fd, ptr, sizeof(*ptr) * count) != sizeof(*ptr) * count)
+#define strict_write(fd, ptr, count) (write(fd, ptr, sizeof(*ptr) * count) != sizeof(*ptr) * count)
+
+enum Opcode {
+ OP_DAEMON,
+ OP_PUSH,
+ OP_POP,
+ OP_TERM,
+ OP_SKIP,
+ OP_LIST,
+ OP_PAUSE,
+ OP_RESUME,
+};
+
+struct Command {
+ enum Opcode opcode;
+ int opindex;
+ int urllen;
+};
+
+#define write_val(fd, ptr) (write(fd, ptr, sizeof(*ptr)))
+#define read_val(fd, ptr) (read(fd, ptr, sizeof(*ptr)))
+static void write_str (int fd, const char *str);
+static char *read_str (int fd);
+
+// ===================================================================
+// Server code
+// ===================================================================
+
+static inline int send_reply (int fd, struct StateInfo *state_info)
+{
+ int i;
+ struct Entry *entry;
+
+ errno = 0;
+ write_val(fd, &state_info->state);
+ write_str(fd, state_info->current_song);
+ write_val(fd, &state_info->song_count);
+ for (i = 0; i < state_info->song_count; i ++) {
+ entry = &state_info->next_songs[i];
+ write_val(fd, &entry->id);
+ write_str(fd, entry->file);
+ }
+
+ return errno ? -1 : 0;
+}
+
+int daemon_start(int sockfd, const struct StreamerOpt *opts)
+{
+ Streamer *streamer;
+ int connfd, new_id;
+ char *url;
+ struct Command msg;
+ struct StateInfo state_info;
+ enum State state;
+
+ if ((streamer = streamer_init(opts)) == NULL) {
+ fprintf(stderr, "Failed to allocate streamer\n");
+ return EXIT_FAILURE;
+ }
+
+ url = NULL;
+ daemon_run = 1;
+ while (daemon_run) {
+ puts("Awaiting connection");
+
+ /* Block and wait for another connection. accept(2) shall returns a
+ * nonnegative file descriptor on sucess, or -1 if an error occured. All
+ * error are considered fatal, except for EINTR, which is seen as an
+ * occasion to re-check whether the daemon should keep running.
+ */
+ if ((connfd = accept(sockfd, NULL, NULL)) < 0) {
+ if (errno == EINTR) continue;
+ perror("Failed to accept connection");
+ break;
+ }
+
+ if (read(connfd, &msg, sizeof(msg)) != sizeof(msg)) {
+ fputs("Unexpected end of package, discarding.\n", stderr);
+ close(connfd);
+ continue;
+ }
+
+ switch (msg.opcode) {
+ case OP_PUSH:
+ if ((url = realloc(url, msg.urllen + 1)) == NULL) {
+ perror("Failed to allocate URL buffer");
+ break;
+ }
+ read(connfd, url, msg.urllen);
+ url[msg.urllen] = '\0';
+
+ new_id = streamer_push(streamer, url, msg.opindex);
+ if (new_id < 0)
+ fprintf(stderr, "Failed to add '%s' to the playlist: %s\n", url, strerror(errno));
+ else
+ printf("Pushed '%s' with id %i\n", url, new_id);
+ break;
+
+ case OP_POP:
+ if (streamer_pop(streamer, msg.opindex))
+ fprintf(stderr, "Failed remove entry #%i\n", msg.opindex);
+ else
+ printf("Succesfullt remove '%i' from the playlist\n", msg.opindex);
+ break;
+
+ case OP_TERM:
+ daemon_run = 0;
+ break;
+ case OP_SKIP:
+ streamer_skip(streamer);
+ break;
+ case OP_PAUSE:
+ streamer_pause(streamer);
+ break;
+ case OP_RESUME:
+ streamer_resume(streamer);
+ break;
+ case OP_LIST:
+ state = streamer_info(streamer, &state_info);
+ if (state == -1) {
+ perror("Failed to reatrieve streamer info");
+ break;
+ }
+
+ if (send_reply(sockfd, &state_info)) {
+ perror("Failed to send back state info");
+ break;
+ }
+ break;
+ default:
+ fprintf(stderr, "Ignoring unknown command code: %i\n", msg.opcode);
+ break;
+ }
+
+ close(connfd);
+ errno = 0;
+ }
+ puts("done\n");
+
+ daemon_run = 0;
+ streamer_free(streamer);
+ return errno ? -1 : -0;
+}
+
+
+// ===================================================================
+// Client code
+// ===================================================================
+
+static inline int send_command(int fd, int code, unsigned int index, int len)
+{
+ struct Command m;
+ m.opcode = code;
+ m.opindex = index;
+ m.urllen = len;
+ return strict_write(fd, &m, 1) ? -1 : 0;
+}
+
+int daemon_push(int fd, const char *track, unsigned int index)
+{
+ int len;
+ char *realtrack;
+
+ if ((realtrack = realpath(track, NULL)) == NULL)
+ return -1;
+
+ len = strlen(realtrack);
+ if (send_command(fd, OP_PUSH, index, len))
+ goto error;
+ if (strict_write(fd, realtrack, len))
+ goto error;
+ free(realtrack);
+ return 0;
+
+error:
+ free(realtrack);
+ return -1;
+}
+
+
+int daemon_pop(int fd, unsigned int index)
+{
+ return send_command(fd, OP_POP, index, 0);
+}
+
+int daemon_skip(int fd)
+{
+ return send_command(fd, OP_SKIP, 0, 0);
+}
+
+int daemon_pause(int fd)
+{
+ return send_command(fd, OP_PAUSE, 0, 0);
+}
+
+int daemon_resume(int fd)
+{
+ return send_command(fd, OP_RESUME, 0, 0);
+}
+
+int daemon_kill(int fd)
+{
+ return send_command(fd, OP_TERM, 0, 0);
+}
+
+int daemon_query(int fd, struct StateInfo *state_info)
+{
+ int i, len;
+ enum State state;
+ struct Entry *entry;
+
+ if (send_command(fd, OP_LIST, 0, 0))
+ return -1;
+
+ errno = 0;
+ read_val(fd, &state_info->state);
+ state_info->current_song = read_str(fd);
+ read_val(fd, &state_info->song_count);
+
+ state_info->next_songs = calloc(state_info->song_count, sizeof(struct Entry));
+ if (state_info->next_songs == NULL) {
+ free(state_info->current_song);
+ return -1;
+ }
+
+ for (i = 0; i < state_info->song_count; i ++) {
+ entry = &state_info->next_songs[i];
+ read_val(fd, &entry->id);
+ entry->file = read_str(fd);
+ }
+
+ if (errno) {
+ stateinfo_free(state_info);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void write_str (int fd, const char *str)
+{
+ size_t len;
+ len = strlen(str) + 1;
+ write(fd, &len, sizeof(size_t));
+ write(fd, str, len);
+}
+
+static char *read_str(int fd)
+{
+ size_t len;
+ char *str;
+
+ read(fd, &len, sizeof(size_t));
+ if ((str = malloc(len)) == NULL)
+ return NULL;
+ read(fd, str, len);
+ return str;
+}
diff --git a/daemon.h b/daemon.h
new file mode 100644
index 0000000..d3ea5d0
--- /dev/null
+++ b/daemon.h
@@ -0,0 +1,29 @@
+#ifndef DAEMON_H_
+#define DAEMON_H_
+
+#include "common.h"
+
+/**
+ * Functions related to client-daemon IPC over a unix-domain socket.
+ */
+
+/* Servers symbols */
+extern int daemon_run;
+
+/**
+ * Allocate a cybd streamer and kickstart the cybd daemon even loop. This
+ * function is blocking and thread-unsafe due to its interaction with the global
+ * daemon_run variable. This function returns 0 on success or -1 on failure.
+ */
+int daemon_start (int fd, const struct StreamerOpt *opts);
+
+/* Client symbols */
+int daemon_push (int fd, const char *track, unsigned int index);
+int daemon_pop (int fd, unsigned int index);
+int daemon_skip (int fd);
+int daemon_pause (int fd);
+int daemon_resume (int fd);
+int daemon_kill (int fd);
+int daemon_query (int fd, struct StateInfo *entries);
+
+#endif // DAEMON_H_
diff --git a/main.c b/main.c
new file mode 100644
index 0000000..7899b7b
--- /dev/null
+++ b/main.c
@@ -0,0 +1,353 @@
+#include <errno.h>
+#include <getopt.h>
+#include <signal.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <time.h>
+#include <unistd.h>
+#include "common.h"
+#include "daemon.h"
+
+#define SOCKET_NAME "/cybd.socket"
+
+#define SUN_LEN(ptr) (offsetof (struct sockaddr_un, sun_path) + strlen ((ptr)->sun_path))
+
+static int fd;
+static char *socketname;
+
+static int main_daemon (int fd, int argc, char **argv);
+static int main_push (int fd, int argc, char **argv);
+static int main_pop (int fd, int argc, char **argv);
+static int main_skip (int fd, int argc, char **argv);
+static int main_pause (int fd, int argc, char **argv);
+static int main_resume (int fd, int argc, char **argv);
+static int main_query (int fd, int argc, char **argv);
+static int main_kill (int fd, int argc, char **argv);
+
+static int get_socket_fd (const char *socketname, int do_bind);
+static char *get_default_socketname (void);
+
+
+int main(int argc, char **argv)
+{
+ int opt, err, is_daemon;
+ char *command;
+
+ socketname = NULL;
+ while ((opt = getopt(argc, argv, "s:")) != -1) switch (opt) {
+ case 's':
+ socketname = strdup(optarg);
+ break;
+ default:
+ goto usage;
+ }
+
+ if (socketname == NULL)
+ socketname = get_default_socketname();
+
+ if ((command = argv[optind ++]) == NULL) {
+ fprintf(stderr, "%s: missing command\n", argv[0]);
+ free(socketname);
+ goto usage;
+ }
+
+ is_daemon = strcmp(command, "daemon") == 0;
+ if (is_daemon) unlink(socketname);
+ if ((fd = get_socket_fd(socketname, is_daemon)) == -1) {
+ fprintf(stderr, "%s: Failed to bind socket at '%s': %s\n", argv[0], socketname, strerror(errno));
+ free(socketname);
+ return EXIT_FAILURE;
+ }
+
+ if (is_daemon)
+ err = main_daemon (fd, argc, argv);
+ else {
+ free(socketname);
+ if (strcmp(command, "push" ) == 0) err = main_push (fd, argc, argv);
+ else if (strcmp(command, "pop" ) == 0) err = main_pop (fd, argc, argv);
+ else if (strcmp(command, "skip" ) == 0) err = main_skip (fd, argc, argv);
+ else if (strcmp(command, "pause" ) == 0) err = main_pause (fd, argc, argv);
+ else if (strcmp(command, "resume") == 0) err = main_resume (fd, argc, argv);
+ else if (strcmp(command, "kill" ) == 0) err = main_kill (fd, argc, argv);
+ else if (strcmp(command, "query" ) == 0) err = main_query (fd, argc, argv);
+ else {
+ fprintf(stderr, "%s: %s: Unknown command\n", argv[0], command);
+ close(fd);
+ goto usage;
+ }
+ }
+
+ close(fd);
+ return err;
+
+usage:
+ fprintf(stderr, "Usage: %s [-s socket] <daemon|push|pop|term|list|skip> [args...]\n", argv[0]);
+ return EXIT_FAILURE;
+}
+
+char *get_default_socketname(void)
+{
+ /* Suggest a default socket name for cybd. This function returns either one
+ * of the two following suggestions;
+ *
+ * 1. If the XDG_RUNTIME_DIR environment is set, we assume the user adhere
+ * to the XDG directory specification and behave accordingly.
+ * 2. If not, we use the FHS-specified '/run' directory.
+ *
+ * In either case, the socket names is set to ends with SOCKET_NAME and a
+ * dynamically allocated string is returned. This function may also returns
+ * NULL if cybd has run out of memory, in which case errno will be set to
+ * ENOMEM.
+ */
+ char *name, *xdg;
+
+ if ((xdg = getenv("XDG_RUNTIME_DIR"))) {
+ name = malloc(strlen(xdg) + sizeof(SOCKET_NAME));
+ if (name == NULL) return NULL;
+
+ sprintf(name, "%s" SOCKET_NAME, xdg);
+ return name;
+ }
+
+ return strdup("/run" SOCKET_NAME);
+}
+
+int get_socket_fd (const char *socketname, int as_daemon)
+{
+ int fd;
+ struct sockaddr_un addr;
+
+ // TODO: Re-check the documentation apropos socket name length
+ if (strlen(socketname) > sizeof(addr.sun_path) - 1) {
+ fputs("Failed to create socket: filename too long\n", stderr);
+ return -1;
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ perror("Failed to create socket");
+ return -1;
+ }
+
+ addr.sun_family = AF_UNIX;
+ strcpy(addr.sun_path, socketname);
+
+ if ((as_daemon ? bind : connect)(fd, (struct sockaddr *)&addr, SUN_LEN(&addr))) {
+ close(fd);
+ fprintf(stderr, "Failed to open socket at '%s': %s\n", socketname, strerror(errno));
+ return -1;
+ }
+
+ if (as_daemon)
+ listen(fd, SOMAXCONN);
+
+ return fd;
+}
+
+static int main_push(int fd, int argc, char **argv)
+{
+ int read;
+ unsigned int id;
+ char *file;
+
+ if (optind == argc) {
+ fprintf(stderr, "%s: Missing input argument\n", argv[0]);
+ goto usage;
+ }
+ file = argv[optind ++];
+
+ if (argv[optind] != NULL) {
+ sscanf(argv[optind], "%u%n", &id, &read);
+ if (read != strlen(argv[optind])) {
+ fprintf(stderr, "%s: bad index argument: '%u'\n", argv[0], id);
+ goto usage;
+ }
+ optind ++;
+ }
+ else id = PLAYLIST_END;
+
+ if (argv[optind] != NULL) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ goto usage;
+ }
+
+ if (daemon_push(fd, file, id)) {
+ perror("Failed to push entry");
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+
+usage:
+ fprintf(stderr, "Usage: %s [-s socket] push <file> [index]\n", argv[0]);
+ return EXIT_FAILURE;
+}
+
+static int main_pop(int fd, int argc, char **argv)
+{
+ int read;
+ unsigned int id;
+
+ if (argv[optind] != NULL) {
+ sscanf(argv[optind], "%u%n", &id, &read);
+ if (read != strlen(argv[optind])) {
+ fprintf(stderr, "%s: bad index argument: '%u'\n", argv[0], id);
+ goto usage;
+ }
+ optind ++;
+ }
+ else id = PLAYLIST_END;
+
+ if (argv[optind] != NULL) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ goto usage;
+ }
+
+ if (daemon_pop(fd, id)) {
+ perror("Failed to pop entry");
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+
+usage:
+ fprintf(stderr, "Usage: %s [-s socket] pop <id>\n", argv[0]);
+ return EXIT_FAILURE;
+}
+
+static int main_skip(int fd, int argc, char **argv)
+{
+ if (argv[optind] != NULL) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ fprintf(stderr, "Usage: %s [-s socket] skip\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+ if (daemon_skip(fd)) {
+ perror("Failed to send skip request");
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
+
+static int main_pause(int fd, int argc, char **argv)
+{
+ if (argv[optind] != NULL) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ fprintf(stderr, "Usage: %s [-s socket] pause\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+ if (daemon_pause(fd)) {
+ perror("Failed to send pause request");
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
+
+static int main_resume(int fd, int argc, char **argv)
+{
+ if (argv[optind] != NULL) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ fprintf(stderr, "Usage: %s [-s socket] resume\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+ if (daemon_resume(fd)) {
+ perror("Failed to send resume request");
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
+
+static int main_kill(int fd, int argc, char **argv)
+{
+ if (argv[optind] != NULL) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ fprintf(stderr, "Usage: %s [-s socket] kill\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+ if (daemon_kill(fd)) {
+ perror("Failed to send kill request");
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
+
+static int main_query (int fd, int argc, char **argv)
+{
+ int i;
+ struct Entry *entry;
+ struct StateInfo state_info;
+
+ if (optind != argc) {
+ fprintf(stderr, "%s: Too many arguments\n", argv[0]);
+ fprintf(stderr, "Usage: %s [-s socket] query\n", argv[0]);
+ return -1;
+ }
+
+ if (daemon_query(fd, &state_info)) {
+ perror("Failed to query daemon");
+ return -1;
+ }
+
+ switch (state_info.state) {
+ case STATE_PAUSED:
+ printf("%s (paused)\n", state_info.current_song);
+ break;
+ case STATE_PLAYING:
+ printf("%s (playing)\n", state_info.current_song);
+ break;
+ case STATE_STANDBY:
+ puts("standby");
+ break;
+ }
+
+ for (i = 0; i < state_info.song_count; i ++) {
+ entry = &state_info.next_songs[i];
+ printf("%u\t%s", entry->id, entry->file);
+ free(entry->file);
+ }
+
+ stateinfo_free(&state_info);
+ return 0;
+}
+
+static void close_socket(int sig)
+{
+ puts("Signal received, terminating daemon");
+ shutdown(fd, SHUT_RDWR);
+}
+
+static int main_daemon (int fd, int argc, char **argv)
+{
+ time_t t;
+ int err;
+ struct tm *tm_info;
+ struct StreamerOpt opts;
+ struct sigaction sigact;
+
+ sigact.sa_handler = close_socket;
+ sigemptyset(&sigact.sa_mask);
+ sigact.sa_flags = 0;
+ sigaction(SIGTERM, &sigact, NULL);
+ sigaction(SIGINT, &sigact, NULL);
+
+ // TODO: Read from args
+ opts.filename = "./test/stream.m3u8";
+
+
+ time(&t);
+ tm_info = localtime(&t);
+
+ char buffer[50];
+ strftime(buffer, 50, "Daemon started at %Y-%m-%d %H:%M:%S", tm_info);
+ puts(buffer);
+
+ err = daemon_start(fd, &opts);
+ unlink(socketname);
+ free(socketname);
+
+ puts("Goodbye.");
+ return err ? EXIT_FAILURE : EXIT_SUCCESS;
+}
diff --git a/playlist.c b/playlist.c
new file mode 100644
index 0000000..5d7a697
--- /dev/null
+++ b/playlist.c
@@ -0,0 +1,269 @@
+#include <asm-generic/errno-base.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "playlist.h"
+
+#define PLAYLIST_INIT_SIZE 2 // TODO: Bump to 32
+
+struct Playlist {
+ struct Entry *entries;
+ int front, rear, next_id, len, size;
+ pthread_mutex_t mutex;
+ pthread_cond_t ready;
+};
+
+static int playlist_grow (Playlist *pl);
+static int playlist_is_full (Playlist *pl);
+static int playlist_is_empty (Playlist *pl);
+static int playlist_get_index (Playlist *pl, unsigned int id);
+static void playlist_unsafe_dequeue (Playlist *pl, struct Entry *entry);
+
+Playlist *playlist_init(void)
+{
+ Playlist *pl;
+
+ if ((pl = malloc(sizeof(*pl))) == NULL)
+ return NULL;
+
+ pl->entries = calloc(PLAYLIST_INIT_SIZE, sizeof(*pl->entries));
+ if (pl->entries == NULL) {
+ free(pl);
+ return NULL;
+ }
+
+ pl->next_id = 0;
+ pl->front = 0;
+ pl->rear = 0;
+ pl->len = 0;
+ pl->size = PLAYLIST_INIT_SIZE;
+ pl->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
+ pl->ready = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
+ return pl;
+}
+
+void playlist_free (Playlist *pl)
+{
+ int i;
+
+ if (pl == NULL)
+ return;
+
+ for (i = 0; i < pl->size; i ++)
+ free(pl->entries[0].file);
+
+ free(pl->entries);
+ free(pl);
+}
+
+int playlist_enqueue (Playlist *pl, const char *entry)
+{
+ int id;
+ char *copy;
+
+ if ((copy = strdup(entry)) == NULL)
+ return -1;
+
+ pthread_mutex_lock(&pl->mutex);
+ if (playlist_is_full(pl) && playlist_grow(pl)) {
+ pthread_mutex_unlock(&pl->mutex);
+ free(copy);
+ return -1;
+ }
+
+ pl->entries[pl->rear].file = copy;
+ id = pl->entries[pl->rear].id = pl->next_id ++;
+ pl->rear = (pl->rear + 1) % pl->size;
+ pl->len ++;
+
+ pthread_cond_signal(&pl->ready);
+ pthread_mutex_unlock(&pl->mutex);
+ return id;
+}
+
+int playlist_dequeue (Playlist *pl, struct Entry *entry)
+{
+ /* playlist_dequeue take hold of the playlist's mutex, relinquishing it
+ * either after a song has been dequeued, or, in the scenario where the
+ * playlist is currently empty, atomically while waiting for the pl->ready
+ * condition to signal true, indicating that a song has been queued.
+ *
+ * However, it is documented that POSIX condition waiters may be subjects to
+ * "spurious wakeup", spontaneously unlocking themself for no appearent
+ * reasons.
+ *
+ * To prevent undefined behaviours, the lock condition checker and the
+ * waiter are placed in a loop to ensure that any return-to-context occured
+ * legitimately, and to keep waiting othwerwise.
+ */
+
+ pthread_mutex_lock(&pl->mutex);
+ errno = 0;
+ while (playlist_is_empty(pl)) {
+ pthread_cond_wait(&pl->ready, &pl->mutex);
+ if (errno) {
+ pthread_mutex_unlock(&pl->mutex);
+ return -1;
+ }
+ }
+
+ playlist_unsafe_dequeue(pl, entry);
+ pthread_mutex_unlock(&pl->mutex);
+ return 0;
+}
+
+int playlist_try_dequeue(Playlist *pl, struct Entry *entry)
+{
+ pthread_mutex_lock(&pl->mutex);
+ if (playlist_is_empty(pl)) {
+ pthread_mutex_unlock(&pl->mutex);
+ return -1;
+ }
+
+ playlist_unsafe_dequeue(pl, entry);
+ pthread_mutex_unlock(&pl->mutex);
+ return 0;
+}
+
+int playlist_remove(Playlist *pl, unsigned int id, struct Entry *entry)
+{
+ int index;
+
+ pthread_mutex_lock(&pl->mutex);
+ if ((index = playlist_get_index(pl, id)) == -1) {
+ pthread_mutex_unlock(&pl->mutex);
+ return -1;
+ }
+
+ *entry = pl->entries[index];
+ while (index != pl->rear) {
+ pl->entries[index] = pl->entries[index + 1];
+ index = (index + 1) % pl->len;
+ }
+
+ if (pl->rear -- < 0)
+ pl->rear = pl->len - 1;
+ pl->len --;
+
+ pthread_mutex_unlock(&pl->mutex);
+ return 0;
+}
+
+int playlist_insert(Playlist *pl, unsigned int id, const char *value)
+{
+ char *copy;
+ int index, new_id;
+ struct Entry a, b;
+
+ pthread_mutex_lock(&pl->mutex);
+ if ((index = playlist_get_index(pl, id)) == -1) {
+ pthread_mutex_unlock(&pl->mutex);
+ return -1;
+ }
+
+ if ((copy = strdup(value)) == NULL)
+ goto enomem;
+
+ if (playlist_is_full(pl) && playlist_grow(pl))
+ goto enomem;
+
+ pl->rear = (pl->rear + 1) % pl->len;
+
+ a = (struct Entry){ new_id = pl->next_id ++, copy };
+ while (index != pl->rear) {
+ b = pl->entries[index];
+ pl->entries[index] = a;
+ a = b;
+ index = (index + 1) % pl->len;
+ }
+
+ pl->len ++;
+
+ pthread_mutex_unlock(&pl->mutex);
+ return new_id;
+
+enomem:
+ pthread_mutex_unlock(&pl->mutex);
+ free(copy);
+ return -2;
+}
+
+int playlist_list(Playlist *pl, struct StateInfo *list)
+{
+ int i, j;
+
+ pthread_mutex_lock(&pl->mutex);
+ list->song_count = pl->len;
+ if ((list->next_songs = malloc(sizeof(list[0]) * pl->len)) == NULL)
+ return -1;
+
+ for (i = 0, j = pl->front; i < pl->len; i ++, j ++) {
+ j %= pl->len;
+ list->next_songs[i].id = pl->entries[j].id;
+ list->next_songs[i].file = strdup(pl->entries[j].file);
+ }
+ pthread_mutex_unlock(&pl->mutex);
+
+ return 0;
+}
+
+int playlist_get_index(Playlist *pl, unsigned int id)
+{
+ int index;
+
+ index = pl->front;
+
+ // Break on a null entry
+ while (pl->entries[index].file) {
+ // Bingo
+ if (pl->entries[index].id == id)
+ return index;
+
+ // Advance the index, and break if we're back at the front
+ index = (index + 1) % pl->len;
+ if (index == pl->front)
+ break;
+ }
+
+ return -1;
+}
+
+static int playlist_is_full (Playlist *pl)
+{
+ return pl->len == pl->size;
+}
+
+static int playlist_is_empty (Playlist *pl)
+{
+ return pl->entries[pl->front].file == NULL;
+}
+
+static int playlist_grow(Playlist *pl)
+{
+ int split;
+ struct Entry *new_buff;
+
+ new_buff = calloc(pl->size * 2, sizeof(*new_buff));
+ if (new_buff == NULL) return -1;
+
+ split = pl->len - pl->front;
+ memcpy(&new_buff[0], &pl->entries[pl->front], split);
+ memcpy(&new_buff[split], &pl->entries[0], pl->len - split);
+
+ free(pl->entries);
+ pl->front = 0;
+ pl->rear = pl->len;
+ pl->entries = new_buff;
+ pl->size *= 2;
+ return 0;
+}
+
+static void playlist_unsafe_dequeue(Playlist *pl, struct Entry *entry)
+{
+ *entry = pl->entries[pl->front];
+ pl->entries[pl->front].file = NULL;
+ pl->front = (pl->front + 1) % pl->size;
+ pl->len --;
+}
diff --git a/playlist.h b/playlist.h
new file mode 100644
index 0000000..ed4b19d
--- /dev/null
+++ b/playlist.h
@@ -0,0 +1,33 @@
+#ifndef PLAYLIST_H_
+#define PLAYLIST_H_
+
+#include "common.h"
+
+/**
+ * Playlist - A thread-safe string queue with support for indexed insert/remove.
+ */
+typedef struct Playlist Playlist;
+
+Playlist *playlist_init (void);
+void playlist_free (Playlist *pl);
+int playlist_enqueue (Playlist *pl, const char *entry);
+
+/**
+ * Fetch the next entry in the playlist, blocking the current thread and waiting
+ * for an entry if needed. The function can be interrupted by sending a signal
+ * to the waiting thread, in which case *entry remains untouched, -1 is
+ * returned, and errno is set to EINTR. Returns 0 on success.
+ */
+int playlist_dequeue(Playlist *pl, struct Entry *entry);
+
+/**
+ * Like playlist_dequeue, but returns -1 when the playlist is empty instead of
+ * locking the thread. Returns 0 on success.
+ */
+int playlist_try_dequeue(Playlist *pl, struct Entry *entry);
+
+int playlist_insert (Playlist *pl, unsigned int id, const char *entry);
+int playlist_remove (Playlist *pl, unsigned int id, struct Entry *entry);
+int playlist_list (Playlist *pl, struct StateInfo *list);
+
+#endif // PLAYLIST_H_
diff --git a/streamer.c b/streamer.c
new file mode 100644
index 0000000..f02efed
--- /dev/null
+++ b/streamer.c
@@ -0,0 +1,235 @@
+#include <bits/pthreadtypes.h>
+#include <bits/types/sigset_t.h>
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/resource.h>
+#include <unistd.h>
+#include "common.h"
+#include "playlist.h"
+#include "streamer.h"
+#include "transmuxer.h"
+
+/* ===== Definitions ===== */
+#define PRIORITY -10
+struct Streamer {
+ pthread_t worker_thread;
+ pthread_mutex_t worker_ready_mutex;
+ pthread_cond_t worker_ready_cond;
+ int worker_ready_flag;
+
+ enum State state;
+ pthread_mutex_t state_mutex;
+
+ HLSRemuxer *remuxer;
+ Playlist *playlist;
+ char *current_song;
+};
+
+/* ===== Thread-local storage ===== */
+static pthread_key_t streamer_worker_key;
+static pthread_once_t streamer_worker_once = PTHREAD_ONCE_INIT;
+
+static void streamer_worker_key_create(void)
+{
+ pthread_key_create(&streamer_worker_key, NULL);
+}
+
+
+/* ====== Streamer initialisation ===== */
+static void *streamer_worker (void *t);
+
+Streamer *streamer_init (const struct StreamerOpt *opts)
+{
+ Streamer *s = NULL;
+ Playlist *p = NULL;
+ HLSRemuxer *r = NULL;
+
+ pthread_once(&streamer_worker_once, streamer_worker_key_create);
+
+ if ((s = malloc(sizeof(*s))) == NULL)
+ goto error;
+ if ((p = playlist_init()) == NULL)
+ goto error;
+ if ((r = hls_remuxer_init(opts)) == NULL)
+ goto error;
+
+ if (pthread_create(&s->worker_thread, NULL, streamer_worker, s))
+ goto error;
+
+ s->playlist = p;
+ s->remuxer = r;
+ s->state = STATE_STANDBY;
+ s->state_mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
+ return s;
+
+error:
+ hls_remuxer_free(r);
+ playlist_free(p);
+ free(s);
+ return NULL;
+}
+
+int streamer_push(Streamer *s, const char *url, unsigned int index)
+{
+ return (index == PLAYLIST_END)
+ ? playlist_enqueue(s->playlist, url)
+ : playlist_insert(s->playlist, index, url);
+}
+
+int streamer_pop(Streamer *s, unsigned int id)
+{
+ int err;
+ struct Entry entry;
+
+ if ((err = playlist_remove(s->playlist, id, &entry)))
+ return err;
+ free(entry.file);
+ return 0;
+}
+
+void streamer_skip(Streamer *s)
+{
+ /* 'skip' only halt the current song, which is then replaced by the streamer
+ * worker, either immediately, or as soon as a song gets pushed.
+ */
+ hls_remuxer_halt(s->remuxer);
+}
+
+void streamer_pause(Streamer *s)
+{
+ pthread_mutex_lock(&s->state_mutex);
+ if (s->state == STATE_PLAYING) {
+ hls_remuxer_pause(s->remuxer);
+ s->state = STATE_PAUSED;
+ }
+ pthread_mutex_unlock(&s->state_mutex);
+}
+
+void streamer_resume(Streamer *s)
+{
+ pthread_mutex_lock(&s->state_mutex);
+ if (s->state == STATE_PAUSED) {
+ hls_remuxer_resume(s->remuxer);
+ s->state = STATE_PLAYING;
+ }
+ pthread_mutex_unlock(&s->state_mutex);
+}
+
+int streamer_info(Streamer *streamer, struct StateInfo *info)
+{
+ pthread_mutex_lock(&streamer->state_mutex);
+ info->state = streamer->state;
+
+ if ((info->current_song = strdup(streamer->current_song)) == NULL) {
+ pthread_mutex_unlock(&streamer->state_mutex);
+ return -1;
+ }
+
+ if (playlist_list(streamer->playlist, info)) {
+ pthread_mutex_unlock(&streamer->state_mutex);
+ free(info->current_song);
+ return -1;
+ }
+
+ pthread_mutex_unlock(&streamer->state_mutex);
+ return 0;
+}
+
+void streamer_free (Streamer *s)
+{
+ pthread_kill(s->worker_thread, SIGTERM);
+ hls_remuxer_halt(s->remuxer);
+ pthread_join(s->worker_thread, NULL);
+ playlist_free(s->playlist);
+ free(s);
+}
+
+
+static void streamer_worker_sighandler(int sig)
+{
+ Streamer *streamer;
+ streamer = pthread_getspecific(streamer_worker_key);
+
+ pthread_mutex_lock(&streamer->state_mutex);
+ streamer->state = STATE_TERM;
+ pthread_mutex_unlock(&streamer->state_mutex);
+}
+
+
+static inline int streamer_worker_conf_priority(void)
+{
+ errno = 0;
+ setpriority(PRIO_PROCESS, 0, PRIORITY);
+ return errno ? -1 : 0;
+}
+
+static inline int streamer_worker_conf_sighandling(void)
+{
+ sigset_t set;
+ struct sigaction sigact;
+
+ /* Configure SIGTERM interuptor */
+ sigact.sa_handler = streamer_worker_sighandler;
+ sigact.sa_flags = 0;
+ sigemptyset(&sigact.sa_mask);
+ sigaction(SIGTERM, &sigact, NULL); // term
+ sigaction(SIGUSR1, &sigact, NULL); // play/pause
+ sigaction(SIGUSR2, &sigact, NULL); // skip
+
+ /* Unblock SIGTERM */
+ sigemptyset(&set);
+ sigaddset(&set, SIGTERM);
+ sigaddset(&set, SIGUSR1);
+ sigaddset(&set, SIGUSR2);
+ pthread_sigmask(SIG_UNBLOCK, &set, NULL);
+}
+
+static void *streamer_worker(void *arg)
+{
+ Streamer *s;
+ struct Entry next;
+
+ pthread_setspecific(streamer_worker_key, (s = arg));
+
+ /* Configure worker */
+ pthread_mutex_lock(&s->worker_ready_mutex);
+ streamer_worker_conf_priority();
+ streamer_worker_conf_sighandling();
+ s->worker_ready_flag = 1;
+ pthread_cond_broadcast(&s->worker_ready_cond);
+ pthread_mutex_unlock(&s->worker_ready_mutex);
+
+ /* Main loop */
+ while (s->state != ) {
+ /* Attempt to fetch the next song, or go into standby mode */
+ if (playlist_try_dequeue(s->playlist, &next)) {
+ puts("The playlist is empty, going into standby mode.");
+ pthread_mutex_lock(&s->state_mutex);
+ s->state = STATE_STANDBY;
+ pthread_mutex_unlock(&s->state_mutex);
+ playlist_dequeue(s->playlist, &next);
+ }
+
+ /* Update the current state */
+ pthread_mutex_lock(&s->state_mutex);
+ printf("Next song: [%i] %s\n", next.id, next.file);
+ s->current_song = next.file;
+ s->state = STATE_PLAYING;
+ pthread_mutex_unlock(&s->state_mutex);
+
+ /* Play the song entirely */
+ hls_remuxer_play(s->remuxer, next.file);
+
+ /* Discard current_song, but keep the current 'playing' state. */
+ pthread_mutex_lock(&s->state_mutex);
+ free(s->current_song);
+ s->current_song = NULL;
+ pthread_mutex_unlock(&s->state_mutex);
+ }
+
+ return NULL;
+}
diff --git a/streamer.h b/streamer.h
new file mode 100644
index 0000000..b10a2af
--- /dev/null
+++ b/streamer.h
@@ -0,0 +1,22 @@
+#ifndef STREAMING_H_
+#define STREAMING_H_
+
+#include "common.h"
+
+/**
+ * An opaque data structure used to initialize and control a threaded HLS
+ * streamer. Please review 'struct StreamerOpt' for runtime initialisation
+ * parameters.
+ */
+typedef struct Streamer Streamer;
+
+Streamer *streamer_init (const struct StreamerOpt *opt);
+int streamer_push (Streamer *streamer, const char *url, unsigned int id);
+int streamer_pop (Streamer *streamer, unsigned int id);
+void streamer_skip (Streamer *streamer);
+void streamer_pause (Streamer *streamer);
+void streamer_resume (Streamer *streamer);
+void streamer_free (Streamer *streamer);
+int streamer_info (Streamer *streamer, struct StateInfo *info);
+
+#endif // STREAMING_H_
diff --git a/testmuxer.c b/testmuxer.c
new file mode 100644
index 0000000..69b693c
--- /dev/null
+++ b/testmuxer.c
@@ -0,0 +1,21 @@
+#include "common.h"
+#include "transmuxer.h"
+#include <stdio.h>
+#include <stdlib.h>
+
+int main(int argc, char **argv)
+{
+ struct StreamerOpt opts = {
+ .filename = "test/stream.m3u8"
+ };
+
+ if (argc != 2) {
+ fprintf(stderr, "usage: %s FILE\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+
+ HLSRemuxer *remuxer = hls_remuxer_init(&opts);
+ hls_remuxer_play(remuxer, argv[1]);
+
+ return EXIT_SUCCESS;
+}
diff --git a/transmuxer.c b/transmuxer.c
new file mode 100644
index 0000000..9dcf005
--- /dev/null
+++ b/transmuxer.c
@@ -0,0 +1,283 @@
+#include <asm-generic/errno-base.h>
+#include <assert.h>
+#include <libavcodec/avcodec.h>
+#include <libavcodec/codec.h>
+#include <libavcodec/codec_id.h>
+#include <libavcodec/codec_par.h>
+#include <libavcodec/packet.h>
+#include <libavformat/avformat.h>
+#include <libavutil/avutil.h>
+#include <libavutil/channel_layout.h>
+#include <libavutil/dict.h>
+#include <libavutil/error.h>
+#include <libavutil/frame.h>
+#include <libavutil/log.h>
+#include <libavutil/mathematics.h>
+#include <libavutil/opt.h>
+#include <libavutil/rational.h>
+#include <libavutil/samplefmt.h>
+#include <libswresample/swresample.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include "audio/muxer.h"
+#include "common.h"
+#include "transmuxer.h"
+
+#define NANOSEC (uint64_t)1000000000
+#define MIN(a, b) ((a < b) ? a : b)
+
+enum PacketType {
+ PKT_AUDIO,
+ PKT_VIDEO,
+};
+
+struct HLSRemuxer {
+ enum State state;
+ pthread_cond_t resume;
+
+ /* Input objects */
+ AVFormatContext *demuxer;
+ const AVStream *audio_stream;
+ const AVStream *video_stream;
+ int audio_stream_index;
+ int video_stream_index;
+
+ /* Output object pointers */
+ long long next_sample;
+ AVFormatContext *hls_writer;
+ AudioMuxer *audio_muxer;
+
+ struct timespec start_time;
+
+ /* overhead: The target lenght in nanosecond of the output's stream
+ * buffer.
+ */
+ uint64_t overhead;
+};
+
+static void hls_nanosleep (uint64_t t);
+static uint64_t hls_nanotime (void);
+static int hls_remuxer_init_input (struct HLSRemuxer *remuxer, const char *file);
+
+static void timespec_add_nano (struct timespec *ts, uint64_t nanoseconds);
+static int timespec_is_ahead (const struct timespec *a, const struct timespec *b);
+static int remuxer_send_audio (HLSRemuxer *remuxer, AVPacket *audio_packet);
+static int remuxer_send_video (HLSRemuxer *remuxer, AVPacket *audio_packet);
+static int remuxer_wait_target_ts (HLSRemuxer *remuxer);
+static int remuxer_get_overrun (HLSRemuxer *remuxer, struct timespec *until);
+
+HLSRemuxer *hls_remuxer_init (const struct StreamerOpt *opts)
+{
+ AVStream *stream;
+ HLSRemuxer *remuxer;
+ AVFormatContext *hls_muxer;
+ AudioMuxer *audio_muxer;
+ AVDictionary *m;
+
+ if ((remuxer = malloc(sizeof(HLSRemuxer))) == NULL)
+ return NULL;
+
+ av_log_set_level(AV_LOG_VERBOSE);
+
+ int err;
+
+ if ((err = avformat_alloc_output_context2(&hls_muxer, NULL, "hls", opts->filename)) < 0) {
+ fprintf(stderr, "Failed to allocate output context: %s\n", av_err2str(err));
+ return NULL;
+ }
+
+ struct AudioMuxerOpt audio_opts = {
+ .codec_id = AV_CODEC_ID_MP3,
+ .sample_rate = 44100,
+ .sample_format = AV_SAMPLE_FMT_S16P,
+ .opts_encoder = NULL,
+ .opts_muxer = NULL,
+ .ch_layout = AV_CHANNEL_LAYOUT_STEREO
+ };
+
+ if (audiomuxer_init(&audio_muxer, hls_muxer, &audio_opts))
+ goto error;
+
+ // TODO: Pass muxer options
+ if ((err = avformat_write_header(hls_muxer, NULL)) < 0) {
+ fprintf(stderr, "Failed to write output file: %s\n", av_err2str(err));
+ goto error;
+ }
+
+ clock_gettime(CLOCK_MONOTONIC, &remuxer->start_time);
+ remuxer->hls_writer = hls_muxer;
+ remuxer->audio_muxer = audio_muxer;
+ remuxer->overhead = 10 * NANOSEC; // NOTE: hls_time * hls_list_size * NANOSEC
+ remuxer->resume = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
+ return remuxer;
+
+error:
+ audiomuxer_free(audio_muxer);
+ avformat_free_context(hls_muxer);
+ free(remuxer);
+ return NULL;
+}
+
+void hls_remuxer_play(HLSRemuxer *remuxer, const char *file)
+{
+ int err;
+ AVPacket *pkt;
+ uint64_t next_second, next_nanosec;
+
+ pkt = av_packet_alloc();
+
+ if (hls_remuxer_init_input(remuxer, file))
+ return;
+
+ int64_t pts;
+ while (1) {
+ /* Demux packet */
+ switch (err = av_read_frame(remuxer->demuxer, pkt)) {
+ case 0:
+ break;
+ default:
+ fprintf(stderr, "Error: %s\n", av_err2str(err));
+ case AVERROR_EOF:
+ fprintf(stderr, "Last packet ts: %f\n", (double)(pts *
+ remuxer->audio_stream->time_base.num) /
+ (remuxer->audio_stream->time_base.den * 60));
+ goto exit;
+ }
+ pts = pkt->pts;
+
+ /* Dispatch packet muxer */
+ if (pkt->stream_index == remuxer->audio_stream_index)
+ err = audiomuxer_send_packet(remuxer->audio_muxer, pkt);
+ else
+ continue;
+
+ if (err) {
+ fprintf(stderr, "Failed to send packet: %s\n", av_err2str(err));
+ break;
+ }
+
+ /* Prevent overrun */
+ remuxer_wait_target_ts(remuxer);
+ }
+
+exit:
+ av_packet_free(&pkt);
+ return;
+}
+
+void hls_remuxer_pause(HLSRemuxer *remuxer)
+{
+ // TODO: mutex?
+ remuxer->state = STATE_PAUSED;
+}
+
+void hls_remuxer_resume(HLSRemuxer *remuxer)
+{
+ pthread_cond_broadcast(&remuxer->resume);
+}
+
+void hls_remuxer_halt(HLSRemuxer *remuxer)
+{
+ // TODO: mutex?
+ remuxer->state = STATE_STANDBY;
+}
+
+void hls_remuxer_free(HLSRemuxer *remuxer)
+{
+ if (remuxer == NULL) return;
+ av_write_trailer(remuxer->hls_writer);
+ audiomuxer_free(remuxer->audio_muxer);
+ free(remuxer);
+}
+
+static int hls_remuxer_init_input(struct HLSRemuxer *remuxer, const char *file)
+{
+ int err;
+ int audio_stream_index;
+ int video_stream_index;
+ const AVStream *audio_stream;
+ const AVStream *video_stream;
+ AVFormatContext *demuxer;
+
+ demuxer = NULL;
+
+ if ((err = avformat_open_input(&demuxer, file, NULL, NULL))) { // TODO: Options
+ fprintf(stderr, "Failed to open '%s': %s\n", file, av_err2str(err));
+ goto error;
+ }
+
+ if ((err = avformat_find_stream_info(demuxer, NULL)) < 0) { // TODO: Options
+ fprintf(stderr, "Failed to find stream info of '%s': %s\n", file, av_err2str(err));
+ goto error;
+ }
+
+ audio_stream_index = av_find_best_stream(demuxer, AVMEDIA_TYPE_AUDIO, -1, -1, NULL, 0);
+ video_stream_index = av_find_best_stream(demuxer, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0);
+
+ audio_stream = audio_stream_index >= 0 ? demuxer->streams[audio_stream_index] : NULL;
+ video_stream = video_stream_index >= 0 ? demuxer->streams[video_stream_index] : NULL;
+ if (audio_stream == NULL && video_stream == NULL) {
+ fprintf(stderr, "Failed to find audio stream index of '%s'\n", file);
+ err = AVERROR_STREAM_NOT_FOUND;
+ goto error;
+ }
+
+ if ((err = audiomuxer_conf(remuxer->audio_muxer, audio_stream)))
+ goto error;
+
+ remuxer->demuxer = demuxer;
+ remuxer->audio_stream = audio_stream;
+ remuxer->video_stream = video_stream;
+ remuxer->audio_stream_index = audio_stream_index;
+ remuxer->video_stream_index = video_stream_index;
+ return 0;
+
+error:
+ avformat_close_input(&demuxer);
+ return err;
+}
+
+static void timespec_add_nano(struct timespec *ts, uint64_t nanoseconds)
+{
+ uint64_t new_nano;
+
+ new_nano = ts->tv_nsec + nanoseconds;
+ ts->tv_nsec = new_nano % NANOSEC;
+ ts->tv_sec += new_nano / NANOSEC;
+}
+
+
+/**
+ * Block the thread if the remuxer is getting close to overruning (writing HLS
+ * segments and discarding previous ones faster than clients can read them).
+ * This is done by checking if all stream[1] have more data ready than needed to
+ * fill the buffer delay (2 seconds by default) and calling clock_nanosleep to
+ * wait off the overrun[2].
+ *
+ * - [1] We only have an audio stream right now.
+ * - [2] TODO: Implement instead of waiting 2 seconds
+ */
+static int remuxer_wait_target_ts (HLSRemuxer *remuxer)
+{
+ uint64_t video_runtime, audio_runtime, min_runtime;
+ struct timespec target_ts;
+
+ audio_runtime = audiomuxer_get_runtime(remuxer->audio_muxer);
+ video_runtime = INT64_MAX;
+ min_runtime = MIN(audio_runtime, video_runtime);
+ assert(min_runtime == audio_runtime);
+ if (min_runtime < remuxer->overhead)
+ return 0;
+ min_runtime -= remuxer->overhead;
+
+ target_ts = remuxer->start_time;
+ timespec_add_nano(&target_ts, min_runtime);
+
+ return clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &target_ts, NULL);
+}
+
diff --git a/transmuxer.h b/transmuxer.h
new file mode 100644
index 0000000..190e6e6
--- /dev/null
+++ b/transmuxer.h
@@ -0,0 +1,17 @@
+#ifndef TRANSMUXER_H_
+#define TRANSMUXER_H_
+
+// TODO: Rename file to "remuxer.c/h"
+
+#include "common.h"
+
+typedef struct HLSRemuxer HLSRemuxer;
+
+HLSRemuxer *hls_remuxer_init (const struct StreamerOpt *opts);
+void hls_remuxer_play (HLSRemuxer *remuxer, const char *file);
+void hls_remuxer_pause (HLSRemuxer *remuxer);
+void hls_remuxer_resume (HLSRemuxer *remuxer);
+void hls_remuxer_halt (HLSRemuxer *remuxer);
+void hls_remuxer_free (HLSRemuxer *remuxer);
+
+#endif // TRANSMUXER_H_