From c1cb78d574c0429aa5e3ff3a2b3886e4bc153212 Mon Sep 17 00:00:00 2001 From: bbergeron Date: Wed, 3 Apr 2024 17:32:01 -0400 Subject: Reset Git repo and use a pseudonym to sign commits I used to sign my commits with my real name and my personal email address, which I wanted scrubbed off the "B." pseudosphere. Re-creating a new git repository was safer than simpler than re-writing the history (although the latter could've also worked but, oh well). --- .editorconfig | 8 ++ .gitignore | 3 + HACKING | 33 +++++ Makefile | 42 +++++++ README | 40 +++++++ audio/decoder.c | 58 +++++++++ audio/decoder.h | 16 +++ audio/encoder.c | 69 +++++++++++ audio/encoder.h | 15 +++ audio/muxer.c | 235 ++++++++++++++++++++++++++++++++++++ audio/muxer.h | 54 +++++++++ audio/resampler.c | 121 +++++++++++++++++++ audio/resampler.h | 31 +++++ audio/stream.c | 47 ++++++++ audio/stream.h | 14 +++ common.c | 13 ++ common.h | 33 +++++ cybd.1 | 39 ++++++ daemon.c | 273 +++++++++++++++++++++++++++++++++++++++++ daemon.h | 29 +++++ main.c | 353 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ playlist.c | 269 +++++++++++++++++++++++++++++++++++++++++ playlist.h | 33 +++++ streamer.c | 235 ++++++++++++++++++++++++++++++++++++ streamer.h | 22 ++++ testmuxer.c | 21 ++++ transmuxer.c | 283 +++++++++++++++++++++++++++++++++++++++++++ transmuxer.h | 17 +++ 28 files changed, 2406 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitignore create mode 100644 HACKING create mode 100644 Makefile create mode 100644 README create mode 100644 audio/decoder.c create mode 100644 audio/decoder.h create mode 100644 audio/encoder.c create mode 100644 audio/encoder.h create mode 100644 audio/muxer.c create mode 100644 audio/muxer.h create mode 100644 audio/resampler.c create mode 100644 audio/resampler.h create mode 100644 audio/stream.c create mode 100644 audio/stream.h create mode 100644 common.c create mode 100644 common.h create mode 100644 cybd.1 create mode 100644 daemon.c create mode 100644 daemon.h create mode 100644 main.c create mode 100644 playlist.c create mode 100644 playlist.h create mode 100644 streamer.c create mode 100644 streamer.h create mode 100644 testmuxer.c create mode 100644 transmuxer.c create mode 100644 transmuxer.h diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..316e750 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +indent_style=tab +charset = utf-8 +trim_trailing_whitespace = true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eed3965 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +LICENSE +cybd +*.o diff --git a/HACKING b/HACKING new file mode 100644 index 0000000..ea45cdc --- /dev/null +++ b/HACKING @@ -0,0 +1,33 @@ +Hacking manual +============== +cybd is intended to be used and embedded into a greater software stack as part +of a more complete HLS distribution system. + +The easiest way to use cybd is through the provided daemon/client command-line. +Altough cybd is not a library per-say, it's nonetheless quite easy to import the +client, the daemon, or even the internal streamer object into another program +thanks to it's clean-cut architecture. + +The cybd layer (main.c, common.h) +--------------------------------- +cybd's main entry file, responsible of parsing commands arguments and dispatching +calls to the daemon layer. + +The daemon layer (daemon.c, daemon.h) +------------------------------------- +Code related to client/daemon communication via a unix domain socket, and code for +the daemon itself which handles IPC calls by forwarding them to the streamer +layer. + +The streamer layer (streamer.c/h, playlist.c/h, transmuxer.c/h) +--------------------------------------------------------------- +streamer.c/h - The 'brain' of the software, manage the song/media queue and stock +the FFMPEG muxer with new songs. + +playlist.c/h - Used by streamer.c. Implements a thread-safe, indexed circular +buffer. + +transmuxer.c/h - Used by streamer.c. Interface FFMPEG to implement muxing and +demuxing. + +Happy hacking! diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b48bf99 --- /dev/null +++ b/Makefile @@ -0,0 +1,42 @@ +.POSIX: + +CFLAGS = -std=c99 -O0 -D_POSIX_C_SOURCE=200809 -g +LDFLAGS = -lswresample -lavutil -lavcodec -lavformat -lpthread + +AUDIO = \ + audio/decoder.c\ + audio/encoder.c\ + audio/muxer.c\ + audio/resampler.c\ + audio/stream.c\ + +SRC = $(AUDIO) main.c streamer.c playlist.c transmuxer.c daemon.c common.c audio_muxer.c +OBJ = $(SRC:.c=.o) + +all: cybd + +.c.o: + $(CC) $(CFLAGS) -c $< -o $@ + +audio/decoder.o: audio/decoder.h +audio/encoder.o: audio/encoder.h +audio/resampler.o: audio/resampler.h audio/encoder.h audio/decoder.h +audio/muxer.o: audio/resampler.h audio/encoder.h audio/decoder.h audio/muxer.h + +common.o: common.h +main.o: common.h daemon.h +daemon.o: common.h daemon.h streamer.h +streamer.o: common.h streamer.h playlist.h transmuxer.h +transmuxer.o: common.h transmuxer.h playlist.h audio/muxer.h +playlist.o: common.h playlist.h + +cybd: $(OBJ) + $(CC) -o $@ $^ $(LDFLAGS) + +clean: + rm -f cybd $(OBJ) testmuxer.o test/* + +testhls: testmuxer.o transmuxer.o playlist.o $(AUDIO:.c=.o) + $(CC) -o $@ $^ $(LDFLAGS) + +.PHONY: all clean diff --git a/README b/README new file mode 100644 index 0000000..1ce0ebd --- /dev/null +++ b/README @@ -0,0 +1,40 @@ +cybd - The cyber radio daemon +============================= +cybd is a lightweight, embeddable POSIX-compliant queued HSL muxer. + +HSL (http live stream) is a popular streaming protocol capable of transporting +audio-video content over HTTP. cybd can play the role of the HSL server to +generate an .m3u8 tracklist file and .ts segments files which can be served by +any standard web server. + +Requirements +------------ +In order to build cybd you need the ffmpeg API header files. + + +Installation +------------ +Edit config.mk to match your local setup (cybd is installed into the /usr/local +namespace by default). + +Afterwards enter the following command to build and install cybd (if necessary as +root): + + make clean install + + +Running cybd +------------ +A cybd deamon must first be launched by invoking the 'daemon' command: + + cybd [-s socket] daemon + +Then: + + cybd [-s socket] COMMAND + + +Hacking cybd +------------ +cybd was built with versatility in mind, and can be embedded in your server stack +in several different ways. Read HACKING for more on that. diff --git a/audio/decoder.c b/audio/decoder.c new file mode 100644 index 0000000..70eacd5 --- /dev/null +++ b/audio/decoder.c @@ -0,0 +1,58 @@ +#include "decoder.h" +#include + +int decoder_init (struct Decoder *decoder, const AVCodecParameters *codecpar, AVDictionary **opts) +{ + int err; + AVCodecContext *avctx; + const AVCodec *codec; + + if ((codec = avcodec_find_decoder(codecpar->codec_id)) == NULL) + return AVERROR_DECODER_NOT_FOUND; + + /* NOTE: parameter 'codec' might be redundant here due to the next, more + * featuref-full call to avcodec_parameters_to_context. */ + if ((avctx = avcodec_alloc_context3(codec)) == NULL) + return AVERROR(ENOMEM); /* NOTE: After reading libavcodec's code, this + * error code *could* misrepresent the actual + * error, but ffmpeg won't tell us what it is + * anyway so oh well. */ + + if ((err = avcodec_parameters_to_context(avctx, codecpar)) < 0) + goto error; + + if ((err = avcodec_open2(avctx, codec, opts)) < 0) + goto error; + + /* Free previous context (no-op if avctx == NULL) and swap it with the new + * one. This allows the previous one to be left untouched in case of error, + * which is generally a nice thing to do in your code. */ + avcodec_free_context(&decoder->avctx); + decoder->avctx = avctx; + return 0; + +error: + avcodec_free_context(&avctx); + return err; +} + +int decoder_init_for_stream (struct Decoder *decoder, const AVStream *stream, AVDictionary **opts) +{ + return decoder_init(decoder, stream->codecpar, opts); +} + +int decoder_send (struct Decoder *decoder, const AVPacket *pkt) +{ + return avcodec_send_packet(decoder->avctx, pkt); +} + +int decoder_convert (struct Decoder *decoder, AVFrame *out) +{ + return avcodec_receive_frame(decoder->avctx, out); +} + +void decoder_free (struct Decoder *decoder) +{ + if (decoder == NULL) return; + avcodec_free_context(&decoder->avctx); +} diff --git a/audio/decoder.h b/audio/decoder.h new file mode 100644 index 0000000..bb7f7b1 --- /dev/null +++ b/audio/decoder.h @@ -0,0 +1,16 @@ +#ifndef AUDIO_DECODER_H_ +#define AUDIO_DECODER_H_ + +#include +#include +#include + +struct Decoder { AVCodecContext *avctx; }; + +int decoder_init (struct Decoder *decoder, const AVCodecParameters *codecpar, AVDictionary **opts); +int decoder_send (struct Decoder *decoder, const AVPacket *pkt); +int decoder_convert (struct Decoder *decoder, AVFrame *out); +void decoder_free (struct Decoder *decoder); +int decoder_init_for_stream (struct Decoder *decoder, const AVStream *stream, AVDictionary **opts); + +#endif // AUDIO_DECODER_H_ diff --git a/audio/encoder.c b/audio/encoder.c new file mode 100644 index 0000000..17cf2d2 --- /dev/null +++ b/audio/encoder.c @@ -0,0 +1,69 @@ +#include "encoder.h" +#include + +int encoder_init (struct Encoder *encoder, const AVCodecParameters *codecpar, AVDictionary **opts) +{ + /* NOTE: While encoder->time_base "MUST be set by user" when encoding + * (quoted from the official FFMPEG documentation), avcodec_open2 actually + * discard it's value and set it to '1/sample_rate'. + * + * NOTE: The assert at the end of this section serves to protect cybd in + * case the FFMPEG devs update their code to implement some funny behaviors. + */ + int err; + AVCodecContext *avctx; + const AVCodec *codec; + + if ((codec = avcodec_find_encoder(codecpar->codec_id)) == NULL) { + err = AVERROR_ENCODER_NOT_FOUND; + goto error; + } + + if ((avctx = avcodec_alloc_context3(codec)) == NULL) + return AVERROR(ENOMEM); + + if ((err = avcodec_parameters_to_context(avctx, codecpar)) < 0) + goto error; + + if ((err = avcodec_open2(avctx, codec, opts))) + goto error; + + /* Because some code rely on avctx's time base to be 1/sample_rate, and + * because this behavious currently appears undocumented, I placed some + * assert here to catch up if anything changes up with ffmpeg. */ + assert(avctx->time_base.num == 1); + assert(avctx->time_base.den == avctx->sample_rate); + encoder->avctx = avctx; + encoder->pts = 0; + return 0; + +error: + avcodec_free_context(&avctx); + return err; +} + +int encoder_init_for_stream (struct Encoder *encoder, const AVStream *stream, AVDictionary **opts) +{ + return encoder_init(encoder, stream->codecpar, opts); +} + +int encoder_send (struct Encoder *encoder, const AVFrame *frame) +{ + return avcodec_send_frame(encoder->avctx, frame); +} + +int encoder_convert (struct Encoder *encoder, AVPacket *pkt_out) +{ + int err; + err = avcodec_receive_packet(encoder->avctx, pkt_out); + if (err < 0) return err; + // Manually setting pts because FFMPEG doesn't do it consistantly + //pkt_out->pts = encoder->pts; + //encoder->pts += pkt_out->duration; + return 0; +} + +void encoder_free (struct Encoder *encoder) +{ + avcodec_free_context(&encoder->avctx); +} diff --git a/audio/encoder.h b/audio/encoder.h new file mode 100644 index 0000000..b42f862 --- /dev/null +++ b/audio/encoder.h @@ -0,0 +1,15 @@ +#ifndef AUDIO_ENCODER_H_ +#define AUDIO_ENCODER_H_ + +#include +#include + +struct Encoder { AVCodecContext *avctx; int pts; }; + +int encoder_init (struct Encoder *encoder, const AVCodecParameters *codecpar, AVDictionary **opts); +int encoder_send (struct Encoder *encoder, const AVFrame *pkt); +int encoder_convert (struct Encoder *encoder, AVPacket *out); +void encoder_free (struct Encoder *encoder); +int encoder_init_for_stream (struct Encoder *encoder, const AVStream *stream, AVDictionary **opts); + +#endif // AUDIO_ENCODER_H_ diff --git a/audio/muxer.c b/audio/muxer.c new file mode 100644 index 0000000..3860476 --- /dev/null +++ b/audio/muxer.c @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "muxer.h" +#include "encoder.h" +#include "decoder.h" +#include "resampler.h" +#include "stream.h" + +#define NANOSEC (uint64_t)1000000000 + +struct AudioMuxer { + struct Stream stream; + struct Encoder encoder; + struct Decoder decoder; + struct Resampler resampler; + AVFrame *frame_in; + AVFrame *frame_out; + AVPacket *packet; + + /* The amount of output samples that went out of this muxer. Starts at 0 and + * keep on increasing. + */ + unsigned int nb_samples_out; +}; + +int64_t audiomuxer_flush(AudioMuxer *muxer); + +uint64_t audiomuxer_get_runtime(const AudioMuxer *muxer) +{ + return (muxer->nb_samples_out * NANOSEC) / muxer->resampler.out_sample_rate; +} + +int audiomuxer_init(AudioMuxer **pmuxer, AVFormatContext *s, const struct AudioMuxerOpt *opts) +{ + int err; + AudioMuxer *muxer; + AVFrame *frame_out; + AVFrame *frame_in; + AVPacket *pkt; + + *pmuxer = NULL; + if ((muxer = malloc(sizeof(AudioMuxer))) == NULL) + return AVERROR(ENOMEM); + + muxer->decoder.avctx = NULL; + muxer->encoder.avctx = NULL; + muxer->resampler.swr = NULL; + + err = stream_init( + &muxer->stream, + s, + opts->codec_id, + opts->sample_rate, + opts->sample_format, + &opts->ch_layout, + opts->opts_muxer); + if (err) goto error; + + err = encoder_init_for_stream(&muxer->encoder, muxer->stream.av_stream, opts->opts_encoder); + if (err) goto error; + + err = resampler_init_for_encoder(&muxer->resampler, &muxer->encoder); + if (err) goto error; + + if ((frame_out = av_frame_alloc()) == NULL) + goto enomem; + if ((frame_in = av_frame_alloc()) == NULL) + goto enomem; + if ((pkt = av_packet_alloc()) == NULL) + goto enomem; + + + frame_out->format = muxer->encoder.avctx->sample_fmt; + frame_out->nb_samples = muxer->encoder.avctx->frame_size; + frame_out->ch_layout = muxer->encoder.avctx->ch_layout; + frame_out->sample_rate = muxer->encoder.avctx->sample_rate; + if ((err = av_frame_get_buffer(frame_out, 0))) + goto error; + + muxer->packet = pkt; + muxer->frame_out = frame_out; + muxer->frame_in = frame_in; + *pmuxer = muxer; + return 0; + +enomem: + err = AVERROR(ENOMEM); +error: + audiomuxer_free(muxer); + return err; +} + +int audiomuxer_conf (AudioMuxer *muxer, const AVStream *input) +{ + int err, src_rate, dst_rate; + const AVCodec *codec; + SwrContext *resampler; + + src_rate = input->codecpar->sample_rate; + + err = decoder_init_for_stream(&muxer->decoder, input, NULL); // TODO: Pass options // TODO: Pass options + if (err) { + fprintf(stderr, "Failed to configure decoder: %s\n", av_err2str(err)); + return err; + } + + err = resampler_conf_for_decoder(&muxer->resampler, &muxer->decoder); + if (err) { + fprintf(stderr, "Failed to configure resampler: %s\n", av_err2str(err)); + return err; + } + + + muxer->frame_in->format = muxer->decoder.avctx->sample_fmt; + muxer->frame_in->nb_samples = muxer->decoder.avctx->frame_size; + muxer->frame_in->ch_layout = muxer->decoder.avctx->ch_layout; + muxer->frame_in->sample_rate = muxer->decoder.avctx->sample_rate; + if ((err = av_frame_get_buffer(muxer->frame_in, 0))) + return err; + + return 0; +} + +int audiomuxer_send_packet (AudioMuxer *muxer, const AVPacket *pkt_in) +{ + int err, duration; + AVFrame *frame_in; + + frame_in = muxer->frame_in; + + if ((err = decoder_send(&muxer->decoder, pkt_in)) < 0) + return err; + + /* Since a packet might contains several frames, we call decoder_convert + * repeatedly until we're out of data. + */ + while ((err = decoder_convert(&muxer->decoder, frame_in)) != AVERROR(EAGAIN)) { + /* Errors other than EAGAIN should be repported */ + if (err) { + fprintf(stderr, "Failed to decode frame: %s\n", av_err2str(err)); + return err; + } + + /* Immediately send frame to resampler and postpone silent injection */ + if ((err = resampler_send_frame(&muxer->resampler, frame_in)) < 0) { + fprintf(stderr, "Failed to send data to resampler: %s\n", av_err2str(err)); + return err; + } + + /* Flush frame if possible */ + if ((err = audiomuxer_flush(muxer)) < 0) + return err; + } + + return 0; +} + +int audiomuxer_send_silence (AudioMuxer *muxer, unsigned int microsec) +{ + int err; + if ((err = resampler_send_empty(&muxer->resampler, microsec))) + return err; + + return audiomuxer_flush(muxer); +} + + +int64_t audiomuxer_flush(AudioMuxer *muxer) +{ + int err; + AVPacket *pkt_out; + AVFrame *frame_out; + + frame_out = muxer->frame_out; + pkt_out = muxer->packet; + + if ((err = resampler_convert(&muxer->resampler, frame_out))) { + if (err == AVERROR(EAGAIN)) return 0; + fprintf(stderr, "Failed to resample: %s\n", av_err2str(err)); + return err; + } + + if ((err = encoder_send(&muxer->encoder, frame_out))) { + fprintf(stderr, "Failed to send frame to encoder: %s\n", av_err2str(err)); + return err; + } + + if ((err = encoder_convert(&muxer->encoder, pkt_out))) { + if (err == AVERROR(EAGAIN)) return 0; + fprintf(stderr, "Failed to encode packet: %s\n", av_err2str(err)); + return err; + } + + /* pkt_out->duration is the "Duration of this packet in AVStream->time_base + * units, 0 if unknown." In practice, the timebase appears to be 1 / + * sample_rate, so, in other words, pkt_out->duration is the amount of + * samples stored in that packet. + * + * TODO: This behaviour is however undocumented, and asserts should be put + * in place to catch potential regressions. + * + * NOTE: duration is usually 1152 with a default MP3 audio stream. + */ + assert(pkt_out->duration > 0); + muxer->nb_samples_out += pkt_out->duration; + + /* Finally send packet to stream. */ + if ((err = stream_send(&muxer->stream, pkt_out)) < 0) { + fprintf(stderr, "Failed stream packet: %s\n", av_err2str(err)); + return err; + } + + return 0; +} + + +void audiomuxer_free (AudioMuxer *muxer) +{ + if (muxer == NULL) return; + + /* Flush decoder */ + avcodec_send_packet(muxer->decoder.avctx, NULL); + + encoder_free(&muxer->encoder); + decoder_free(&muxer->decoder); + resampler_free(&muxer->resampler); + + free(muxer); +} diff --git a/audio/muxer.h b/audio/muxer.h new file mode 100644 index 0000000..b671484 --- /dev/null +++ b/audio/muxer.h @@ -0,0 +1,54 @@ +#ifndef AUDIO_MUXER_H_ +#define AUDIO_MUXER_H_ + +#include +#include + +/** + * An AudioMuxer is responsible for attaching and managing an audio stream to an + * FFMPEG AVFormatContext instance. Ultimately, AudioMuxer allows users to pass + * amorphous FFMPEG audio packets to an AvFormatContext without worrying about + * encoding or sample count, and to enable silent streaming. + */ +typedef struct AudioMuxer AudioMuxer; + +struct AudioMuxerOpt { + int sample_rate; + enum AVSampleFormat sample_format; + enum AVCodecID codec_id; + AVChannelLayout ch_layout; + AVDictionary **opts_muxer; + AVDictionary **opts_encoder; +}; + +#define AUDIO_MUXER_OPT_DEFAULTS\ + .sample_rate = 44100,\ + .sample_format = 0,\ + .codec_id = 0,\ + .ch_layout = 0,\ + .opts_muxer = NULL,\ + .opts_encoder = NULL + +/** + * Initialize **pmuxer using the parameters passed by *opts and attach an audio + * stream to *avctx. This function mearly configures the output front of the + * remuxer (i.e. a stream, an encoder, and part of a resmapler), and a second + * call to audiomuxer_conf must be dispatched before sending in packets. Return + * 0 on success, or a negative AVERROR on failure. + */ +int audiomuxer_init(AudioMuxer **pmuxer, AVFormatContext *avctx, const struct AudioMuxerOpt *opts); + +/** + * Configure *muxer to process input from a given *input stream. Internally, + * this function allocate a decoder and configure the remuxer's resampler to + * suite *input's codec and sample rate. Return 0 on success, or a negative + * AVERROR on failure. + */ +int audiomuxer_conf(AudioMuxer *muxer, const AVStream *input); + +int audiomuxer_send_packet (AudioMuxer *muxer, const AVPacket *pkt_in); +int audiomuxer_send_silence (AudioMuxer *muxer, unsigned int microsec); +uint64_t audiomuxer_get_runtime (const AudioMuxer *muxer); +void audiomuxer_free (AudioMuxer *muxer); + +#endif // AUDIO_MUXER_H_ diff --git a/audio/resampler.c b/audio/resampler.c new file mode 100644 index 0000000..6906d05 --- /dev/null +++ b/audio/resampler.c @@ -0,0 +1,121 @@ +#include +#include +#include +#include +#include +#include +#include "resampler.h" + +#define MICROSECOND 1000000 + +static inline uint64_t div_uint64_ceil(uint64_t num, uint64_t den) +{ + return (num / den) + (num % den != 0); +} + +static int resampler_init (struct Resampler *resampler, const AVChannelLayout *out_chlayout, enum AVSampleFormat out_sample_fmt, int64_t out_sample_rate, uint64_t out_frame_size) +{ + /* Let me tell you something; you --don't want-- CAN'T configure a + * SwrContext manually, GOOD ****ING LUCK DOING THAT. The documentation is + * so bad on that matter, it only referes to "tHe PaRaMeTeRs" or show off a + * list of deprecated attribute which looks nothing like what's actually + * used in their API. I tried to copy what's being done in their a API but + * whatever I do would either not work or be liable to change since it's not + * documented for shit (even the documented stuff doesn't appears stable, so + * wtf am I saying anyway). A ****ing joke, I tell you that. + * + * Pardon my french. + * + * Anywho, instead of configuring the input parameter directly, we shove + * them in resampler so we can recall them before calling ffmpeg's SWR + * allocator. + */ + resampler->out_chlayout = out_chlayout; + resampler->out_sample_fmt = out_sample_fmt; + resampler->out_sample_rate = out_sample_rate; + resampler->out_frame_size = out_frame_size; + resampler->swr = NULL; + return 0; +} + +static int resampler_conf (struct Resampler *resampler, const AVChannelLayout *in_chlayout, enum AVSampleFormat in_sample_fmt, int64_t in_sample_rate) +{ + int err; + + err = swr_alloc_set_opts2( + &resampler->swr, + resampler->out_chlayout, + resampler->out_sample_fmt, + resampler->out_sample_rate, + in_chlayout, + in_sample_fmt, + in_sample_rate, + 0, 0); + if (err) return err; + + if (!swr_is_initialized(resampler->swr)) + if ((err = swr_init(resampler->swr))) + return err; + + resampler->in_sample_rate = in_sample_rate; + resampler->needed = div_uint64_ceil(resampler->out_frame_size * in_sample_rate, resampler->out_sample_rate); + + return 0; +} + +int resampler_send_frame (struct Resampler *resampler, AVFrame *in) +{ + int err; + + if ((err = swr_convert(resampler->swr, NULL, 0, (const uint8_t **)in->extended_data, in->nb_samples)) < 0) + return err; + resampler->stored += in->nb_samples; + return 0; +} + +int resampler_send_empty (struct Resampler *resampler, unsigned int microsecond) +{ + // TODO: Check for downcasting + int nb_samples; + nb_samples = div_uint64_ceil(microsecond * resampler->in_sample_rate, 1000); + return swr_inject_silence(resampler->swr, nb_samples); +} + +int resampler_convert (struct Resampler *resampler, AVFrame *out) +{ + int err; + assert(out != NULL); + assert(resampler->swr != NULL); + + if (resampler->stored < resampler->needed) { + return AVERROR(EAGAIN); + /* + err = swr_inject_silence(resampler->swr, resampler->needed - resampler->stored); + if (err < 0) return err; + */ + } + + err = swr_convert(resampler->swr, out->extended_data, out->nb_samples, NULL, 0); + if (err < 0) return err; + resampler->stored = 0; + + return 0; +} + +void resampler_free (struct Resampler *resampler) +{ + if (resampler == NULL) return; + swr_free(&resampler->swr); +} + +int resampler_init_for_encoder (struct Resampler *resampler, const struct Encoder *encoder) +{ + AVCodecContext *avctx = encoder->avctx; + return resampler_init(resampler, &avctx->ch_layout, avctx->sample_fmt, avctx->sample_rate, avctx->frame_size); +} + +int resampler_conf_for_decoder (struct Resampler *resampler, const struct Decoder *decoder) +{ + AVCodecContext *avctx = decoder->avctx; + return resampler_conf(resampler, &avctx->ch_layout, avctx->sample_fmt, avctx->sample_rate); +} diff --git a/audio/resampler.h b/audio/resampler.h new file mode 100644 index 0000000..0bb471c --- /dev/null +++ b/audio/resampler.h @@ -0,0 +1,31 @@ +#ifndef AUDIO_RESAMPLER_H_ +#define AUDIO_RESAMPLER_H_ + +#include +#include +#include +#include "decoder.h" +#include "encoder.h" + +struct Resampler { + SwrContext *swr; + uint64_t stored; + uint64_t needed; + int in_sample_rate; + + /* Passed by *_init, used by *_conf */ + const AVChannelLayout *out_chlayout; + enum AVSampleFormat out_sample_fmt; + uint64_t out_sample_rate; + uint64_t out_frame_size; +}; + +int resampler_init_for_encoder (struct Resampler *resampler, const struct Encoder *encoder); +int resampler_conf_for_decoder (struct Resampler *resampler, const struct Decoder *decoder); +int resampler_send_frame (struct Resampler *resampler, AVFrame *in); +int resampler_send_empty (struct Resampler *resampler, unsigned int microsecond); +int resampler_convert (struct Resampler *resmapler, AVFrame *out); +int resampler_has_enough (struct Resampler *resampler); +void resampler_free (struct Resampler *resampler); + +#endif // AUDIO_RESAMPLER_H_ diff --git a/audio/stream.c b/audio/stream.c new file mode 100644 index 0000000..7bab674 --- /dev/null +++ b/audio/stream.c @@ -0,0 +1,47 @@ +#include "stream.h" + +int stream_init(struct Stream *stream, AVFormatContext *s, int codec_id, int sample_rate, int format, const AVChannelLayout *layout, AVDictionary **opts) +{ + int err; + AVStream *av_stream; + + if ((av_stream = avformat_new_stream(s, NULL)) == NULL) + return AVERROR(ENOMEM); + av_stream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO; + av_stream->codecpar->codec_id = codec_id; + av_stream->codecpar->sample_rate = sample_rate; + av_stream->codecpar->format = format; + av_stream->codecpar->ch_layout = *layout; + + if ((err = avformat_init_output(s, opts)) < 0) + return err; + + stream->av_stream = av_stream; + stream->s = s; + return 0; +} + +int stream_send (struct Stream *stream, AVPacket *pkt) +{ + int err; + int64_t duration; + + // NOTE: + // Before encoding; + // - frame_out->pts DOES matter + // + // After encoding; + // - pkt->pts is set to something + // - pkt->timebase is unset, but would be '1 / output_sample_rate' + // - pkt->duration is set to the amount of sample in the packet + // - pkt->dts seems to be pkt->pts + pkt->stream_index = stream->av_stream->index; + //av_packet_rescale_ts(pkt_out, encoder->time_base, stream->time_base); + //print_pts(pkt_out, &stream->time_base); + + duration = pkt->duration; + if ((err = av_interleaved_write_frame(stream->s, pkt)) < 0) + return err; + + return duration; +} diff --git a/audio/stream.h b/audio/stream.h new file mode 100644 index 0000000..94c5162 --- /dev/null +++ b/audio/stream.h @@ -0,0 +1,14 @@ +#ifndef AUDIO_STREAM_H +#define AUDIO_STREAM_H + +#include + +struct Stream { + AVFormatContext *s; + AVStream *av_stream; +}; + +int stream_init (struct Stream *stream, AVFormatContext *s, int codec_id, int sample_rate, int format, const AVChannelLayout *layout, AVDictionary **opts); +int stream_send (struct Stream *stream, AVPacket *pkt); + +#endif // AUDIO_STREAM_H diff --git a/common.c b/common.c new file mode 100644 index 0000000..683428d --- /dev/null +++ b/common.c @@ -0,0 +1,13 @@ +#include +#include "common.h" + +void stateinfo_free(struct StateInfo *state_info) +{ + int i; + + for (i = 0; i < state_info->song_count; i ++) + free(state_info->next_songs[i].file); + + free(state_info->current_song); + free(state_info->next_songs); +} diff --git a/common.h b/common.h new file mode 100644 index 0000000..1ad624b --- /dev/null +++ b/common.h @@ -0,0 +1,33 @@ +#ifndef COMMON_H_ +#define COMMON_H_ + +#include +#define PLAYLIST_END UINT_MAX +#define SEGMENT_COUNT_UNLIMITED UINT_MAX + +enum State { + STATE_PAUSED, + STATE_PLAYING, + STATE_STANDBY, + STATE_TERM +}; + +struct Entry { + unsigned int id; + char *file; +}; + +struct StateInfo { + enum State state; + char *current_song; + int song_count; + struct Entry *next_songs; +}; + +struct StreamerOpt { + char *filename; +}; + +void stateinfo_free(struct StateInfo *state_info); + +#endif // COMMON_H_ diff --git a/cybd.1 b/cybd.1 new file mode 100644 index 0000000..6dffa17 --- /dev/null +++ b/cybd.1 @@ -0,0 +1,39 @@ +.TH CYBD 1 cybd\-VERSION +.SH NAME +cybd \- The cyber radio daemon +.SH SYNOPSIS +.B cybd +.RB [ \-t +.IR socket ] +.I command +.RI [ arguments ...] +.SH DESCRIPTION +cybd is a simple HLS muxer daemon designed to be integrated into a bigger HTTP +radio stack. It's role is to progress through a list of local media file in +order to generate a continuous HTTP live stream, stored as an .m3u8 playlist +file and .ts segment files, which can then be statically distributed by a web +server. +.P +By default, the daemon and the clients communicate via a UNIX socket named +$XDG_RUNTIME_DIR/cybd.sock. This can be changed by setting the +.B \-t +flag to the desired socket name. +.SH COMMANDS +.TP +.B daemon +Start the daemon and begin streaming silent frames. The diffusion will start as soon as an entry is pushed into the daemon's playlist. +.TP +.B skip +Halt the playback of the currently playing media an continue with the next one in queue. +.TP +.B pause +Pause the daemon and starts streaming silent frames. +.TP +.B resume +Resume the daemon after a pause command. +.SH AUTHORS +See the LICENSE file for the authors. +.SH LICENSE +See the LICENSE file for the terms of redistribution. +.SH SEE ALSO +.BR ffmpeg (1), diff --git a/daemon.c b/daemon.c new file mode 100644 index 0000000..bcb5049 --- /dev/null +++ b/daemon.c @@ -0,0 +1,273 @@ +#define _DEFAULT_SOURCE +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "daemon.h" +#include "streamer.h" + +int daemon_run = 0; + +// TODO: Too pedantic and verbose, replace with read_val, write_val, read_str, write_str +#define strict_read(fd, ptr, count) (read(fd, ptr, sizeof(*ptr) * count) != sizeof(*ptr) * count) +#define strict_write(fd, ptr, count) (write(fd, ptr, sizeof(*ptr) * count) != sizeof(*ptr) * count) + +enum Opcode { + OP_DAEMON, + OP_PUSH, + OP_POP, + OP_TERM, + OP_SKIP, + OP_LIST, + OP_PAUSE, + OP_RESUME, +}; + +struct Command { + enum Opcode opcode; + int opindex; + int urllen; +}; + +#define write_val(fd, ptr) (write(fd, ptr, sizeof(*ptr))) +#define read_val(fd, ptr) (read(fd, ptr, sizeof(*ptr))) +static void write_str (int fd, const char *str); +static char *read_str (int fd); + +// =================================================================== +// Server code +// =================================================================== + +static inline int send_reply (int fd, struct StateInfo *state_info) +{ + int i; + struct Entry *entry; + + errno = 0; + write_val(fd, &state_info->state); + write_str(fd, state_info->current_song); + write_val(fd, &state_info->song_count); + for (i = 0; i < state_info->song_count; i ++) { + entry = &state_info->next_songs[i]; + write_val(fd, &entry->id); + write_str(fd, entry->file); + } + + return errno ? -1 : 0; +} + +int daemon_start(int sockfd, const struct StreamerOpt *opts) +{ + Streamer *streamer; + int connfd, new_id; + char *url; + struct Command msg; + struct StateInfo state_info; + enum State state; + + if ((streamer = streamer_init(opts)) == NULL) { + fprintf(stderr, "Failed to allocate streamer\n"); + return EXIT_FAILURE; + } + + url = NULL; + daemon_run = 1; + while (daemon_run) { + puts("Awaiting connection"); + + /* Block and wait for another connection. accept(2) shall returns a + * nonnegative file descriptor on sucess, or -1 if an error occured. All + * error are considered fatal, except for EINTR, which is seen as an + * occasion to re-check whether the daemon should keep running. + */ + if ((connfd = accept(sockfd, NULL, NULL)) < 0) { + if (errno == EINTR) continue; + perror("Failed to accept connection"); + break; + } + + if (read(connfd, &msg, sizeof(msg)) != sizeof(msg)) { + fputs("Unexpected end of package, discarding.\n", stderr); + close(connfd); + continue; + } + + switch (msg.opcode) { + case OP_PUSH: + if ((url = realloc(url, msg.urllen + 1)) == NULL) { + perror("Failed to allocate URL buffer"); + break; + } + read(connfd, url, msg.urllen); + url[msg.urllen] = '\0'; + + new_id = streamer_push(streamer, url, msg.opindex); + if (new_id < 0) + fprintf(stderr, "Failed to add '%s' to the playlist: %s\n", url, strerror(errno)); + else + printf("Pushed '%s' with id %i\n", url, new_id); + break; + + case OP_POP: + if (streamer_pop(streamer, msg.opindex)) + fprintf(stderr, "Failed remove entry #%i\n", msg.opindex); + else + printf("Succesfullt remove '%i' from the playlist\n", msg.opindex); + break; + + case OP_TERM: + daemon_run = 0; + break; + case OP_SKIP: + streamer_skip(streamer); + break; + case OP_PAUSE: + streamer_pause(streamer); + break; + case OP_RESUME: + streamer_resume(streamer); + break; + case OP_LIST: + state = streamer_info(streamer, &state_info); + if (state == -1) { + perror("Failed to reatrieve streamer info"); + break; + } + + if (send_reply(sockfd, &state_info)) { + perror("Failed to send back state info"); + break; + } + break; + default: + fprintf(stderr, "Ignoring unknown command code: %i\n", msg.opcode); + break; + } + + close(connfd); + errno = 0; + } + puts("done\n"); + + daemon_run = 0; + streamer_free(streamer); + return errno ? -1 : -0; +} + + +// =================================================================== +// Client code +// =================================================================== + +static inline int send_command(int fd, int code, unsigned int index, int len) +{ + struct Command m; + m.opcode = code; + m.opindex = index; + m.urllen = len; + return strict_write(fd, &m, 1) ? -1 : 0; +} + +int daemon_push(int fd, const char *track, unsigned int index) +{ + int len; + char *realtrack; + + if ((realtrack = realpath(track, NULL)) == NULL) + return -1; + + len = strlen(realtrack); + if (send_command(fd, OP_PUSH, index, len)) + goto error; + if (strict_write(fd, realtrack, len)) + goto error; + free(realtrack); + return 0; + +error: + free(realtrack); + return -1; +} + + +int daemon_pop(int fd, unsigned int index) +{ + return send_command(fd, OP_POP, index, 0); +} + +int daemon_skip(int fd) +{ + return send_command(fd, OP_SKIP, 0, 0); +} + +int daemon_pause(int fd) +{ + return send_command(fd, OP_PAUSE, 0, 0); +} + +int daemon_resume(int fd) +{ + return send_command(fd, OP_RESUME, 0, 0); +} + +int daemon_kill(int fd) +{ + return send_command(fd, OP_TERM, 0, 0); +} + +int daemon_query(int fd, struct StateInfo *state_info) +{ + int i, len; + enum State state; + struct Entry *entry; + + if (send_command(fd, OP_LIST, 0, 0)) + return -1; + + errno = 0; + read_val(fd, &state_info->state); + state_info->current_song = read_str(fd); + read_val(fd, &state_info->song_count); + + state_info->next_songs = calloc(state_info->song_count, sizeof(struct Entry)); + if (state_info->next_songs == NULL) { + free(state_info->current_song); + return -1; + } + + for (i = 0; i < state_info->song_count; i ++) { + entry = &state_info->next_songs[i]; + read_val(fd, &entry->id); + entry->file = read_str(fd); + } + + if (errno) { + stateinfo_free(state_info); + return -1; + } + + return 0; +} + +static void write_str (int fd, const char *str) +{ + size_t len; + len = strlen(str) + 1; + write(fd, &len, sizeof(size_t)); + write(fd, str, len); +} + +static char *read_str(int fd) +{ + size_t len; + char *str; + + read(fd, &len, sizeof(size_t)); + if ((str = malloc(len)) == NULL) + return NULL; + read(fd, str, len); + return str; +} diff --git a/daemon.h b/daemon.h new file mode 100644 index 0000000..d3ea5d0 --- /dev/null +++ b/daemon.h @@ -0,0 +1,29 @@ +#ifndef DAEMON_H_ +#define DAEMON_H_ + +#include "common.h" + +/** + * Functions related to client-daemon IPC over a unix-domain socket. + */ + +/* Servers symbols */ +extern int daemon_run; + +/** + * Allocate a cybd streamer and kickstart the cybd daemon even loop. This + * function is blocking and thread-unsafe due to its interaction with the global + * daemon_run variable. This function returns 0 on success or -1 on failure. + */ +int daemon_start (int fd, const struct StreamerOpt *opts); + +/* Client symbols */ +int daemon_push (int fd, const char *track, unsigned int index); +int daemon_pop (int fd, unsigned int index); +int daemon_skip (int fd); +int daemon_pause (int fd); +int daemon_resume (int fd); +int daemon_kill (int fd); +int daemon_query (int fd, struct StateInfo *entries); + +#endif // DAEMON_H_ diff --git a/main.c b/main.c new file mode 100644 index 0000000..7899b7b --- /dev/null +++ b/main.c @@ -0,0 +1,353 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "daemon.h" + +#define SOCKET_NAME "/cybd.socket" + +#define SUN_LEN(ptr) (offsetof (struct sockaddr_un, sun_path) + strlen ((ptr)->sun_path)) + +static int fd; +static char *socketname; + +static int main_daemon (int fd, int argc, char **argv); +static int main_push (int fd, int argc, char **argv); +static int main_pop (int fd, int argc, char **argv); +static int main_skip (int fd, int argc, char **argv); +static int main_pause (int fd, int argc, char **argv); +static int main_resume (int fd, int argc, char **argv); +static int main_query (int fd, int argc, char **argv); +static int main_kill (int fd, int argc, char **argv); + +static int get_socket_fd (const char *socketname, int do_bind); +static char *get_default_socketname (void); + + +int main(int argc, char **argv) +{ + int opt, err, is_daemon; + char *command; + + socketname = NULL; + while ((opt = getopt(argc, argv, "s:")) != -1) switch (opt) { + case 's': + socketname = strdup(optarg); + break; + default: + goto usage; + } + + if (socketname == NULL) + socketname = get_default_socketname(); + + if ((command = argv[optind ++]) == NULL) { + fprintf(stderr, "%s: missing command\n", argv[0]); + free(socketname); + goto usage; + } + + is_daemon = strcmp(command, "daemon") == 0; + if (is_daemon) unlink(socketname); + if ((fd = get_socket_fd(socketname, is_daemon)) == -1) { + fprintf(stderr, "%s: Failed to bind socket at '%s': %s\n", argv[0], socketname, strerror(errno)); + free(socketname); + return EXIT_FAILURE; + } + + if (is_daemon) + err = main_daemon (fd, argc, argv); + else { + free(socketname); + if (strcmp(command, "push" ) == 0) err = main_push (fd, argc, argv); + else if (strcmp(command, "pop" ) == 0) err = main_pop (fd, argc, argv); + else if (strcmp(command, "skip" ) == 0) err = main_skip (fd, argc, argv); + else if (strcmp(command, "pause" ) == 0) err = main_pause (fd, argc, argv); + else if (strcmp(command, "resume") == 0) err = main_resume (fd, argc, argv); + else if (strcmp(command, "kill" ) == 0) err = main_kill (fd, argc, argv); + else if (strcmp(command, "query" ) == 0) err = main_query (fd, argc, argv); + else { + fprintf(stderr, "%s: %s: Unknown command\n", argv[0], command); + close(fd); + goto usage; + } + } + + close(fd); + return err; + +usage: + fprintf(stderr, "Usage: %s [-s socket] [args...]\n", argv[0]); + return EXIT_FAILURE; +} + +char *get_default_socketname(void) +{ + /* Suggest a default socket name for cybd. This function returns either one + * of the two following suggestions; + * + * 1. If the XDG_RUNTIME_DIR environment is set, we assume the user adhere + * to the XDG directory specification and behave accordingly. + * 2. If not, we use the FHS-specified '/run' directory. + * + * In either case, the socket names is set to ends with SOCKET_NAME and a + * dynamically allocated string is returned. This function may also returns + * NULL if cybd has run out of memory, in which case errno will be set to + * ENOMEM. + */ + char *name, *xdg; + + if ((xdg = getenv("XDG_RUNTIME_DIR"))) { + name = malloc(strlen(xdg) + sizeof(SOCKET_NAME)); + if (name == NULL) return NULL; + + sprintf(name, "%s" SOCKET_NAME, xdg); + return name; + } + + return strdup("/run" SOCKET_NAME); +} + +int get_socket_fd (const char *socketname, int as_daemon) +{ + int fd; + struct sockaddr_un addr; + + // TODO: Re-check the documentation apropos socket name length + if (strlen(socketname) > sizeof(addr.sun_path) - 1) { + fputs("Failed to create socket: filename too long\n", stderr); + return -1; + } + + if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + perror("Failed to create socket"); + return -1; + } + + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, socketname); + + if ((as_daemon ? bind : connect)(fd, (struct sockaddr *)&addr, SUN_LEN(&addr))) { + close(fd); + fprintf(stderr, "Failed to open socket at '%s': %s\n", socketname, strerror(errno)); + return -1; + } + + if (as_daemon) + listen(fd, SOMAXCONN); + + return fd; +} + +static int main_push(int fd, int argc, char **argv) +{ + int read; + unsigned int id; + char *file; + + if (optind == argc) { + fprintf(stderr, "%s: Missing input argument\n", argv[0]); + goto usage; + } + file = argv[optind ++]; + + if (argv[optind] != NULL) { + sscanf(argv[optind], "%u%n", &id, &read); + if (read != strlen(argv[optind])) { + fprintf(stderr, "%s: bad index argument: '%u'\n", argv[0], id); + goto usage; + } + optind ++; + } + else id = PLAYLIST_END; + + if (argv[optind] != NULL) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + goto usage; + } + + if (daemon_push(fd, file, id)) { + perror("Failed to push entry"); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + +usage: + fprintf(stderr, "Usage: %s [-s socket] push [index]\n", argv[0]); + return EXIT_FAILURE; +} + +static int main_pop(int fd, int argc, char **argv) +{ + int read; + unsigned int id; + + if (argv[optind] != NULL) { + sscanf(argv[optind], "%u%n", &id, &read); + if (read != strlen(argv[optind])) { + fprintf(stderr, "%s: bad index argument: '%u'\n", argv[0], id); + goto usage; + } + optind ++; + } + else id = PLAYLIST_END; + + if (argv[optind] != NULL) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + goto usage; + } + + if (daemon_pop(fd, id)) { + perror("Failed to pop entry"); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + +usage: + fprintf(stderr, "Usage: %s [-s socket] pop \n", argv[0]); + return EXIT_FAILURE; +} + +static int main_skip(int fd, int argc, char **argv) +{ + if (argv[optind] != NULL) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + fprintf(stderr, "Usage: %s [-s socket] skip\n", argv[0]); + return EXIT_FAILURE; + } + if (daemon_skip(fd)) { + perror("Failed to send skip request"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +static int main_pause(int fd, int argc, char **argv) +{ + if (argv[optind] != NULL) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + fprintf(stderr, "Usage: %s [-s socket] pause\n", argv[0]); + return EXIT_FAILURE; + } + if (daemon_pause(fd)) { + perror("Failed to send pause request"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +static int main_resume(int fd, int argc, char **argv) +{ + if (argv[optind] != NULL) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + fprintf(stderr, "Usage: %s [-s socket] resume\n", argv[0]); + return EXIT_FAILURE; + } + if (daemon_resume(fd)) { + perror("Failed to send resume request"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +static int main_kill(int fd, int argc, char **argv) +{ + if (argv[optind] != NULL) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + fprintf(stderr, "Usage: %s [-s socket] kill\n", argv[0]); + return EXIT_FAILURE; + } + if (daemon_kill(fd)) { + perror("Failed to send kill request"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +static int main_query (int fd, int argc, char **argv) +{ + int i; + struct Entry *entry; + struct StateInfo state_info; + + if (optind != argc) { + fprintf(stderr, "%s: Too many arguments\n", argv[0]); + fprintf(stderr, "Usage: %s [-s socket] query\n", argv[0]); + return -1; + } + + if (daemon_query(fd, &state_info)) { + perror("Failed to query daemon"); + return -1; + } + + switch (state_info.state) { + case STATE_PAUSED: + printf("%s (paused)\n", state_info.current_song); + break; + case STATE_PLAYING: + printf("%s (playing)\n", state_info.current_song); + break; + case STATE_STANDBY: + puts("standby"); + break; + } + + for (i = 0; i < state_info.song_count; i ++) { + entry = &state_info.next_songs[i]; + printf("%u\t%s", entry->id, entry->file); + free(entry->file); + } + + stateinfo_free(&state_info); + return 0; +} + +static void close_socket(int sig) +{ + puts("Signal received, terminating daemon"); + shutdown(fd, SHUT_RDWR); +} + +static int main_daemon (int fd, int argc, char **argv) +{ + time_t t; + int err; + struct tm *tm_info; + struct StreamerOpt opts; + struct sigaction sigact; + + sigact.sa_handler = close_socket; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = 0; + sigaction(SIGTERM, &sigact, NULL); + sigaction(SIGINT, &sigact, NULL); + + // TODO: Read from args + opts.filename = "./test/stream.m3u8"; + + + time(&t); + tm_info = localtime(&t); + + char buffer[50]; + strftime(buffer, 50, "Daemon started at %Y-%m-%d %H:%M:%S", tm_info); + puts(buffer); + + err = daemon_start(fd, &opts); + unlink(socketname); + free(socketname); + + puts("Goodbye."); + return err ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/playlist.c b/playlist.c new file mode 100644 index 0000000..5d7a697 --- /dev/null +++ b/playlist.c @@ -0,0 +1,269 @@ +#include +#include +#include +#include +#include +#include +#include "playlist.h" + +#define PLAYLIST_INIT_SIZE 2 // TODO: Bump to 32 + +struct Playlist { + struct Entry *entries; + int front, rear, next_id, len, size; + pthread_mutex_t mutex; + pthread_cond_t ready; +}; + +static int playlist_grow (Playlist *pl); +static int playlist_is_full (Playlist *pl); +static int playlist_is_empty (Playlist *pl); +static int playlist_get_index (Playlist *pl, unsigned int id); +static void playlist_unsafe_dequeue (Playlist *pl, struct Entry *entry); + +Playlist *playlist_init(void) +{ + Playlist *pl; + + if ((pl = malloc(sizeof(*pl))) == NULL) + return NULL; + + pl->entries = calloc(PLAYLIST_INIT_SIZE, sizeof(*pl->entries)); + if (pl->entries == NULL) { + free(pl); + return NULL; + } + + pl->next_id = 0; + pl->front = 0; + pl->rear = 0; + pl->len = 0; + pl->size = PLAYLIST_INIT_SIZE; + pl->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; + pl->ready = (pthread_cond_t) PTHREAD_COND_INITIALIZER; + return pl; +} + +void playlist_free (Playlist *pl) +{ + int i; + + if (pl == NULL) + return; + + for (i = 0; i < pl->size; i ++) + free(pl->entries[0].file); + + free(pl->entries); + free(pl); +} + +int playlist_enqueue (Playlist *pl, const char *entry) +{ + int id; + char *copy; + + if ((copy = strdup(entry)) == NULL) + return -1; + + pthread_mutex_lock(&pl->mutex); + if (playlist_is_full(pl) && playlist_grow(pl)) { + pthread_mutex_unlock(&pl->mutex); + free(copy); + return -1; + } + + pl->entries[pl->rear].file = copy; + id = pl->entries[pl->rear].id = pl->next_id ++; + pl->rear = (pl->rear + 1) % pl->size; + pl->len ++; + + pthread_cond_signal(&pl->ready); + pthread_mutex_unlock(&pl->mutex); + return id; +} + +int playlist_dequeue (Playlist *pl, struct Entry *entry) +{ + /* playlist_dequeue take hold of the playlist's mutex, relinquishing it + * either after a song has been dequeued, or, in the scenario where the + * playlist is currently empty, atomically while waiting for the pl->ready + * condition to signal true, indicating that a song has been queued. + * + * However, it is documented that POSIX condition waiters may be subjects to + * "spurious wakeup", spontaneously unlocking themself for no appearent + * reasons. + * + * To prevent undefined behaviours, the lock condition checker and the + * waiter are placed in a loop to ensure that any return-to-context occured + * legitimately, and to keep waiting othwerwise. + */ + + pthread_mutex_lock(&pl->mutex); + errno = 0; + while (playlist_is_empty(pl)) { + pthread_cond_wait(&pl->ready, &pl->mutex); + if (errno) { + pthread_mutex_unlock(&pl->mutex); + return -1; + } + } + + playlist_unsafe_dequeue(pl, entry); + pthread_mutex_unlock(&pl->mutex); + return 0; +} + +int playlist_try_dequeue(Playlist *pl, struct Entry *entry) +{ + pthread_mutex_lock(&pl->mutex); + if (playlist_is_empty(pl)) { + pthread_mutex_unlock(&pl->mutex); + return -1; + } + + playlist_unsafe_dequeue(pl, entry); + pthread_mutex_unlock(&pl->mutex); + return 0; +} + +int playlist_remove(Playlist *pl, unsigned int id, struct Entry *entry) +{ + int index; + + pthread_mutex_lock(&pl->mutex); + if ((index = playlist_get_index(pl, id)) == -1) { + pthread_mutex_unlock(&pl->mutex); + return -1; + } + + *entry = pl->entries[index]; + while (index != pl->rear) { + pl->entries[index] = pl->entries[index + 1]; + index = (index + 1) % pl->len; + } + + if (pl->rear -- < 0) + pl->rear = pl->len - 1; + pl->len --; + + pthread_mutex_unlock(&pl->mutex); + return 0; +} + +int playlist_insert(Playlist *pl, unsigned int id, const char *value) +{ + char *copy; + int index, new_id; + struct Entry a, b; + + pthread_mutex_lock(&pl->mutex); + if ((index = playlist_get_index(pl, id)) == -1) { + pthread_mutex_unlock(&pl->mutex); + return -1; + } + + if ((copy = strdup(value)) == NULL) + goto enomem; + + if (playlist_is_full(pl) && playlist_grow(pl)) + goto enomem; + + pl->rear = (pl->rear + 1) % pl->len; + + a = (struct Entry){ new_id = pl->next_id ++, copy }; + while (index != pl->rear) { + b = pl->entries[index]; + pl->entries[index] = a; + a = b; + index = (index + 1) % pl->len; + } + + pl->len ++; + + pthread_mutex_unlock(&pl->mutex); + return new_id; + +enomem: + pthread_mutex_unlock(&pl->mutex); + free(copy); + return -2; +} + +int playlist_list(Playlist *pl, struct StateInfo *list) +{ + int i, j; + + pthread_mutex_lock(&pl->mutex); + list->song_count = pl->len; + if ((list->next_songs = malloc(sizeof(list[0]) * pl->len)) == NULL) + return -1; + + for (i = 0, j = pl->front; i < pl->len; i ++, j ++) { + j %= pl->len; + list->next_songs[i].id = pl->entries[j].id; + list->next_songs[i].file = strdup(pl->entries[j].file); + } + pthread_mutex_unlock(&pl->mutex); + + return 0; +} + +int playlist_get_index(Playlist *pl, unsigned int id) +{ + int index; + + index = pl->front; + + // Break on a null entry + while (pl->entries[index].file) { + // Bingo + if (pl->entries[index].id == id) + return index; + + // Advance the index, and break if we're back at the front + index = (index + 1) % pl->len; + if (index == pl->front) + break; + } + + return -1; +} + +static int playlist_is_full (Playlist *pl) +{ + return pl->len == pl->size; +} + +static int playlist_is_empty (Playlist *pl) +{ + return pl->entries[pl->front].file == NULL; +} + +static int playlist_grow(Playlist *pl) +{ + int split; + struct Entry *new_buff; + + new_buff = calloc(pl->size * 2, sizeof(*new_buff)); + if (new_buff == NULL) return -1; + + split = pl->len - pl->front; + memcpy(&new_buff[0], &pl->entries[pl->front], split); + memcpy(&new_buff[split], &pl->entries[0], pl->len - split); + + free(pl->entries); + pl->front = 0; + pl->rear = pl->len; + pl->entries = new_buff; + pl->size *= 2; + return 0; +} + +static void playlist_unsafe_dequeue(Playlist *pl, struct Entry *entry) +{ + *entry = pl->entries[pl->front]; + pl->entries[pl->front].file = NULL; + pl->front = (pl->front + 1) % pl->size; + pl->len --; +} diff --git a/playlist.h b/playlist.h new file mode 100644 index 0000000..ed4b19d --- /dev/null +++ b/playlist.h @@ -0,0 +1,33 @@ +#ifndef PLAYLIST_H_ +#define PLAYLIST_H_ + +#include "common.h" + +/** + * Playlist - A thread-safe string queue with support for indexed insert/remove. + */ +typedef struct Playlist Playlist; + +Playlist *playlist_init (void); +void playlist_free (Playlist *pl); +int playlist_enqueue (Playlist *pl, const char *entry); + +/** + * Fetch the next entry in the playlist, blocking the current thread and waiting + * for an entry if needed. The function can be interrupted by sending a signal + * to the waiting thread, in which case *entry remains untouched, -1 is + * returned, and errno is set to EINTR. Returns 0 on success. + */ +int playlist_dequeue(Playlist *pl, struct Entry *entry); + +/** + * Like playlist_dequeue, but returns -1 when the playlist is empty instead of + * locking the thread. Returns 0 on success. + */ +int playlist_try_dequeue(Playlist *pl, struct Entry *entry); + +int playlist_insert (Playlist *pl, unsigned int id, const char *entry); +int playlist_remove (Playlist *pl, unsigned int id, struct Entry *entry); +int playlist_list (Playlist *pl, struct StateInfo *list); + +#endif // PLAYLIST_H_ diff --git a/streamer.c b/streamer.c new file mode 100644 index 0000000..f02efed --- /dev/null +++ b/streamer.c @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "playlist.h" +#include "streamer.h" +#include "transmuxer.h" + +/* ===== Definitions ===== */ +#define PRIORITY -10 +struct Streamer { + pthread_t worker_thread; + pthread_mutex_t worker_ready_mutex; + pthread_cond_t worker_ready_cond; + int worker_ready_flag; + + enum State state; + pthread_mutex_t state_mutex; + + HLSRemuxer *remuxer; + Playlist *playlist; + char *current_song; +}; + +/* ===== Thread-local storage ===== */ +static pthread_key_t streamer_worker_key; +static pthread_once_t streamer_worker_once = PTHREAD_ONCE_INIT; + +static void streamer_worker_key_create(void) +{ + pthread_key_create(&streamer_worker_key, NULL); +} + + +/* ====== Streamer initialisation ===== */ +static void *streamer_worker (void *t); + +Streamer *streamer_init (const struct StreamerOpt *opts) +{ + Streamer *s = NULL; + Playlist *p = NULL; + HLSRemuxer *r = NULL; + + pthread_once(&streamer_worker_once, streamer_worker_key_create); + + if ((s = malloc(sizeof(*s))) == NULL) + goto error; + if ((p = playlist_init()) == NULL) + goto error; + if ((r = hls_remuxer_init(opts)) == NULL) + goto error; + + if (pthread_create(&s->worker_thread, NULL, streamer_worker, s)) + goto error; + + s->playlist = p; + s->remuxer = r; + s->state = STATE_STANDBY; + s->state_mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; + return s; + +error: + hls_remuxer_free(r); + playlist_free(p); + free(s); + return NULL; +} + +int streamer_push(Streamer *s, const char *url, unsigned int index) +{ + return (index == PLAYLIST_END) + ? playlist_enqueue(s->playlist, url) + : playlist_insert(s->playlist, index, url); +} + +int streamer_pop(Streamer *s, unsigned int id) +{ + int err; + struct Entry entry; + + if ((err = playlist_remove(s->playlist, id, &entry))) + return err; + free(entry.file); + return 0; +} + +void streamer_skip(Streamer *s) +{ + /* 'skip' only halt the current song, which is then replaced by the streamer + * worker, either immediately, or as soon as a song gets pushed. + */ + hls_remuxer_halt(s->remuxer); +} + +void streamer_pause(Streamer *s) +{ + pthread_mutex_lock(&s->state_mutex); + if (s->state == STATE_PLAYING) { + hls_remuxer_pause(s->remuxer); + s->state = STATE_PAUSED; + } + pthread_mutex_unlock(&s->state_mutex); +} + +void streamer_resume(Streamer *s) +{ + pthread_mutex_lock(&s->state_mutex); + if (s->state == STATE_PAUSED) { + hls_remuxer_resume(s->remuxer); + s->state = STATE_PLAYING; + } + pthread_mutex_unlock(&s->state_mutex); +} + +int streamer_info(Streamer *streamer, struct StateInfo *info) +{ + pthread_mutex_lock(&streamer->state_mutex); + info->state = streamer->state; + + if ((info->current_song = strdup(streamer->current_song)) == NULL) { + pthread_mutex_unlock(&streamer->state_mutex); + return -1; + } + + if (playlist_list(streamer->playlist, info)) { + pthread_mutex_unlock(&streamer->state_mutex); + free(info->current_song); + return -1; + } + + pthread_mutex_unlock(&streamer->state_mutex); + return 0; +} + +void streamer_free (Streamer *s) +{ + pthread_kill(s->worker_thread, SIGTERM); + hls_remuxer_halt(s->remuxer); + pthread_join(s->worker_thread, NULL); + playlist_free(s->playlist); + free(s); +} + + +static void streamer_worker_sighandler(int sig) +{ + Streamer *streamer; + streamer = pthread_getspecific(streamer_worker_key); + + pthread_mutex_lock(&streamer->state_mutex); + streamer->state = STATE_TERM; + pthread_mutex_unlock(&streamer->state_mutex); +} + + +static inline int streamer_worker_conf_priority(void) +{ + errno = 0; + setpriority(PRIO_PROCESS, 0, PRIORITY); + return errno ? -1 : 0; +} + +static inline int streamer_worker_conf_sighandling(void) +{ + sigset_t set; + struct sigaction sigact; + + /* Configure SIGTERM interuptor */ + sigact.sa_handler = streamer_worker_sighandler; + sigact.sa_flags = 0; + sigemptyset(&sigact.sa_mask); + sigaction(SIGTERM, &sigact, NULL); // term + sigaction(SIGUSR1, &sigact, NULL); // play/pause + sigaction(SIGUSR2, &sigact, NULL); // skip + + /* Unblock SIGTERM */ + sigemptyset(&set); + sigaddset(&set, SIGTERM); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGUSR2); + pthread_sigmask(SIG_UNBLOCK, &set, NULL); +} + +static void *streamer_worker(void *arg) +{ + Streamer *s; + struct Entry next; + + pthread_setspecific(streamer_worker_key, (s = arg)); + + /* Configure worker */ + pthread_mutex_lock(&s->worker_ready_mutex); + streamer_worker_conf_priority(); + streamer_worker_conf_sighandling(); + s->worker_ready_flag = 1; + pthread_cond_broadcast(&s->worker_ready_cond); + pthread_mutex_unlock(&s->worker_ready_mutex); + + /* Main loop */ + while (s->state != ) { + /* Attempt to fetch the next song, or go into standby mode */ + if (playlist_try_dequeue(s->playlist, &next)) { + puts("The playlist is empty, going into standby mode."); + pthread_mutex_lock(&s->state_mutex); + s->state = STATE_STANDBY; + pthread_mutex_unlock(&s->state_mutex); + playlist_dequeue(s->playlist, &next); + } + + /* Update the current state */ + pthread_mutex_lock(&s->state_mutex); + printf("Next song: [%i] %s\n", next.id, next.file); + s->current_song = next.file; + s->state = STATE_PLAYING; + pthread_mutex_unlock(&s->state_mutex); + + /* Play the song entirely */ + hls_remuxer_play(s->remuxer, next.file); + + /* Discard current_song, but keep the current 'playing' state. */ + pthread_mutex_lock(&s->state_mutex); + free(s->current_song); + s->current_song = NULL; + pthread_mutex_unlock(&s->state_mutex); + } + + return NULL; +} diff --git a/streamer.h b/streamer.h new file mode 100644 index 0000000..b10a2af --- /dev/null +++ b/streamer.h @@ -0,0 +1,22 @@ +#ifndef STREAMING_H_ +#define STREAMING_H_ + +#include "common.h" + +/** + * An opaque data structure used to initialize and control a threaded HLS + * streamer. Please review 'struct StreamerOpt' for runtime initialisation + * parameters. + */ +typedef struct Streamer Streamer; + +Streamer *streamer_init (const struct StreamerOpt *opt); +int streamer_push (Streamer *streamer, const char *url, unsigned int id); +int streamer_pop (Streamer *streamer, unsigned int id); +void streamer_skip (Streamer *streamer); +void streamer_pause (Streamer *streamer); +void streamer_resume (Streamer *streamer); +void streamer_free (Streamer *streamer); +int streamer_info (Streamer *streamer, struct StateInfo *info); + +#endif // STREAMING_H_ diff --git a/testmuxer.c b/testmuxer.c new file mode 100644 index 0000000..69b693c --- /dev/null +++ b/testmuxer.c @@ -0,0 +1,21 @@ +#include "common.h" +#include "transmuxer.h" +#include +#include + +int main(int argc, char **argv) +{ + struct StreamerOpt opts = { + .filename = "test/stream.m3u8" + }; + + if (argc != 2) { + fprintf(stderr, "usage: %s FILE\n", argv[0]); + return EXIT_FAILURE; + } + + HLSRemuxer *remuxer = hls_remuxer_init(&opts); + hls_remuxer_play(remuxer, argv[1]); + + return EXIT_SUCCESS; +} diff --git a/transmuxer.c b/transmuxer.c new file mode 100644 index 0000000..9dcf005 --- /dev/null +++ b/transmuxer.c @@ -0,0 +1,283 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "audio/muxer.h" +#include "common.h" +#include "transmuxer.h" + +#define NANOSEC (uint64_t)1000000000 +#define MIN(a, b) ((a < b) ? a : b) + +enum PacketType { + PKT_AUDIO, + PKT_VIDEO, +}; + +struct HLSRemuxer { + enum State state; + pthread_cond_t resume; + + /* Input objects */ + AVFormatContext *demuxer; + const AVStream *audio_stream; + const AVStream *video_stream; + int audio_stream_index; + int video_stream_index; + + /* Output object pointers */ + long long next_sample; + AVFormatContext *hls_writer; + AudioMuxer *audio_muxer; + + struct timespec start_time; + + /* overhead: The target lenght in nanosecond of the output's stream + * buffer. + */ + uint64_t overhead; +}; + +static void hls_nanosleep (uint64_t t); +static uint64_t hls_nanotime (void); +static int hls_remuxer_init_input (struct HLSRemuxer *remuxer, const char *file); + +static void timespec_add_nano (struct timespec *ts, uint64_t nanoseconds); +static int timespec_is_ahead (const struct timespec *a, const struct timespec *b); +static int remuxer_send_audio (HLSRemuxer *remuxer, AVPacket *audio_packet); +static int remuxer_send_video (HLSRemuxer *remuxer, AVPacket *audio_packet); +static int remuxer_wait_target_ts (HLSRemuxer *remuxer); +static int remuxer_get_overrun (HLSRemuxer *remuxer, struct timespec *until); + +HLSRemuxer *hls_remuxer_init (const struct StreamerOpt *opts) +{ + AVStream *stream; + HLSRemuxer *remuxer; + AVFormatContext *hls_muxer; + AudioMuxer *audio_muxer; + AVDictionary *m; + + if ((remuxer = malloc(sizeof(HLSRemuxer))) == NULL) + return NULL; + + av_log_set_level(AV_LOG_VERBOSE); + + int err; + + if ((err = avformat_alloc_output_context2(&hls_muxer, NULL, "hls", opts->filename)) < 0) { + fprintf(stderr, "Failed to allocate output context: %s\n", av_err2str(err)); + return NULL; + } + + struct AudioMuxerOpt audio_opts = { + .codec_id = AV_CODEC_ID_MP3, + .sample_rate = 44100, + .sample_format = AV_SAMPLE_FMT_S16P, + .opts_encoder = NULL, + .opts_muxer = NULL, + .ch_layout = AV_CHANNEL_LAYOUT_STEREO + }; + + if (audiomuxer_init(&audio_muxer, hls_muxer, &audio_opts)) + goto error; + + // TODO: Pass muxer options + if ((err = avformat_write_header(hls_muxer, NULL)) < 0) { + fprintf(stderr, "Failed to write output file: %s\n", av_err2str(err)); + goto error; + } + + clock_gettime(CLOCK_MONOTONIC, &remuxer->start_time); + remuxer->hls_writer = hls_muxer; + remuxer->audio_muxer = audio_muxer; + remuxer->overhead = 10 * NANOSEC; // NOTE: hls_time * hls_list_size * NANOSEC + remuxer->resume = (pthread_cond_t) PTHREAD_COND_INITIALIZER; + return remuxer; + +error: + audiomuxer_free(audio_muxer); + avformat_free_context(hls_muxer); + free(remuxer); + return NULL; +} + +void hls_remuxer_play(HLSRemuxer *remuxer, const char *file) +{ + int err; + AVPacket *pkt; + uint64_t next_second, next_nanosec; + + pkt = av_packet_alloc(); + + if (hls_remuxer_init_input(remuxer, file)) + return; + + int64_t pts; + while (1) { + /* Demux packet */ + switch (err = av_read_frame(remuxer->demuxer, pkt)) { + case 0: + break; + default: + fprintf(stderr, "Error: %s\n", av_err2str(err)); + case AVERROR_EOF: + fprintf(stderr, "Last packet ts: %f\n", (double)(pts * + remuxer->audio_stream->time_base.num) / + (remuxer->audio_stream->time_base.den * 60)); + goto exit; + } + pts = pkt->pts; + + /* Dispatch packet muxer */ + if (pkt->stream_index == remuxer->audio_stream_index) + err = audiomuxer_send_packet(remuxer->audio_muxer, pkt); + else + continue; + + if (err) { + fprintf(stderr, "Failed to send packet: %s\n", av_err2str(err)); + break; + } + + /* Prevent overrun */ + remuxer_wait_target_ts(remuxer); + } + +exit: + av_packet_free(&pkt); + return; +} + +void hls_remuxer_pause(HLSRemuxer *remuxer) +{ + // TODO: mutex? + remuxer->state = STATE_PAUSED; +} + +void hls_remuxer_resume(HLSRemuxer *remuxer) +{ + pthread_cond_broadcast(&remuxer->resume); +} + +void hls_remuxer_halt(HLSRemuxer *remuxer) +{ + // TODO: mutex? + remuxer->state = STATE_STANDBY; +} + +void hls_remuxer_free(HLSRemuxer *remuxer) +{ + if (remuxer == NULL) return; + av_write_trailer(remuxer->hls_writer); + audiomuxer_free(remuxer->audio_muxer); + free(remuxer); +} + +static int hls_remuxer_init_input(struct HLSRemuxer *remuxer, const char *file) +{ + int err; + int audio_stream_index; + int video_stream_index; + const AVStream *audio_stream; + const AVStream *video_stream; + AVFormatContext *demuxer; + + demuxer = NULL; + + if ((err = avformat_open_input(&demuxer, file, NULL, NULL))) { // TODO: Options + fprintf(stderr, "Failed to open '%s': %s\n", file, av_err2str(err)); + goto error; + } + + if ((err = avformat_find_stream_info(demuxer, NULL)) < 0) { // TODO: Options + fprintf(stderr, "Failed to find stream info of '%s': %s\n", file, av_err2str(err)); + goto error; + } + + audio_stream_index = av_find_best_stream(demuxer, AVMEDIA_TYPE_AUDIO, -1, -1, NULL, 0); + video_stream_index = av_find_best_stream(demuxer, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0); + + audio_stream = audio_stream_index >= 0 ? demuxer->streams[audio_stream_index] : NULL; + video_stream = video_stream_index >= 0 ? demuxer->streams[video_stream_index] : NULL; + if (audio_stream == NULL && video_stream == NULL) { + fprintf(stderr, "Failed to find audio stream index of '%s'\n", file); + err = AVERROR_STREAM_NOT_FOUND; + goto error; + } + + if ((err = audiomuxer_conf(remuxer->audio_muxer, audio_stream))) + goto error; + + remuxer->demuxer = demuxer; + remuxer->audio_stream = audio_stream; + remuxer->video_stream = video_stream; + remuxer->audio_stream_index = audio_stream_index; + remuxer->video_stream_index = video_stream_index; + return 0; + +error: + avformat_close_input(&demuxer); + return err; +} + +static void timespec_add_nano(struct timespec *ts, uint64_t nanoseconds) +{ + uint64_t new_nano; + + new_nano = ts->tv_nsec + nanoseconds; + ts->tv_nsec = new_nano % NANOSEC; + ts->tv_sec += new_nano / NANOSEC; +} + + +/** + * Block the thread if the remuxer is getting close to overruning (writing HLS + * segments and discarding previous ones faster than clients can read them). + * This is done by checking if all stream[1] have more data ready than needed to + * fill the buffer delay (2 seconds by default) and calling clock_nanosleep to + * wait off the overrun[2]. + * + * - [1] We only have an audio stream right now. + * - [2] TODO: Implement instead of waiting 2 seconds + */ +static int remuxer_wait_target_ts (HLSRemuxer *remuxer) +{ + uint64_t video_runtime, audio_runtime, min_runtime; + struct timespec target_ts; + + audio_runtime = audiomuxer_get_runtime(remuxer->audio_muxer); + video_runtime = INT64_MAX; + min_runtime = MIN(audio_runtime, video_runtime); + assert(min_runtime == audio_runtime); + if (min_runtime < remuxer->overhead) + return 0; + min_runtime -= remuxer->overhead; + + target_ts = remuxer->start_time; + timespec_add_nano(&target_ts, min_runtime); + + return clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &target_ts, NULL); +} + diff --git a/transmuxer.h b/transmuxer.h new file mode 100644 index 0000000..190e6e6 --- /dev/null +++ b/transmuxer.h @@ -0,0 +1,17 @@ +#ifndef TRANSMUXER_H_ +#define TRANSMUXER_H_ + +// TODO: Rename file to "remuxer.c/h" + +#include "common.h" + +typedef struct HLSRemuxer HLSRemuxer; + +HLSRemuxer *hls_remuxer_init (const struct StreamerOpt *opts); +void hls_remuxer_play (HLSRemuxer *remuxer, const char *file); +void hls_remuxer_pause (HLSRemuxer *remuxer); +void hls_remuxer_resume (HLSRemuxer *remuxer); +void hls_remuxer_halt (HLSRemuxer *remuxer); +void hls_remuxer_free (HLSRemuxer *remuxer); + +#endif // TRANSMUXER_H_ -- cgit v1.2.3